Pregunta

Yo uso ActiveMQ como un corredor para entregar mensajes.Estos mensajes están intentados para ser escritos en una dabatasa.A veces, la base de datos es inalcanzable o baja.En ese caso, quiero retroceder mi mensaje para volver a intentarlo más adelante este mensaje y quiero continuar leyendo otros mensajes.

Este código funciona bien, excepto un punto: el mensaje de replizada me está bloqueando de leer los demás:

private Connection getConnection() throws JMSException {
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setMaximumRedeliveries(3); // will retry 3 times to dequeue rollbacked messages
    redeliveryPolicy.setInitialRedeliveryDelay(5 *1000);  // will wait 5s to read that message

    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
    Connection connection = connectionFactory.createConnection();
    ((ActiveMQConnection)connection).setUseAsyncSend(true);
    ((ActiveMQConnection)connection).setDispatchAsync(true);
    ((ActiveMQConnection)connection).setRedeliveryPolicy(redeliveryPolicy);
    ((ActiveMQConnection)connection).setStatsEnabled(true);
    connection.setClientID("myClientID");
    return connection;
}

creo mi sesión de esta manera:

session = connection.createSession(true, Session.SESSION_TRANSACTED);

rollback es fácil de preguntar:

session.rollback();

Vamos a imaginar que tengo 3 mensajes en mi cola:

1: ok
2: KO (will need to be treated again : the message I want to rollback)
3: ok
4: ok

Mi consumidor hará (secuencia lineal):

commit 1 
rollback 2
wait 5s
rollback 2
wait 5s
rollback 2
put 2 in dead letter queue (ActiveMQ.DLQ)
commit 3
commit 4

pero quiero:

commit 1
rollback 2
commit 3
commit 4
wait 5s
rollback 2
wait 5s
rollback 2
wait 5s
put 2 in dead letter queue (ActiveMQ.DLQ)

Entonces, ¿cómo puedo configurar a mi consumidor para retrasar mis mensajes revertidos más tarde?

¿Fue útil?

Solución

Este es realmente el comportamiento esperado, porque el cliente no maneja los reintentos de mensajes, no el corredor.Por lo tanto, dado que tiene 1 sesión obligada, y su política de reintento se configura para los 3 reintentos antes de DLQ, entonces todo el proceso de reintento bloquea ese hilo en particular.

Entonces, mi primera pregunta es que si el inserto de la base de datos falla, ¿no esperaría que todo el resto de sus insertos de DB fallarán por una razón similar?

Si no, la forma de desplazarse es para establecer la política de reintento para que la cola sea 0 reintentos, con un DLQ específico, de modo que los mensajes fallarán de inmediato e ingrese a DLQ.Luego, tenga otro proceso que retire el DLQ cada 5 segundos y reprocese y / o la vuelva a colocar en la cola principal para su procesamiento.

Otros consejos

¿Está utilizando el <strictOrderDispatchPolicy /> en el archivo de configuración de activemq xml?No estoy seguro de si esto afectará el orden de los mensajes para la entrega o no.Si está utilizando un mensaje de pedido estricto, intente comentar esa política para ver si eso cambia el comportamiento.

bruce

Tuve el mismo problema, no he encontrado la solución aquí, así que decidí publicarlo aquí después de que encontré uno para las personas que luchan con lo mismo. Esto se fija antes de la versión 5.6 cuando establece la propiedad sin bloqueardeRedivery a True en Connection Factory:

<property name="nonBlockingRedelivery" value="true" />

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top