質問

How do I implement mechanism which reject message after few configurable requeue attempts?

In other words, if I'm subscribing to a queue I want to guaranty that same message does not redelivered more then X times.

My code sample:

q.subscribe({ack: true}, function(data,headers,deliveryInfo,message) {
  try{
    doSomething(data);
  } catch(e) {
   message.reject(true);
  }
}
役に立ちましたか?

解決

In my opinion the best solution is to handle these errors in your application and reject them when app has decided that it can't process the message.

If you don't want to lose information, the app should reject the message only after it sent the same message to an error queue.

code is not tested:

q.subscribe({ack: true}, function () {
  var numOfRetries = 0;
  var args = arguments;
  var self = this;
  var promise = doWork.apply(self, args);
  for (var numOfRetries = 0; numOfRetries < MAX_RETRIES; numOfRetries++) {
    promise = promise.fail(function () { return doWork.apply(self, args); });
  }

  promise.fail(function () {
    sendMessageToErrorQueue.apply(self, args);
    rejectMessage.apply(self, args);
  })
})

他のヒント

One possible solution is to hash the message using some sort of hash function you define, then check a cache object for that hash. If it is there, add one to the cache up to the configurable max, and if it's not there, set it to 1. Here's a quick and dirty prototype for you (note that the mcache object should be in scope for all subscribers):

var mcache = {}, maxRetries = 3;

q.subscribe({ack: true}, function(data,headers,deliveryInfo,message) {
  var messagehash = hash(message);
  if(mcache[messagehash] === undefined){
    mcache[messagehash] = 0;
  }
  if(mcache[messagehash] > maxRetries) {
    q.shift(true,false); //reject true, requeue false (discard message)
    delete mcache[messagehash]; //don't leak memory
  } else {
    try{
      doSomething(data);
      q.shift(false); //reject false
      delete mcache[messagehash]; //don't leak memory
    } catch(e) {
      mcache[messagehash]++;
      q.shift(true,true); //reject true, requeue true
    }
  }
}

if the message has a GUID, you can simply return that in the hash function.

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top