Question

I have an apache camel route which is processing a POJO on the exchange body.

Please look at the sequences of lines marked from 1 to 3.

    from("direct:foo")
        .to("direct:doSomething")         // 1 (POJO on the exchange body)
        .to("direct:storeInHazelcast")    // 2 (destroys my pojo! it gets -1)
        .to("direct:doSomethingElse")     // 3 (Where is my POJO??)
    ;

Now I need to use the put operation on the hazelcast component which unfortunately needs to set the body to the value -1.

    from("direct:storeInHazelcast")
            .setBody(constant(-1))
            .setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.PUT_OPERATION))
            .setHeader(HazelcastConstants.OBJECT_ID, constant(LAST_FLIGHT_UPDATE_SEQ))
            .to("hazelcast:map:MyNumber")
    ;

For the line marked 2, I would like to send a COPY of the exchange to the storeInHazelcast route.

Firstly, I tried .multicast(), but the exchange body was still screwed up (to -1).

        // shouldnt this copy the exchange?
        .multicast().to("direct:storeInHazelcast").end()

Then I tried .wireTap(), which works as a "fire and forget" (async) mode, but I actually need it to block, and wait for it to complete. Can you make wireTap block?

        // this works but I need it to be sync processing (not async)
        .wireTap("direct:storeInHazelcast").end()

So I'm looking for some tips here. As far as I can read, multicast() should have copied the exchange, but the setBody() in my storeInHazelcast route seens to screw up the original exchange.

Alternatively maybe there is some other way to do this.

Thanks in advance. Camel 2.10

Was it helpful?

Solution

I think I have stumbled on to the answer, line 2 can use enrich() from the dsl like this:

    .enrich("direct:storeInHazelcast", new KeepOriginalAggregationStrategy())

where:

public class KeepOriginalAggregationStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        return oldExchange;
    }
}

Interestingly, I found an aggregation strategy named UseOriginalAggregationStrategy(), but I couldn't see how to specify the parameter named Exchange original from the DSL.

    .enrich("direct:storeInHazelcast",
        new UseOriginalAggregationStrategy(???, false))

In absence of some sort of getExchange() method in the dsl, I can't see how to use this aggregation strategy here (but if anyone can advise how, please do).

OTHER TIPS

You can do without writing your own aggregation strategy by using

.enrich("direct:storeInHazelcast", AggregationStrategies.useOriginal())

Save it in a header and restore it.

from("direct:foo")
    .to("direct:doSomething")         // 1 (POJO on the exchange body)
    .setHeader("old_body", body())    // save body
    .to("direct:storeInHazelcast")    // 2 (destroys my pojo! it gets -1)
    .setBody(header("old_body"))      // RESTORE the body
    .removeHeader("old_body")         // cleanup header
    .to("direct:doSomethingElse")     // 3 (Where is my POJO??)
;

This is a fairly common paradigm for destructive components.

I also had this requirement (to perform synchronous, in-only processing on another route), and to achieve it I wrote a custom Processor that programmatically sends a copy of the Exchange. I think this results in a nicer DSL, in which the semantics at the point of use are clearer than by using enrich.

This static helper method creates the processor:

public static Processor synchronousWireTap(String uri) {
    return exchange -> {
        Exchange copy = exchange.copy();

        exchange.getContext().createProducerTemplate().send(uri,copy);

        //ProducerTemplate.send(String,Exchange) does not, unlike other send methods, rethrow an exception
        //on the exchange. We want any unhandled exception to be rethrown, so we must do so here.
        Throwable thrown = copy.getException(Throwable.class);

        if (thrown != null) {
            throw new CamelExecutionException(thrown.getMessage(), exchange, thrown);
        }
    };
}

And here's an example of use:

from("direct:foo")
    .to("direct:doSomething")                               // 1 (POJO on the exchange body)
    .process(synchronousWireTap("direct:storeInHazelcast")) // 2 (Does not destroy POJO because a copy of the exchange gets sent to this uri)
    .to("direct:doSomethingElse")                           // 3 (POJO is still there)

Note that this custom processor is not quite a synchronous analogue of the standard wireTap(), which is fully in-only, in that this processor rethrows any unhandled exception that occurs on the target route - but the message itself is left untouched. This was my requirement, since what I wanted to do was perform some other processing synchronously on another route, and be notified if that failed, but otherwise not have the message on my main route be affected (sort of the equivalent of calling a void method in procedural code).

You can use the copy="true" option in wiretap to copy the exchange as mentioned in http://camel.apache.org/wire-tap.html or you can create your own processor to do the same.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top