문제

주어진 JMSCorRelationID와 일치하는 메시지 만 수신하는 JVA (JRE / JDK / J2EE 1.4)에서 JMS 큐 리스너를 어떻게 인스턴스화 할 수 있습니까? 내가 픽업하고자하는 메시지는 주제가 아닌 대기열에 게시되었지만 필요한 경우 변경할 수 있습니다.

다음은 메시지를 대기열에 넣기 위해 현재 사용하고있는 코드입니다.

/**
 * publishResponseToQueue publishes Requests to the Queue.
 *
 * @param   jmsQueueFactory             -Name of the queue-connection-factory
 * @param   jmsQueue                    -The queue name for the request
 * @param   response                     -A response object that needs to be published
 * 
 * @throws  ServiceLocatorException     -An exception if a request message
 *                                      could not be published to the Topic
 */
private void publishResponseToQueue( String jmsQueueFactory,
                                    String jmsQueue,
                                    Response response )
        throws ServiceLocatorException {

    if ( logger.isInfoEnabled() ) {
        logger.info( "Begin publishRequestToQueue: " +
                         jmsQueueFactory + "," + jmsQueue + "," + response );
    }
    logger.assertLog( jmsQueue != null && !jmsQueue.equals(""),
                      "jmsQueue cannot be null" );
    logger.assertLog( jmsQueueFactory != null && !jmsQueueFactory.equals(""),
                      "jmsQueueFactory cannot be null" );
    logger.assertLog( response != null, "Request cannot be null" );

    try {

        Queue queue = (Queue)_context.lookup( jmsQueue );

        QueueConnectionFactory factory = (QueueConnectionFactory)
            _context.lookup( jmsQueueFactory );

        QueueConnection connection = factory.createQueueConnection();
        connection.start();
        QueueSession session = connection.createQueueSession( false,
                                    QueueSession.AUTO_ACKNOWLEDGE );

        ObjectMessage objectMessage = session.createObjectMessage();

        objectMessage.setJMSCorrelationID(response.getID());

        objectMessage.setObject( response );

        session.createSender( queue ).send( objectMessage );

        session.close();
        connection.close();

    } catch ( Exception e ) {
        //XC3.2  Added/Modified BEGIN
        logger.error( "ServiceLocator.publishResponseToQueue - Could not publish the " +
                      "Response to the Queue - " + e.getMessage() );
        throw new ServiceLocatorException( "ServiceLocator.publishResponseToQueue " +
                                           "- Could not publish the " +
                      "Response to the Queue - " + e.getMessage() );
        //XC3.2  Added/Modified END
    }

    if ( logger.isInfoEnabled() ) {
        logger.info( "End publishResponseToQueue: " +
                         jmsQueueFactory + "," + jmsQueue + response );
    }

}  // end of publishResponseToQueue method 
도움이 되었습니까?

해결책

큐 연결 설정은 동일하지만 큐가 있으면 수신기를 만들 때 선택기를 설정합니다.

    QueueReceiver receiver = session.createReceiver(myQueue, "JMSCorrelationID='theid'");

그 다음에

receiver.receive()

또는

receiver.setListener(myListener);

다른 팁

BTW 실제 질문은 아니지만 JMS에 대한 요청 응답을 구현하려는 경우 추천합니다. 이 기사를 읽습니다 JMS API가 상상할 수있는 것보다 훨씬 더 복잡하고 효율적으로 수행하는 것이 보이는 것보다 훨씬 어렵습니다.

특히 JMS를 효율적으로 사용합니다 단일 메시지 등에 소비자를 만들지 않도록 노력해야합니다.

또한 JMS API는 특히 풀링, 트랜잭션 및 동시 처리와 함께 올바르게 효율적으로 사용하기에 매우 복잡하기 때문에 사람들을 추천합니다. 애플리케이션 코드에서 미들웨어를 숨기십시오 사용과 같은 사용 JMS에 대한 Apache Camel의 Spring Remoting 구현

이것이 도움이되기를 바랍니다. 나는 Open MQ를 사용했습니다.

package com.MQueues;

import java.util.UUID;

import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import com.sun.messaging.BasicQueue;
import com.sun.messaging.QueueConnectionFactory;

public class HelloProducerConsumer {

public static String queueName = "queue0";
public static String correlationId;

public static String getCorrelationId() {
    return correlationId;
}

public static void setCorrelationId(String correlationId) {
    HelloProducerConsumer.correlationId = correlationId;
}

public static String getQueueName() {
    return queueName;
}

public static void sendMessage(String threadName) {
    correlationId = UUID.randomUUID().toString();
    try {

        // Start connection
        QueueConnectionFactory cf = new QueueConnectionFactory();
        QueueConnection connection = cf.createQueueConnection();
        QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        BasicQueue destination = (BasicQueue) session.createQueue(threadName);
        MessageProducer producer = session.createProducer(destination);
        connection.start();

        // create message to send
        TextMessage message = session.createTextMessage();
        message.setJMSCorrelationID(correlationId);
        message.setText(threadName + "(" + System.currentTimeMillis() 
                + ") " + correlationId +" from Producer");

        System.out.println(correlationId +" Send from Producer");
        producer.send(message);

        // close everything
        producer.close();
        session.close();
        connection.close();

    } catch (JMSException ex) {
        System.out.println("Error = " + ex.getMessage());
    }
}

public static void receivemessage(final String correlationId) {
    try {

        QueueConnectionFactory cf = new QueueConnectionFactory();
        QueueConnection connection = cf.createQueueConnection();
        QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

        BasicQueue destination = (BasicQueue) session.createQueue(getQueueName());

        connection.start();

        System.out.println("\n");
        System.out.println("Start listen " + getQueueName() + " " + correlationId +" Queue from receivemessage");
        long now = System.currentTimeMillis();

        // receive our message
        String filter = "JMSCorrelationID = '" + correlationId  + "'";
        QueueReceiver receiver = session.createReceiver(destination, filter);
        TextMessage m = (TextMessage) receiver.receive();
        System.out.println("Received message = " + m.getText() + " timestamp=" + m.getJMSTimestamp());

        System.out.println("End listen " + getQueueName() + " " + correlationId +" Queue from receivemessage");

        session.close();
        connection.close();

    } catch (JMSException ex) {
        System.out.println("Error = " + ex.getMessage());
    }
}

public static void main(String args[]) {
    HelloProducerConsumer.sendMessage(getQueueName());
    String correlationId1 = getCorrelationId();
    HelloProducerConsumer.sendMessage(getQueueName());
    String correlationId2 = getCorrelationId();
    HelloProducerConsumer.sendMessage(getQueueName());
    String correlationId3 = getCorrelationId();


    HelloProducerConsumer.receivemessage(correlationId2);

    HelloProducerConsumer.receivemessage(correlationId1);

    HelloProducerConsumer.receivemessage(correlationId3);
}
}
String filter = "JMSCorrelationID = '" + msg.getJMSMessageID() + "'";
QueueReceiver receiver = session.createReceiver(queue, filter);

여기서 수신기는 메시지를 얻습니다 JMSCorrelationID 와 동등하다 MessageID. 이것은 요청/ 응답 패러다임에 매우 도움이됩니다.

또는 이것을 모든 값으로 직접 설정할 수 있습니다.

QueueReceiver receiver = session.createReceiver(queue,  "JMSCorrelationID ='"+id+"'";);

당신이 할 수있는 것보다 receiver.receive(2000); 또는 receiver.setMessageListener(this);

라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top