Question

I am working on a project and have decided to use Camel and ActiveMQ. I am attempting to create a route using Java and MQTT endpoints. Within this route I have also incorporated a Processor. This is what my route looks like:

from("mqtt:test?subscribeTopicName=zaq.avila.send")
            //.process(new RestProcessor())
            .to("mqtt:test?publishTopicName=zaq.avila.receive");

From my understanding, the route is consuming from zaq/avila/send, a processor is applied and then the message is published to zaq/avila/receive. The to() part does not appear to be happening, when I check the console, I see that the processor executes though no message is published to zaq/avila/receive. Also, within the web console I see that the messages in zaq/avila/send for enqueued and dequeued increment even when I only pusblished one message. In addition, if I shutdown ActiveMQ I get the following:

 INFO | Waiting as there are still 1 inflight and pending exchanges to complete,
 timeout in 7 seconds.

Also:

  WARN | Error occurred while shutting down service: Endpoint[mqtt://test?publish
 TopicName=zaq.avila.receive]. This exception will be ignored.
 java.lang.NullPointerException

These exception make me wonder that the exchange is not completing and something is missing. I need help!

Was it helpful?

Solution 2

This may not necessarily be the best answer, however, it works.

from("mqtt:test?subscribeTopicName=zaq.avila.send")
        .process(new RestProcessor())
        .to("jms:topic:zaq.avila.receive");

According to ActiveMQ Doc

MQTT messages are transformed into an JMS ByteMessage. Conversely, the body of any JMS Message is converted to a byte buffer to be the payload of an MQTT message.

I was able to publish an mqtt message to a topic, apply a processor and receive the modified message as an mqtt message even though the specified endpoint is JMS.

If anyone can think of any possible downfalls I would gladly appreciate hearing from you. In my opinion, this removes the need to publish messages as MQTT.

OTHER TIPS

Have a look into Camel MQTT component documentation. There is note that this component can be only used for consuming messages if I understand it correctly.

Note: The component currently only supports polling (consuming) feeds.

It's quite weird. I'll investigate further.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top