JMS-Nachrichtenempfänger Filterung durch JMSCorrelationID
Frage
Wie kann ich eine JMS-Warteschlange Zuhörer in Java (JRE / JDK / J2EE 1.4) instanziiert, die nur Nachrichten empfängt, die einen bestimmten JMSCorrelationID Vorstellungen? Die Nachrichten, die ich bin auf der Suche wurden in eine Warteschlange veröffentlicht zu holen und nicht ein Thema, aber das kann bei Bedarf geändert werden.
Hier ist der Code, den ich zur Zeit mit der Nachricht in der Warteschlange zu setzen:
/**
* publishResponseToQueue publishes Requests to the Queue.
*
* @param jmsQueueFactory -Name of the queue-connection-factory
* @param jmsQueue -The queue name for the request
* @param response -A response object that needs to be published
*
* @throws ServiceLocatorException -An exception if a request message
* could not be published to the Topic
*/
private void publishResponseToQueue( String jmsQueueFactory,
String jmsQueue,
Response response )
throws ServiceLocatorException {
if ( logger.isInfoEnabled() ) {
logger.info( "Begin publishRequestToQueue: " +
jmsQueueFactory + "," + jmsQueue + "," + response );
}
logger.assertLog( jmsQueue != null && !jmsQueue.equals(""),
"jmsQueue cannot be null" );
logger.assertLog( jmsQueueFactory != null && !jmsQueueFactory.equals(""),
"jmsQueueFactory cannot be null" );
logger.assertLog( response != null, "Request cannot be null" );
try {
Queue queue = (Queue)_context.lookup( jmsQueue );
QueueConnectionFactory factory = (QueueConnectionFactory)
_context.lookup( jmsQueueFactory );
QueueConnection connection = factory.createQueueConnection();
connection.start();
QueueSession session = connection.createQueueSession( false,
QueueSession.AUTO_ACKNOWLEDGE );
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setJMSCorrelationID(response.getID());
objectMessage.setObject( response );
session.createSender( queue ).send( objectMessage );
session.close();
connection.close();
} catch ( Exception e ) {
//XC3.2 Added/Modified BEGIN
logger.error( "ServiceLocator.publishResponseToQueue - Could not publish the " +
"Response to the Queue - " + e.getMessage() );
throw new ServiceLocatorException( "ServiceLocator.publishResponseToQueue " +
"- Could not publish the " +
"Response to the Queue - " + e.getMessage() );
//XC3.2 Added/Modified END
}
if ( logger.isInfoEnabled() ) {
logger.info( "End publishResponseToQueue: " +
jmsQueueFactory + "," + jmsQueue + response );
}
} // end of publishResponseToQueue method
Lösung
Die Warteschlange Verbindungsaufbau ist die gleiche, aber wenn man die QueueSession haben, stellen Sie den Wähler, wenn ein Empfänger zu schaffen.
QueueReceiver receiver = session.createReceiver(myQueue, "JMSCorrelationID='theid'");
dann
receiver.receive()
oder
receiver.setListener(myListener);
Andere Tipps
BTW, während seine nicht die eigentliche Frage, die Sie gefragt - wenn Sie Anforderungsantwort über JMS zu implementieren versuchen, würde ich empfehlen, JMS effizient zu nutzen, sollten Sie versuchen zu verhindern, dass Verbraucher für eine einzelne Nachricht usw. zu schaffen.
Auch weil der JMS-API so sehr komplex ist korrekt und effizient zu nutzen - vor allem mit Pooling-Transaktionen und die gleichzeitigen Verarbeitung - Ich empfehle Leute versteckt die Middleware von ihrem Anwendungscode wie über mit Frühling Remoting-Implementierung Apache Camel für JMS
Hope dies dazu beitragen wird. Ich verwendete öffnen MQ.
package com.MQueues;
import java.util.UUID;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import com.sun.messaging.BasicQueue;
import com.sun.messaging.QueueConnectionFactory;
public class HelloProducerConsumer {
public static String queueName = "queue0";
public static String correlationId;
public static String getCorrelationId() {
return correlationId;
}
public static void setCorrelationId(String correlationId) {
HelloProducerConsumer.correlationId = correlationId;
}
public static String getQueueName() {
return queueName;
}
public static void sendMessage(String threadName) {
correlationId = UUID.randomUUID().toString();
try {
// Start connection
QueueConnectionFactory cf = new QueueConnectionFactory();
QueueConnection connection = cf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
BasicQueue destination = (BasicQueue) session.createQueue(threadName);
MessageProducer producer = session.createProducer(destination);
connection.start();
// create message to send
TextMessage message = session.createTextMessage();
message.setJMSCorrelationID(correlationId);
message.setText(threadName + "(" + System.currentTimeMillis()
+ ") " + correlationId +" from Producer");
System.out.println(correlationId +" Send from Producer");
producer.send(message);
// close everything
producer.close();
session.close();
connection.close();
} catch (JMSException ex) {
System.out.println("Error = " + ex.getMessage());
}
}
public static void receivemessage(final String correlationId) {
try {
QueueConnectionFactory cf = new QueueConnectionFactory();
QueueConnection connection = cf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
BasicQueue destination = (BasicQueue) session.createQueue(getQueueName());
connection.start();
System.out.println("\n");
System.out.println("Start listen " + getQueueName() + " " + correlationId +" Queue from receivemessage");
long now = System.currentTimeMillis();
// receive our message
String filter = "JMSCorrelationID = '" + correlationId + "'";
QueueReceiver receiver = session.createReceiver(destination, filter);
TextMessage m = (TextMessage) receiver.receive();
System.out.println("Received message = " + m.getText() + " timestamp=" + m.getJMSTimestamp());
System.out.println("End listen " + getQueueName() + " " + correlationId +" Queue from receivemessage");
session.close();
connection.close();
} catch (JMSException ex) {
System.out.println("Error = " + ex.getMessage());
}
}
public static void main(String args[]) {
HelloProducerConsumer.sendMessage(getQueueName());
String correlationId1 = getCorrelationId();
HelloProducerConsumer.sendMessage(getQueueName());
String correlationId2 = getCorrelationId();
HelloProducerConsumer.sendMessage(getQueueName());
String correlationId3 = getCorrelationId();
HelloProducerConsumer.receivemessage(correlationId2);
HelloProducerConsumer.receivemessage(correlationId1);
HelloProducerConsumer.receivemessage(correlationId3);
}
}
String filter = "JMSCorrelationID = '" + msg.getJMSMessageID() + "'";
QueueReceiver receiver = session.createReceiver(queue, filter);
Hier wird der Empfänger die Nachrichten erhalten, für die JMSCorrelationID
ist gleich MessageID
. Dies ist sehr hilfreich bei der Request / Response-Paradigma.
oder Sie können diese auf einen beliebigen Wert direkt eingestellt:
QueueReceiver receiver = session.createReceiver(queue, "JMSCorrelationID ='"+id+"'";);
Dann können Sie entweder tun receiver.receive(2000);
oder receiver.setMessageListener(this);