Question

I have a Spring configured web app that receives JMS messages via the following listener:

public class EntityPersister implements MessageListener {

    @Resource
    private EntityManager entityManager;

    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            Object entity = createEntity(textMessage);
            entityManager.persist(entity);
            entityManager.flush(); //for debugging only
        }
    }
}

When I execute this listener in my application I get a NoTransactionException from the line entityManager.flush().

What do I need to configure so that the entity manager takes part in the already existing JTA transaction?

I already tried @Transactional on the above implementation but with no success.

ActiveMQ is used as the JMS provider. The spring configuration is:

<bean id="jmsConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
    init-method="init" destroy-method="close">
    <property name="uniqueResourceName" value="atomikos-activemq" />
    <property name="xaConnectionFactory">
        <!-- ActiveMQ wird als JMS Provider genutzt -->
        <bean id="activeMQXAConnectionFactory"
            class="org.apache.activemq.spring.ActiveMQXAConnectionFactory">
            <property name="brokerURL">
                <value>tcp://localhost:61616</value>
            </property>
        </bean>
    </property>
    <property name="maxPoolSize" value="2" />
    <property name="localTransactionMode" value="false" />
</bean>

<bean id="entityPersister" class="EntityPersister" />

<bean id="jmsContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="jmsConnectionFactory" />
    <property name="destinationName" ref="entityDestinationName" />
    <property name="messageListener" ref="entityPersister" />
    <property name="sessionTransacted" value="true" />
    <property name="transactionManager" ref="txManager" />
</bean>

OpenJPA is used as the JPA Provider. The persistence unit is:

<persistence-unit name="somePU" transaction-type="JTA">
    <jta-data-source>managedDataSource</jta-data-source>
    <non-jta-data-source>nonManagedDataSource</non-jta-data-source>
    <!-- some entity class listed here -->
    <properties>
        <property name="openjpa.jdbc.SynchronizeMappings" value="buildSchema(ForeignKeys=true)" />
    </properties>
</persistence-unit>

The spring configuration is:

<!-- Construct Atomikos UserTransactionManager, needed to configure Spring -->
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
    init-method="init" destroy-method="close">
    <!-- when close is called, should we force transactions to terminate or 
        not? -->
    <property name="forceShutdown" value="true" />
</bean>

<!-- Also use Atomikos UserTransactionImp, needed to configure Spring -->
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
    <property name="transactionTimeout" value="300" />
</bean>

<!-- Configure the Spring framework to use JTA transactions from Atomikos -->
<bean id="txManager"
    class="org.springframework.transaction.jta.JtaTransactionManager">
    <property name="transactionManager" ref="atomikosTransactionManager" />
    <property name="userTransaction" ref="atomikosUserTransaction" />
</bean>

<!-- enable the configuration of transactional behavior based on annotations -->
<tx:annotation-driven transaction-manager="txManager" />

<bean id="entityManagerFactory"
    class="org.springframework.orm.jpa.LocalEntityManagerFactoryBean">
    <property name="persistenceUnitName" value="somePU" />
    <property name="jpaPropertyMap">
        <map>
            <entry key="openjpa.ManagedRuntime" value="jndi" />
        </map>
    </property>
</bean>

<bean id="entityManager" factory-bean="entityManagerFactory"
    factory-method="createEntityManager" />

OpenJPA looks up the XA and non-XA datasource from the persistence unit via JNDI.

Was it helpful?

Solution

If you are using setMessageListener(), the message reception in onMessage happens asynchronously in a separate thread.

The injection of the entityManager happens in a different thread. Normally a transaction is thread-bound. So the injection thread has probably already done its work, and its associated transaction was closed, before message reception starts. Even if that transaction still existed, if would not have been enlisted in the same global transaction used by the message-receiving thread.

You could verify it by creating/getting an entityManager in onMessage explicitly. This way all XA transaction-aware resources should enlist within the same global transaction, because all of these are opened from the same thread.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top