I finally figured out why this was happening. I did not realize that it wasn't enough to simply acknowledge when you were done with a message, but also had to reject any message you could not process using channel.basic_reject
. This may seem obvious, but it is not the default behavior for AMQP.
Basically we have to release the message using basic_reject
with requeue
set to True
. The important factor here is the requeue
keyword which prevents the message from being discarded, and instead queues it up again, so that one of our available workers can process it.
if success:
# On Success - Mark message as processed.
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
# Else - Mark message as rejected and move it back to the queue.
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
I found some really useful information in this article, and there are more technical details on the reject
keyword in this blog post.