I would suggest doing it like this:
Create a class called
TimedItemsConcurrentPriorityQueue<TKey, TValue>
that inherits fromConcurrentPriorityQueue<TKey, TValue>
.Implement an event called ItemReady in your
TimedItemsConcurrentPriorityQueue<TKey, TValue>
class that gets fired whenever an item is ready (for being processed) according to the timestamp. You can use a single timer and update the timer as needed by shadowing theEnqueue
,Insert
,Remove
and other methods as needed (Or by modifying the source ofConcurrentPriorityQueue<TKey, TValue>
and make those methods virtual so you can override them).Instantiate a single instance of
TimedItemsConcurrentPriorityQueue<TKey, TValue>
, let's call that variable itemsWaitingToBecomeReady.Instantiate a single object of
BlockingCollection<T>
, let's call that variable itemsReady. Use the constructor that takes anIProducerConsumerCollection<T>
and pass it a new instance ofConcurrentPriorityQueue<TKey, TValue>
(it inheritsIProducerConsumerCollection<KeyValuePair<TKey,TValue>>
)Whenever the event ItemReady is fired in itemsWaitingToBecomeReady, you deque that item and enqueue it to itemsReady.
Process the items in itemsReady using the
BlockingCollection<T>.GetConsumingEnumerable
method using a new task like this:
.
Task.Factory.StartNew(() =>
{
foreach (var item in itemsReady.GetConsumingEnumerable())
{
...
}
}