BigRubyConf was a great opportunity to meet and talk with some really great developers. The slides for my talk on high-scale job processing are available online via speakerdeck.Read More
At Tapjoy we're constantly evaluating our technical infrastructure to ensure we're using the right tools in the right places. We've spent a lot of time over the past year working on our message queuing infrastructure. As part of using the right tool for the job, we use a variety of queue systems for different purposes. We recently did a dive into our job processing infrastructure to determine the best way to improve overall stability and reliability without interrupting existing operations. Given that the existing infrastructure was processing nearly a million unique jobs an hour, this was not a trivial problem. Our evaluation looked at a few key areas: Durability, Availability and Performance.
When we talk about Durability as it relates to queuing, what we really want to know is how likely it is that once a message has been sent to the queue that it will be successfully delivered. This means that the queue needs to be as tolerant as possible to failure scenarios such as a process dying, a disk being full, an entire server disappearing, or a significant network partition or failure.
Availability in this case is how likely we'll be able to successfully write a message to the queue. The ideal failure scenario for a queuing system is one in which it fails to allow reads, but continues successfully writing new data to the queue in a durable fashion. Obviously, this is not possible at all times, so just as important is being clear when it's not able to safely write data. The worst failure case is one in which it silently fails to write data.
Finally, let's discuss Performance. There are three things to measure when looking at queue performance: write throughput, read throughput, and time spent in queue. All three are fairly simple to test, but each requires fairly specific knowledge of the particular queue systems you're working with to determine the right way to get these metrics.
Breaking down our job queue use cases, we found two distinct scenarios. In the first, it is critically important that we're eventually able to deliver all messages. In the second, it is more important that we're able to quickly process most messages. So, we have a need for a less latency sensitive, loss intolerant system and also a very latency sensitive, loss tolerant system (although the amount of loss we will tolerate is exceedingly small). Using our previous terms, we need a highly Durable, highly Available system and a high Performance, highly Available system.
With those parameters in place we set out to look at a variety of queue systems to see what fit our two use cases the best. We quickly narrowed down our choices to two systems, Amazon's SQS and RabbitMQ based on our existing infrastructure and in house expertise. You can read more about our use of RabbitMQ as a message bus in one of our previous posts. So let's talk more about SQS as it relates to job processing.
We chose SQS for our first use case, where we were willing to trade a bit of Performance (specifically latency/time in queue) for high Durability and high Availability. SQS, for those who aren't familiar, is Amazon Web Service's message queue system. It provides a simple HTTP API for both publishing, and consuming messages, which hides the complexities of managing a widely distributed, highly durable job queue system.
What makes a job queue?
Without going too far off the rails here, let's talk a bit about job queues. Job queues are a subset of the use cases for generalized Message queue systems. In a job queue, we don't care (as much) about things like fanout (multiple delivery) or strict message ordering. However, we do care a lot about things like message durability, retry, timeouts, and locking. Let's look at how those concepts map to SQS.
We want to ensure that once a message has been fetched from the queue, that it won't be fetched again until it has been completed (or processing reached an error condition). SQS handles this with a concept called Visibility. When you create a queue, you specify the Visibility of messages that go into that queue. As a message is fetched from the queue it is marked as "Not Visible", and is ineligible to be fetched again. Once a timeout has been reached internally, SQS will automatically make that message "Visible" again and eligible to be reprocessed. This sounds like the exact behavior we want here! But wait, there's a catch…
Despite the Visibility concept, SQS only guarantees "at least once" not "only once" delivery. In practice, you may receive the same message multiple times if you're fetching from the queue from multiple connections nearly simultaneously. In a significantly scaled job processing system, this is almost guaranteed to occur. The good news is that this is a relatively easy problem to solve with memcached. In short, we look up a message by ID in memcached. If it’s not found, we add it and begin processing. Thanks to memcached this is an atomic operation. This process ensures that we get incredibly close to "only once" delivery of our jobs.
Message durability is just baked into SQS. You don't have to specify on a per-message basis that messages should persist, or what strategy to use for that persistence. There is a hard cap that messages can only live in a queue for 14 days before they will be expunged, but we're building a job queue! We want to churn through these things as quickly as we can; however, we want to tolerate some general delays as well. A limit of 14 days has been more than sufficient for our usage thus far.
Retrying messages is an incredibly common feature in a job system. SQS handles the simple case automatically and provides some information for you to handle more advanced things on your own. By default, any message that hits its visibility timeout for any reason is put back into the queue for reprocessing. All messages are automatically retried by default. If you want to do more advanced things like exponential back-off or maximum retries, you'll need to do a bit of work. To facilitate this, SQS has a concept of per-message "Receives". Every message has an ApproximateReceiveCount, which is the number of times that message has been fetched from the queue. By default this data is not sent down with the message; however, when fetching a message, you can request additional parameters by name. Using the receive count, you can then implement more advanced error recovery scenarios in your processing code.
The official Ruby AWS SDK provides a wrapper around SQS itself, but it doesn't really provide any of the higher level "job processing" code we need to be able to use this as a full-fledged job processing system. After some research, we couldn't identify any existing Ruby based solutions that interfaced with SQS and fit into our overall infrastructure in a way we were comfortable with. Given that we have a lot of in-house experience with this problem space, we went ahead and rolled our own job processor, which we'll hopefully be releasing to the public soon.
Over the last year, we've found SQS to be well within our expectations of availability. There have been a few brief partial outages largely affecting the ability to fetch messages from the queue. There have been two instances of being unable to write to a particular queue (both of which were temporary and isolated to a specific queue). In the failed write scenarios, the service provided adequate information back to the client so that we were able to handle the failures at the client, and not lose messages.
Remember earlier when I mentioned measuring the time a message spends in the queue? Let's get back to that.
Time spent to get a single message out of a particular queue is variable in SQS. To keep up the level of durability that SQS provides, it distributes a message across many servers. Each time you attempt to fetch messages from a queue, you only hit a subset of the servers that the queue is distributed across. The likelihood that you get no messages back for a particular request increases as the total number of messages in the queue decreases. It may take several requests to get a message when a queue is nearly empty. In an attempt to reduce the impact of this, the SQS team implemented long-polling. Long-polling lets you poll many servers on each request for messages, thereby reducing the number of empty requests. The tradeoff is that each request is much longer than the non-long-polling requests. In practice, we found issues when combining this setting with fetching batches of messages, and have not yet gone back to investigate how to balance these two options.
In theory we would be able to push more messages (better write and read volume) through our RabbitMQ infrastructure than we can with SQS for the same number of clients and consumers. But, we would be trading durability and availability for performance. Coupled with the significantly lower operational costs of SQS, we've found this to be a definite win. At some point in the future that equation could turn over; however, we don't anticipate that being anytime soon.
Wow, we left out something really important. Regardless of what solution we use, we need to have solid monitoring and alerting around the state of our job system. Since we’re in AWS that means Cloudwatch. Cloudwatch is a solid tool for providing high-level information about the health of the system. It also allows you to configure simple alerts based on thresholds. We mainly use this to watch for sudden queue growth, which is often a symptom of a bigger problem elsewhere in the system we should be looking into. Cloudwatch gives us a solid base-line, but it's not without a few warts.
There has been a secondary effect on our system based on our use of Cloudwatch for high-level monitoring. We have adopted a queue-per-job strategy for managing our queues. This is somewhat less than ideal in a few cases. In an ideal job queue system, you can design your queues around priorities or resources. This lets you scale your worker pools based on the performance or availability of particular resources, or based on the number of high-priority jobs going into the system. You then separately monitor the number of each type of job going into, and coming out of the system. This lets you inspect the volume of particular types of jobs, while still scaling out the system based on performance or resource availability. But using just Cloudwatch for monitoring, you're unable to know anything at a level more granular than "queue". We're actively working on adapting our internal monitoring tools to do the job type tracking, which will allow us to begin to collapse our queues down and still be able to easily inspect the system.
In conclusion, we use SQS to process a staggering amount of jobs everyday. It is one part of our infrastructure that we now spend very little time worrying about as it mostly “just works”. That isn’t to say it’s perfect, but we’ve found that the tradeoffs Amazon made with SQS work well for our job processing infrastructure. Stay tuned. We’ll continue to share more information, experiences, and code around these problems in the coming months.
A queuing system, in its simplest form, is made up of three parts: the publisher, the consumer and the message bus.Read More