Question

I got a wierd problem with NServiceBus retying the message X number of times although no exception is thrown in the handler. There is some information out there dealing with the NHibernate session and the NSB ambiant transaction. Since no error is thrown I'm not 100% sure of the problem and thus can't really decide on what to do.

I got NSB configured with Castle Windsor like so:

IWindsorContainer container = new WindsorContainer(new XmlInterpreter());
container.Install(new ContainerInstaller());
container.Install(new UnitOfWorkInstaller(AppDomain.CurrentDomain.BaseDirectory, Castle.Core.LifestyleType.Scoped));
container.Install(new FactoryInstaller(AppDomain.CurrentDomain.BaseDirectory));
container.Install(new RepositoryInstaller(AppDomain.CurrentDomain.BaseDirectory));

Configure.With()
    .CastleWindsorBuilder(container)
    .FileShareDataBus(Properties.Settings.Default.DataBusFileSharePath)
    .MsmqTransport()
        .IsTransactional(true)
        .PurgeOnStartup(false)
    .UnicastBus()
         .LoadMessageHandlers()
         .ImpersonateSender(false)
     .JsonSerializer();

The UnitOfWorkInstaller registers the unit of work (the NHibernate session) like so:

public void Install(IWindsorContainer container, IConfigurationStore store)
{
    var fromAssemblyDescriptor = AllTypes.FromAssemblyInDirectory(new AssemblyFilter(_installationPath));
    container.Register(fromAssemblyDescriptor
        .IncludeNonPublicTypes()
        .Pick()
        .If(t => t.GetInterfaces().Any(i => i == typeof(IUnitOfWork)) && t.Namespace.StartsWith("Magma"))
        .WithService.AllInterfaces()
        .Configure(con => con.LifeStyle.Is(_lifeStyleType).UsingFactoryMethod(k => k.Resolve<IUnitOfWorkFactory>().Create())));
}

So each time a message arrives the same unit of work is used for all the repositories. I read that manually rolling back the current transaction results in an error (I don't really know why) and I also know that NSB creates a child container for every transport message and that this child container is disposed after the processing of the message. The problem is that when the child container is disposed the unit of work is disposed this way:

    public void Dispose()
    {
        if (!_isDisposed)
        {
            DiscardSession();
            _isDisposed = true;
        }
    }

    private void DiscardSession()
    {
        if (_transaction != null && _transaction.IsActive)
        {
            _transaction.Dispose();
        }
        if (Session != null)
        {
            Session.Dispose();
        }
    }

My handlers are structured like this: (the _unitOfWork is passed as a constructor dependency)

    public void Handle(<MessageType> message)
    {
        using (_unitOfWork)
        {
            try
            {
                // do stuff
                _unitOfWork.Commit();
            }
            catch (Exception ex)
            {
                _unitOfWork.Rollback();

                // rethrow so the message stays in the queue
                throw;
            }
        }
    }

I found out that if I don't commit the unit of work (which flushes the session and commit the transaction) I got an error saying that the message has retried beyond the max retry count bla bla bla...

So it seems to be linked with the NHibernate session and the way it's created and disposed of but since it's created within the unit of work I can't really use the session factory. I read that I could use the IMessageModule to create and dispose the session but again I don't know if this is the right way to go since I don't understand what's causing the error in the first place.

So to recap:

  • I'm using a scoped unit of work so that all the handler dependencies using it will share the same instance (thx to the child container, BTW: I've setup the unit of work as transient thinking that the child container will treat all transient object as singleton within that container but I saw that the unit of work wasn't shared so this is why it's setup as scoped)

  • I'm wrapping my handler in a using(_unitOfWork) { } statement to dispose the unit of work after each processing.

  • When the unit of work get's disposed, the NHibernate session is also disposed

  • If I don't explicitly call Commit on the _unitOfWork, the message retries beyond the max retry count and then an error is thrown.

What is causing this behavior? and is the IMessageModule is the answer for this?

Was it helpful?

Solution

I think I narrowed it down a bit... I removed all of the using(_unitOfWork), the _unitOfWork.Commit() and the _unitOfWork.Rollback() and let the NSB TransactionScope do the work of commit or rollback the transaction since the NHibernate's Session was enlisting in the NSB transaction scope.

I also started to use the NHibernate session's transaction (Session.Transaction) instead of grabbing a reference to it through Session.BeginTransaction() and using this one. I've copy/pasted my UoW implementation so you can see the differences (the old code is in comments).

I don't know if my changes account for anything but using the Session's transaction and removing the flushes since it's taking care of within the transaction commit seems to have solved the problem... I don't have to explicitly call the Commit method in order for the message to be successfully processed anymore. Here is my UoW implementation:

public class NHibernateUnitOfWork : INHibernateUnitOfWork
{
    //private ITransaction _transaction;
    private bool _isDisposed;
    private bool _isInError;

    public ISession Session { get; protected set; }

    public NHibernateUnitOfWork(ISession session)
    {
        Contract.Requires(session != null, "session");
        Session = session;

        //_transaction = Session.BeginTransaction();

        // create a new transaction as soon as the session is available
        Session.BeginTransaction();
        _isDisposed = false;
        _isInError = false;
    }

    public void MarkCreated(Object entity)
    {
        // assert stuff

        try
        {
            Session.SaveOrUpdate(entity);
            //Session.Flush();
        }
        catch (HibernateException ex)
        {
            HandleError();
            throw;
        }
    }

    public void MarkUpdated(Object entity)
    {
        // assert stuff

        try
        {
            Session.Update(entity);
            //Session.Flush();
        }
        catch (HibernateException ex)
        {
            HandleError();
            throw;
        }
    }

    public void MarkSavedOrUpdated(Object entity)
    {
        // assert stuff

        try
        {
            Session.SaveOrUpdate(entity);
            //Session.Flush();
        }
        catch (HibernateException)
        {
            HandleError();
            throw;
        }
    }

    public void MarkDeleted(Object entity)
    {
        // assert stuff

        try
        {
            Session.Delete(entity);
            //Session.Flush();
        }
        catch (HibernateException ex)
        {
            HandleError();
            throw;
        }
    }

    public void Commit()
    {
        // assert stuff

        try
        {
            //Session.Flush();
            //_transaction.Commit();
            Session.Transaction.Commit();
        }
        catch (HibernateException ex)
        {
            HandleError();
            throw;
        }
    }

    public void Rollback()
    {
        // assert stuff

        try
        {
            //if (!_transaction.WasRolledBack)
            //{
            //    _transaction.Rollback();
            //}
            Session.Transaction.Rollback();
        }
        catch (HibernateException ex)
        {
            HandleError();
            throw;
        }
    }

    public void Dispose()
    {
        if (!_isDisposed)
        {
            DiscardSession();
            _isDisposed = true;
        }
    }

    private void DiscardSession()
    {
        //if (_transaction != null && _transaction.IsActive)
        //{
        //    _transaction.Dispose();
        //}
        if (Session != null)
        {
            try
            {
                // rollback all uncommitted changes
                if (Session.Transaction != null && Session.Transaction.IsActive)
                {
                    Session.Transaction.Rollback();
                }
                //Session.Clear();
                Session.Close();
            }
            catch (Exception)
            { }
            finally
            {
                Session.Dispose();
            }
        }
    }

    private void HandleError()
    {
        _isInError = true;
        //if (_transaction != null && _transaction.IsActive)
        //{
        //    _transaction.Rollback();
        //}
        if (Session.Transaction != null && Session.Transaction.IsActive)
        {
            Session.Transaction.Rollback();
        }
    }

    // assert methods
}

Does any of this makes sense? I still don't know what caused the error in the first place but it seems to have to do with disposing the NHibernate Session before the transaction scope finishes.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top