Question

I'm working on a project that is built using the Spring Integration framework and atomikos for distributed transactions. Recently we've been trying to run integration tests to verify that messages are being sent through our system correctly. When executing one of these integration tests, I noticed that we are getting 10 log messages indicating a new transaction is being created and 10 log messages indicating a transaction commit.

Does Spring create new transactions every time a message is handed from channel to endpoint or vice versa?

The code below accepts messages on a message-driven-channel-adapter (using the transactionManager) and sends to a router. The router then sends the message to the chain which contains a transformer, service activator (with retryAdvice) and an outbound-message-adapter.

AFAIK, I should see one transaction created when our message-driven-channel-adapter receives the msg from the queue, and then a commit when it completes handling the message. So I thought there would be 3 transactions in total. One from sending the message in the test, one from the inbound adapter, and one from receiving the message in the test.

from spring-datasource.xml

<bean id="transactionManager"
      class="org.springframework.transaction.jta.JtaTransactionManager">
    <property name="transactionManager" ref="AtomikosTransactionManager" />
    <property name="userTransaction" ref="AtomikosUserTransaction" />
</bean>

from spring-context.xml

<jms:message-driven-channel-adapter
        id="inventoryQueue_inbound_adapter"
        destination-name="queue.inventory"
        channel="InventoryRouterChannel"
        error-channel="inventoryErrorChannel"
        transaction-manager="transactionManager"
        acknowledge="transacted"/>

<integration:router input-channel="InventoryRouterChannel">
    <bean class="com.inventory.InventoryRouter"/>
</integration:router>

<integration:chain input-channel="rxAddToCountWell">
    <integration:json-to-object-transformer type="com.events.RxAddToCountWell"/>
    <integration:service-activator ref="addToCountWellHandler" method="formatCountwellMessage">
        <integration:request-handler-advice-chain>
            <ref bean="retryAdvice"/>
        </integration:request-handler-advice-chain>
    </integration:service-activator>
    <jms:outbound-channel-adapter destination-name="OUTBOUND.QUEUE"/>
</integration:chain>

<bean id="retryAdvice" class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
    <property name="recoveryCallback">
        <bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer">
            <constructor-arg ref="inventoryErrorChannel"/>
        </bean>
    </property>
    <property name="retryTemplate">
        <bean class="org.springframework.retry.support.RetryTemplate">
            <property name="retryPolicy">
                <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
                    <property name="maxAttempts" value="2"/>
                </bean>
            </property>
            <property name="backOffPolicy">
                <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
                    <property name="initialInterval" value="1000"/>
                    <property name="multiplier" value="2"/>
                </bean>
            </property>
        </bean>
    </property>
</bean>

the message endpoint

@MessageEndpoint
public class AddToCountWellHandler {

    public static final Logger logger = Logger.getLogger(AddToCountWellHandler.class);

    public Message<String> formatCountwellMessage(RxAddToCountWell payload) {
        //our logic here
        //...
        return MessageBuilder.withPayload(temp).build();
    }
}

the test method

@Test
@DirtiesContext
public void addToCountWellIntegrationTest() throws InterruptedException, SystemException, NotSupportedException, HeuristicRollbackException, HeuristicMixedException, RollbackException {

    // Send the message to the handler
    transactionManager.getTransactionManager().begin();
    jmsTemplate.send("queue.inventory", new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
            TextMessage message;
            try {
                message = session.createTextMessage(getJson("./doc/event_json_examples/inventory/rxAddToCountWell.json"));
            } catch (Exception e) {
                //...
            }
            message.setJMSType("rxAddToCountWell");
            return message;
        }
    });
    transactionManager.getTransactionManager().commit();

    transactionManager.getTransactionManager().begin();

    //verify that it was placed on the queue
    TextMessage output = (TextMessage) jmsTemplate.receive(COUNTWELL_QUEUE_NAME);
    assertNotNull(output);

    transactionManager.getTransactionManager().commit();
    appContext.close();
}

the logs

2014-04-08 13:55:35,018 [INFO ] org.springframework.transaction.jta.JtaTransactionManager - Using JTA UserTransaction: com.atomikos.icatch.jta.UserTransactionImp@188c838 (org.springframework.transaction.jta.JtaTransactionManager.checkUserTransactionAndTransactionManager(JtaTransactionManager.java:471))
2014-04-08 13:55:35,019 [INFO ] org.springframework.transaction.jta.JtaTransactionManager - Using JTA TransactionManager: com.atomikos.icatch.jta.UserTransactionManager@111089b (org.springframework.transaction.jta.JtaTransactionManager.checkUserTransactionAndTransactionManager(JtaTransactionManager.java:482))
2014-04-08 13:55:36,172 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#2]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:36,171 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#0]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:36,172 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#4]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:36,172 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#3]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:36,171 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#1]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:37,619 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:37,631 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#2]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:38,656 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:38,658 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#0]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:39,667 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:39,670 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#4]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:40,705 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:40,706 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#1]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:40,748 [DEBUG] com.inventory.InventoryRouter - Routing jms_type: rxAddToCountWell (com.inventory.InventoryRouter.route(InventoryRouter.java:25))
2014-04-08 13:55:40,834 [DEBUG] org.springframework.retry.support.RetryTemplate - Retry: count=0 (org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:251))
2014-04-08 13:55:40,835 [DEBUG] com.inventory.AddToCountWellHandler - Entered formatCountwellMessage... (com.inventory.AddToCountWellHandler.formatCountwellMessage(AddToCountWellHandler.java:31))
2014-04-08 13:55:41,179 [DEBUG] com.inventory.AddToCountWellHandler - Leaving formatCountwellMessage (com.inventory.AddToCountWellHandler.formatCountwellMessage(AddToCountWellHandler.java:38))
2014-04-08 13:55:41,190 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:41,193 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#3]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:43,396 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:44,406 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:45,422 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:46,448 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:47,453 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
Was it helpful?

Solution

I've identified what is happening. When I loaded my test context configuration I was loading a file that imported all 5 of my message-driven-channel-adapters. As Gary stated, each idle thread must start a transaction. So the adapters all started transactions but only one actually received a message.

I found that modifying my test context to only load the specific adapter/route resulted in only the single thread being used.

2014-04-08 17:16:38,322 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#0]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 17:16:38,492 [DEBUG] com.inventory.InventoryRouter - Routing jms_type: rxAddToCountWell (com.inventory.InventoryRouter.route(InventoryRouter.java:25))
2014-04-08 17:16:38,555 [DEBUG] org.springframework.retry.support.RetryTemplate - Retry: count=0 (org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:251))
2014-04-08 17:16:38,595 [DEBUG] com.inventory.AddToCountWellHandler - Entered formatCountwellMessage... (com.inventory.AddToCountWellHandler.formatCountwellMessage(AddToCountWellHandler.java:31))
2014-04-08 17:16:42,198 [DEBUG] com.inventory.AddToCountWellHandler - Leaving formatCountwellMessage (com.inventory.AddToCountWellHandler.formatCountwellMessage(AddToCountWellHandler.java:38))
2014-04-08 17:16:42,205 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 17:16:42,210 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#0]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 17:16:43,219 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))

OTHER TIPS

It doesn't match your configuration but it looks like you have 5 listener threads (#0..#4). 4 of them do a no-op transaction (no message to receive) while #3 processes the message. The idle threads must start a transaction before the receive "in case" there's a message.

Since there was no work done on these threads, the commits should be lightweight; you can increase the receive timeout to minimize this, at the expense of it might take longer to shut down the container (because threads are blocked in the JMS client library).

The default receiveTimeout is 1 second.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top