Erroneous behavior when using Interlocked.Decrement along with monitor.wait and monitor.pulse in a multithreaded environment

StackOverflow https://stackoverflow.com/questions/14062137

Question

I am trying to implement a multithreaded library that would run simultaneous tasks using threadpool. Basically it will add tasks to threadpool from the collection parameter it receive and then will wait until last task that is being processed sends a pulse signal. I had success in my earlier tests but I encountered a weird issue when I wanted to test with tasks that are really short to process. Somehow either pulse signal is sent before wait command takes in place in the main thread or something else is going on that I just can't simply see regardless of my efforts for syncronization.

In order to remediate my problem I have implemented another "less desirable" solution because of the potential performance benefits I am trading off which is working well as of now, but wanted to know why my first approach doesn't work in such cases in the first place even though performance wise there are not much of a difference between the two.

To illustrate, I am adding both solutions after I simplified the processes below. Can someone help me to point what is going wrong?

Thanks in advance.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Diagnostics;

namespace TestcodeBenchmark
{
    class Program
    {
        static int remainingTasks = 10000000;
        static Stopwatch casioF91W = new Stopwatch();
        static Random rg = new Random();
        static readonly object waitObject = new object();


        static void Main(string[] args)
        {
            TestLoop(30, remainingTasks);
            Console.ReadKey();
        }

        private static void TestLoop(int loopCount, int remainingCountResetNumber)
        {
            for (int i = 0; i < loopCount; i++)
            {
                remainingTasks = remainingCountResetNumber;
                //When this method is called it eventualy stuck at Monitor.Wait line
                TestInterlocked();

                remainingTasks = remainingCountResetNumber;
                //When this method is called it processes stuff w/o any issues.
                TestManualLock();
                Console.WriteLine();
            }
        }

        private static void TestInterlocked()
        {
            casioF91W.Restart();
            //for (int i = 0; i < remainingTasks; i++)
            //{
            //    ThreadPool.QueueUserWorkItem(delegate { TestInterlockedDecrement(); });
            //}
            int toStart = remainingTasks;
            //for (int i = 0; i < remainingTasks; i++)
            for (int i = 0; i < toStart; i++)
            {
                if (!ThreadPool.QueueUserWorkItem(delegate { TestInterlockedDecrement(); }))
                    Console.WriteLine("Queue failed");
            }
            //lock waitObject to be able to call Monitor.Wait
            lock (waitObject)
            {
                //if waitObject is locked then no worker thread should be able to send a pulse signal
                //however, if pulse signal was sent before locking here remainingTasks should be
                //zero so don't wait if all tasks are processed already
                if (remainingTasks != 0)
                {
                    //release the lock on waitObject and wait pulse signal from the worker thread that 
                    //finishes last task
                    Monitor.Wait(waitObject);
                }
            }
            casioF91W.Stop();
            Console.Write("Interlocked:{0}ms ", casioF91W.ElapsedMilliseconds);
        }

        private static void TestInterlockedDecrement()
        {
            //process task
            //TestWork();
            //Once processing finishes decrement 1 from remainingTasks using Interlocked.Decrement
            //to make sure it is atomic and therefore thread safe. If resulting value is zero
            //send pulse signal to wake main thread.            
            if (Interlocked.Decrement(ref remainingTasks) == 0)
            {
                //Acquire a lock on waitObject to be able to send pulse signal to main thread. If main 
                //thread acquired the lock earlier, this will wait until main thread releases it
                lock (waitObject)
                {
                    //send a pulse signal to main thread to continue
                    Monitor.PulseAll(waitObject);
                }
            }
        }

        private static void TestManualLock()
        {
            casioF91W.Restart();

            //Acquire the lock on waitObject and don't release it until all items are added and
            //Wait method is called. This will ensure wait method is called in main thread
            //before any worker thread can send pulse signal by requiring worker threads to
            //lock waitObject to be able to modify remainingTasks            
            lock (waitObject)
            {
                for (int i = 0; i < remainingTasks; i++)
                {
                    ThreadPool.QueueUserWorkItem(delegate { TestManualDecrement(); });
                }
                Monitor.Wait(waitObject);
            }
            casioF91W.Stop();
            Console.Write("ManualLock:{0}ms ", casioF91W.ElapsedMilliseconds);
        }

        private static void TestManualDecrement()
        {
            //TestWork();
            //try to acquire lock on wait object.
            lock (waitObject)
            {
                //if lock is acquired, decrement remaining tasks by and then check
                //whether resulting value is zero.
                if (--remainingTasks == 0)
                {
                    //send a pulse signal to main thread to continue
                    Monitor.PulseAll(waitObject);
                }
            }
        }

        private static void TestWork()
        {
            //Uncomment following to simulate some work.
            //int i = rg.Next(100, 110);
            //for (int j = 0; j < i; j++)
            //{

            //}
        }
    }
}
Was it helpful?

Solution

When you start your tasks, you loop is starting remainingTasks tasks. By the time you near 10000, however, some of the tasks have completed and decremented this number to less than 10000, so you do not start the proper number of tasks. If I modify your loop to save how many tasks should be started, the code runs successfully. (Note that you should also be checking the return value of QueueUserWorkItem.)

        int toStart = remainingTasks;
        for (int i = 0; i < toStart; i++)
        {
            if (!ThreadPool.QueueUserWorkItem(delegate { TestInterlockedDecrement(); }))
                Console.WriteLine("Queue failed");
        }
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top