Question

I'm trying to use Spring Integration to receive MQTT messages and process them in custom code. Due to other dependencies, I have to use spring-integration-mqtt.1.0.0.M1.

For Testing, I use the this broker: tcp://broker.mqttdashboard.com:1883 it has a nice interface for sending messages.

When my application recveives a message, I get the following logs in the console:

DEBUG 17:46:16 org.springframework.integration.channel.DirectChannel : preSend on     channel 'startCase', message: [Payload=hellas][Headers={timestamp=1395420376914,     id=5adcb1cf-094e-c593-60d1-9b12f82736b6, mqtt_qos=1, mqtt_topic=edw1, mqtt_retained=false,     mqtt_duplicate=false}]
DEBUG 17:46:16 org.springframework.integration.channel.DirectChannel : preSend on channel 'logger', message: [Payload=hellas][Headers={timestamp=1395420376914, id=5adcb1cf-094e-c593-60d1-9b12f82736b6, mqtt_qos=1, mqtt_topic=edw1, mqtt_retained=false, mqtt_duplicate=false}]
DEBUG 17:46:16 org.springframework.integration.handler.LoggingHandler : org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=hellas][Headers={timestamp=1395420376914, id=5adcb1cf-094e-c593-60d1-9b12f82736b6, mqtt_qos=1, mqtt_topic=edw1, mqtt_retained=false, mqtt_duplicate=false}]
DEBUG 17:46:16 org.springframework.integration.handler.LoggingHandler : hellas
DEBUG 17:46:16 org.springframework.integration.channel.DirectChannel : postSend (sent=true) on channel 'logger', message: [Payload=hellas][Headers={timestamp=1395420376914, id=5adcb1cf-094e-c593-60d1-9b12f82736b6, mqtt_qos=1, mqtt_topic=edw1, mqtt_retained=false, mqtt_duplicate=false}]
DEBUG 17:46:16 org.springframework.integration.handler.ServiceActivatingHandler : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@25669ccf] received message: [Payload=hellas][Headers={timestamp=1395420376914, id=5adcb1cf-094e-c593-60d1-9b12f82736b6, mqtt_qos=1, mqtt_topic=edw1, mqtt_retained=false, mqtt_duplicate=false}]
ERROR 17:46:16 org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter : Lost connection:MqttException; retrying...
DEBUG 17:46:16 org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter : Attempting reconnect
DEBUG 17:46:17 org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter : Connected and subscribed to [Ljava.lang.String;@7fd6b8ce

Nothing else. So because of the first line, I assume, the message is received somehow 'hellas' is the content of the payload of the message I send.

Here is my spring-integration configuration:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:int="http://www.springframework.org/schema/integration"
   xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt"
   xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/mqtt http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    ">

<import resource="customer-propertyPlaceholder-config.xml" />


<bean id="clientFactory"
      class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
    <property name="userName" value="${mqtt.username}"/>
    <property name="password" value="${mqtt.password}"/>
</bean>

<!-- intercept and log every message -->
<int:logging-channel-adapter id="logger" level="ERROR"/>
<int:wire-tap channel="logger"/>

<int-mqtt:message-driven-channel-adapter id="startCaseAdapter"
                                         client-id="${mqtt.clientId}"
                                         url="${mqtt.url}"
                                         topics="${mqtt.topic}"
                                         channel="startCase" auto-startup="true" />
<int:channel id="startCase" />


<int:service-activator id="startCaseService" input-channel="startCase" ref="mqttCaseService" method="startCase" />

<bean id="mqttCaseService" class="foo.bar.customer.mqtt.MqttCaseService" />

So I'd expect that after receiving a message, MqttCaseService#startCase would be called. But this never happens, the breakpoint at the beginning of this method is never touched.

After having seen the DEBUG output, maybe it's helpful to add the service I want to call.

public interface MqttCaseService {

public void startCase(MqttMessage message);

}
Was it helpful?

Solution 2

I just ran a test to that broker and had no problems. But I am using the latest version (4.0.0.M3) - the mqtt module has been moved to Spring Integration proper.

Given that you are getting the log message, it means the adapter got the message ok, so I am guessing you are getting some kind of exception invoking the service activator. DEBUG logging should help you track it down.

However, when I force an error, I get the same message as you and no stack trace, so that's a bug - we should log the exception; I have opened a JIRA issue.

The fix (failure to log the error) is available in the 1.0.0 release candidate (1.0.0.RC1 or Spring Integration 3.0.x and 2.2.x) and 4.0.0.BUILD-SNAPSHOT (4.0.0.M4 is due soon).

You should be able to find the exception by putting a breakpoint in the catch clause in MesageProducerSupport.sendMessage().

OTHER TIPS

I encountered this problem with spring-integration-mqtt-4.3.8.RELEASE. Every time I received a message, I would get the error message:

org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter
: Lost connection:MqttException; retrying...`

Turning debug on helped track down the actual problem by showing me an exception and stack trace that was not being handled by my own code. My message handler was trying to parse the content of the message and throwing an exception that I was not catching - the exception was then passed into the spring mqtt libraries which killed the connection and opened it again.

The really annoying thing was that I would receive the same message (with retains value true) repeatedly each time it would reconnect.

Handling all exceptions in my own code fixed the problem.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top