Pregunta

I'm new to NEventStore and event sourcing in general. In a project I want to use NEventStore for persisting events generated by our aggregates, but I have some problem to correctly handle concurrency.

How can I write to the same stream using an optimistic lock?

Let's say I have 2 instances of the same aggregate that are loaded at revision 1 from 2 different threads. Then the first thread call command A and the second thread call command B . Using an optimistic lock one of the aggregate should fail with a concurrency exception.

I thought to use the maxRevision to open the stream from the point that the aggregate is loaded, but seems that the CommitChanges never fail, also if I pass an old revision.

What I'm missing? Is optimistic lock possible/correct when using NEventStore/Event Sourcing?

Here is the code that I have used to reproduce the problem:

namespace NEventStore.Example
{
    using System;
    using System.Transactions;
    using NEventStore;
    using NEventStore.Dispatcher;
    using NEventStore.Persistence.SqlPersistence.SqlDialects;

    internal static class MainProgram
    {
        private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier
        private static IStoreEvents store;

        private static void Main()
        {
            using (var scope = new TransactionScope())
            using (store = WireupEventStore())
            {
                Client1(revision: 0);

                Client2(revision: 0);

                scope.Complete();
            }

            Console.WriteLine(Resources.PressAnyKey);
            Console.ReadKey();
        }

        private static IStoreEvents WireupEventStore()
        {
             return Wireup.Init()
                .UsingInMemoryPersistence()
                .Build();
        }

        private static void Client1(int revision)
        {
            using (var stream = store.OpenStream(StreamId, 0, revision))
            {
                var @event = new SomeDomainEvent { Value = "Client 1 - event 1." };

                stream.Add(new EventMessage { Body = @event });


                stream.CommitChanges(Guid.NewGuid());
            }
        }

        private static void Client2(int revision)
        {
            using (var stream = store.OpenStream(StreamId, 0, revision))
            {
                var @event = new SomeDomainEvent { Value = "Client 2 - event 1." };

                stream.Add(new EventMessage { Body = @event });


                stream.CommitChanges(Guid.NewGuid());
            }
        }
    }
}

I expect client 2 to fail because I open the stream with an old revision.

UPDATE 26/08/2013: I have tested the same code using Sql server and seems to work as expected.

namespace NEventStore.Example
{
    using System;
    using System.Transactions;
    using NEventStore;
    using NEventStore.Dispatcher;
    using NEventStore.Persistence.SqlPersistence.SqlDialects;

    internal static class MainProgram
    {
        private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier
        private static IStoreEvents store;

        private static void Main()
        {
            using (store = WireupEventStore())
            {
                OpenOrCreateStream();

                AppendToStream_Client1(revision: 1);

                AppendToStream_Client2(revision: 1); // throws an error
                // AppendToStream_Client2(revision: 2); // works
            }

            Console.WriteLine(Resources.PressAnyKey);
            Console.ReadKey();
        }

        private static IStoreEvents WireupEventStore()
        {
             return Wireup.Init()
                .LogToOutputWindow()
                .UsingInMemoryPersistence()
                .UsingSqlPersistence("EventStore") // Connection string is in app.config
                    .WithDialect(new MsSqlDialect())
                    .InitializeStorageEngine()
                    .UsingJsonSerialization()
                .Build();
        }

        private static void OpenOrCreateStream()
        {
            using (var stream = store.OpenStream(StreamId, 0, int.MaxValue))
            {
                var @event = new SomeDomainEvent { Value = "Initial event." };

                stream.Add(new EventMessage { Body = @event });
                stream.CommitChanges(Guid.NewGuid());
            }
        }

        private static void AppendToStream_Client1(int revision)
        {
            using (var stream = store.OpenStream(StreamId, int.MinValue, revision))
            {
                var @event = new SomeDomainEvent { Value = "Second event 1." };

                stream.Add(new EventMessage { Body = @event });
                stream.CommitChanges(Guid.NewGuid());
            }
        }

        private static void AppendToStream_Client2(int revision)
        {
            using (var stream = store.OpenStream(StreamId, int.MinValue, revision))
            {
                var @event = new SomeDomainEvent { Value = "Second event 2." };

                stream.Add(new EventMessage { Body = @event });
                stream.CommitChanges(Guid.NewGuid());
            }
        }
    }
}

So back to my question: to enable optimistic lock should I use revision when opening the stream? There are other possible implementations or guidelines?

thanks

¿Fue útil?

Solución

Firstly, the in-memory persistence implementation, whose primary purpose is testing, is not transaction aware. In your original example, client 2 will simply append it's event to the stream. Try running the above with a persistence store that supports transactions (SQL & Raven, but not Mongo).

Secondly, specifying the min/max revision when opening a stream is used for different purposes:

  1. When re-hydrating an aggregate, and no snapshots are available, you would specify (min:0, max:int.MaxValue), as you are interested in retrieving all of the events.
  2. When re-hydrating an aggregate and a snapshot is available, you would specify (min:snapshot.Version, max:int.MaxValue) to get all events that have occurred since the snapshot.
  3. When saving an aggregate, you would specify (min:0, max:Aggregate.Version). The Aggregate.Version is derived during re-hydration. If same aggregate is re-hydrated at the same time somewhere else and saved, you'll have a race condition and a ConcurrencyException will occur.

Support for most of this would be encapsulated in a domain framework. See AggregateBase and EventStoreRepository in CommonDomain

Thirdly, and most importantly, updating >1 stream in a single transaction is a code smell. If you are doing DDD/ES, the stream represents a single aggregate root which, by definition, is a consistency boundary. Creating/updating more than one AR in a transaction breaks this. NEventStore's transaction support was (reluctantly) added so it could work with other tools, i.e. transactionally read a command from MSMQ/NServiceBus/whatever and handle it, or, transactionally dispatch a commit message to a queue and mark it as such. Personally, I'd would recommend that you do your best to avoid 2PC.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top