Question

I've got a (concurrent) priority queue with a timestamp (in the future) as the key and a function that should be called (/ an item that should be processed) when the time is reached as the value. I don't want to attach a timer to each item, cause there a lots of it. I'd rather go with a scheduler thread/task.

What would be a good strategy to do so?


With a thread running a scheduler... (pseudo-code follows)

// scheduler
readonly object _threadLock = new object();
while (true)
{
    if(queue.Empty) 
    {
        Monitor.Wait(_threadLock);
    } 
    else 
    {
        var time = GetWaitingTimeForNextElement();

        if(time > 0)
          Monitor.Wait(_threadLock, time);
        else
          // dequeue and process element
    }  
}

...and pulsing when adding elements (to an empty queue or adding a new first element)?

// element enqueued 
Monitor.Pulse(_threadLock);

Or with somehow chained (Task.ContinueWith(...)) Tasks using Task.Delay(int, CancellationToken )? This would need some logic to abort the waiting if a new first element is enqueued or to create a new task if no one is running. It feels like there is a simpler solution I'm not getting right now. :)


Or using a timer (very-pseudo-code, just to get the idea)...

System.Timers.Timer x = new System.Timers.Timer().Start();

x.Elapsed += (sender, args) =>
{
    // dequeue and process item(s)

    x.Interval = GetWaitingTimeForNextElement(); // does this reset the timer anyway?
}

...and updating the interval when adding elements (like above).

// element enqueued 
x.Interval = updatedTime;

I'm also concerned with the precision of the wait methods / timers: Milliseconds is quite rough (allthough it might work) Is there a better alternative?

Ergo...

Thats again a bunch of questions/thoughts - sorry for that - but there are so many options and concerns that its hard to get an overview. So to summarize: What is the best way to implement a (precise) time scheduling system for dynamically incoming items?.

I appreciate all hints and answers! Thanks a lot.

Was it helpful?

Solution

I would suggest doing it like this:

  1. Create a class called TimedItemsConcurrentPriorityQueue<TKey, TValue> that inherits from ConcurrentPriorityQueue<TKey, TValue>.

  2. Implement an event called ItemReady in your TimedItemsConcurrentPriorityQueue<TKey, TValue> class that gets fired whenever an item is ready (for being processed) according to the timestamp. You can use a single timer and update the timer as needed by shadowing the Enqueue, Insert, Remove and other methods as needed (Or by modifying the source of ConcurrentPriorityQueue<TKey, TValue> and make those methods virtual so you can override them).

  3. Instantiate a single instance of TimedItemsConcurrentPriorityQueue<TKey, TValue>, let's call that variable itemsWaitingToBecomeReady.

  4. Instantiate a single object of BlockingCollection<T>, let's call that variable itemsReady. Use the constructor that takes an IProducerConsumerCollection<T> and pass it a new instance of ConcurrentPriorityQueue<TKey, TValue> (it inherits IProducerConsumerCollection<KeyValuePair<TKey,TValue>>)

  5. Whenever the event ItemReady is fired in itemsWaitingToBecomeReady, you deque that item and enqueue it to itemsReady.

  6. Process the items in itemsReady using the BlockingCollection<T>.GetConsumingEnumerable method using a new task like this:

.

Task.Factory.StartNew(() =>
{
  foreach (var item in itemsReady.GetConsumingEnumerable())
  {
    ...
  }
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top