문제

I have a situation where I have multiple producers and multiple consumers. The producers enters a job into a queue. I chose the BlockingCollection and it works great since I need the consumers to wait for a job to be found. However, if I use the GetConsumingEnumerable() feature the order of the items in the collection change... this is not what I need.

It even says in MSDN http://msdn.microsoft.com/en-us/library/dd287186.aspx that it does not preserve the order of the items.

Does anyone know an alternative for this situation?

I see that the Take method is available but does it also provide a 'wait' condition for the consumer threads?

It says http://msdn.microsoft.com/en-us/library/dd287085.aspx

'A call to Take may block until an item is available to be removed.' Is it better to use TryTake? I really need the thread to wait and keep checking for a job.

도움이 되었습니까?

해결책

Take blocks the thread till something comes available.

TryTake as the name implies tries to do so but returns a bool if it fails or succeeds. Allowing for more flex using it:

while(goingOn){
   if( q.TryTake(out var){
      Process(var)
   }
   else{
      DoSomething_Usefull_OrNotUseFull_OrEvenSleep();
   }
}

instead of

while(goingOn){
   if( var x = q.Take(){
      //w'll wait till this ever will happen and then we:
      Process(var)
   }
}

My votes are for TryTake :-)

EXAMPLE:

    public class ProducerConsumer<T> {

        public struct Message {
            public T Data;
        }

        private readonly ThreadRunner _producer;
        private readonly ThreadRunner _consumer;

        public ProducerConsumer(Func<T> produce, Action<T> consume) {
            var q = new BlockingCollection<Message>();
            _producer = new Producer(produce,q);
            _consumer = new Consumer(consume,q);
        }

        public void Start() {
            _producer.Run();
            _consumer.Run();
        }

        public void Stop() {
            _producer.Stop();
            _consumer.Stop();
        }

        private class Producer : ThreadRunner {

            public Producer(Func<T> produce, BlockingCollection<Message> q) : base(q) {
                _produce = produce;
            }

            private readonly Func<T> _produce;

            public override void Worker() {
                try {
                    while (KeepRunning) {
                        var item = _produce();
                        MessageQ.TryAdd(new Message{Data = item});
                    }
                }
                catch (ThreadInterruptedException) {
                    WasInterrupted = true;
                }
            }
        }

        public abstract class ThreadRunner {

            protected readonly BlockingCollection<Message> MessageQ;

            protected ThreadRunner(BlockingCollection<Message> q) {
                MessageQ = q;
            }

            protected Thread Runner;
            protected bool KeepRunning = true;

            public bool WasInterrupted;

            public abstract void Worker();

            public void Run() {
                Runner = new Thread(Worker);
                Runner.Start();
            }

            public void Stop() {
                KeepRunning = false;
                Runner.Interrupt();
                Runner.Join();
            }

        }

        class Consumer : ThreadRunner {

            private readonly Action<T> _consume;

            public Consumer(Action<T> consume,BlockingCollection<Message> q) : base(q) {
                _consume = consume;
            }

            public override void Worker() {
                try {
                    while (KeepRunning) {
                        Message message;
                        if (MessageQ.TryTake(out message, TimeSpan.FromMilliseconds(100))) {
                            _consume(message.Data);
                        }
                        else {
                            //There's nothing in the Q so I have some spare time...
                            //Excellent moment to update my statisics or update some history to logfiles
                            //for now we sleep:
                            Thread.Sleep(TimeSpan.FromMilliseconds(100));
                        }
                    }
                }
                catch (ThreadInterruptedException) {
                    WasInterrupted = true;
                }
            }
        }
    }

}

USAGE:

[Fact]
public void ConsumerShouldConsume() {

    var produced = 0;
    var consumed = 0;

    Func<int> produce = () => {
        Thread.Sleep(TimeSpan.FromMilliseconds(100));
        produced++;
        return new Random(2).Next(1000);
    };

    Action<int> consume = c => { consumed++; };

    var t = new ProducerConsumer<int>(produce, consume);
    t.Start();
    Thread.Sleep(TimeSpan.FromSeconds(5));
    t.Stop();

    Assert.InRange(produced,40,60);
    Assert.InRange(consumed, 40, 60);

}
라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top