How can we make producer in spring amqp rabbitmq waiting after sending all messages and release upon receiving all?

StackOverflow https://stackoverflow.com/questions/23294934

Question

I am queuing all messages to rabbitmq queue and processing those on remote server. Below is my producer and reply handler in same class.

public class AmqpAsynchRpcItemWriter<T> implements ItemWriter<T>,
        MessageListener {

    protected String exchange;
    protected String routingKey;
    protected String queue;
    protected String replyQueue;
    protected RabbitTemplate template;

    // Reply handler 
    @Override
    public void onMessage(Message message) {

        try {
            String corrId = new String(message.getMessageProperties()
                    .getCorrelationId(), "UTF-8");
            System.out.println("received " + corrId + " from " + this.replyQueue);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    //Producer
    @Override
    public void write(List<? extends T> items) throws Exception {

        for (T item : items) {
            System.out.println(item);

            System.out.println("Queing " + item + " to " + this.queue);

            Message message = MessageBuilder
                    .withBody(item.toString().getBytes())
                    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                    .setReplyTo(this.replyQueue)
                    .setCorrelationId(item.toString().getBytes()).build();

            template.send(this.exchange, this.routingKey, message);

            System.out.println("Queued " + item + " to " + this.queue);

        }

        // It should wait here untill we get all replies in onMessage, How can we do this ?

    }

I am sending all messages in write method and getting replies in onMessage. This is working properly but write doesnt wait for replies, it returns to caller and spring-batch step is marked completed.

But I want the process to wait for replies after sending all message till we get all replies in onMessage. How can we do this ?

Was it helpful?

Solution

You can use any number of synchronization techniques; for example have the listener put the replies in a LinkedBlockingQueue and have the sender take (or poll with timeout) from the queue until all the replies are received.

Or, don't use a listener at all and simply use the same RabbitTemplate to receive() from the reply queue until all the replies are received.

However, receive() returns null if the queue is empty so you'll have to sleep between receives to avoid spinning the CPU.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top