If I understand you correctly you need to use this one: DefaultTransactionSynchronizationFactory
And here is a snapshot how to configure it:
SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor =
new ExpressionEvaluatingTransactionSynchronizationProcessor();
syncProcessor.setBeanFactory(mock(BeanFactory.class));
PollableChannel queueChannel = new QueueChannel();
syncProcessor.setBeforeCommitExpression(new SpelExpressionParser().parseExpression("#bix"));
syncProcessor.setBeforeCommitChannel(queueChannel);
syncProcessor.setAfterCommitChannel(queueChannel);
syncProcessor.setAfterCommitExpression(new SpelExpressionParser().parseExpression("#baz"));
DefaultTransactionSynchronizationFactory syncFactory =
new DefaultTransactionSynchronizationFactory(syncProcessor);
adapter.setTransactionSynchronizationFactory(syncFactory);
Transaction boundaries are covered in the SourcePollingChannelAdapter#adviceChain
, so it should be configure like this:
TransactionInterceptor txAdvice =
new TransactionInterceptor(transactionManager,
new MatchAlwaysTransactionAttributeSource(new DefaultTransactionAttribute()));
adapter.setAdviceChain(Collections.singletonList(txAdvice));
So, now each 'poll' will be wrapped with transaction and your syncFactory
will do the stuff.