سؤال

I have the following route:

<route id="my-aggregator">
    <from uri="direct:aggregator" />

    <aggregate strategyRef="myAggregationStrategy" completionSize="3">
        <correlationExpression>
            <simple>${header.id}</simple>
        </correlationExpression>
        <to uri="bean:payloadProcessor?method=process" />
    </aggregate>
</route>

As you can see, it waits until it has received 3 exchanges that all have a matching header.id value. Once it receives 3 such messages, the following code executes:

public class MyAggregationStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange e1, Exchange e2) {
        // The first (of 3) message will have a `List<Widget>` on its body. Extract it here.
        List<Widget> redWidgets = ??? // e1.getIn().getBody()? e2.getIn().getBody()? something else?

        // The second (of 3) message will have a `List<Widget>` on its body. Extract it here.
        List<Widget> blueWidgets = ??? // e1.getIn().getBody()? e2.getIn().getBody()? something else?

        // The third (of 3) message will have a `List<Widget>` on its body. Extract it here.
        List<Widget> greenWidgets = ??? // e1.getIn().getBody()? e2.getIn().getBody()? something else?

        WidgetPayload payload = new WidgetPayload(redWidgets, blueWidgets, greenWidgets);

        // Which "output" exchange do I set payload to, so that it gets routed on to the
        // "payloadProcessor" bean?
        e1.getOut().setBody(payload);   /* or */    e2.getOut().setBody(payload);

        // What do we even return?
        return e1; /* or return e2; */
    }
}

So in the aggregator, I'm trying to get access to each List<Widget> that was sent to the aggregator by various routes, combine them into a WidgetPayload instance, and then set the WidgetPayload as the outbound/return value of the outbound exchange. But I can't figure out the correct way to do all this. Specifically:

  1. How to extract each of the 3 List<Widget> from e1 and e2?
  2. What do e1 and e2 even represent? In-Out? New/Old? Something else?
  3. What method calls on e1/e2 do I make so that payload is forwarded on to the payloadProcessor bean?
هل كانت مفيدة؟

المحلول

Look at the code example the page I mentioned; the first argument is the aggregated exchange. It is null on the first invocation of an aggregation. You have to create your WidgetPayload on the first invocation and then keep adding widgets to it with every sub-sequent invocation. Then, when you get to the route step after the aggregator, the body of the exchange is your aggregated WidgetPayload.

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top