Frage

I currently have the following Camel route:

<camelContext id="my-camel-context" xmlns="http://camel.apache.org/schema/spring">
    <propertyPlaceholder id="envProps" location="classpath:myapp.properties" />
    <route id="my-camel-route"> 
        <from uri="{{start.uri}}"/>

        <setHeader headerName="id">
            <constant>1</constant>
        </setHeader>

        <to uri="bean:preProcessor?method=process" />

        <aggregate strategyRef="myAggregationStrategy" completionSize="1">
            <correlationExpression> 
                <simple>${header.id} == 1</simple> 
            </correlationExpression>
            <to uri="bean:postProcessor?method=process" /> 
        </aggregate> 

        <to uri="bean:mailer?method=process" /> 
    </route> 
</camelContext>

<bean id="myAggregationStrategy" class="com.me.myapp.MyAggregationStrategy" />
<bean id="postProcessor" class="com.me.myapp.PostProcessor" />
<bean id="mailer" class="com.me.myapp.Mailer" />

For now, I'm not really aggregating anything meaningful (completionSize=1), I'm really just testing AggregationStrategy out. Here's my strategy:

public class MyAggregationStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange aggregatingExchange, Exchange incomingExchange) {
        AppPayload payload = null;

        if(aggregatingExchange == null)
            payload = new AppPayload(); // This should prevent it from being NULL below in PostProcessor...
        else
            payload = (AppPayload)incomingExchange.getIn().getBody();

        payload.setCargo((Order)incomingExchange.getIn().getBody());

        if(aggregatingExchange == null) {
            incomingExchange.getIn().setBody(payload);
            return incomingExchange;
        }
        else
            return aggregatingExchange;
    }
}

And also my postProcessor bean:

public class PostProcessor implement Processor {
    @Override
    public void process(Exchange exchange) {
        try {
            System.out.println("In PostProcessor...");
            AppPayload payload = (AppPayload)exchange.getIn().getBody();
            System.out.println("\t...payload acquired...");

            if(payload == null)
                System.out.println("Payload is NULL.");
        } catch(Throwable throwable) {
            System.out.println(ExceptionUtils.getFullStackTrace(throwable));
        }
    }
}

When I run this code, I see log messages from my preProcessor bean that inidcate it is executing correctly. And I also see that MyAggregationStrategy is correctly "aggregating" the message and then letting it pass on to postProcessor after the 1st message arrives (again, because completionSize=1). However, I'm getting the following output in postProcessor:

In PostProcessor...
    ...payload acquired...
Payload is NULL.

Can anyone see why payload would be NULL? Shouldn't it have been initialized up inside MyAggregationStrategy?!? I'm happy to post more code, but I believe it stems from me using the AggregationStrategy API incorrectly.

War es hilfreich?

Lösung

I believe you are getting confused with aggregatingExchange and incomingExchange. Can you try this:

public class MyAggregationStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange aggregatingExchange, Exchange incomingExchange) {
        AppPayload payload = null;

        if(aggregatingExchange == null) {
        payload = new AppPayload(); // This should prevent it from being NULL below in PostProcessor...
        } else {
            payload = (AppPayload)aggregatingExchange.getIn().getBody();
        }

        payload.setCargo((Order)incomingExchange.getIn().getBody());

        if(aggregatingExchange == null) {
            incomingExchange.getIn().setBody(payload);
            return incomingExchange;
        } else {
            return aggregatingExchange;
        }
    }
}

Andere Tipps

Adding to what @hveiga already mentioned. I had a similar issue which I resolved by adding header to my messages. However in your case I see that you are not using splitter and that you already have a header defined. So a piece of information that got from Clauss Ibssen was that the firs time exchange would be empty and we need to check for the null object.

See this for more explanation - Apache Camel - Split and aggregate - Old Exchange is always null

Track the complete explanation here - http://camel.465427.n5.nabble.com/Split-and-Aggregate-Old-Exchange-is-null-everytime-in-AggregationStrategy-td5746365.html

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top