Question

I'm writing a module that acts as a wrapper around amqplib. The motivation being, we have an existing and well defined exchange/queue/bindings set up, and I just want to expose the consume method to allow consumption of incoming data.

To that end, my module takes a callback argument of the form callback(channel, msg). In the module, after setting up the exchange and queue and bindings, I have the following

module.exports = function (options, callback) {

  /* connection, exchange and queue set up here */

  // consume messages from primary queue
  ok = ok.then(function() {
    var q = opts.pq;
    console.log('Subscribing to', q);
    return ch.consume(q, function (message) {
      callback(ch, message);
    });
  });

  return ok;

};

In the callback, I'm processing the message, and on success, calling channel.ack(msg).

Everything is running as ok, in that there are no errors, however the RabbitMQ admin console shows all the processed messages as Unacked. If I then kill my app, the Unacked messages go back onto the queue (thankfully).

Why are my messages not being acked? Am I doing something wrong in the ack'ing? Should I be supplying the allUpTo parameter as true in the call to ack?

Env details

node -v
v0.8.26

npm ls
...
amqplib@0.1.1
Was it helpful?

Solution

For completeness, answering my own question with what I found out. This was actually down to differences in default settings between node-amqp and amqp.node and a slightly flawed implementation in my consumer.

When subscribing to a queue, node-amqp by default uses a prefetch=1, meaning only 1 message is in transit at any time. Another will not be delivered until the previous has been acknowledged. amqp.node however, defaulted to prefetch=0, meaning all messages are sent to the consumer as quickly as possible, and they can be ack'd individually at some point in the future when the consumer is done with each one.

This is what I was seeing in the management console, which caused alarm and resulted in this question. A discussion of this can be seen in this github issue.

ASIDE - Given the information above, I left my consumer running and let it ack the messages as and when it was ready. This threw up a separate issue with my implementation. Given the way the messages we're being delivered (i.e. all at once), the message handler was bombarding the DB with requests for new connections. As a result, the DB server got overloaded and ultimately the consumer died. To resolve that, I just switched to prefetch=1, as before, meaning the consume promise method from above is now as follows

module.exports = function (options, callback) {

  /* connection, exchange and queue set up here */

  // consume messages from primary queue
  ok = ok.then(function() {
    var q = opts.pq;
    console.log('Subscribing to', q);

    ch.prefetch(1);   // <-- only get 1 message at a time

    return ch.consume(q, function (message) {
      callback(ch, message);
    }, { noAck: false });
  });


  return ok;

};

Worth bearing in mind if anyone faces a similar situation.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top