Question

I use ActiveMq as JMS server and Atomikos as transaction manager.

On ActiveMq Admin web-interface I see that one message was enqueued, but 2 (!) messages were dequeued.

However, jms consumer process message only once, there is no duplication in processing. When I use simple Spring JmsTransactionManager, there are one enqueued and one dequeued messages. The problem appeares only for Atomikos JTA transaction manager.

What is the problem? How to configure Atomikos not to see twice dequeued messages?

My configuration is below. It almost the same as in tutorial. BTW, Atomikos's spring integration example works well but it uses Spring 2.0 whereas I use Spring 3.1.

<bean id="activeMqXaConnectionFactory" class="org.apache.activemq.spring.ActiveMQXAConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<bean id="atomikosQueueConnectionFactory" class="com.atomikos.jms.QueueConnectionFactoryBean" init-method="init">
    <property name="resourceName" value="xaMq"/>
    <property name="xaQueueConnectionFactory" ref="activeMqXaConnectionFactory"/>
</bean>

<bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    <property name="targetConnectionFactory" ref="atomikosQueueConnectionFactory"/>
</bean>

<bean id="jmsDefaultContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="singleConnectionFactory"/>
    <property name="messageListener" ref="consumer"/>
    <property name="concurrentConsumers" value="1"/>
    <property name="destinationName" value="q.jtaxa"/>
    <property name="receiveTimeout" value="3000"/>
    <property name="recoveryInterval" value="5000"/>
    <property name="transactionManager" ref="transactionManager"/>
    <property name="sessionTransacted" value="true"/>
</bean>

<!-- Transactions -->
<!--Construct Atomikos UserTransactionManager, needed to configure Spring-->
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init"
      destroy-method="close">
    <!--  when close is called, should we force transactions to terminate or not? -->
    <property name="forceShutdown" value="true"/>
</bean>

<!-- Also use Atomikos UserTransactionImp, needed to configure Spring  -->
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
    <property name="transactionTimeout" value="300"/>
</bean>

<!-- Configure the Spring framework to use JTA transactions from Atomikos -->
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
    <property name="transactionManager" ref="atomikosTransactionManager"/>
    <property name="userTransaction" ref="atomikosUserTransaction"/>
    <property name="nestedTransactionAllowed" value="true"/>
</bean>

Consumer class is:

@Component
public class Consumer implements MessageListener {
  @Override
  public void onMessage(Message message) {
    if (message instanceof TextMessage) {
        try {
            TextMessage textMsg = (TextMessage) message;
            System.out.println("         " + textMsg.getText());
        } catch (JMSException ex) {
            ex.printStackTrace();
        }
    }
  }
}
Was it helpful?

Solution

I found what was wrongly configured. It was "atomikosQueueConnectionFactory". I did accroding to the tutorial, but it should be just AtomikosConnectionFactoryBean class not QueueConnectionFactoryBean. I deleted atomikosQueueConnectionFactory and added atomikosConnectionFactory

<bean id="atomikosConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init">
    <property name="uniqueResourceName" value="amq1"/>
    <property name="xaConnectionFactory" ref="mqXaConnectionFactory"/>
</bean>

It works fine after it. I found proper configuration here.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top