Question

I use ActiveMQ as a broker to deliver messages. Theses messages are intented to be written in a dabatase. Sometimes, the database is unreachable or down. In that case, I want to rollback my message to retry later this message and I want to continue reading other messages.

This code works fine, except one point : the rollbacked message is blocking me from reading the others :

private Connection getConnection() throws JMSException {
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setMaximumRedeliveries(3); // will retry 3 times to dequeue rollbacked messages
    redeliveryPolicy.setInitialRedeliveryDelay(5 *1000);  // will wait 5s to read that message

    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
    Connection connection = connectionFactory.createConnection();
    ((ActiveMQConnection)connection).setUseAsyncSend(true);
    ((ActiveMQConnection)connection).setDispatchAsync(true);
    ((ActiveMQConnection)connection).setRedeliveryPolicy(redeliveryPolicy);
    ((ActiveMQConnection)connection).setStatsEnabled(true);
    connection.setClientID("myClientID");
    return connection;
}

I create my session this way :

session = connection.createSession(true, Session.SESSION_TRANSACTED);

Rollback is easy to ask :

session.rollback();

Let's imagine I have 3 messages in my queue :

1: ok
2: KO (will need to be treated again : the message I want to rollback)
3: ok
4: ok

My consumer will do (linear sequence) :

commit 1 
rollback 2
wait 5s
rollback 2
wait 5s
rollback 2
put 2 in dead letter queue (ActiveMQ.DLQ)
commit 3
commit 4

But I want :

commit 1
rollback 2
commit 3
commit 4
wait 5s
rollback 2
wait 5s
rollback 2
wait 5s
put 2 in dead letter queue (ActiveMQ.DLQ)

So, how can I configure my Consumer to delay my rollbacked messages later ?

Was it helpful?

Solution

This is actually expected behavior, because message retries are handled by the client, not the broker. So, since you have 1 session bound, and your retry policy is setup for the 3 retries before DLQ, then the entire retry process blocks that particular thread.

So, my first question is that if the database insert fails, wouldn't you expect all of the rest of your DB inserts to fail for a similar reason?

If not, the way to get around that is to set the retry policy for that queue to be 0 retries, with a specific DLQ, so that messages will fail immediately and go into the DLQ. Then have another process that pulls off of the DLQ every 5 seconds and reprocesses and/or puts it back in the main queue for processing.

OTHER TIPS

Are you using the <strictOrderDispatchPolicy /> in the ActiveMQ XML config file? I'm not sure if this will affect the order of messages for redelivery or not. If you are using strict order dispatch, try commenting out that policy to see if that changes the behavior.

Bruce

I had same problem, i haven't found solution here so decided to post it here after i found one for people struggling with the same. This is fixed prior to version 5.6 when you set property nonBlockingRedelivery to true in connection factory:

<property name="nonBlockingRedelivery" value="true" />
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top