What is the most efficient way to guarantee non-repeatable work and minimize un-acked delays using AMQP/RabbitMQ?

StackOverflow https://stackoverflow.com/questions/20976356

Question

  • Context:

I have a handwritten job server (think a very limited equivalent of something like Gearman) that uses AMQP and RabbitMQ as its messaging broker. I use Rabbit 3 and can upgrade as necessary. I run many instances of this server, most of which consume from the same work queue.

There is a large class of work that the server needs to do that absolutely can't be repeated. There's also an idempotent (i.e. messages can be re-run if need be) mode of execution that acks only after successful execution, but there are no issues with that part.

In non-repeatable mode message processing on a client looks like this (assuming that it's consuming the appropriate queue):

  1. Receive a messsage.
  2. Unpack/parse/verify the message contents.
  3. Ack the message.
  4. Do the operation requested by the message.
  5. GOTO step 1.

The requirement that work be non-repeatable is so important that the inherent risks of the above scenario (messages get dropped on the floor if a job server is killed between steps 3 and 4) are acceptable.

  • Problem:

Sometimes, step 4 takes a very long time. Since the AMQP client has already acked its only message (assuming a qos of 1), RabbitMQ pushes it another if one exists, and that message then stays in unacked status until the job server finishes execution of its current message. This is very bad, because the work being done is very high-priority and should be done ASAP, and there are almost always available consumers who are not in the middle of doing work that can perform requested operations immediately.

Hence, my question:

While remaining a consumer on a queue, how do I guarantee that a message will only be processed once, without consuming/unacking any other messages during the processing phase?

  • What I've tried:

1. The first thing Itried was setting the QoS (sometimes called prefetch size, but that's a misnomer since there's no 'fetching' involved, just server pushes) to 1 or 0. Setting it to 1 reduces the severity of this problem, but doesn't solve it, since one message can still be made to wait for a previous message's processing to finish. Setting QoS to 0 unlimits the local buffer size, which makes the problem much worse.

2. I played with breaking up work queues to be separate per job server instance, but that damages scalability, since I have to add objects to the broker every time I need a new instance. I need to be able to increase the amount of work that could be done in parallel just by starting new instances of the server, no messaging topology changes required.

3. The naive solution of writing a record to some central state every time a message starts running (i.e. reimplementing message locking) is far too slow.

4. I tried cancelling the consume immediately after acking the message, nacking everything in the local buffer, and re-establishing the consume for the next message. This was very slow on the clients, generated a whole lot more network traffic, and (worst of all), made the Rabbit server experience very high load and progressively slower performance. The management API/UI also became sluggish and difficult to use, my federation exchanges began hanging, and HA started falling out of sync.

5. I thought about a complicated topological solution: First, I'd ensure that all messages had a routing key of a unique identifier (I could use headers exchanges and the message_id AMQP field later, but this is a simpler implementation). All messages would be published to a topic exchange xA, whose alternate exchange is xB. xB would be a direct exchange, and bind directly to qC, the work queue. I'd also create qD, which would only contain work that failed to process or was rejected due to a crash on a job server instance. Then, I could start a specialized daemon/constantly-running program somewhere that loops through the entire contents of qC, acks them all, and manually re-publishes each one to xA. All messages enter the system through xA anyway, and in the default state they end up in qC regardless of their routing keys.

The message lifecycle on each client would then look like:

  1. Receive a messsage.
  2. Unpack/parse/verify the message contents.
  3. If a message has the redelivered bit set:
    • Ack the message.
    • Republish it as-is to xA.
    • Wait for confirmation.
    • Remove binding from xA to qD with the routing key of the message received (a UID).
  4. Else:
    • Add binding from xA to qD with the routing key of the message received (a UID).
    • Do the operation requested by the message.
    • Ack message.
    • GOTO 1.

That way, if a message failed during its "do work" phase and was implicitly rejected with requeue=1, the redelivered bit would be set. The next job server instance to receive that redelivered message would ack it and republish it to xA, which would route it to qD. After the publish was confirmed (the message landed in qD), the binding would be eliminated.

That solution would solve the problem better than any of the other theories I've had. However, I think that the huge number of bindings that could potentially be created (and the add/remove binding requests/second rate) would very probably severely impact Rabbit's performance. This discussion thread seems to corroborate that theory. Ideally, I'd like a solution that doesn't send more network traffic to and from the rabbit server than it absolutely needs to, and doesn't create more persistent objects (temp messages, queues, bindings, etc) on the server than it absolutely needs to.

Was it helpful?

Solution

This answer adheres to any AMQP implementation but I have only experience with RabbitMQ myself. There are two possible ways of doing what you want. They both have their advantages and disadvantages so here they are in no particular order:

basic.get in an interval

Maybe you do not want to continue consuming until you are done with the last message. AMQP have a basic.get function where you consume only one message. The downside to this solution is that basic.get is non-blocking and returns a get-empty message on an empty queue, in which case you will need to wait a little bit before trying again. This may or may not be acceptable to you. Going this route I would do this:

  1. basic.get
  2. If get-empty, sleep x milliseconds and then go to 1
  3. Ack message
  4. Do work
  5. Go to 1

Notice that you only sleep when the queue is empty. As long as there is work to do nobody will sleep.

Check the redelivered flag

The broker will set the redelivered flag when a message was previously delivered but not acknowledged. This means that if your worker did get the message, but you did not Ack it and your worker then died, the message will go back to the queue but with redelivered set to a non-zero value. Since this message may be partly handled by a crashed worker you do not want to act upon it.

basic.consume with a QoS of 1.

  1. Consume a message
  2. If redelivered=0, go to 5
  3. Reject the message with requeue=false
  4. Go to 1
  5. Do work
  6. Ack message
  7. Go to 1

A bonus with this solution is that you may configure your broker to save rejected messages to a dead-letter exchange so they don't get lost. If you do not use a dead-letter exchange you may opt for just Acking the message in step 3, but reject has a clearer semantic meaning.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top