Frage

I have two Mule instances subscribed to the same topic on a queue but I only want each message consumed once.

I can achieve this by funneling messages to unique queue and processing from there, but to reduce the operational complexity I want to set up the message consumer flow running on each Mule instance to defer to one of the instances.

This would be akin to an ActiveMQ failover setup (where only one instance is running at a time and idle instances only awaken when the running instance fails to respond) or a master/slave arrangement where I would grant one of the instances command over the others. Or like a VM transport that is inter-instance instead of intra-instance.

This would need to be done without any Mule Enterprise Edition components (relying only upon Mule Community Edition components) using Mule versions 3.4. or 3.5.

War es hilfreich?

Lösung

I was unable to find a convenient built-in way to do this. Instead I assume that each mule instance will run on a separate box and use the server.host value to determine which instance does the processing:

<mule xmlns:redis="http://www.mulesoft.org/schema/mule/redis" 
    xmlns="http://www.mulesoft.org/schema/mule/core"  
    version="CE-3.5.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.mulesoft.org/schema/mule/redis http://www.mulesoft.org/schema/mule/redis/3.4/mule-redis.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd">

    <!-- define redis instance -->
    <redis:config name="redis-instance" />

    <flow name="topicConsumer">
        <!-- listen to redis channel (topic) -->
        <redis:subscribe config-ref="redis-instance">
            <redis:channels>
                <redis:channel>topic.channel</redis:channel>
            </redis:channels>
        </redis:subscribe>

        <!-- save original payload (message from Redis) -->
        <set-session-variable variableName="redisPayload" value="#[payload]" />

        <!-- select processor -->
        <flow-ref name="topicProcessorSelector"/>

        <choice>
            <when expression="#[sessionVars['subscriberProcessor'] == server.host]">
                <logger level="INFO" message="processing on #[server.host]"/>
            </when>
            <otherwise>
                <logger level="INFO" message="take no action"/>
            </otherwise>
        </choice>
    </flow>

    <flow name="topicProcessorSelector" processingStrategy="synchronous">
        <!-- get key -->
        <redis:get config-ref="redis-instance" 
            key="topic_processor"/>

        <!-- if no key, then add this instance as the processor -->
        <choice>
            <when expression="#[payload instanceof org.mule.transport.NullPayload]">
                <!-- set key -->
                <redis:set config-ref="redis-instance" 
                    key="topic_processor"
                    expire="10"
                    value="#[server.host]">
                </redis:set>
                <set-session-variable variableName="subscriberProcessor" value="#[server.host]" />
            </when>
            <otherwise>
                <!-- use existing key -->
                <byte-array-to-string-transformer/>
                <set-session-variable variableName="subscriberProcessor" value="#[payload]" />
            </otherwise>
        </choice>
    </flow>
</mule>
Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top