Domanda

I'm looking for any help. I want to fix an insert_order_queue() function to be able to resend a message to RabbitMQ if the message wasn't actually got delivered to the server.

This is my current code:

def insert_order_queue(self, msg):
    ''' Insert message into the queue '''
    if msg:
        msg_props = pika.BasicProperties(delivery_mode=conf.rabbit_msg_props_delivery_mode,
                                         content_type=conf.rabbit_msg_props_content_type)
        logger.info('Message : %s' % msg) 
        try:
            self.channel.basic_publish(body=json.dumps(msg),
                                       exchange=conf.rabbit_exchange_name,
                                       properties=msg_props,
                                       routing_key=conf.rabbit_exchange_routing_key) 
        except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPChannelError), error:
            logger.error('AMQP Connection failed. Trying again... %s' % error)
            self._connect()
            return
    else:
        logger.error('Something wrong') 

And this is my _connect() method:

def _connect(self):
    ''' Connecting to the RabbitMQ, and declare queue '''
    logger.info('Trying to connect to RabbitMQ')
    while True:
        try:
            conn_broker = pika.BlockingConnection(
                pika.ConnectionParameters(
                    host=conf.rabbit_server,
                    port=conf.rabbit_port,
                    virtual_host=conf.rabbit_vhost,
                    ssl=conf.rabbit_ssl, # do not set it to True if there is no ssl!
                    heartbeat_interval=conf.rabbit_heartbeat_interval,
                    credentials=pika.PlainCredentials(
                        conf.rabbit_user,
                        conf.rabbit_pass)))
            logger.info('Successfully connected to Rabbit at %s:%s' % (conf.rabbit_server, conf.rabbit_port)) 
            channel = conn_broker.channel()
            # Don't dispatch a new message to a worker until it has processed and acknowledged the previous one
            channel.basic_qos(prefetch_count=conf.rabbit_prefetch_count)
            status = channel.queue_declare(queue=conf.rabbit_queue_name,
                                           durable=conf.rabbit_queue_durable,
                                           exclusive=conf.rabbit_queue_exclusive,
                                           passive=conf.rabbit_queue_passive)
            if status.method.message_count == 0:
                logger.info("Queue empty")
            else:
                logger.info('Queue status: %s' % status)                  
            channel.queue_bind(
                queue=conf.rabbit_queue_name,
                exchange=conf.rabbit_exchange_name,
                routing_key=conf.rabbit_exchange_routing_key)  
            return channel
        except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPChannelError), error:
            time.sleep(3)
            logger.error('Exception while connecting to Rabbit %s' % error)
        else:
            break 
È stato utile?

Soluzione

There are a couple ways a message can not be "delivered"

The most obvious is "the connection to rabbit is closed" in which case you just reconnect and resend (you already have most of that logic in terms of the reconnection, just need to resend the message).

Then there are a couple variations on "no one is listening for this message". These can be handled by the immediate and mandatory flags on basic_publish. See this for more information: http://bunnyamqp.wordpress.com/2009/08/21/amqp-basic-publish-immediate-versus-mandatory/

Finally you can add a confirmation callback. Pika lets you set this callback:

https://github.com/pika/pika/blob/master/pika/channel.py#L387

From within that callback you can decide to send the message again or not.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top