Question

I am trying to use rabbitmq using spring amqp, below is my configuration.

<rabbit:connection-factory id="rabbitConnectionFactory"
    port="${rabbitmq.port}" host="${rabbitmq.host}" />

<rabbit:admin connection-factory="rabbitConnectionFactory" />

<rabbit:queue name="${rabbitmq.import.queue}" />

<rabbit:template id="importAmqpTemplate"
    connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}" />

<beans:bean id="importExchangeMessageListener"
    class="com.stockopedia.batch.foundation.ImportMessageListener" />

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="5">
    <rabbit:listener queues="${rabbitmq.import.queue}" ref="importMessageListener" />
</rabbit:listener-container>

This is simple Message Listener class,

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class ImportMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        System.out.println("consumer output: " + message);
    }

}

This is producer (which is itemWriter of spring batch),

public class ImportItemWriter<T> implements ItemWriter<T> {

    private AmqpTemplate template;

    public AmqpTemplate getTemplate() {
        return template;
    }

    public void setTemplate(AmqpTemplate template) {
        this.template = template;
    }

    public void write(List<? extends T> items) throws Exception {
        for (T item : items) {
            Object reply = template.convertSendAndReceive(item.toString());
            System.out.println("producer output: " + reply);
        }
    }

}

When I run my spring batch job, ImportItemWriter.write gets called. But ImportMessageListener.onMessage does not work. It doesnt print the message. I get below output for all items on console

producer output: null
producer output: null
producer output: null
producer output: null
producer output: null
producer output: null
producer output: null
Was it helpful?

Solution

Your consumer is not sending a result...

@Override
public void onMessage(Message message) {
    System.out.println("consumer output: " + message);
}

Change it to a simple POJO; the container's MessageListenerAdapter will take care of the conversion for you, and send the result.

@Override
public String handleMessage(String message) {
    System.out.println("consumer output: " + message);
    return "result";
}

EDIT:

You also haven't set up any exchange or routing to your queue. If you want to use default exchange/routing, use...

convertSendAndReceive("", queueName, item.toString());

EDIT2:

Or, set the routingKey on the template to the queue name and then you can use the simpler method.

The ...sendAndReceive() methods are meant for request/reply scenarios so blocking is required. To do it asynchronously, you have to use one of the ...send() methods, and wire up your own SimpleListenerContainer to receive the replies; you will have to do your own correlation. Use

public void convertAndSend(Object message, MessagePostProcessor postProcessor)

and in your message post processor, set the replyTo and correlationId headers...

message.getMessageProperties().setReplyTo("foo");
message.getMessageProperties().setCorrelationId("bar");

Or, build the Message object yourself (e.g by using the MessageBuilder) and use the send method...

template.send(MessageBuilder.withBody("foo".getBytes())
            .setReplyTo("bar")
            .setCorrelationId("baz".getBytes())
            .build());

Each request needs a unique correlationId so you can correlate the response.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top