Question

My use case requires to enrich my input with smne data and send it to an outbound endpoint.

The data for enriching is obtained by making calls to two web-services and then extract the data from the reply. This extracted data is enriched into my input XML and sent to an outbound endpoint.

The two web-service calls that I need to make needs to be parallel, as they don't have dependency on another. This way I could save my processing time.

Please suggest how I could achieve this parallel processing in a flow in Mule.

Note: I have tried using ALL flow control, but it seems that is calling the web-services (sub-flows) sequentially.

Given below is my abstract flow.

<flow name="mainFlow">
    <inbound-endpoint> .....

    <some validation>

    <setting some flow variables>

    <!-- Now make calls to the sub-flows which has some processing of the input and make some web-service calls -->
    <all>
        <flow-ref name="myFlow1" />
        <flow-ref name="myFlow2" />
        <flow-ref name="myFlow3" />
    </all>

    <enrich the input with the data obtained from the output of the above three flows>

    <outbound-endpoint>
</flow>



<flow name="myFlow1">
    <some transformer to transform the payload provided >

    < the tran sformed payload is passed as input to the web-service call>

    <http:outbound-endpoint ...>

    <transform the reply from the web-service call>
</flow>



<flow name="myFlow2">
    <some transformer to transform the payload provided >

    < the tran sformed payload is passed as input to the web-service call>

    <http:outbound-endpoint ...>

    <transform the reply from the web-service call>
</flow>



<flow name="myFlow3">
    <some transformer to transform the payload provided to it>

    < the tran sformed payload is passed as input to the web-service call>

    <http:outbound-endpoint ...>

    <transform the reply from the web-service call>
</flow>
Was it helpful?

Solution

Here is a simple configuration that shows one way to make a fork/join with two HTTP outbound endpoints. To add a third endpoint, set MULE_CORRELATION_GROUP_SIZE to 3 and the MULE_CORRELATION_SEQUENCE of the third async flow-ref to 3.

<flow name="fork">
    <vm:inbound-endpoint path="fork.in" />
    <set-property propertyName="MULE_CORRELATION_GROUP_SIZE"
        value="2" />
    <all enableCorrelation="IF_NOT_SET">
        <async>
            <set-property propertyName="MULE_CORRELATION_SEQUENCE"
                value="1" />
            <flow-ref name="parallel1" />
        </async>
        <async>
            <set-property propertyName="MULE_CORRELATION_SEQUENCE"
                value="2" />
            <flow-ref name="parallel2" />
        </async>
    </all>
</flow>

<sub-flow name="parallel1">
    <logger level="INFO" message="parallel1: processing started" />
    <http:outbound-endpoint address="..."
        exchange-pattern="request-response" />
    <logger level="INFO" message="parallel1: processing finished" />
    <flow-ref name="join" />
</sub-flow>

<sub-flow name="parallel2">
    <logger level="INFO" message="parallel2: processing started" />
    <http:outbound-endpoint address="..."
        exchange-pattern="request-response" />
    <logger level="INFO" message="parallel2: processing finished" />
    <flow-ref name="join" />
</sub-flow>

<sub-flow name="join">
    <collection-aggregator timeout="6000"
        failOnTimeout="true" />
    <combine-collections-transformer />
    <logger level="INFO"
        message="Continuing processing of: #[message.payloadAs(java.lang.String)]" />
</sub-flow>

EDIT: In the above config, the aggregator times out after 6 seconds. This is potentially too short for your actual use case: increase as you see fit. Also it is set to fail on time-out, which is maybe not the behaviour you desire in case not all the outbound HTTP endpoint interactions succeeded: it's up to you to decide based on your use case.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top