Question

I had posted a question about how to handle logic for releasing groups of messages from an aggregator in another thread and got some helpful responses but I wasnt able to get the logic to work as I wanted it so I am gonna step back and ask a more basic general question in trying to understand how aggregators work.

Here is what I am trying to achieve. I want to send these messages to my spring integration flow where these messages are basically routed through an aggregator

in.send(MessageBuilder.withPayload("a1").setCorrelationId("1").build());
in.send(MessageBuilder.withPayload("b1").setCorrelationId("1").build());
in.send(MessageBuilder.withPayload("c1").setCorrelationId("1").build());

in.send(MessageBuilder.withPayload("a2").setCorrelationId("2").build());
in.send(MessageBuilder.withPayload("b2").setCorrelationId("2").build());
in.send(MessageBuilder.withPayload("c2").setCorrelationId("2").build());

in.send(MessageBuilder.withPayload("d1").setCorrelationId("1").build());
in.send(MessageBuilder.withPayload("d2").setCorrelationId("2").build());

in is the input channel to the flow. As you can see the first 3 messages belong to group 1 and next 3 messages belong to group 2. Now once i send these messages I expect all these 6 messages to be aggregated and then when I send the 7th message thats supposed to release all messages of group 1 so at this point the first 3 messages along with 7th message should be released while still keeping group 2 messages in the aggregator. Finally I send the 8th message which should release group 2 messages.

my release policy looks like this

public static class DReleasePolicy {
    public boolean release(List<String> messages){
        boolean release = false;
        for(String m : messages){
            if(m.startsWith("d")){
                release = true;
                break;
            }
        }
        return release;
    }
}

the flow looks like this

<int:channel id="in" />

<int:service-activator
             id="sa"
             input-channel="in"
             output-channel="logger"
             expression="payload" 
             />

<int:logging-channel-adapter 
             id="logger"
             logger-name="com.test"
             expression="'incmonig message => ' + payload" 
             />

<int:aggregator 
             id="aggregator"
             input-channel="logger"
             output-channel="buffered" 
             release-strategy="releaser"
             />

<int:channel id="buffered" />

<int:splitter 
             id="splitter"
             input-channel="buffered"
             output-channel="finallogger"
             />

<int:logging-channel-adapter
             id="finallogger"
             expression="'released from aggregator => ' + payload"
             />

So thats the expected behavior but in the output all i see is

[20:33:41:676] [main] INFO  com.test - incmonig message => a1 
[20:33:41:681] [main] INFO  com.test - incmonig message => c1 
[20:33:41:682] [main] INFO  com.test - incmonig message => b2 
[20:33:41:683] [main] INFO  com.test - incmonig message => d1 
[20:33:41:684] [main] INFO  com.test - released from aggregator => a2 
[20:33:41:685] [main] INFO  com.test - released from aggregator => c2 

so basically a2, b1,c2 and d1 didnt even came through and only some of the messages of group 2 were released. If I turn on debugging I see all the messaging coming through but only these ones, that are logged are the ones processed by the aggregator others simply arent so there isnt much in logging information.

I would appreciate any help in identifying the problem here.

Was it helpful?

Solution

The problem is you have two distinct subscribers to logger - the logging adapter and the aggregator.

When there is more than one subscriber to a point-to-point channel, the messages are round-robin distributed to the subscribers, so the aggregator is only going to get every other message.

a1 goes to the logger, a2 goes to the aggregator, a3 to the logger, etc.

Channel adapters that don't specify an input channel subscribe to a channel matching the id instead.

I didn't notice you had done this in your earlier question.

If you want to log everything going through a channel, add a wire-tap instead.

Alternatively, make logger a <publish-subscribe-channel/> instead; that way, both subscribers (logger and aggregator) will get all messages.

As I mentioned in my comments on the other question/answer, if you had looked at DEBUG logging you would have seen messages alternating between the logging handler and the aggregating handler.

As ever, with these kind of problems, DEBUG logging will often make it obvious what's happening.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top