I was unable to find a convenient built-in way to do this. Instead I assume that each mule instance will run on a separate box and use the server.host
value to determine which instance does the processing:
<mule xmlns:redis="http://www.mulesoft.org/schema/mule/redis"
xmlns="http://www.mulesoft.org/schema/mule/core"
version="CE-3.5.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.mulesoft.org/schema/mule/redis http://www.mulesoft.org/schema/mule/redis/3.4/mule-redis.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd">
<!-- define redis instance -->
<redis:config name="redis-instance" />
<flow name="topicConsumer">
<!-- listen to redis channel (topic) -->
<redis:subscribe config-ref="redis-instance">
<redis:channels>
<redis:channel>topic.channel</redis:channel>
</redis:channels>
</redis:subscribe>
<!-- save original payload (message from Redis) -->
<set-session-variable variableName="redisPayload" value="#[payload]" />
<!-- select processor -->
<flow-ref name="topicProcessorSelector"/>
<choice>
<when expression="#[sessionVars['subscriberProcessor'] == server.host]">
<logger level="INFO" message="processing on #[server.host]"/>
</when>
<otherwise>
<logger level="INFO" message="take no action"/>
</otherwise>
</choice>
</flow>
<flow name="topicProcessorSelector" processingStrategy="synchronous">
<!-- get key -->
<redis:get config-ref="redis-instance"
key="topic_processor"/>
<!-- if no key, then add this instance as the processor -->
<choice>
<when expression="#[payload instanceof org.mule.transport.NullPayload]">
<!-- set key -->
<redis:set config-ref="redis-instance"
key="topic_processor"
expire="10"
value="#[server.host]">
</redis:set>
<set-session-variable variableName="subscriberProcessor" value="#[server.host]" />
</when>
<otherwise>
<!-- use existing key -->
<byte-array-to-string-transformer/>
<set-session-variable variableName="subscriberProcessor" value="#[payload]" />
</otherwise>
</choice>
</flow>
</mule>