Question

I have a integration flow that seems like this

a). a poller dropping messages in a queued channel

b). a service activator picking up messages from the queue and I have specified an executor on the poller for this service activator

c). the service activator sends the processed messages to a publish subscriber channel

d). the publish subscribe channel then has multiple consumers which receive the message for further processing

okay so what I am wondering is what role does the executor plays in this flow

a) so my service activator that polls from the queue has an executor which lets say has a fixed pool of 10 threads and lets say I have max-messages-per-poll as 5 so im assuming 5 of new messages will be processed in one go by 5 separate threads from the pool.

b) assuming thats correct then what happens when the 5 different messages reach the publish-subscriber channel. Lets say this pub-sub channel has 3 subscribers .. does that means internally 3 new threads will be spawned to asynchronously pass the incoming message to the 3 different subscribers .. so that the subscribers/consumers can do their processing of the message in parallel. I guess at this point it gets a little fuzzy on how things are gonna be processed. basically I guess im wondering is if the consumers are provided the message and the consumers chains run in parallel .. and if thats the case where is the executor for that

any comments would be appreciated Thanks

++++++++++++++++++++++++++++++

Based on below input I was trying this out

 <int:service-activator 
                    input-channel="filesIn"
                    output-channel="readyFiles"
                    ref="handler">
        <int:poller fixed-delay="3000" max-messages-per-poll="3" />
    </int:service-activator>

    <int:publish-subscribe-channel id="readyFiles" task-executor="executor">            
    </int:publish-subscribe-channel>

    <int:service-activator 
                            id="consumer1"
                            input-channel="readyFiles"
                            ref="handler"
                            />

    <int:service-activator 
                            id="consumer2"
                            input-channel="readyFiles"
                            ref="handler"
                            />

    <int:service-activator 
                            id="consumer3"
                            input-channel="readyFiles"
                            ref="handler"
                            />

    <task:executor id="executor" pool-size="10" rejection-policy="CALLER_RUNS"/>

my handler simply slows things down a bit

public void handle(File file) throws InterruptedException{
    log.debug(Thread.currentThread().getName() + " executing for file " + file.getName() + " ...");
    Thread.sleep(3000);
    log.debug(Thread.currentThread().getName() + " completed for file " + file.getName());
}

because I am using a task executor on the pub-sub channel i was expecting that all 3 consumes should be started together but instead the result is sequential invocation

output

18:44:11.000 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 executing for file 1.txt ...
18:44:14.001 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 completed for file 1.txt
18:44:14.002 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 executing for file 2.txt ...
18:44:17.002 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 completed for file 2.txt
18:44:17.002 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 executing for file 3.txt ...
18:44:20.002 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 completed for file 3.txt

each subscriber is invoked after the previous one has completed.

Was it helpful?

Solution

<publish-subscribe-channel> has task-executor option to initiate handler invocation within threads of that Executor. That means that your subscribers might process the message in parallel. But, of course, there is no guaranty, as it depends on Executor nature and state.

I think this snapshot from source code (BroadcastingDispatcher) should help you:

for (final MessageHandler handler : handlers) {
    if (this.executor != null) {
        this.executor.execute(new Runnable() {
            @Override
            public void run() {
                invokeHandler(handler, messageToSend);
            }
        });
        dispatched++;
    }
    else {
        if (this.invokeHandler(handler, messageToSend)) {
            dispatched++;
        }
    }
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top