How to make MessageListener stop listening for messages in JMS when it receives a certain message?

StackOverflow https://stackoverflow.com/questions/18787939

  •  28-06-2022
  •  | 
  •  

Вопрос

I have a message listener that is receiving some TextMessages. When it receives an ObjectMessage, I want it to stop listening to the queue. My problem is that when I call, consumer.close() inside the onMessage(Message msg) method, the ObjectMessage does not seem to be removed from the Queue. If I use some marker to tell the consuemr to close after the onMessage() method, the listener may consume another message before it actually closes. Any suggestions? Here is some code. The Session, Connection, and InitialContext have not been closed yet.

public class MyListener implements MessageListener{
    MessageConsumer consumer;

    public MyListener(MessageConsumer mc){
        consumer = mc;
    }

    @Override
    public void onMessage(Message msg) {
        try{
            if(msg instanceof ObjectMessage){
                consumer.close();
            }
            if (msg instanceof TextMessage){
                TextMessage tmsg = (TextMessage) msg;
                String xml = tmsg.getText();
                // do some stuff                
            }

       }catch(Exception e){
           e.printStackTrace();
       }
    }
Это было полезно?

Решение

Do not use an asynchronous MessageListener.

Instead use the normal synchronous receive method in your main thread in a loop. If you get your special message, you can acknowledge and break from the loop to close the session, and to terminate the program.

Другие советы

It is correct that you can't call connection.stop() or connection.close() from onMessage() according to JMS specification, but you can call connection.close() and connection.stop() from other thread, so in my case I just setup volatile variable from onMessage() when need to stop the connection, and check this variable in other thread where I can call connection.stop() and connection.close() without getting the exception or deadlock.

> From JMS 2.0 spec:

6.1.5. Pausing delivery of incoming messages If any message listeners are running when stop is invoked, stop must wait until all of them have returned before it may return. While these message listeners are completing, they must have the full services of the connection available to them. A message listener must not attempt to stop its own connection as this would lead to deadlock. The JMS provider must detect this and throw a javax.jms.IllegalStateException.

> From JMS 1.1:

4.3.4 Pausing Delivery of Incoming Messages If MessageListeners are running when stop is invoked, stop must wait until all of them have returned before it may return. While these MessageListeners are completing, they must have the full services of the connection available to them.

Read the documentation first. You may have another thread(s) accessing MessageConsumer and the thread that calls close() will block until the completion of other(s).

After a few hours trying to solve this, I think I found a way to stop an asynchronous message consumer (MessageListener). The solution involves using Java locks (synchronized statements, and wait/notify methods).

First, on your main thread you need to lock the message listener after starting your JMS connection and invoke the message listener “wait” method. On your message listener, you need to lock again the message listener and then invoke the “notify all” method.

  // Main thread ...
  public static void main(String[] args) {
    // ...
    try {
      Connection jmsConn;
      MessageConsumer msgConsumer;
      MessageListener msgListener;
      // ...
      msgConsumer.setMessageListener(msgListener);
      // ...
      synchronized (msgListener) {
        jmsConn.start();
        msgListener.wait();
      }
      jmsConn.stop();
      //...
    } catch (Exception e) {
      // ...
    }
  }


  // MessageListener onMessage...
  public void onMessage(Message jmsMsg) {
    try {
      // ...
      synchronized (this) {
        this.notifyAll();
      }
    } catch (Exception e) {
      // ...
    }
  }

Miguel Abraham

This may be a little old, but as I had the same issue and came across it then I thought I would post my findings to help others.

I had the same issues as the question, I had created a JMS receiver class which set an async listener;

TopicSubscriber receiver = myTopicSession.createSubscriber(myTopic);  
JmsMessageListener listener = new JmsMessageListener();
receiver.setMessageListener(listener);

And then I could not terminate the listener in a nice way.

I found the solution was to actually close the connection to my topic. And that this would terminate the listener thread as well.

myTopicConnection.close();

It meant that in my main thread I had to keep a link to the JMS receiver class I had created and then call a close() method to shut it down.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top