Domanda

As a simple example, I'm adding 5 items to a new RabbitMQ(v 2.6.1) queue:

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='not.my.real.server.net'))
channel = connection.channel()
channel.queue_declare(queue='dw.neil',durable=True)
# add 5 messages to the queue, the numbers 1-5
for x in range(5):
    message = x+1
    channel.basic_publish(exchange='',routing_key='dw.neil', body=str(message))
    print " [x] Sent '%s'" % message
connection.close()

I purge my queue and then run the above code to add the 5 items:

nkodner@hadoop4 sports_load_v2$ python send_5.py 
 [x] Sent '1'
 [x] Sent '2'
 [x] Sent '3'
 [x] Sent '4'
 [x] Sent '5'

Now, I'm trying to simulate failed processing. Given the following code to consume from the queue. Notice that I have the call to basic_ack commented out:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='not.my.real.server.net'))
channel = connection.channel()
channel.queue_declare(queue='dw.neil',durable=True)
method_frame, header_frame, body=channel.basic_get(queue='dw.neil')
print method_frame, header_frame
print "body: %s" % body
#channel.basic_ack(delivery_tag=method_frame.delivery_tag)
connection.close()

I run the receiving code to grab an item off the queue. As I'd expect, I get item #1:

nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 1

Since the call to channel.basic_ack() is commented out, I would expect the unacknowledged message to be placed on the queue so that the next consumer will get it. I would hope that message #1 is the first message (again) out of the queue, with the Redelivered property set to True. Instead, message #2 is received:

nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 2

And all of the other messages in the queue are received before #1 comes back with Redelivered flag set to True:

...

nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 5

nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=True', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 1

Are there any properties or options I could be setting so that I keep getting #1 delivered until its acknowledged?

My use-case is loading a data warehouse with sequentially-generated files. We're using message-based processing to let my program know some new files are ready and are to be loaded into the DW. We have to process the files in the order that they're generated.

È stato utile?

Soluzione

This has been addressed in RabbitMQ 2.7.0 - we were running 2.6.1.

From the release notes:

New features in this release include:

  • order preserved of messages re-queued for a consumer

Altri suggerimenti

Try using channel.basic_reject - this should push the unacknowledged message back to RabbitMQ which will treat the message as a new message. Also -- if you have a failed message stuck, you can use channel.basic_recover to tell RabbitMQ to redeliver all non-acknowledged messages.

http://www.rabbitmq.com/extensions.html#negative-acknowledgements provides distinguishing information on Basic.Reject vs Basic.Nack.

Message ordering semantics are explained at http://www.rabbitmq.com/semantics.html

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top