See Brian Gideon's excellent answer regarding your itemCount
problem.
You might consider rewriting your code to use BlockingCollection rather than ConcurrentQueue<T>
. It's much easier to work with. BlockingCollection
is a wrapper for concurrent collections. In its default configuration, the backing store is a ConcurrentQueue
. So you get the same concurrent queue functionality but with a much nicer interface.
BlockingCollection<int> sharedQueue = new BlockingCollection<int>();
for (int i = 0; i < 10; i++)
{
sharedQueue.Add(i);
}
// CompleteAdding marks the queue as "complete for adding,"
// meaning that no more items will be added.
sharedQueue.CompleteAdding();
int itemCount= 0;
Task[] tasks = new Task[2];
for (int i = 0; i < tasks.Length; i++)
{
// create the new task
tasks[i] = new Task(() =>
{
foreach (var queueElement in sharedQueue.GetConsumingEnumerable())
{
DateTime dt = DateTime.Now;
Console.WriteLine("Item " + itemCount + "processed by "
+ Task.CurrentId + " Time: " + dt.Hour + ":" + dt.Minute + ":" + dt.Second);
Interlocked.Increment(ref itemCount);
if (Task.CurrentId == 1)
Thread.Sleep(2000);
else
Thread.Sleep(3000);
}
});
// start the new task
tasks[i].Start();
}
GetConsumingEnumerable returns an enumerator that will get the next item from the queue until the queue is empty. It also handles cancellation well, which is a bit more difficult with ConcurrentQueue
.
In general, any time you think of using ConcurrentQueue<T>
, you probably want BlockingCollection<T>
.