Filtrado del receptor de mensajes JMS por JMSCorrelationID
Pregunta
¿Cómo puedo crear una instancia de un escucha de colas JMS en java (JRE / JDK / J2EE 1.4) que solo recibe mensajes que coinciden con un JMSCorrelationID determinado? Los mensajes que busco recoger se han publicado en una cola y no en un tema, aunque eso puede cambiar si es necesario.
Aquí está el código que estoy usando actualmente para poner el mensaje en la cola:
/**
* publishResponseToQueue publishes Requests to the Queue.
*
* @param jmsQueueFactory -Name of the queue-connection-factory
* @param jmsQueue -The queue name for the request
* @param response -A response object that needs to be published
*
* @throws ServiceLocatorException -An exception if a request message
* could not be published to the Topic
*/
private void publishResponseToQueue( String jmsQueueFactory,
String jmsQueue,
Response response )
throws ServiceLocatorException {
if ( logger.isInfoEnabled() ) {
logger.info( "Begin publishRequestToQueue: " +
jmsQueueFactory + "," + jmsQueue + "," + response );
}
logger.assertLog( jmsQueue != null && !jmsQueue.equals(""),
"jmsQueue cannot be null" );
logger.assertLog( jmsQueueFactory != null && !jmsQueueFactory.equals(""),
"jmsQueueFactory cannot be null" );
logger.assertLog( response != null, "Request cannot be null" );
try {
Queue queue = (Queue)_context.lookup( jmsQueue );
QueueConnectionFactory factory = (QueueConnectionFactory)
_context.lookup( jmsQueueFactory );
QueueConnection connection = factory.createQueueConnection();
connection.start();
QueueSession session = connection.createQueueSession( false,
QueueSession.AUTO_ACKNOWLEDGE );
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setJMSCorrelationID(response.getID());
objectMessage.setObject( response );
session.createSender( queue ).send( objectMessage );
session.close();
connection.close();
} catch ( Exception e ) {
//XC3.2 Added/Modified BEGIN
logger.error( "ServiceLocator.publishResponseToQueue - Could not publish the " +
"Response to the Queue - " + e.getMessage() );
throw new ServiceLocatorException( "ServiceLocator.publishResponseToQueue " +
"- Could not publish the " +
"Response to the Queue - " + e.getMessage() );
//XC3.2 Added/Modified END
}
if ( logger.isInfoEnabled() ) {
logger.info( "End publishResponseToQueue: " +
jmsQueueFactory + "," + jmsQueue + response );
}
} // end of publishResponseToQueue method
Solución
La configuración de la conexión de la cola es la misma, pero una vez que tenga la QueueSession, configure el selector al crear un receptor.
QueueReceiver receiver = session.createReceiver(myQueue, "JMSCorrelationID='theid'");
entonces
receiver.receive()
o
receiver.setListener(myListener);
Otros consejos
Por cierto, si bien no es la pregunta real que hizo, si está intentando implementar una respuesta de solicitud a través de JMS, le recomendaría leer este artículo ya que la API de JMS es un poco más compleja de lo que podría imaginar y hacerlo de manera eficiente es mucho más difícil de lo que parece.
En particular, para utilizar JMS de manera eficiente debería intente evitar crear consumidores para un solo mensaje, etc.
También porque la API JMS es muy compleja de usar de manera correcta y eficiente, especialmente con la agrupación, las transacciones y el procesamiento simultáneo, recomiendo a la gente oculta el middleware de su código de aplicación , por ejemplo, utilizando Implementación de Spring Remoting de Apache Camel para JMS
Espero que esto ayude. Usé Open MQ.
package com.MQueues;
import java.util.UUID;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import com.sun.messaging.BasicQueue;
import com.sun.messaging.QueueConnectionFactory;
public class HelloProducerConsumer {
public static String queueName = "queue0";
public static String correlationId;
public static String getCorrelationId() {
return correlationId;
}
public static void setCorrelationId(String correlationId) {
HelloProducerConsumer.correlationId = correlationId;
}
public static String getQueueName() {
return queueName;
}
public static void sendMessage(String threadName) {
correlationId = UUID.randomUUID().toString();
try {
// Start connection
QueueConnectionFactory cf = new QueueConnectionFactory();
QueueConnection connection = cf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
BasicQueue destination = (BasicQueue) session.createQueue(threadName);
MessageProducer producer = session.createProducer(destination);
connection.start();
// create message to send
TextMessage message = session.createTextMessage();
message.setJMSCorrelationID(correlationId);
message.setText(threadName + "(" + System.currentTimeMillis()
+ ") " + correlationId +" from Producer");
System.out.println(correlationId +" Send from Producer");
producer.send(message);
// close everything
producer.close();
session.close();
connection.close();
} catch (JMSException ex) {
System.out.println("Error = " + ex.getMessage());
}
}
public static void receivemessage(final String correlationId) {
try {
QueueConnectionFactory cf = new QueueConnectionFactory();
QueueConnection connection = cf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
BasicQueue destination = (BasicQueue) session.createQueue(getQueueName());
connection.start();
System.out.println("\n");
System.out.println("Start listen " + getQueueName() + " " + correlationId +" Queue from receivemessage");
long now = System.currentTimeMillis();
// receive our message
String filter = "JMSCorrelationID = '" + correlationId + "'";
QueueReceiver receiver = session.createReceiver(destination, filter);
TextMessage m = (TextMessage) receiver.receive();
System.out.println("Received message = " + m.getText() + " timestamp=" + m.getJMSTimestamp());
System.out.println("End listen " + getQueueName() + " " + correlationId +" Queue from receivemessage");
session.close();
connection.close();
} catch (JMSException ex) {
System.out.println("Error = " + ex.getMessage());
}
}
public static void main(String args[]) {
HelloProducerConsumer.sendMessage(getQueueName());
String correlationId1 = getCorrelationId();
HelloProducerConsumer.sendMessage(getQueueName());
String correlationId2 = getCorrelationId();
HelloProducerConsumer.sendMessage(getQueueName());
String correlationId3 = getCorrelationId();
HelloProducerConsumer.receivemessage(correlationId2);
HelloProducerConsumer.receivemessage(correlationId1);
HelloProducerConsumer.receivemessage(correlationId3);
}
}
String filter = "JMSCorrelationID = '" + msg.getJMSMessageID() + "'";
QueueReceiver receiver = session.createReceiver(queue, filter);
Aquí el receptor obtendrá los mensajes para los que JMSCorrelationID
es igual a MessageID
. Esto es muy útil en el paradigma de solicitud / respuesta.
o puede establecer esto directamente en cualquier valor:
QueueReceiver receiver = session.createReceiver(queue, "JMSCorrelationID ='"+id+"'";);
De lo que puede hacer receiver.receive (2000);
o receiver.setMessageListener(this);