For completeness, answering my own question with what I found out. This was actually down to differences in default settings between node-amqp and amqp.node and a slightly flawed implementation in my consumer.
When subscribing to a queue, node-amqp by default uses a prefetch=1
, meaning only 1 message is in transit at any time. Another will not be delivered until the previous has been acknowledged. amqp.node however, defaulted to prefetch=0
, meaning all messages are sent to the consumer as quickly as possible, and they can be ack'd individually at some point in the future when the consumer is done with each one.
This is what I was seeing in the management console, which caused alarm and resulted in this question. A discussion of this can be seen in this github issue.
ASIDE - Given the information above, I left my consumer running and let it ack the messages as and when it was ready. This threw up a separate issue with my implementation. Given the way the messages we're being delivered (i.e. all at once), the message handler was bombarding the DB with requests for new connections. As a result, the DB server got overloaded and ultimately the consumer died. To resolve that, I just switched to prefetch=1
, as before, meaning the consume promise method from above is now as follows
module.exports = function (options, callback) {
/* connection, exchange and queue set up here */
// consume messages from primary queue
ok = ok.then(function() {
var q = opts.pq;
console.log('Subscribing to', q);
ch.prefetch(1); // <-- only get 1 message at a time
return ch.consume(q, function (message) {
callback(ch, message);
}, { noAck: false });
});
return ok;
};
Worth bearing in mind if anyone faces a similar situation.