Question

I'm trying to implement a consumer in C#. There are many publishers which could be executing concurrently. I've created three examples, one with Rx and subject, one with BlockingCollection and a third using ToObservable from the BlockingCollection. They all do the same thing in this simple example and I want them to work with multiple producers.

What are the different qualities of each approach?

I'm already using Rx, so I'd prefer this approach. But I'm concerned that OnNext has no thread safe guarantee and I don't know what the queuing semantics are of Subject and the default scheduler.

Is there a thread safe subject?

Are all messages going to be processed?

Are there any other scenarios when this wont work? Is it processing concurrently?

void SubjectOnDefaultScheduler()
{
    var observable = new Subject<long>();
    observable.
        ObserveOn(Scheduler.Default).
        Subscribe(i => { DoWork(i); });
    observable.OnNext(1);
    observable.OnNext(2);
    observable.OnNext(3);
}

Not Rx, but easily adapted to use/subscribe it. It takes an item and then processes it. This should happen serially.

void BlockingCollectionAndConsumingTask()
{
    var blockingCollection = new BlockingCollection<long>();
    var taskFactory = new TaskFactory();
    taskFactory.StartNew(() =>
    {
        foreach (var i in blockingCollection.GetConsumingEnumerable())
        {
            DoWork(i);
        }
    });
    blockingCollection.Add(1);
    blockingCollection.Add(2);
    blockingCollection.Add(3);
}

Using a blocking collection a bit like a subject seems like a good compromise. I'm guessing implicitly will schedule onto task, so that I can use async/await, is that correct?

void BlockingCollectionToObservable()
{
    var blockingCollection = new BlockingCollection<long>();
    blockingCollection.
        GetConsumingEnumerable().
        ToObservable(Scheduler.Default).
        Subscribe(i => { DoWork(i); });
    blockingCollection.Add(1);
    blockingCollection.Add(2);
    blockingCollection.Add(3);
}
Was it helpful?

Solution

Subject is not thread-safe. OnNexts issued concurrently will directly call an Observer concurrently. Personally I find this quite surprising given the extent to which other areas of Rx enforce the correct semantics. I can only assume this was done for performance considerations.

Subject is kind of a half-way house though, in that it does enforce termination with OnError or OnComplete - after either of these are raised, OnNext is a NOP. And this behaviour is thread-safe.

But use Observable.Synchronize() on a Subject and it will force outgoing calls to obey the proper Rx semantics. In particular, OnNext calls will block if made concurrently.

The underlying mechanism is the standard .NET lock. When the lock is contended by multiple threads they are granted the lock on a first-come first-served basis most of the time. There are certain conditions where fairness is violated. However, you will definitely get the serialized access you are looking for.

ObserveOn has behaviour that is platform specific - if available, you can supply a SynchronizationContext and OnNext calls are Posted to it. With a Scheduler, it ends up putting calls onto a ConcurrentQueue<T> and dispatching them serially via the scheduler - so the thread of execution will depend on the scheduler. Either way, the queuing behaviour will also enforce the correct semantics.

In both cases (Synchronize & ObserveOn), you certainly won't lose messages. With ObserveOn, you can implicitly choose thread you'll process messages on by your choice of Scheduler/Context, with Synchronize you'll process messages on the calling thread. Which is better will depend on your scenario.

There's more to consider as well - such as what you want to do if your producers out-pace your consumer.

You might want to have a look at Rxx Consume as well: http://rxx.codeplex.com/SourceControl/changeset/view/63470#1100703

Sample code showing Synchronize behaviour (Nuget Rx-Testing, Nunit) - it's a bit hokey with the Thread.Sleep code but it's quite fiddly to be bad and I was lazy :):

public class SubjectTests
{
    [Test]
    public void SubjectDoesNotRespectGrammar()
    {
        var subject = new Subject<int>();
        var spy = new ObserverSpy(Scheduler.Default);
        var sut = subject.Subscribe(spy);
        // Swap the following with the preceding to make this test pass
        //var sut = subject.Synchronize().Subscribe(spy);

        Task.Factory.StartNew(() => subject.OnNext(1));
        Task.Factory.StartNew(() => subject.OnNext(2));

        Thread.Sleep(2000);

        Assert.IsFalse(spy.ConcurrencyViolation);
    }

    private class ObserverSpy : IObserver<int>
    {
        private int _inOnNext;

        public ObserverSpy(IScheduler scheduler)
        {
            _scheduler = scheduler;
        }

        public bool ConcurrencyViolation = false;
        private readonly IScheduler _scheduler;

        public void OnNext(int value)
        {
            var isInOnNext = Interlocked.CompareExchange(ref _inOnNext, 1, 0);

            if (isInOnNext == 1)
            {
                ConcurrencyViolation = true;
                return;
            }

            var wait = new ManualResetEvent(false);

            _scheduler.Schedule(TimeSpan.FromSeconds(1), () => wait.Set());
            wait.WaitOne();

            _inOnNext = 0;
        }

        public void OnError(Exception error)
        {

        }

        public void OnCompleted()
        {

        }
    }
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top