Question

I'm playing around with a simple console app that creates one thread and I do some inter thread communication between the main and the worker thread.

I'm posting objects from the main thread to a concurrent queue and the worker thread is dequeueing that and does some processing.

What strikes me as odd, is that when I profile this app, even despite I have two cores. One core is 100% free and the other core have done all the work, and I see that both threads have been running in that core. enter image description here

Why is this?

Is it because I use a wait handle that sets when I post a message and releases when the processing is done?

This is my sample code, now using 2 worker threads. It still behaves the same, main, worker1 and worker2 is running in the same core. Ideas?

[EDIT] It sort of works now, atleast, I get twice the performance compared to yesterday. the trick was to slow down the consumer just enough to avoid signaling using the AutoResetEvent.

public class SingleThreadDispatcher
{
    public long Count;
    private readonly ConcurrentQueue<Action> _queue = new ConcurrentQueue<Action>();
    private volatile bool _hasMoreTasks;
    private volatile bool _running = true;
    private int _status;
    private readonly AutoResetEvent _signal = new AutoResetEvent(false);
    public SingleThreadDispatcher()
    {
        var thread = new Thread(Run)
        {
            IsBackground = true,
            Name = "worker" + Guid.NewGuid(),           
        };

        thread.Start();
    }

    private void Run()
    {
        while (_running)
        {

            _signal.WaitOne();
            do
            {
                _hasMoreTasks = false;

                Action task;
                while (_queue.TryDequeue(out task) && _running)
                {
                    Count ++;
                    task();
                }
                //wait a short while to let _hasMoreTasks to maybe be set to true
                //this avoids the roundtrip to the AutoResetEvent
                //that is, if there is intense pressure on the pool, we let some new
                //tasks have the chance to arrive and be processed w/o signaling
                if(!_hasMoreTasks)
                    Thread.Sleep(5);

                Interlocked.Exchange(ref _status, 0);
            } while (_hasMoreTasks);
        }
    }

    public void Schedule(Action task)
    {
        _hasMoreTasks = true;
        _queue.Enqueue(task);

        SetSignal();
    }

    private void SetSignal()
    {
        if (Interlocked.Exchange(ref _status, 1) == 0)
        {
            _signal.Set();
        }
    }
}
Was it helpful?

Solution

Is it because I use a wait handle that sets when I post a message and releases when the processing is done?

Without seeing your code it is hard to say for sure, but from your description it appears that the two threads that you wrote act as co-routines: when the main thread is running, the worker thread has nothing to do, and vice versa. It looks like .NET scheduler is smart enough to not load the second core when this happens.

You can change this behavior in several ways - for example

  • by doing some work on the main thread before waiting on the handle, or
  • by adding more worker threads that would compete for the tasks that your main thread posts, and could both get a task to work on.

OTHER TIPS

OK, I've figured out what the problem is. The producer and consumer is pretty much just as fast in this case. This results in the consumer finishing all its work fast and then looping back to wait for the AutoResetEvent. The next time the producer sends a task, it has to touch the AutoresetEvent and set it.

The solution was to add a very very small delay in the consumer, making it slightly slower than the producer. This results in when the producer sends a task, it notices that the consumer is already active and it just has to post to the worker queue w/o touching the AutoResetEvent.

The original behavior resulted in a sort of ping-pong effect, that can be seen on the screenshot.

Dasblinkelight (probably) has the right answer.

Apart from that, it would also be the correct behaviour when one of your threads is I/O bound (that is, it's not stuck on the CPU) - in that case, you've got nothing to gain from using multiple cores, and .NET is smart enough to just change contexts on one core.

This is often the case for UI threads - it has very little work to do, so there usually isn't much of a reason for it to occupy a whole core for itself. And yes, if your concurrent queue is not used properly, it could simply mean that the main thread waits for the worker thread - again, in that case, there's no need to switch cores, since the original thread is waiting anyway.

You should use BlockingCollection rather than ConcurrentQueue. By default, BlockingCollection uses a ConcurrentQueue under the hood, but it has a much easier to use interface. In particular, it does non-busy waits. In addition, BlockingCollection supports cancellation, so your consumer becomes very simple. Here's an example:

public class SingleThreadDispatcher
{
    public long Count;
    private readonly BlockingCollection<Action> _queue = new BlockingCollection<Action>();
    private readonly CancellationTokenSource _cancellation = new CancellationTokenSource();

    public SingleThreadDispatcher()
    {
        var thread = new Thread(Run)
        {
            IsBackground = true,
            Name = "worker" + Guid.NewGuid(),
        };

        thread.Start();
    }

    private void Run()
    {
        foreach (var task in _queue.GetConsumingEnumerable(_cancellation.Token))
        {
            Count++;
            task();
        }
    }

    public void Schedule(Action task)
    {
        _queue.Add(task);
    }
}

The loop with GetConsumingEnumerable will do a non-busy wait on the queue. There's no need to do it with a separate event. It will wait for an item to be added to the queue, or it will exit if you set the cancellation token.

To stop it normally, you just call _queue.CompleteAdding(). That tells the consumer that no more items will be added to the queue. The consumer will empty the queue and then exit.

If you want to quit early, then just call _cancellation.Cancel(). That will cause GetConsumingEnumerable to exit.

In general, you shouldn't ever have to use ConcurrentQueue directly. BlockingCollection is easier to use and provides equivalent performance.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top