سؤال

My basic problem is needing to process items from a queue immediately if the queue is empty, or add an item to the queue and leave if there is an item already being processed.

I'm trying out a technique that uses peek to simplify things and am wondering what gotchas there may be that could get in the way. Thanks!

    void SequenceAction(Action action) {
       bool go = false;

       lock (_RaiseEventQueueLock) {
          _RaiseEventQueue.Enqueue(action);
          go = (_RaiseEventQueue.Count == 1); 
       }

       // 'go' can only be true if queue was empty when queue 
       //  was locked and item was enqueued.
       while (go) {
          #if naive_threadsafe_presumption 
          // Peek is threadsafe because in effect this loop owns
          //  the zeroeth item in the queue for as long as the queue 
          //  remains non-empty.
          (_RaiseEventQueue.Peek())();

          #else
          Action a;
          lock (_RaiseEventQueueLock) {
             a = _RaiseEventQueue.Peek();
          }
          a();
          #endif   

          // Lock the queue to see if any item was enqueued while
          //  the zeroeth item was being processed.
          // Note that the code processing an item can call this 
          //  function reentrantly, adding to its own todo list
          //  while insuring that that each item is processed 
          //  to completion.
          lock (_RaiseEventQueueLock) {
             _RaiseEventQueue.Dequeue();
             go = (_RaiseEventQueue.Count > 0); 
          }
       }
    }
هل كانت مفيدة؟

المحلول 2

    // If action already in progress, add new
    //  action to queue and return.
    // If no action in progress, begin processing
    //  the new action and continue processing
    //  actions added to the queue in the meantime.
    void SequenceAction(Action action) {
       lock (_SequenceActionQueueLock) {
          _SequenceActionQueue.Enqueue(action);
          if (_SequenceActionQueue.Count > 1) {
             return;
          }
       }
       // Would have returned if queue was not empty
       //  when queue was locked and item was enqueued.
       for (;;) {
          action();
          lock (_SequenceActionQueueLock) {
             _SequenceActionQueue.Dequeue();
             if (_SequenceActionQueue.Count == 0) {
                return;
             }
             action = _SequenceActionQueue.Peek();
          }
       }
    }

نصائح أخرى

Actually, your Peek is not threadsafe. Adding an item to the queue could cause the backing store (an array, eventually) to be resized. I imagine the queue is implemented in a circular buffer, with head and tail indexes for insertion and removal.

So imagine what happens if there are, say, 16 items in the queue. Insert is at 8, and Remove is at 9. The queue is full. Then this happens:

  1. Thread A calls Peek, retrieves the Remove index (9).
  2. Thread A gets swapped out.
  3. Thread B calls Enqueue and sees that it has to grow the queue.
  4. Thread B allocates a new array of 32 items and copies data from the existing array. Data is copied in order, starting at Remove and wrapping around.
  5. Thread B sets Remove to 0 and Insert to 16.
  6. Thread A gets its next timeslice and returns the item at position 9.
  7. You have just processed an event out of order, and you will end up processing it again.
  8. Worse, you will remove the item at position 0 without processing it.

You might be able to solve that problem with:

Action nextAction;
lock (_RaiseEventQueueLock)
{
    nextAction = _RaiseEventQueue.Peek();
}
nextAction();

I wouldn't stake my professional career on it, though. I'd suggest using a BlockingCollection and a producer/consumer design.

Possible fix

It occurs to me that the following should do what you intended.

private readonly object _queueLock = new object();
private readonly object _processLock = new object();

void SequenceAction(Action action)
{
    lock (_queueLock)
    {
        _RaiseEventQueue.Enqueue(action);
    }
    if (Monitor.TryEnter(_processLock))
    {
        while (true)
        {
            Action a;
            lock (_queueLock)
            {
                if (_RaiseEventQueue.Count == 0) return;
                a = _RaiseEventQueue.Dequeue();
            }
            a();
        }
        Monitor.Exit(_processLock);
    }
}
مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top