Question

J'utilise ActiveMQ comme courtier pour transmettre des messages.Ces messages sont destinés à être écrits dans une dabatase.Parfois, la base de données est inaccessible ou en panne.Dans ce cas, je souhaite annuler mon message pour réessayer plus tard ce message et je souhaite continuer à lire les autres messages.

Ce code fonctionne bien, sauf un point :le message annulé m'empêche de lire les autres :

private Connection getConnection() throws JMSException {
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setMaximumRedeliveries(3); // will retry 3 times to dequeue rollbacked messages
    redeliveryPolicy.setInitialRedeliveryDelay(5 *1000);  // will wait 5s to read that message

    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
    Connection connection = connectionFactory.createConnection();
    ((ActiveMQConnection)connection).setUseAsyncSend(true);
    ((ActiveMQConnection)connection).setDispatchAsync(true);
    ((ActiveMQConnection)connection).setRedeliveryPolicy(redeliveryPolicy);
    ((ActiveMQConnection)connection).setStatsEnabled(true);
    connection.setClientID("myClientID");
    return connection;
}

Je crée ma session de cette façon :

session = connection.createSession(true, Session.SESSION_TRANSACTED);

Le rollback est facile à demander :

session.rollback();

Imaginons que j'ai 3 messages dans ma file d'attente :

1: ok
2: KO (will need to be treated again : the message I want to rollback)
3: ok
4: ok

Mon consommateur fera (séquence linéaire) :

commit 1 
rollback 2
wait 5s
rollback 2
wait 5s
rollback 2
put 2 in dead letter queue (ActiveMQ.DLQ)
commit 3
commit 4

Mais je veux :

commit 1
rollback 2
commit 3
commit 4
wait 5s
rollback 2
wait 5s
rollback 2
wait 5s
put 2 in dead letter queue (ActiveMQ.DLQ)

Alors, comment puis-je configurer mon consommateur pour qu'il retarde mes messages annulés plus tard ?

Était-ce utile?

La solution

Ceci est en réalité attendu Comportement, car les tentatives de messagerie sont traitées par le client, pas le courtier.Donc, comme vous avez 1 session Bound, et votre stratégie de réessaye est configurée pour les 3 tentatives avant la DLQ, l'ensemble du processus de tentative bloque le thread particulier.

Donc, ma première question est que si l'insertion de la base de données échoue, ne vous attendez pas à ce que tout reste de vos inserts de DB échoue pour une raison similaire?

Si ce n'est pas le cas, le moyen de contourner la stratégie de réessation pour que cette file d'attente soit 0 de tentatives, avec une DLQ spécifique, de sorte que les messages échouent immédiatement et entrent dans la DLQ.Demandez ensuite un autre processus qui retire la DLQ toutes les 5 secondes et les retraites et / ou le remet dans la file d'attente principale pour le traitement.

Autres conseils

Utilisez-vous le <strictOrderDispatchPolicy /> dans le fichier de configuration XML ActiveMQ ?Je ne sais pas si cela affectera l'ordre des messages à renvoyer ou non.Si vous utilisez une répartition stricte des commandes, essayez de commenter cette politique pour voir si cela modifie le comportement.

Bruce

J'ai eu le même problème, je n'ai pas trouvé de solution ici, alors décidé de le poster ici après avoir trouvé un pour les personnes qui ont du mal à la même chose. Ceci est fixé avant la version 5.6 lorsque vous définissez la propriété NOBLOCKINDREDelivery sur True In Connection Factory:

<property name="nonBlockingRedelivery" value="true" />

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top