Question

I want two threads to be working with one queue. First thread should be called every 2 seconds, and the second thread - every 3 seconds. Both threads should start at the same time. I have a problem when accessing first element of the queue. Both threads take the element with index 0. Sometimes it happens with other elements of the queue, not only with the first element. I have such an output on Console:

  • Item 0 processed by 1 Time: 3:27:8
  • Item 0 processed by 2 Time: 3:27:8
  • Item 2 processed by 1 Time: 3:27:10
  • Item 3 processed by 2 Time: 3:27:11
  • Item 4 processed by 1 Time: 3:27:12

and so on..

Here is the code I use:

    ConcurrentQueue<int> sharedQueue = new ConcurrentQueue<int>();

    for (int i = 0; i < 10; i++)
    {
        sharedQueue.Enqueue(i);
    }


    int itemCount= 0;


    Task[] tasks = new Task[2];
    for (int i = 0; i < tasks.Length; i++)
    {
        // create the new task
        tasks[i] = new Task(() =>
        {
            while (sharedQueue.Count > 0)
            {
                // define a variable for the dequeue requests
                int queueElement;
                // take an item from the queue
                bool gotElement = sharedQueue.TryDequeue(out queueElement);
                // increment the count of items processed
                if (gotElement)
                {
                    DateTime dt = DateTime.Now;
                    Console.WriteLine("Item " + itemCount + "processed by " 
                        + Task.CurrentId + " Time: " + dt.Hour + ":" + dt.Minute + ":" + dt.Second);
                    Interlocked.Increment(ref itemCount);   
               if (Task.CurrentId == 1) 
                    Thread.Sleep(2000);
                else 
                    Thread.Sleep(3000);                       
                }

            }
        });
        // start the new task
        tasks[i].Start();


    }
    // wait for the tasks to complete
    Task.WaitAll(tasks);
    // report on the number of items processed
    Console.WriteLine("Items processed: {0}", itemCount);
    // wait for input before exiting
    Console.WriteLine("Press enter to finish");
    Console.ReadLine();
}
Was it helpful?

Solution

Replace the following line:

Console.WriteLine("Item " + itemCount + "processed by " ...);

With this line:

Console.WriteLine("Item " + queueElement + "processed by " ...);

The problem you are seeing is likely due to the tasks executing Console.WriteLine at nearly the same time and both are seeing the same value of itemCount because they have interleaved in such a manner that the Interlocked.Increment calls have not happened yet. It probably makes more sense to print out queueElement anyway since it is more meaningful.

OTHER TIPS

See Brian Gideon's excellent answer regarding your itemCount problem.

You might consider rewriting your code to use BlockingCollection rather than ConcurrentQueue<T>. It's much easier to work with. BlockingCollection is a wrapper for concurrent collections. In its default configuration, the backing store is a ConcurrentQueue. So you get the same concurrent queue functionality but with a much nicer interface.

BlockingCollection<int> sharedQueue = new BlockingCollection<int>();

for (int i = 0; i < 10; i++)
{
    sharedQueue.Add(i);
}

// CompleteAdding marks the queue as "complete for adding,"
// meaning that no more items will be added.
sharedQueue.CompleteAdding();

int itemCount= 0;

Task[] tasks = new Task[2];
for (int i = 0; i < tasks.Length; i++)
{
    // create the new task
    tasks[i] = new Task(() =>
    {
        foreach (var queueElement in sharedQueue.GetConsumingEnumerable())
        {
            DateTime dt = DateTime.Now;
            Console.WriteLine("Item " + itemCount + "processed by " 
                + Task.CurrentId + " Time: " + dt.Hour + ":" + dt.Minute + ":" + dt.Second);
            Interlocked.Increment(ref itemCount);   
            if (Task.CurrentId == 1) 
                Thread.Sleep(2000);
            else 
                Thread.Sleep(3000);                       
        }
    });

    // start the new task
    tasks[i].Start();
}

GetConsumingEnumerable returns an enumerator that will get the next item from the queue until the queue is empty. It also handles cancellation well, which is a bit more difficult with ConcurrentQueue.

In general, any time you think of using ConcurrentQueue<T>, you probably want BlockingCollection<T>.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top