My use case requires to enrich my input with smne data and send it to an outbound endpoint.

The data for enriching is obtained by making calls to two web-services and then extract the data from the reply. This extracted data is enriched into my input XML and sent to an outbound endpoint.

The two web-service calls that I need to make needs to be parallel, as they don't have dependency on another. This way I could save my processing time.

Please suggest how I could achieve this parallel processing in a flow in Mule.

Note: I have tried using ALL flow control, but it seems that is calling the web-services (sub-flows) sequentially.

Given below is my abstract flow.

<flow name="mainFlow">
    <inbound-endpoint> .....

    <some validation>

    <setting some flow variables>

    <!-- Now make calls to the sub-flows which has some processing of the input and make some web-service calls -->
    <all>
        <flow-ref name="myFlow1" />
        <flow-ref name="myFlow2" />
        <flow-ref name="myFlow3" />
    </all>

    <enrich the input with the data obtained from the output of the above three flows>

    <outbound-endpoint>
</flow>



<flow name="myFlow1">
    <some transformer to transform the payload provided >

    < the tran sformed payload is passed as input to the web-service call>

    <http:outbound-endpoint ...>

    <transform the reply from the web-service call>
</flow>



<flow name="myFlow2">
    <some transformer to transform the payload provided >

    < the tran sformed payload is passed as input to the web-service call>

    <http:outbound-endpoint ...>

    <transform the reply from the web-service call>
</flow>



<flow name="myFlow3">
    <some transformer to transform the payload provided to it>

    < the tran sformed payload is passed as input to the web-service call>

    <http:outbound-endpoint ...>

    <transform the reply from the web-service call>
</flow>
有帮助吗?

解决方案

Here is a simple configuration that shows one way to make a fork/join with two HTTP outbound endpoints. To add a third endpoint, set MULE_CORRELATION_GROUP_SIZE to 3 and the MULE_CORRELATION_SEQUENCE of the third async flow-ref to 3.

<flow name="fork">
    <vm:inbound-endpoint path="fork.in" />
    <set-property propertyName="MULE_CORRELATION_GROUP_SIZE"
        value="2" />
    <all enableCorrelation="IF_NOT_SET">
        <async>
            <set-property propertyName="MULE_CORRELATION_SEQUENCE"
                value="1" />
            <flow-ref name="parallel1" />
        </async>
        <async>
            <set-property propertyName="MULE_CORRELATION_SEQUENCE"
                value="2" />
            <flow-ref name="parallel2" />
        </async>
    </all>
</flow>

<sub-flow name="parallel1">
    <logger level="INFO" message="parallel1: processing started" />
    <http:outbound-endpoint address="..."
        exchange-pattern="request-response" />
    <logger level="INFO" message="parallel1: processing finished" />
    <flow-ref name="join" />
</sub-flow>

<sub-flow name="parallel2">
    <logger level="INFO" message="parallel2: processing started" />
    <http:outbound-endpoint address="..."
        exchange-pattern="request-response" />
    <logger level="INFO" message="parallel2: processing finished" />
    <flow-ref name="join" />
</sub-flow>

<sub-flow name="join">
    <collection-aggregator timeout="6000"
        failOnTimeout="true" />
    <combine-collections-transformer />
    <logger level="INFO"
        message="Continuing processing of: #[message.payloadAs(java.lang.String)]" />
</sub-flow>

EDIT: In the above config, the aggregator times out after 6 seconds. This is potentially too short for your actual use case: increase as you see fit. Also it is set to fail on time-out, which is maybe not the behaviour you desire in case not all the outbound HTTP endpoint interactions succeeded: it's up to you to decide based on your use case.

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top