Question

I want to receive a message after a certain amount of time in one of my workers. I decided to go with Node and RabbitMQ after discovering so-called dead letter exchanges.

The message seems to get send to the queue in DeadExchange, but the consumer is never receiving the message after the elapsed time in the WorkQueue in the WorkExchange. Either the bindQueue is off, or the dead-letter'ing doesn't work?

I've tried a lot of different values now. Can someone please point out what I'm missing?

var amqp = require('amqplib');
var url = 'amqp://dev.rabbitmq.com';

amqp.connect(url).then(function(conn) {
    //Subscribe to the WorkQueue in WorkExchange to which the "delayed" messages get dead-letter'ed (is that a verb?) to.
    return conn.createChannel().then(function(ch) {
        return ch.assertExchange('WorkExchange', 'direct').then(function() {
            return ch.assertQueue('WorkQueue', {
                autoDelete: false,
                durable: true
            })
        }).then(function() {
            return ch.bindQueue('WorkQueue', 'WorkExchange', '');
        }).then(function() {
            console.log('Waiting for consume.');

            return ch.consume('WorkQueue', function(msg) {
                console.log('Received message.');
                console.log(msg.content.toString());
                ch.ack(msg);
            });
        });
    })
}).then(function() {
    //Now send a test message to DeadExchange to a random (unique) queue.
    return amqp.connect(url).then(function(conn) {
        return conn.createChannel();
    }).then(function(ch) {
        return ch.assertExchange('DeadExchange', 'direct').then(function() {
            return ch.assertQueue('', {
                arguments: {
                    'x-dead-letter-exchange': 'WorkExchange',
                    'x-message-ttl': 2000,
                    'x-expires': 10000
                }
            })
        }).then(function(ok) {
            console.log('Sending delayed message');

            return ch.sendToQueue(ok.queue, new Buffer(':)'));
        });
    })
}).then(null, function(error) {
    console.log('error\'ed')
    console.log(error);
    console.log(error.stack);
});

I'm using amqp.node (https://github.com/squaremo/amqp.node) which is amqplib in npm. Although node-amqp (https://github.com/postwait/node-amqp) seems to be so much more popular, it doesn't implement the full protocol and there are quite some outstanding issues regarding reconnecting.

dev.rabbitmq.com is running RabbitMQ 3.1.3.

Was it helpful?

Solution 3

There was a bug in Channel#assertQueue in AMQP.Node which just got fixed, see https://github.com/squaremo/amqp.node/commit/3749c66b448875d2df374e6a89946c0bdd0cb918. The fix is on GitHub but not in npm just yet.

OTHER TIPS

This is a working code.When a message spends more than ttl in DeadExchange, it is pushed to WorkExchange. The key to success is defining the right routing key. The exchange-queue to which you wish to send post ttl, should be bounded with a routing key(note: not default), and 'x-dead-letter-routing-key' attributes value should match that route-key.

var amqp = require('amqplib');
var url = 'amqp://localhost';

amqp.connect(url).then(function(conn) {
    //Subscribe to the WorkQueue in WorkExchange to which the "delayed" messages get dead-letter'ed (is that a verb?) to.
    return conn.createChannel().then(function(ch) {
        return ch.assertExchange('WorkExchange', 'direct').then(function() {
            return ch.assertQueue('WorkQueue', {
                autoDelete: false,
                durable: true
            })
        }).then(function() {
            return ch.bindQueue('WorkQueue', 'WorkExchange', 'rk1');
        }).then(function() {
            console.log('Waiting for consume.');

            return ch.consume('WorkQueue', function(msg) {
                console.log('Received message.');
                console.log(msg.content.toString());
                ch.ack(msg);
            });
        });
    })
}).then(function() {
    //Now send a test message to DeadExchange to DEQ queue.
    return amqp.connect(url).then(function(conn) {
        return conn.createChannel();
    }).then(function(ch) {
        return ch.assertExchange('DeadExchange', 'direct').then(function() {
            return ch.assertQueue('DEQ', {
                arguments: {
                    'x-dead-letter-exchange': 'WorkExchange',
                    'x-dead-letter-routing-key': 'rk1',
                    'x-message-ttl': 15000,
                    'x-expires': 100000
                }
            })
        }).then(function() {
            return ch.bindQueue('DEQ', 'DeadExchange', '');
        }).then(function() {
            console.log('Sending delayed message');

            return ch.publish('DeadExchange', '', new Buffer("Over the Hills and Far Away!"));
        });
    })
}).then(null, function(error) {
    console.log('error\'ed')
    console.log(error);
    console.log(error.stack);
});

Here's an example using AMQP Connection Manager for Node. I noticed no examples seemed to match what we were doing in our code, so I made a repo with a simple example and one with retry counts via republishing back to the main exchange: https://github.com/PritchardAlexander/node-amqp-dead-letter-queue

Here's the simple example:

const amqp = require('amqp-connection-manager');
const username = encodeURIComponent('queue');
const password = encodeURIComponent('pass');
const port = '5672';
const host = 'localhost';
const connectionString = `amqp://${username}:${password}@${host}:${port}`;

// Ask the connection manager for a ChannelWrapper.  Specify a setup function to
// run every time we reconnect to the broker.
connection = amqp.connect([connectionString]);

// A channel is your ongoing connection to RabbitMQ.
// All commands go through your channel.
connection.createChannel({
  json: true,
  setup: function (channel) {
    channel.prefetch(100);

    // Setup EXCHANGES - which are hubs you PUBLISH to that dispatch MESSAGES to QUEUES
    return Promise.all([
      channel.assertExchange('Test_MainExchange', 'topic', {
        durable: false,
        autoDelete: true,
        noAck: false
      }),
      channel.assertExchange('Test_DeadLetterExchange', 'topic', {
        durable: false,
        autoDelete: true,
        maxLength: 1000,
        noAck: true // This means dead letter messages will not need an explicit acknowledgement or rejection
      })
    ])
    // Setup QUEUES - which are delegated MESSAGES by EXCHANGES.
    // The MESSAGES then need to be CONSUMED.
    .then(() => {
      return Promise.all([
        channel.assertQueue(
          'Test_MainQueue',
          options = {
            durable: true,
            autoDelete: true,
            exclusive: false,
            messageTtl: 1000*60*60*1,
            deadLetterExchange: 'Test_DeadLetterExchange'
          }
        ),
        channel.assertQueue('Test_DeadLetterQueue',
          options = {
            durable: false,
            autoDelete: true,
            exclusive: false
          }
        )
      ]);
    })
    // This glues the QUEUES and EXCHANGES together
    // The last parameter is a routing key. A hash/pound just means: give me all messages in the exchange.
    .then(() => {
      return Promise.all([
        channel.bindQueue('Test_MainQueue', 'Test_MainExchange', '#'),
        channel.bindQueue('Test_DeadLetterQueue', 'Test_DeadLetterExchange', '#')
      ]);
    })
    // Setup our CONSUMERS
    // They pick MESSAGES off of QUEUES and do something with them (either ack or nack them)
    .then(() => {
      return Promise.all([
        channel.consume('Test_MainQueue', (msg) => {
          const stringifiedContent = msg.content ? msg.content.toString() : '{}';
          console.log('Test_MainQueue::CONSUME ' + stringifiedContent);

          const messageData = JSON.parse(stringifiedContent);
          if (messageData.value === 0) {
            console.log('Test_MainQueue::REJECT ' + stringifiedContent);
            // the 'false' param at the very end means, don't retry! dead letter this instead!
            return channel.nack(msg, true, false);
          }
          return channel.ack(msg);
        })
      ]),
      channel.consume('Test_DeadLetterQueue', (msg) => {
        const stringifiedContent = msg.content ? msg.content.toString() : '{}';
        console.log('');
        console.log('Test_DeadLetterQueue::CONSUME ' + stringifiedContent);
        console.log('');
      });
    })
    .then(() => {
      setInterval(function () {
        const messageData = {
          text: 'Dead letter if 0',
          value: Math.floor(Math.random()*5)
        };
        const stringifiedMessage = JSON.stringify(messageData);

        // Publish message to exchange
        if (channel.publish('Test_MainExchange', '', new Buffer(stringifiedMessage))) {
          console.log(`Sent ${stringifiedMessage}`);
        } else {
          console.log(`Failed to send ${stringifiedMessage}`);
        };
      }, 300);
    });
  }
});
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top