Here is a simple configuration that shows one way to make a fork/join with two HTTP outbound endpoints. To add a third endpoint, set MULE_CORRELATION_GROUP_SIZE
to 3
and the MULE_CORRELATION_SEQUENCE
of the third async flow-ref
to 3
.
<flow name="fork">
<vm:inbound-endpoint path="fork.in" />
<set-property propertyName="MULE_CORRELATION_GROUP_SIZE"
value="2" />
<all enableCorrelation="IF_NOT_SET">
<async>
<set-property propertyName="MULE_CORRELATION_SEQUENCE"
value="1" />
<flow-ref name="parallel1" />
</async>
<async>
<set-property propertyName="MULE_CORRELATION_SEQUENCE"
value="2" />
<flow-ref name="parallel2" />
</async>
</all>
</flow>
<sub-flow name="parallel1">
<logger level="INFO" message="parallel1: processing started" />
<http:outbound-endpoint address="..."
exchange-pattern="request-response" />
<logger level="INFO" message="parallel1: processing finished" />
<flow-ref name="join" />
</sub-flow>
<sub-flow name="parallel2">
<logger level="INFO" message="parallel2: processing started" />
<http:outbound-endpoint address="..."
exchange-pattern="request-response" />
<logger level="INFO" message="parallel2: processing finished" />
<flow-ref name="join" />
</sub-flow>
<sub-flow name="join">
<collection-aggregator timeout="6000"
failOnTimeout="true" />
<combine-collections-transformer />
<logger level="INFO"
message="Continuing processing of: #[message.payloadAs(java.lang.String)]" />
</sub-flow>
EDIT: In the above config, the aggregator times out after 6 seconds. This is potentially too short for your actual use case: increase as you see fit. Also it is set to fail on time-out, which is maybe not the behaviour you desire in case not all the outbound HTTP endpoint interactions succeeded: it's up to you to decide based on your use case.