Pergunta

I am a newbie on task, parallel and scheduling etc. Here I created two blockingcollection. One is for collecting input, the other is for output. I added 100 items. So I think that the total counts of them should be 100 or 0. Or the sum of them is equal 100.

However I found that they are all 0. Please help me to understand these concepts using simple language.

    static void Main(string[] args)
    {
        new Program().run();
    }
    void run()
    {
        int threadCount = 4;
        Task[] workers = new Task[threadCount];

        Task.Factory.StartNew(consumer);
        // We can do other work in parallel
        for (int i = 0; i < threadCount; ++i)
        {
            int workerId = i;
            Task task = new Task(() => worker(workerId));
            workers[i] = task;
            task.Start();
        }

        for (int i = 0; i < 100; ++i)
        {
            Console.WriteLine("Queueing work item {0}", i);
            inputQueue.Add(i);
            Thread.Sleep(50);
        }

        Console.WriteLine("Stopping adding.");
        inputQueue.CompleteAdding();
        Console.WriteLine("The count in InputQueue= {0}", inputQueue.Count);// 0
        Task.WaitAll(workers);
        outputQueue.CompleteAdding();
        Console.WriteLine("The count in OutputQueue= {0}", outputQueue.Count); // 0
        Console.WriteLine("Done.");

        Console.ReadLine();
    }

    void worker(int workerId)
    {
        Console.WriteLine("Worker {0} is starting.", workerId);

        foreach (var workItem in inputQueue.GetConsumingEnumerable())
        {
            Console.WriteLine("Worker {0} is processing item {1}", workerId, workItem);
            Thread.Sleep(100);          // Simulate work.
            outputQueue.Add(workItem);  // Output completed item.
        }

        Console.WriteLine("Worker {0} is stopping.", workerId);
    }

    void consumer()
    {
        Console.WriteLine("Consumer is starting.");

        foreach (var workItem in outputQueue.GetConsumingEnumerable())
        {
            Console.WriteLine("Consumer is using item {0}", workItem);
            Thread.Sleep(25);
        }

        Console.WriteLine("Consumer is finished.");
    }

    BlockingCollection<int> inputQueue = new BlockingCollection<int>();
    BlockingCollection<int> outputQueue = new BlockingCollection<int>();
}
Foi útil?

Solução

You have a 3 stage pipeline

run -- (inputQueue) -- worker -- (outputQueue) -- consumer

all 3 stages run at the same time so as soon as 1 item is in inputQueue it can be immediately taken out and moved to outputQueue, as soon as one item is put in to outputQueue it can be immediately taken out and processed.

So to get a count of 100 you would need to do

(100 - number of items "run" has put in to "inputQueue") + 
"inputQueue.Count" + 
some number between 0 and "threadCount" representing items that have been taken out of "inputQueue" but have not yet been put in "outputQueue" +
"outputQueue.Count" + 
the number of items "consumer" has taken out of "outputQueue"

Beacuse your number of threads and waits are perfectly balanced for the load it is most likely the above fomula would be 0 + 0 + 4 + 0 + 96 with the last 4 elements waiting to be processed in worker with everything else already processed by consumer. If you did half the worker threads and made consumer take 4 times longer to process you would have numbers like 0 + 48 + 2 + 25 + 25 with 48 waiting to be processed by a worker, 2 being processed by a worker, 25 waiting to be processed by consumer, and 25 already processed by consumer.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top