Question

I'm using the following DefaultMessageListenerContainer to create a durable subscription to get messages even in downtimes which really works well.

@Bean
ConnectionFactory connectionFactory() {
    SingleConnectionFactory connectionFactory = new SingleConnectionFactory(new ActiveMQConnectionFactory(
            AMQ_BROKER_URL));
    connectionFactory.setClientId(CLIENT_ID);
    return connectionFactory;
}

@Bean
DefaultMessageListenerContainer container(final MessageListener messageListener,
        final ConnectionFactory connectionFactory) {
    return new DefaultMessageListenerContainer() {
        {
            setMessageListener(messageListener);
            setConnectionFactory(connectionFactory);
            setDestinationName(JMS_TARGET);
            setPubSubDomain(true);
            setSessionTransacted(true);
            setSubscriptionDurable(true);
            setDurableSubscriptionName(SUBSCRIPTION_ID);                        
            setConcurrentConsumers(1);
        }
    };
}

The question is: What is the best way to remove the subscription when I don't need it anymore? Would it even be possible to temporarily remove the subscription (and miss some messages), but enable it later again?

The only way which worked so far, was to shutdown the DMLC and call unsubscribe afterwards.

dmlc.shutdown();
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.unsubscribe(SUBSCRIPTION_ID);

Is that a sensible solution? How could the subscription be reinitiated?

I've already seen this answer but I really don't know how to do this? Or would it be even better to subscribe and unsubscribe in a total different way?

Was it helpful?

Solution

Before I suggest my answer, allow me a word of caution: temporarily removing a durable subscription sounds a little bit like you don't really need a durable subscription. Durable subscriptions are for Consumers which need to receive messages that were sent while they are offline. If you need the messages only sometimes (say, when your consumer is connected), a non-durable topic is the item of choice.

Note that this only makes sense for Topics (1:n communication, pub/sub). Queues work slightly different as they are used for 1:1 communication (advanced techniques like load balancing set aside for now).

Durable topics incur a notable resource overhead on the message broker and removing and recreating subscriptions might lead to expensive initialization over and over again.

Now if you really want to (temporarily) unsubscribe from your durable topic (also works for "normal" topics), you'll have to extend DefaultMessageListenerContainer:

public class MyContainer extends DefaultMessageListenerContainer {
    public Session retrieveSession() {
        return getSession(); // this is protected, so we wrap it (could also make getSession public)
    }
}

When you instantiate MyContainer (instead of DefaultMessageListenerContainer) in the container method, make sure to store the reference:

protected MyContainer listenerContainer;
...
listenerContainer = new MyContainer(...);
return listenerContainer;

You can then just call listenerContainer.retrieveSession().unsubscribe(...) to unsubscribe.

Please also note that you may only call this after all the session's consumers to that topic are closed and there are no messages in transit (direct quote from the documentation: "It is erroneous for a client to delete a durable subscription while there is an active (not closed) consumer for the subscription, or while a consumed message is part of a pending transaction or has not been acknowledged in the session."

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top