Question

I have a Spring MVC application. I want to use spring ampq to execute my asynchronous tasks. My REST backend uses @Transactional annotation to manage the transactions. After doing its work it pushes a task to the exhange and then returns. Then the task is received by a consumer, which in its first attempt receives a "org.hibernate.HibernateException: No Session found for current thread" error. Then spring-ampq retries to send the task to the consumer and this time it works, without any session error.

How can I make this scenario work in the first attempt?

My spring ampq configuration is similar to this (http://projects.spring.io/spring-amqp/):

<rabbit:connection-factory id="connectionFactory" />

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
    exchange="myExchange" routing-key="foo.bar"/>

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:queue name="myQueue" />

<rabbit:topic-exchange name="myExchange">
    <rabbit:bindings>
        <rabbit:binding queue="myQueue" pattern="foo.*" />
    </rabbit:bindings>
</rabbit:topic-exchange>


<rabbit:listener-container connection-factory="connectionFactory"  advice-chain="retryInterceptor">
    <rabbit:listener ref="foo" method="listen" queue-names="myQueue" />
</rabbit:listener-container>

<bean id="foo" class="foo.Foo" />

<bean id="retryInterceptor" class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
    <property name="messageRecoverer" ref="rejectAndDontRequeueRecoverer"/>
    <property name="retryOperations" ref="retryTemplate"/>
    <property name="messageKeyGeneretor" ref="bodyBasedKeyGenerator"/>
</bean>
<bean id="bodyBasedKeyGenerator" class="com.mydomain.util.BodyBasedKeyGenerator"/>
<bean id="rejectAndDontRequeueRecoverer" class="org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer"/>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="5000"/>
            <property name="maxInterval" value="120000"/>
            <property name="multiplier" value="2"/>
        </bean>
    </property>
    <property name="retryPolicy">
        <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
            <property name="maxAttempts" value="1"/>
        </bean>
    </property>
</bean>          

Note: foo.Foo listen() method is also annotated with @Transactional.

My transaction manager is configured as follows:

<bean id="mySessionFactory" class="org.springframework.orm.hibernate4.LocalSessionFactoryBean">
    <property name="dataSource" ref="myDataSource"/>
    <property name="packagesToScan" value="com.mydomain.bean"/>
    <property name="hibernateProperties">
        <props>
            <prop key="hibernate.dialect">org.hibernate.spatial.dialect.postgis.PostgisDialect
            </prop>
            <prop key="hibernate.show_sql">false</prop>
            <prop key="hibernate.hbm2ddl.auto">update</prop>
        </props>
    </property>
</bean>

<bean id="transactionManager" class="org.springframework.orm.hibernate4.HibernateTransactionManager">
    <property name="sessionFactory" ref="mySessionFactory"/>
</bean>

<tx:annotation-driven transaction-manager="transactionManager"/>

Note: The session factory is initiazed deep in the dao factory. I am using the same dao factory both in the REST service and rabbit consumer.

My error log is as follows:

06:48:16,141 DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Storing delivery for Consumer: tag=[amq.ctag-K3ZRHRBVttW20ROENd9l4g], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@192.168.1.21:5672/,3), acknowledgeMode=AUTO local queue size=0
06:48:16,142 DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Received message: (Body:'73'; ID:null; Content:application/x-java-serialized-object; Headers:{}; Exchange:exchange; RoutingKey:foo.dummy.dummy2; Reply:null; DeliveryMode:PERSISTENT; DeliveryTag:1)
06:48:16,143 DEBUG [org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor] - Executing proxied method in stateful retry: public abstract void org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$ContainerDelegate.invokeListener(com.rabbitmq.client.Channel,org.springframework.amqp.core.Message) throws java.lang.Exception(2fe83585)
06:48:16,146 TRACE [org.springframework.retry.support.RetryTemplate] - RetryContext retrieved: [RetryContext: count=0, lastException=null, exhausted=false]
06:48:16,146 DEBUG [org.springframework.retry.support.RetryTemplate] - Retry: count=0
06:48:16,147 DEBUG [org.springframework.beans.factory.support.DefaultListableBeanFactory] - Returning cached instance of singleton bean 'transactionManager'
06:48:16,147 DEBUG [org.springframework.orm.hibernate4.HibernateTransactionManager] - Creating new transaction with name [foo.Foo.listen]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; ''
06:48:16,147 TRACE [org.hibernate.internal.SessionImpl] - Opened session at timestamp: 13881430961
06:48:16,147 DEBUG [org.springframework.orm.hibernate4.HibernateTransactionManager] - Opened new Session [SessionImpl(PersistenceContext[entityKeys=[],collectionKeys=[]];ActionQueue[insertions=[] updates=[] deletions=[] collectionCreations=[] collectionRemovals=[] collectionUpdates=[] unresolvedInsertDependencies=UnresolvedEntityInsertActions[]])] for Hibernate transaction
06:48:16,147 DEBUG [org.springframework.orm.hibernate4.HibernateTransactionManager] - Preparing JDBC Connection of Hibernate Session [SessionImpl(PersistenceContext[entityKeys=[],collectionKeys=[]];ActionQueue[insertions=[] updates=[] deletions=[] collectionCreations=[] collectionRemovals=[] collectionUpdates=[] unresolvedInsertDependencies=UnresolvedEntityInsertActions[]])]
06:48:16,147 DEBUG [org.hibernate.engine.transaction.spi.AbstractTransactionImpl] - begin
06:48:16,147 DEBUG [org.hibernate.engine.jdbc.internal.LogicalConnectionImpl] - Obtaining JDBC connection
06:48:16,147 DEBUG [org.hibernate.engine.jdbc.internal.LogicalConnectionImpl] - Obtained JDBC connection
06:48:16,147 DEBUG [org.hibernate.engine.transaction.internal.jdbc.JdbcTransaction] - initial autocommit status: true
06:48:16,147 DEBUG [org.hibernate.engine.transaction.internal.jdbc.JdbcTransaction] - disabling autocommit
06:48:16,147 DEBUG [org.springframework.orm.hibernate4.HibernateTransactionManager] - Exposing Hibernate transaction as JDBC transaction [org.hibernate.engine.jdbc.internal.proxy.ConnectionProxyHandler@60c58418[valid=true]]
06:48:16,147 TRACE [org.springframework.transaction.support.TransactionSynchronizationManager] - Bound value [org.springframework.jdbc.datasource.ConnectionHolder@52a971e3] for key [org.apache.commons.dbcp.BasicDataSource@3c250cce] to thread [SimpleAsyncTaskExecutor-1]
06:48:16,147 TRACE [org.springframework.transaction.support.TransactionSynchronizationManager] - Bound value [org.springframework.orm.hibernate4.SessionHolder@7274187a] for key [org.hibernate.internal.SessionFactoryImpl@68e26d2e] to thread [SimpleAsyncTaskExecutor-1]
06:48:16,147 TRACE [org.springframework.transaction.support.TransactionSynchronizationManager] - Initializing transaction synchronization
06:48:16,147 TRACE [org.springframework.transaction.interceptor.TransactionInterceptor] - Getting transaction for [foo.Foo.listen]
06:48:16,153 DEBUG [org.springframework.beans.factory.support.DefaultListableBeanFactory] - Returning cached instance of singleton bean 'org.springframework.cache.interceptor.CacheInterceptor#0'
06:48:16,153 TRACE [org.hibernate.internal.SessionImpl] - Opened session at timestamp: 13881430961
06:48:16,154 TRACE [org.springframework.transaction.interceptor.TransactionInterceptor] - Completing transaction for [foo.Foo.listen] after exception: org.hibernate.HibernateException: No Session found for current thread
06:48:16,154 TRACE [org.springframework.transaction.interceptor.RuleBasedTransactionAttribute] - Applying rules to determine whether transaction should rollback on org.hibernate.HibernateException: No Session found for current thread
06:48:16,154 TRACE [org.springframework.transaction.interceptor.RuleBasedTransactionAttribute] - Winning rollback rule is: null
06:48:16,154 TRACE [org.springframework.transaction.interceptor.RuleBasedTransactionAttribute] - No relevant rollback rule found: applying default rules
06:48:16,154 TRACE [org.springframework.orm.hibernate4.HibernateTransactionManager] - Triggering beforeCompletion synchronization
06:48:16,154 DEBUG [org.springframework.orm.hibernate4.HibernateTransactionManager] - Initiating transaction rollback
06:48:16,154 DEBUG [org.springframework.orm.hibernate4.HibernateTransactionManager] - Rolling back Hibernate transaction on Session [SessionImpl(PersistenceContext[entityKeys=[],collectionKeys=[]];ActionQueue[insertions=[] updates=[] deletions=[] collectionCreations=[] collectionRemovals=[] collectionUpdates=[] unresolvedInsertDependencies=UnresolvedEntityInsertActions[]])]
06:48:16,154 DEBUG [org.hibernate.engine.transaction.spi.AbstractTransactionImpl] - rolling back
06:48:16,154 DEBUG [org.hibernate.engine.transaction.internal.jdbc.JdbcTransaction] - rolled JDBC Connection
06:48:16,154 DEBUG [org.hibernate.engine.transaction.internal.jdbc.JdbcTransaction] - re-enabling autocommit
06:48:16,154 TRACE [org.hibernate.engine.transaction.internal.TransactionCoordinatorImpl] - after transaction completion
06:48:16,154 TRACE [org.hibernate.internal.SessionImpl] - after transaction completion
06:48:16,154 TRACE [org.springframework.orm.hibernate4.HibernateTransactionManager] - Triggering afterCompletion synchronization
06:48:16,154 TRACE [org.springframework.transaction.support.TransactionSynchronizationManager] - Clearing transaction synchronization
06:48:16,154 TRACE [org.springframework.transaction.support.TransactionSynchronizationManager] - Removed value [org.springframework.orm.hibernate4.SessionHolder@7274187a] for key [org.hibernate.internal.SessionFactoryImpl@68e26d2e] from thread [SimpleAsyncTaskExecutor-1]
06:48:16,154 TRACE [org.springframework.transaction.support.TransactionSynchronizationManager] - Removed value [org.springframework.jdbc.datasource.ConnectionHolder@52a971e3] for key [org.apache.commons.dbcp.BasicDataSource@3c250cce] from thread [SimpleAsyncTaskExecutor-1]
06:48:16,154 TRACE [org.hibernate.engine.jdbc.internal.proxy.ConnectionProxyHandler] - Handling invocation of connection method [isReadOnly]
06:48:16,154 DEBUG [org.springframework.orm.hibernate4.HibernateTransactionManager] - Closing Hibernate Session [SessionImpl(PersistenceContext[entityKeys=[],collectionKeys=[]];ActionQueue[insertions=[] updates=[] deletions=[] collectionCreations=[] collectionRemovals=[] collectionUpdates=[] unresolvedInsertDependencies=UnresolvedEntityInsertActions[]])] after transaction
06:48:16,154 TRACE [org.hibernate.internal.SessionImpl] - Closing session
06:48:16,154 TRACE [org.hibernate.engine.jdbc.internal.LogicalConnectionImpl] - Closing logical connection
06:48:16,154 TRACE [org.hibernate.engine.jdbc.internal.JdbcResourceRegistryImpl] - Closing JDBC container [org.hibernate.engine.jdbc.internal.JdbcResourceRegistryImpl@5ef23a26]
06:48:16,154 DEBUG [org.hibernate.engine.jdbc.internal.LogicalConnectionImpl] - Releasing JDBC connection
06:48:16,154 DEBUG [org.hibernate.engine.jdbc.internal.LogicalConnectionImpl] - Released JDBC connection
06:48:16,154 DEBUG [org.hibernate.engine.jdbc.internal.proxy.ConnectionProxyHandler] - HHH000163: Logical connection releasing its physical connection
06:48:16,154 DEBUG [org.hibernate.engine.jdbc.internal.proxy.ConnectionProxyHandler] - HHH000163: Logical connection releasing its physical connection
06:48:16,154 DEBUG [org.hibernate.engine.jdbc.internal.proxy.ConnectionProxyHandler] - HHH000163: Logical connection releasing its physical connection
06:48:16,154 TRACE [org.hibernate.engine.jdbc.internal.LogicalConnectionImpl] - Logical connection closed
06:48:16,155 DEBUG [org.springframework.retry.support.RetryTemplate] - Checking for rethrow: count=1
06:48:16,155 DEBUG [org.springframework.retry.support.RetryTemplate] - Rethrow in retry for policy: count=1
org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener method 'listen' threw exception
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:457)
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:358)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:546)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:472)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:61)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:110)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
    at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor$MethodInvocationRetryCallback.doWithRetry(StatefulRetryOperationsInterceptor.java:173)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:255)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:188)
    at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor.invoke(StatefulRetryOperationsInterceptor.java:145)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
    at $Proxy51.invokeListener(Unknown Source)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:611)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:454)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:474)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:458)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$300(SimpleMessageListenerContainer.java:61)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:551)
    at java.lang.Thread.run(Thread.java:662)
Caused by: org.hibernate.HibernateException: No Session found for current thread
    at org.springframework.orm.hibernate4.SpringSessionContext.currentSession(SpringSessionContext.java:97)
    at org.hibernate.internal.SessionFactoryImpl.getCurrentSession(SessionFactoryImpl.java:978)
    at com.mydomain.dao.ListingDaoImpl.getListing(ListingDaoImpl.java:38)
    at com.mydomain.dataaccess.DataProvider.getListing(DataProvider.java:94)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:198)
    at $Proxy29.getListing(Unknown Source)
    at foo.Foo.listen(Foo.java:32)
    at foo.Foo$$FastClassByCGLIB$$1e4b772c.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:698)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:110)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:631)
    at foo.Foo$$EnhancerByCGLIB$$714aa208.listen(<generated>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:273)
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:451)
    ... 26 more

The listener code is as follows:

package foo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;

@Transactional(readOnly = true)
public class Foo {

    @Autowired
    private IDataProvider dataProvider;

    @Transactional
    public void listen(Long listingId) throws Exception {

        dataProvider.getListing(listingId);
    }
}

The code for the REST backend is similar to this:

@Controller
@Transactional(readOnly = true)
@RequestMapping(value = { "/myController" })
public class MyController extends BaseController {

    @Autowired
    private RabbitTemplate exchangeTemplate;

    @Transactional(readOnly = false)
    @RequestMapping(method = RequestMethod.POST)
    public @ResponseBody MyClass update(@RequestBody MyClass myclass, HttpServletRequest request) throws Exception {
        getAuthorizationManager().authorizeUserFromRequest(request);
        List<MyClass> result = getDataProvider().update(myclass);
        exchangeTemplate.convertAndSend("foo.Foo.update", myclass.getId())
        return result;
    }   
}
Was it helpful?

Solution 2

The problem turned out to be in the initialization of sessionfactory. I had a static getDao function and the SessionFactory instance was also static:

@Repository
public class DaoFactory {

    private static SessionFactory sessionFactory;

    public SessionFactory getSessionFactory() {
        return sessionFactory;
    }

    @Autowired
    public void setSessionFactory(SessionFactory sessionFactory) {
        DaoFactory.sessionFactory = sessionFactory;
    }

    public static HibernateDaoBase getDao(Class<?> className) {
     .
     .

I got it working by making the getDao function non-static and autowiring the DaoFactory class into the DataProvider class:

@Repository
public class DaoFactory {

    @Autowired
    private SessionFactory sessionFactory;

    public HibernateDaoBase getDao(Class<?> className) {
     .
     .

and

@Repository
@EnableCaching
public class DataProvider implements IDataProvider {

    @Autowired
    private DaoFactory daoFactory;

OTHER TIPS

OK. Now I see where is an issue. You are using LocalSessionFactoryBean, who is based on SpringSessionContext, who, in turn, wants to have a SessionHolder in the transactional resources within ThreadLocal. But as far as AMQP Listener works within his own Thread, there is no any hook which register sessionFactory in the transactional resources. Something like OpenSessionInViewFilter for Spring MVC.

Try to use

hibernate.current_session_context_class = local

as hibernateProperties for LocalSessionFactoryBean. I have never used LocalSessionFactoryBean. The JTA SessionFactory from EE container JNDI always works perfectly for all cases.

UPDATE:

Well, as far as you may use the same IDataProvider from MVC and from AMQP Listener, and the issue is around the CurrentSessionContext, where there is no currentSession within AMQP Thread, you have to do sessionFactory.openSession() manually from code of your IDataProvider implementation and only in case, if there is no currentSession within currentSessionContext. Something like this:

Session hibernateSession = null;
try {
   hibernateSession = this.sessionFactory.getCurrentSession();
}
catch(HibernateException he) {
    hibernateSession = this.sessionFactory.openSession();
}

Of course, you can write your onw SpringSessionContext variant and move that code to the currentSession()

I don't understand why it works the second attempt from Retry for you. In my test-case it fails anyway.

Suppose there is something in your configuration, which takes care about currentSession, but you don't mention it.

HTH

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top