Hello and welcome to our new Tech and Knowledge Center! For one of our first discussions, we want to talk a bit about High Performance Reliable Queuing. There’s a lot to talk about here, so I will break this discussion up into two posts. This first one will deal with making it work and in our next one, we’ll get into making it fast. We are very much looking forward to keeping these posts engaging and relevant, but also hearing what you have to say and what questions we can answer. So with that...let’s dig in…. A queuing system, in its simplest form, is made up of three parts: the publisher, the consumer and the message bus.
If you've ever tried to learn about a message bus, you've probably heard about stock ticker applications. A stock ticker application is the “hello world” of message busses, where discrete events are emitted by a publisher and received by many consumers. There aren’t that many updates to the stock information, maybe a few a second, and its not that critical if some of the consumers don't get some of the events.
But what if it is bad if a published event is never consumed? What if each message is its own precious little snowflake that is special and unique and can't be lost or delivered more than once? And what if there are a lot of snowflakes?
Simple Queuing Use Case
At Tapjoy we generate a lot of events from our API tier. For every ad clicked and every app started, we generate events which are recorded and analyzed by people who love that sort of thing. This data is the flowing blood of our business. Every event is a very important snowflake.
The theory here is very simple. All of the API servers generate these events and send them to a magical, infinitely scalable message bus that never loses them. On the other side we have a number of consumer processes that grab these events from the bus and, in the technical parlance, do some stuff with it. Multiple groups of consumers can subscribe to the event stream.
In our hello world example, most messages busses scale pretty well. Get a message, send it to some places if they are listening. However, when we add two fairly simple requirements, the performance profile of the queuing system as a whole change drastically:
- Events are not disposable. We need them all.
- Events are unique and can't be processed more than once.
The key concept to remember is that these are not necessarily requirements of the message bus but of the queuing system, which includes the publishers and consumers.
Gluing it all together
In order to satisfy requirement #1, we need to handle failure at all levels of the queuing system.
- problem with producers
- problem with message bus
- problem with consumers
Handoffs - Message acking
The first step in making sure a message makes it all the way through our queuing system is to have each layer confirm receipt of a message. With RabbitMQ this can be done with publisher message acknowledgements. With message acking each part of the queuing system knows where the boundaries of its responsibility are where responsibility means storing the message in a persistent way that is recoverable in the case of crashes on either side of the transaction.
The publisher owns the message until it receives an ack from the bus after publishing a message. The bus is on the hook from when it sends the publisher an ack until it receives an ack from the consumer, and the consumer owns it after it acks the message.
It is important to note that while these boundaries are clear, there are several times in the flow from publisher to consumer where two parts of our system have responsibility for a message simultaneously. For example, the publisher owns the message "until it receives an ack" whereas the bus owns the message once "it sends the publisher an ack". The flight time of the ack itself is when both the bus and publisher own the message.
There is a similar window on the handoff to the consumer. It is in these windows where failures can cause subsequent duplicate message delivery. We will deal with those later.
Producer reliability - Local Shoveling
As mentioned earlier, the publishers are on the hook for the messages they send until they receive acks from the message bus. That means publishers have to store the message on disk during that time and can only delete it after getting an ack for that message. This is sometimes referred to as shoveling. Our implementation of this writes all messages to be sent to a file on the publisher. This file is packaged into a message (or messages) and sent to the bus, based on either a time interval or a message limit. Once the ack is received the file is removed.
All of this requires fsyncing to the filesystem to be sure the files are there in case of an OS crash. There are several publisher processes on each box which will send any message file they find so individual process crashes are handled by work stealing. Shutdown hooks ensure that any files left over are processed and sent when a machine shuts down.
Message Bus reliability - Durability and Persistence
As part of the AMQP spec, RabbitMQ has an option for both exchanges and queues called durability, which indicates that the configuration of an exchange or queue should be saved and recreated in the event of a broker restart or crash. However, durability only applies to the configuration of queues and exchanges, and not to the messages themselves. In order to have the broker save the message contents, each message must be sent with persistence enabled. This will indicate to the broker that the message must be persisted to disk before it is acked. The bus is responsible for any acknowledged persistent message in the case of a crash.
Consumer Reliability - just write it down
The consumer side of message responsibility is less interesting. Before acking any given message, the consumers write them to a file, send them to s3, tell their best friend or whatever the consumer wants to do to make sure the message is safe. Once the consumer acks the message, its part in the story is over.
As mentioned above, there are times in the message flow where more than one part of the system has responsibility for the message. This can lead to duplicate messages being sent after intermittent failures to ensure at least one delivery. Due to the distributed, and somewhat chaotic nature of the event publishers, as well as the distributed and concurrent nature of the consumers, there is no single place through which all messages pass to be checked for duplicates. The bus gets all of the messages but it does not have the business logic to determine what is a duplicate and what are two messages that look very similar. To effectively solve this, we need cooperation on both sides of the message exchange.
Before each message is sent, the publisher performs a checksum on the message body. The goal of this is not rock solid security grade hashing, but just a very high probability of unique checksums. And it also has to be fast as it will be done for each message. For this we'll use the ubiquitous and relatively fast md5sum. The md5sum of the message body is calculated immediately before publication and added as a header to the message in a field called 'messageId'. Yay, clever naming.
We mentioned that the consumer's reliability story wasn't particularly interesting. Well here's something a little more fun. When each message is received, and before it is acked, the consumer checks the header for the 'messageId' field. Simply put, if the consumer has seen this messageId before it knows to a reasonable degree of confidence that it is a duplicate delivery and it can ack the message without processing it.
Not simply put, how does it know? Where does it put the messageIds to keep track of them? How long does it have to keep old ones before its sure it won't get redelivered?
In the end we decided to put them in memcached. We found it was a good tradeoff of simplicity, speed and data safety. For our use case, duplicate messages are fairly rare and their impact is not mission critical. They aren’t great but they’re not “wake up your devops guys at 2am” bad. So memcached is a good compromise for this.
Consumers take each message header and try to add it to memcached. It’s stored with a useless value, since we really only want the key itself and the fast lookup. If the key already exists, the client throws an exception and we have a duplicate. If not, we've stored the key for someone else to find it later.
The memcached server is sized for message throughput and md5sum size to store headers for a certain amount of time. We use ~1 day. This is probably vast overkill since it’s unlikely for messages to be sitting around for that long and then suddenly get sent again after a failure, but as a wise man that wasn't me once said, "the nice thing about overkill is you're sure its dead."
So there we have it. It works, but it is slow. In our next post we’ll figure out where and how to optimize this to make it a little more fleet of foot. See you then…