I'm trying to use Spring Integration to receive MQTT messages and process them in custom code. Due to other dependencies, I have to use spring-integration-mqtt.1.0.0.M1
.
For Testing, I use the this broker: tcp://broker.mqttdashboard.com:1883
it has a nice interface for sending messages.
When my application recveives a message, I get the following logs in the console:
DEBUG 17:46:16 org.springframework.integration.channel.DirectChannel : preSend on channel 'startCase', message: [Payload=hellas][Headers={timestamp=1395420376914, id=5adcb1cf-094e-c593-60d1-9b12f82736b6, mqtt_qos=1, mqtt_topic=edw1, mqtt_retained=false, mqtt_duplicate=false}]
DEBUG 17:46:16 org.springframework.integration.channel.DirectChannel : preSend on channel 'logger', message: [Payload=hellas][Headers={timestamp=1395420376914, id=5adcb1cf-094e-c593-60d1-9b12f82736b6, mqtt_qos=1, mqtt_topic=edw1, mqtt_retained=false, mqtt_duplicate=false}]
DEBUG 17:46:16 org.springframework.integration.handler.LoggingHandler : org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=hellas][Headers={timestamp=1395420376914, id=5adcb1cf-094e-c593-60d1-9b12f82736b6, mqtt_qos=1, mqtt_topic=edw1, mqtt_retained=false, mqtt_duplicate=false}]
DEBUG 17:46:16 org.springframework.integration.handler.LoggingHandler : hellas
DEBUG 17:46:16 org.springframework.integration.channel.DirectChannel : postSend (sent=true) on channel 'logger', message: [Payload=hellas][Headers={timestamp=1395420376914, id=5adcb1cf-094e-c593-60d1-9b12f82736b6, mqtt_qos=1, mqtt_topic=edw1, mqtt_retained=false, mqtt_duplicate=false}]
DEBUG 17:46:16 org.springframework.integration.handler.ServiceActivatingHandler : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@25669ccf] received message: [Payload=hellas][Headers={timestamp=1395420376914, id=5adcb1cf-094e-c593-60d1-9b12f82736b6, mqtt_qos=1, mqtt_topic=edw1, mqtt_retained=false, mqtt_duplicate=false}]
ERROR 17:46:16 org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter : Lost connection:MqttException; retrying...
DEBUG 17:46:16 org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter : Attempting reconnect
DEBUG 17:46:17 org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter : Connected and subscribed to [Ljava.lang.String;@7fd6b8ce
Nothing else. So because of the first line, I assume, the message is received somehow 'hellas' is the content of the payload of the message I send.
Here is my spring-integration configuration:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/mqtt http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
">
<import resource="customer-propertyPlaceholder-config.xml" />
<bean id="clientFactory"
class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
<property name="userName" value="${mqtt.username}"/>
<property name="password" value="${mqtt.password}"/>
</bean>
<!-- intercept and log every message -->
<int:logging-channel-adapter id="logger" level="ERROR"/>
<int:wire-tap channel="logger"/>
<int-mqtt:message-driven-channel-adapter id="startCaseAdapter"
client-id="${mqtt.clientId}"
url="${mqtt.url}"
topics="${mqtt.topic}"
channel="startCase" auto-startup="true" />
<int:channel id="startCase" />
<int:service-activator id="startCaseService" input-channel="startCase" ref="mqttCaseService" method="startCase" />
<bean id="mqttCaseService" class="foo.bar.customer.mqtt.MqttCaseService" />
So I'd expect that after receiving a message, MqttCaseService#startCase
would be called. But this never happens, the breakpoint at the beginning of this method is never touched.
After having seen the DEBUG output, maybe it's helpful to add the service I want to call.
public interface MqttCaseService {
public void startCase(MqttMessage message);
}