Question

I have following two cases of code for activeMQ listeners. I was expecting case 2 to be faster but it is not ...

case 1: Here i am creating multiple listeners of activemq in main function

public class FileReceiver {


    public static void main(String[] args) {
         List<MessageReceiver> messageReceiverList = new ArrayList<MessageReceiver>();

         MessageReceiver msgReceiver1 = new MessageReceiver();
         messageReceiverList.add(msgReceiver1);
         msgReceiver1.run();

         MessageReceiver msgReceiver2 = new MessageReceiver();
         messageReceiverList.add(msgReceiver2);
         msgReceiver2.run();

         MessageReceiver msgReceiver3 = new MessageReceiver();
         messageReceiverList.add(msgReceiver3);
         msgReceiver3.run();

    }
}

case 2: Here i am creating multiple listeners of activemq in multiple threads

public class FileReceiver {


    public static void main(String[] args) {
         List<MessageReceiver> messageReceiverList = new ArrayList<MessageReceiver>();


         for(int i = 0 ; i < 3 ; i++) {
             MessageReceiver msgReceiver = new MessageReceiver();
             Thread thread = new Thread(msgReceiver);
             thread.start();
             messageReceiverList.add(msgReceiver);
             System.out.println("Listener " + i + " started.");
         }

    }
}

Here is the MessageReceiver class i am using ...

class MessageReceiver implements Runnable,MessageListener {

    private int numMsgsReceived = 0;

    private void start() {
        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    "tcp://localhost:61616");
            Connection connection = connectionFactory.createConnection();
            connection.start();
            ActiveMQSession session = (ActiveMQSession) connection
                    .createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination ftQueue = session.createQueue("TEMP.DEST");
            MessageConsumer consumer = session.createConsumer(ftQueue);
            consumer.setMessageListener(this);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void onMessage(Message msg) {
        incNumMsgsReceived();
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public void run() {
        start();

    }
}

In both of the cases i am able to receive only 30 messages per second. Shouldn't the case 2 be faster as it is running on three separate threads?

Was it helpful?

Solution 2

MessageListener onMessage is called from the context of the dispatch thread inside of the Session that created the Consumer. It doesn't really matter how many threads you create because the thread that's actually invoking the onMessage call is always the same. In your case the extra threads will do no real work for you since the message continue to come from the same three Connection / Session instances so you have three session threads dispatching the same messages in either case.

OTHER TIPS

Actually, you are using 3 different connections/sessions to create your 3 MessageListeners, therefore you are concurrently consuming messages in both case 1 and case 2. That is why you are seeing the same throughput.

You don't need to create threads to consume concurrently from a queue. For every session you have you have a thread. In the following URL they explain this:

http://activemq.apache.org/multiple-consumers-on-a-queue.html

So, you should see a difference if you create message listeners from the same session in case 1, and from different sessions in case 2.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top