At Tapjoy, analytics is core to our platform. On an average day, we're processing over 2 million messages per minute through our analytics pipeline. These messages are generated by various user events in our platform and eventually aggregated for a close-to-realtime view of the system.
For our technology stack, we use Kafka to store the messages, Spark to consume and aggregate the data, and Postgres to store the aggregates. In order to deliver the data in a timely and accurate manner, we aimed to build a system that was scalable, highly available, performant, and cost-effective.
In this post we look at how we handled the at-least-once semantics of our Kafka pipeline through real-time deduping in order to ensure the integrity / accuracy of the data.
Messaging systems tend to provide one of two types of delivery semantics for consumers: at-least-once and at-most-once. At-most-once semantics means messages may get lost; at-least-once semantics means messages may be duplicated. For critical systems, data loss is unacceptable. For example, if we're reporting ad conversions that our partners will use to make spend decisions then our data must be accurate.
At Tapjoy, every messaging system we use provides at-least-once delivery guarantees: SQS, RabbitMQ, Kafka, etc. However, we want to process messages exactly once in order to avoid over-reporting analytics. This poses the question: if the same message gets written to Kafka twice, how do you ensure that message is only processed once by our client application code?
This is where dedupe comes into play.
Given the scale of the pipeline and the downstream dependence on accurate data, any dedupe solution must be:
Scalable - This is more or less a given -- we need to be able to scale this solution as we continue to track more analytics data or during periods of high throughput when our product goes live on a large partner.
Highly Available - Most of our infrastructure is designed to be highly available so we can sustain failures and keep the platform running. Given the real-time constraints of this system, we want to be able to quickly fail over to another cluster / set of servers in case, for example, an availability zone goes awry in AWS.
Persistent - We need the dedupe datastore to persist even when other services in the stack (like Spark) restart. If we lost the data on every restart, we'd be increasing the potential for dupes every time we deploy.
Fast - 2 million messages / minute is a lot to dedupe and we don't want to run into a situation where our Spark ETL can't keep up.
Cost-effective - We need to keep a keen eye on cost especially as the solution scales up. Sure, we could decide to add 15 servers in order to have high performance but economically it's not acceptable.
Efficient storage - We want to guard against the possibility that we would receive the same message over and over again if a message producer was broken. A goal we set for ourselves was the ability to dedupe over a period of 12 to 24 hours. This means that if a message comes in at 08:00 AM, we should be able to detect any duplicate messages that come in by 08:00 PM. To store data over a broad time range, but keep storage costs down, we need a way to evict old data in the dedupe store. We'll be looking for a technology that supports Least Recently Used, or LRU, eviction.
At a high-level, the approach we're looking at isn't much different than a standard dedupe algorithm. For every message:
- Grab the message's transaction id (the id that identifies the event that occurred)
- Perform a "set if not already set" atomic command on the transaction id in our data store
- If the key was successfully set, then process the message; otherwise, it's a duplicate so ignore
This is the approach we use in a lot of places and it's the general strategy we use here.
In Spark, ETL jobs are run in multiple stages. At any point, a stage may fail and need to be retried or a job may get killed and the application needs to get restarted. The impact of this is that if a job dies after we've marked the transaction ids as having been seen, then we need to make sure the job is able to process that same message when it's re-run.
To address this issue, our dedupe process is tweaked to the following:
- Grab the message's transaction id (the id that identifies the event that occurred)
- Attempt a "set if not already set" command where the key is the transaction id and the value is the Kafka partition + offset of the message (that's the important part)
- If the key was successfully set, then process the message. If the key already existed and the value matches the current Kafka partition + offset, process it as a retry due to a previous failure; otherwise, it's a duplicate so ignore.
The Kafka partition + offset is what we refer to as the "owner id" in our system -- it's basically which message in Kafka owns the transaction id that's being processed. This allows ETL jobs to get retried multiple times if failures are encountered.
Scalability: Similar to Memcached, Redis can be scaled by sharding your keys across multiple servers. Since both are restrained by available memory on a single box, the only way to scale out to support more keys is by adding more servers.
Availability: Elasticache (and Redis) support high availability through replication and automatic failover. Amazon's Elasticache exposes a DNS endpoint and will automatically fail over to the backup if the primary Redis server fails. This is a bit cleaner than Memcached where we must write to multiple servers from the client in order to achieve high availability. This adds a bit more complexity than what we would prefer on the client.
Persistence: Since Redis can store the data both in memory and on disk, process restarts avoid data loss. On the other hand, Memcached loses all of the data on restart. We would face the same limitation if we store the data in-process in Spark.
Fast / Cheap / Efficient storage - This is largely dependent on how the client uses the technology regardless of whether it's Redis, Memcached, or something else. However, the Redis API provides more flexibility around how commands are executed, which leads to much better performance. Details of Redis optimizations are reviewed later in this post.
Other solutions, such as a memory-mapped hashmap in the Spark ETL process or use of our Postgresql cluster, were considered. However, each fell short in different areas ranging from difficulty in scaling / providing high availability to ensuring efficient performance and data storage.
All things considered, Redis proved to be the best fit for our needs.
Optimizing storage efficiency
As mentioned earlier, we wanted a solution that allowed us to detect duplicate messages that were 12-24 hours apart. With 2 million messages / minute, memory quickly becomes a valuable resource. As a result, it was important for us to store data in the most effective manner in order to make best use of the available memory in Redis.
In the case of Tapjoy's analytics pipeline, message transaction ids are UUIDs and are mapped to the Kafka partition + offset when it was first seen (e.g. partition 3, offset 81418110694).
In a naive implementation, the UUID keys are 36 bytes (32 characters with 4 dashes) and the values that represent the Kafka owner id are a comma-delimited string like "1,64942845052" (13 bytes). With an approximate 64 byte overhead per key in Redis, this would leave the amount of memory required for 24 hours of data to be:
(overhead + key_bytes + value_bytes) * keys_per_minute * 60 * 24 (64 + 36 + 13) * 2,000,000 * 60 * 24 = 300GB
That's going to cost a lot to have that memory, so let's see if we can do better...
Key / Value efficiency
To get our first improvement in memory efficiency, consider the best way to represent the keys / values being stored.
Under the hood, UUIDs are actually made from 16 bytes of information. If we can take those 16 bytes and convert them to their binary representation, this would result in more than a 50% reduction in memory usage of just the key data. This would mean that instead of representing a UUID as "ce059644-18a0-4f27-bc2b-c2a2d4d4e7bf", we could represent it as "\xbf@\xd4\x91V&IG\x9f5\x9a\xf9\x16K\x9b\xc8".
This translation can be done like so:
val uuid = java.util.UUID.fromString("ce059644-18a0-4f27-bc2b-c2a2d4d4e7bf") val hi = uuid.getMostSignificantBits val lo = uuid.getLeastSignificantBits ByteBuffer.allocate(16).putLong(hi).putLong(lo).array // => Array(-50, 5, -106, 68, 24, -96, 79, 39, -68, 43, -62, -94, -44, -44, -25, -65)
Using this as a reference, we can use a similar algorithm to represent the owner id. We know the owner id is a combination of the partition id and the offset id. As a result, the question becomes: what's the smallest number of bytes we need to represent both of those values?
Partitions are represented in Kafka as integers (4 bytes), but our maximum partition number is only 6. If we assume that we'll never go above 32,767 partitions for a single Kafka topic, then we can represent the partition in 2 bytes (2^15 - 1).
Offsets are represented in Kafka as longs (8 bytes), but the only requirement that we have is that we don't see the same offset value within a partition over a 24 hour period. The reason we have this requirement is that we only intend on storing up to 24 hours worth of message ids in Redis. Therefore, if we end up effectively reusing offset values, it's okay as long as it happens outside of that 24 hour window.
Given the above, if we target 6 bytes to represent the Kafka offset, this means we'll rollover every (2^47 - 1) messages (281,474,976,710,656 messages). This is way beyond what we normally see in a 24 hour period (we see about 2B messages).
This means our owner id effectively gets calculated like so:
ByteBuffer.allocate(2).putShort(partition.toShort).array ++ ByteBuffer.allocate(8).putLong(offset).array.slice(2, 8)
The end result is that instead of requiring 13 bytes to represent Partition 1 / Offset 64942845052, it only requires 8 bytes.
Given all of this, the new calculation is:
(per_key_overhead + key_bytes + value_bytes) * keys_per_minute * 1440 (64 + 16 + 8) * 2,000,000 * 60 * 24 = 230GB
Great, we've saved ourselves 70GB -- but the amount of memory being stored is still pretty high! Let's see if we can do any better...
Reduce number of keys
At this point, the only thing we could potentially do to reduce the amount of memory required is reduce the number of keys that we're actually storing. At first blush, this seems impossible -- if we need to store 2B unique message ids in Redis, then how can you reduce the number of keys? This is where Redis's Hash operations come into play (e.g.
Our original inspiration for using Redis's Hash data type to reduce the total number of 1st-level Redis keys being stored came from a blog post by the Instagram Engineering team (previously posted here). Under the hood, Redis will try to use a ziplist data structure to store the keys/values for Hashes. This structure is significantly more memory efficient than using normal Redis keys if you don't have different TTL requirements for each key.
To utilize Redis Hash keys, we effectively need to create buckets of message ids. In our case, we can bucket messages using a timestamp within the message. We know through our own testing, and Instagram's own results, that we see diminishing returns with buckets containing more than 1,000 keys. We also don't want to exceed the hash-max-zipmap-entries value since the data would no longer be stored as a ziplist. In evaluating memory usage, we find that storing between 100 and 1,000 keys per bucket is ideal. This gives us room to grow without having too much of an impact on memory consumption.
If we target 100 keys per bucket, the next piece is to figure out how many buckets we're going to need. That calculation is:
buckets = keys_per_minute / keys_per_bucket buckets_per_min = 2,000,000 / 100 = 20,000 buckets_per_sec = 20,000 / 60 = 333
We can now run our new calculation for how much memory will be consumed:
((per_key_overhead + key_bytes + value_bytes) * keys_per_minute) * 1440 + ((per_bucket_overhead + bucket_key_bytes) * buckets_per_minute * 1440 ((1 + 16 + 8) * 2,000,000) * 60 * 24 + ((64 + 14) * 20,000 * 60 * 24 = (72,000,000,000 + 2,246,400,000) = 69GB
In the above formula:
per_bucket_overheadis the same as the
per_key_overheadused in previously calculations
per_key_overheadis now the number of additional bytes Redis's Hash data type requires to store the key
- The bucket key is assumed to be 14 bytes (combination of message timestamp + 3-digit bucket slice for sharding purposes)
With this change, we get our biggest savings -- we're now at 30% of previous usage!
Alright, now that we've got our technology and we know how we're storing the data, the next piece is to figure out how we do it fast.
Recall we're processing 2M messages per minute in our Spark cluster. Relative to everything else that happens in our ETL every minute, we have to make sure the dedupe process doesn't add too much time to our jobs. If deduping was too expensive, our ETL could start falling behind.
For our system, we wanted to target a total time of 1s to be spent generating requests for Redis, storing the data, and removing the dupes every minute.
Despite the fact that we're using this bucketed approach above, we still need to write 2M keys to Redis. The naive implementation here is to send 2 million
HGET commands to Redis (see HSETNX). For example:
message_ids.each do |message_id, owner_id| client.HSETNX(message_id, owner_id) # claim owner client.HGET(message_id) # determine winner end
As you might imagine, this is pretty slow. This requires 4M separate commands to be processed by Redis and 4M responses to be processed by the client. Even if you were to shard the buckets across multiple Redis servers and parallelize commands, the performance is unacceptable.
For 2M messages sharded across 3 Redis instances, it takes about 26s. Way beyond what we're trying to target.
Redis offers pipelining that allows commands to continue to get processed by Redis even while the client is reading the results. As documented by Redis, this can have a pretty good impact on your performance. For example:
client.pipeline do |pipeline| message_ids.each do |message_id, owner_id| pipeline.HSETNX(message_id, owner_id) # claim owner pipeline.HGET(message_id) # determine winner end end
If we take the same approach as above, but with the commands sent via a pipeline instead, it takes about 3s to process that same 2M messages. Pretty good! -- about an 8x performance improvement, though still above our target.
Redis supports Lua scripting that allows us to run logic as close to the data as possible (within the Redis process itself!) and control what actually gets returned to the client. The reasons Lua scripting might offer us a benefit here are:
By being close to the data, we can reduce both the number of round trips with the client and the number of reads in Redis. By calling
HSETNXmultiple times on the same bucket, Redis is forced to read through and parse the contents of the Hash key multiple times. If we can contain that to a single command, we can reduce processing time.
By controlling the results, we can reduce the amount of data sent back to the client by only returning message ids that were detected as dupes. We're currently transmitting all message ids / owner ids back to the client. This puts a lot of strain on the client to process. In the common case, we don't really get any dupes -- so there's really no need to have all of that data come back to the client.
The dedupe lua script we landed on ended up looking like so:
HMGET(bucket, message_ids) message_ids.each do if existing owner matches or not set not dupe else dupe end end HMSET(bucket, new_message_ids_and_values) return dupes
By switching to a Lua script, we bring the time required down to 1.4s -- about as close as we're going to get to our target!
Update: Expanded on this to explain our need for sharding
So far we've built a solution that addresses our needs today -- great! At this point, the question is: how do we architect this solution to allow us to scale to 10x the traffic we're handling today? Instead of 2 million messages / minute, how could we meet our memory and performance requirements while processing 20 million messages / minute? This is where sharding comes into play.
First, let's talk about memory requirements. If we assume that 10x the amount of traffic is roughly equivalent to 10x memory usage over a 24 hour period, we're suddenly talking about storing 700GB worth of data in Redis. Since we use AWS, the largest possible instance available in Elasticache only has 237GB. This means we need a way to be able to store message ids across multiple Redis instances -- and always have the same message ids stored on the same Redis instance.
At the same time, we don't necessarily want to scale up 237GB at a time. Instead, we want to add capacity as we need it -- which means using smaller servers. This helps us keep costs down as much as we can. In a sharding solution, by simply adding new Redis clusters to increase memory capacity, we've effectively created limitless capacity to store messages.
As we scale, there are going to be some concerns around performance. If we scale out to 10x traffic, we expect the processing time to dedupe all of those messages to be 10x as well (14 seconds instead of 1.4 seconds!). To meet our real-time needs, this is unacceptable.
The naive solution is to simply parallelize requests to our Redis cluster. However, there's a big gotcha here: Redis is single threaded. This means, at any time, Redis is only able to process a single request at a time. To truly parallelize requests to Redis and gain the performance benefits of doing so, we need to run requests across multiple Redis clusters.
So what does a sharding solution like this actually look like? Let's say, for example, we're going to use 3 Redis servers and 300 bucket keys per second. Based on the contents of a message, we're going to have to choose a Redis instance / bucket shard to store the message id. To choose a shard, you typically use a hashing function. For example:
If the number of shards changes, then the hashing function is going to start storing messages in different shards than we were before.
In order to account for this scenario, we effectively have the concept of a "previous" Redis configuration and a "current" Redis configuration. "Previous" configurations apply to all messages with a timestamp prior to some maximum time. Any messages with transaction times after that maximum time will start using the "current" (new) configuration. This allows us to effectively transition from one cluster to another or from one hashing algorithm to another without losing any of the data we've stored from previous hours.
Deduping at scale is a hard problem and there are certainly other effective solutions employed elsewhere in the industry. In fact, even the folks at Kafka are working on a standard solution. Typically you hope that either your actions are idempotent or accuracy isn't so critical that a complex system like this is necessary. In our case, this was important to ensuring the integrity of our analytics pipeline.
Hopefully, this post serves as a helpful example of tackling a thorny engineering problem ("Oh no, I have a huge pipeline of data flying around and I can't keep track of whether I've seen this before!") in a deliberate way.