Multithreading consumption of an ordered queue

142 views Asked by At

I have a stream of object deltas (that is, JSON objects describing changes to other objects) coming from a third party's messaging queue. I need to apply these to the appropriate objects in a database (translating deltas to state). The deltas are inherently ordered.

As it is, I intend to pipe these deltas into our own RabbitMQ cluster, whence a group of Java servers will pull them and then apply them to the database (the Java is where the database update logic is centralized).

The application of the deltas needs to be multithreaded, but I want to ensure that the deltas for a given object are always applied in order. To truly guarantee this, only one thread can ever process deltas for a given object.

To that end, as I read them off the third party queue and before I place them in RabbitMQ, I figured I could split the deltas into queues by the uuid of the corresponding object. Basically, each delta has an object_uuid field, and I'd modulo that uuid by say, 50, and then use the result as the routing key, so that I would have 50 queues of ordered deltas within RabbitMQ.

At that point, it is simply (heh) a matter of ensuring that I have a single consumer per queue (although I can still have multiple queues per consumer). I thought the 'exclusive' parameter to queue declarations in AMQP might give me the desired behavior, and it kind of does, but unfortunately it comes with the prohibitive side effect that the queue is deleted when the consumer disconnects (this is a fleet of Java servers that come up and down with every release -- queues must persist between releases).

This can't be an uncommon dilemma, but I don't see anything that quite fits the problem. Is there no construct in RabbitMQ or AMQP that I can use to my advantage here? Is there a way I can rethink this problem that avoids the issue? Or do I need to look at distributed locking solutions?

1

There are 1 answers

0
kha On

This is what I understand from your question: you basically have N objects with M* states each. You want each of these N objects to run on different processes/threads but for the M* states that belong to n (from N) to apply in sequence.

Your proposed solution looks good to me. What I would do is this:

For each object, create a separate queue (call it N') which is basically the UUID of your objects as per your post.

You then have a server/distributor which serves three purposes:

  1. Create a persistent queue N'
  2. Assign one of your agent pools randomly to this topic and communicate it with them
  3. Listen to heartbeat messages from these agents every once in a while to make sure they're alive. If they're not, pick another available agent randomly and assign the queue to them instead.

To make this distributor also crash/restart safe, it probably should, on startup, check existing queues and make sure they're assigned to some agent. If not, it should assign them ASAP.

Your agents themselves are responsible for:

  1. Listening to the queue assignment from the distributor
  2. Processing the items (M' states) in their assigned queues
  3. Send heartbeats to the distributor

The job of ensuring each agent is exclusive is with the distributor. It shouldn't be anyone else's responsibility and you can't really safely give this job to a messaging technology or at least I'm not aware of any queue that can handle this but the above solution should hopefully help with your problem somehow.

Another problem that may arise is when to delete a queue. You can do that in the agents themselves when they've processes everything but you need to make sure that they only ever do that in a shared lock with the distributor. What you don't want is to delete a queue you think is empty but the distributor is about to write into.