Question

I am testing Spring-AMQP with Spring-Integration support, I've following configuration and test:

<rabbit:connection-factory id="connectionFactory" />
<rabbit:queue name="durableQ"/>
<int:channel id="consumingChannel">
    <int:queue capacity="2"/> <!-- Message get Acked as-soon-as filled in Q -->
</int:channel>

<int-amqp:inbound-channel-adapter 
    channel="consumingChannel"
    queue-names="durableQ" 
    connection-factory="connectionFactory"
    concurrent-consumers="1"
    acknowledge-mode="AUTO"
    />


public static void main(String[] args) {
System.out.println("Starting consumer with integration..");
    AbstractApplicationContext context = new ClassPathXmlApplicationContext(
    "classpath:META-INF/spring/integration/spring-integration-context-consumer.xml");

    PollableChannel consumingChannel = context.getBean("consumingChannel",   
                                                          PollableChannel.class);           
        int count = 0;
        while (true) {
            Message<?> msg = consumingChannel.receive(1000);
            System.out.println((count++) + " \t -> " + msg);

            try { //sleep to check number of messages in queue
                Thread.sleep(50000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

In this configuration it was evident that as soon as message arrives at consumingChannel they are Acked and hence removed from queue. I validated this by placing a high sleep after receive and check queue-size. There are no further control on it.

Now if I set acknowledge-mode=MANUAL, there are no ways seems to do manual ack via spring integration.

My need is to process message and after processing do a manual-ack so till ack message remains persisted at durableQ.

Is there any way to handle MANUAL ack with spring-amqp-integration? I want to avoid passing ChannelAwareMessageListener to inbound-channel-adapter since I want to have control of consumer's receive.

Update:

It even doesn't seems to be possible when using own listener-container with inbound-channel-adapter:

// Below creates a default direct-channel (spring-integration channel) named "adapter", to receive poll this channel which is same as above
<int-amqp:inbound-channel-adapter id="adapter" listener-container="amqpListenerContainer" /> 

<bean id="amqpListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="durableQ" />
    <property name="acknowledgeMode" value="MANUAL" />

// messageListener not allowed when using with adapter, so no way of having own ChannelAwareMessageListener, so no channel exposed onMessage, hence no way to ack
    <property name="messageListener" ref="listener"/>
</bean>
<bean id="listener" class="com.sd.springint.rmq.MsgListener"/>

Above configuration throws error as messageListener property is not allowed, see inline comment on tag. So purpose of using listner-container got defeated (for exposing channel via ChannelAwareMessageListener).

To me spring-integration cannot be used for manual-acknowledgement (I know, this is a hard saying!), Can anyone help me in validating this or Is there any specific approach/configuration required for this which I am missing?

Was it helpful?

Solution

The problem is because you are using async handoff using a QueueChannel. It is generally better to control the concurrency in the container (concurrent-consumers="2") and don't do any async handoffs in your flow (use DirectChannels). That way, AUTO ack will work just fine. Instead of receiving from the PollableChannel subscribe a new MessageHandler() to a SubscribableChannel.

Update:

You normally don't need to deal with Messages in an SI application, but the equivalent of your test with a DirectChannel would be...

    SubscribableChannel channel = context.getBean("fromRabbit", SubscribableChannel.class);

    channel.subscribe(new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println("Got " + message);
        }
    });

OTHER TIPS

MANUAL Ack is allowed only via Channel.basicAck(). So, you should have an access to the Channel, on which your message was received.

Try to play with advice-chain of <int-amqp:inbound-channel-adapter>:

  1. Implement some Advice as MethodBeforeAdvice
  2. The advice-chain on Container is applied for ContainerDelegate#invokeListener
  3. The first argument of that method is exactly a Channel
  4. Suppose you can place to the MessageProperties.headers that Channel within that Advice
  5. And configure <int-amqp:inbound-channel-adapter> with mapped-request-headers to that Channel.
  6. And in the end try to invoke basicAck() on that Channel header from Spring Integration Message in the any place of your downstream flow.
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top