Question

I am trying out a competing event consumer implementation using Akka and Camel. Am using Akka 2.3.2 and Camel 5.8.0. I am connecting camel to ActiveMQ broker and using a producer to generate messages from other end. In the following code EventManager is the master which creates pool of consumers and EventProcessor is the message processing actor.

EventManager.java

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.camel.component.ActiveMQComponent;
    import org.apache.camel.CamelContext;

    import akka.actor.ActorRef;
    import akka.actor.ActorSystem;
    import akka.actor.Props;
    import akka.actor.UntypedActor;

    import akka.camel.Camel;
    import akka.camel.CamelExtension;
    import akka.japi.Creator;
    import akka.routing.RoundRobinPool;

    public class EventManager {




        private final ActorSystem akkaSystem;

        private CamelContext camelContext = null;

        private ActorRef workRouter;

        public EventManager(ActorSystem system) {
            akkaSystem = system;

            initialize();
        }

        public void initialize() {

            Camel camel = CamelExtension.get(akkaSystem);

            camelContext = camel.context();

            ActiveMQComponent activemqComponent = ActiveMQComponent.activeMQComponent("tcp://localhost:61616");
            activemqComponent.setDeliveryPersistent(false);
            camelContext.addComponent("activemq",activemqComponent );


            int numOfWorkers = 5;

// distributing the message processing across a pool of 5 actors
            ActorRef workRouter =
                      akkaSystem.actorOf(new RoundRobinPool(numOfWorkers).props(Props.create(EventProcessor.class)), 
                        "workRouter");

        }

    }

EventProcessor.java

import org.apache.log4j.Logger;



import akka.actor.UntypedActor;
import akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedConsumerActor;

public class EventProcessor extends UntypedConsumerActor{

    private static final Logger LOGGER = Logger.getLogger(EventProcessor.class);
    public EventProcessor() {

    }

    public void onReceive(Object message) {
        if(message instanceof CamelMessage) {
            CamelMessage camelMessage = (CamelMessage) message;
            String body = camelMessage.getBodyAs(String.class, getCamelContext());
                LOGGER.info("Message handled by :" +this.getSelf().path().name());

                LOGGER.info("Message body:" + body);
        }


    }

    public boolean autoAck() {
        return true;
    }
    public String getEndpointUri() {
        return "activemq:queue:dest";
    }

}

The problem I am seeing is that the messages seems to be consumed by a single actor and not getting distributed across the pool. Do I need to create a separate camel route to distribute ? I would also like to distribute the processing across different physical nodes. Appreciate your inputs and best practices.

Was it helpful?

Solution

Try setting concurrent consumers on the AMQ endpoint

return "activemq:queue:dest?concurrentConsumers=50";

.. or enabled async consumer on the endpoint

return "activemq:queue:dest?asyncConsumer=true";

Also it seems you got the version information wrong. There is no Camel version 5.8. I assume that is the AMQ version.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top