Eric Day

Thoughts, code, and other oddments.
Dark | Light

< || >

Gearman Persistent Queues & Replication

January 14th, 2009

Now that the core of Gearman is rewritten in C, I’ve started working on a new module to provide persistent queues and queue replication. The current Gearman implementation is fine when the jobs can be unreliable, but there are many cases when we want to guarantee that jobs get run.

Because Gearman is such a flexible system, there are a couple of places where we can plug in a persistent queue and/or replication. Now, in most cases, these features are only going to be useful for background jobs since failed foreground jobs can be detected and restarted by a client. Many applications using Gearman want to just throw a job in the queue and go back to doing something else, not caring when it gets done. Enter persistent queues. It may also be the case that a job should be run multiple times, mot likely on a separate set of servers for backup and geographic redundancy. I’m currently working on the worker-wrapper method (see below) since it suits the application I’m working on (scratching my own itch), but eventually I’ll get around to the others as well. If you have the resources to work on one of the others, find me on IRC (as eday) and I’d be more than happy to help out where I can.

Application Specific Queues/Replication

Some applications may be so specific that they require a custom persistent queue and/or replication system. I’m trying to write the worker-wrapper module generic enough to fit most applications, but sometimes there is only so much abstraction you can do. If you really need to write your own please post what you have done as a use case on the wiki!

Queues inside gearmand

This was my first thought, but as I ran the various cases through my head it turned out not to be optimal. For small background workloads this makes sense, but if the workloads are large and they are written in both the gearmand location and the worker location, you start wasting I/O resources. Also, you usually only have a couple gearmand instances running, and if you have large jobs or a huge number of jobs, disk I/O could be a limiting factor. Keeping the persistent queues outside of gearmand means you don’t need any real disk on those machines as well, just a fast network connection.

Replication at this level gets a bit tricky as well, since you may want the job run only once, and the worker has no way to track this unless some unique key is inserted into the application workload. To do this properly, protocol extensions would most likely need to be introduced so multiple gearmand instances can sync up. This led me down the path to my next idea, a worker-wrapper.

Worker-Wrapper

This solution made the most sense to me, at least for the application I’m working on. I’m going to have many jobs, each ranging from a few kilobytes to multiple megabytes. Disk I/O is an issue, so ideally the workload will only be written once (it is also being stored permanently on the worker). This means a worker level queue that can share it’s store with the normal worker code would work out best. Here is a diagram below of how jobs will flow.

Gearman Worker-Wrapper Queue & Replication

The blue items show what a normal Gearman worker already does, and the persistent queue items are added in yellow. The first thing for persistent queue items is to add a unique ID to each job as it comes in. It then adds the job to a persistent queue, which is a user supplied callback (this way you can use Drizzle, MySQL, Tokyo Cabinet, BDB, …). All the callback needs to do is ensure that the key/value pair has been committed to disk. Next, if we are only looking at persistent queue, we run the normal worker function and then call another callback to mark the job as complete. The worker can reply to the calling client as queued after the “queue_add()” call, at which point the client can go about it’s business knowing that the job is on disk and will get run (unless the worker machine fails hard and all data is lost).

Now for the replication steps shown in red. If a job is to be replicated, it is sent to at least on other worker in it’s replication group (a set of workers that queue the same set of jobs). This ensures that if the original worker machine dies hard before the job is run, another worker can continue it’s distribution. This means that the client queue acknowledgment is moved beyond that step as well, so the client knows it was persistently written to at least two workers. The replication workers talk to each other using the normal Gearman client/worker interface, where a replication worker is setup specially to take jobs with the ID already assigned to it. In the diagram above, the new replication workers register themselves as “rep_1″, “rep_2″, and so on.

Once the job has been replicated at least once and the client queue reply has been sent, it then continues down the line, running the job, marking the job as run, and then (possibly in parallel) flushing the job to other replication workers. Other workers who get the job through the replication worker function will also queue and run the job as shown, this means you’ll have a live backup if the original node ever dies. Depending on queue management, you can also spin up new replication workers by replaying the queue (possibly from a snapshot) to handle more load.

In the event of a worker process crash, a worker flush process will be run to scan for jobs that still need to be replicated and run. The diagram for a worker flush will look much like the one above, but without the registered Gearman worker functions (just job running and replication).

By keeping the key/value queue storage as a callback, you can combine the queue store with your regular worker data store. For example, if the job contains an e-mail, the queue can store the full job and workload, and then the worker function can mark the blob as permanent (so “queue_complete()” doesn’t remove it). Further queues for that blob can then access the important bits through offsets. This means you only need to write large job workloads once. One other thing to mention is that this type of module can be used as queue only, or replication only (in the latter case, it’s basically a job multi-cast).

This is my current state of mind and what I’m working on. Any further comments, ideas, or suggestions would be very much appreciated! I’ll post further details here and on the Gearman wiki once I have something working (should be in the next couple weeks).

Posted in Drizzle, Gearman, Main, MySQL

One Response to "Gearman Persistent Queues & Replication"

  1. Wow, this looks great! Have you managed to do any more work on it? I am looking into queue replication for an application for a client of mine that is using ActiveMQ.

    I have designed the app so that one can choose between ActiveMQ and MQSeries (Websphere). ActiveMQ offers transparent failover but without queue replication. To get that it is assumed you will put the queue store on a shared filesystem (NFS, CIFS etc). IBM do have a Q replication facility but it is an add-on that is quite expensive. My current client doesn’t want to use either facility so we are being encouraged to come up with an application-centric solution.

    I prefer a generic solution, even if it is home-grown but I quickly came to realise that it is a tough problem. I have been researching how people tackle this problem and came across your site. Very interesting. Thanks for making it available.

Leave a Reply


< || >
Blog
Wiki
About
Resume
RSS
Comments

E-Mail
Launchpad
LinkedIn
Twitter
identi.ca
Facebook

OpenStack
Scale Stack
Gearman
NW Veg
Veg Food & Fit

Linux On Laptops