Вопрос

I have a BlockingCollection. Producer tasks add items to it, and consumer tasks remove items.

Now I want to limit the number of items in the collection, automatically discarding old data if more items are added. The collection should never contain more than the N most recently added items at the same time.

So, if the producers add new items faster than the consumers remove them, I want the consumers to process only the newest items.

I can limit the size of a BlockingCollection in its constructor, but of course that just means it blocks when adding more items, not that it removes old items.

(I do not want blocking on the producer side, only the consumer side should block when retrieving items from an empty collection.)

My current solution is a hack, and only works for a size limit of 1:
(And I am not quite sure it works reliable at all.)

// My consumer task:
foreach (var item in blockingCollection.GetConsumingEnumerable())
{
    var lastItem = item;
    var lastItemTmp = item;
    while (blockingCollection.TryTake(out lastItemTmp))
           lastItem = lastItemTmp;
    // Now lastItem contains the most recent item in the collection, 
    // and older items have been discarded.
    // Proceed consuming lastItem ...
}

Is there a cleaner solution?

Это было полезно?

Решение

Do it this way:

void AddItemToQueue(MyClass item)
{
    while (!queue.TryAdd(item))
    {
        MyClass trash;
        queue.TryTake(out trash);
    }
}

If the queue is full when you try to add the item, an item is removed from the queue. It uses TryTake because it's possible (unlikely, but possible) that some other thread might have removed the last item from the queue before you get a chance to take one.

This assumes, of course, that you specified a limit on the number of items when you constructed the BlockingCollection.

Another way to do this, although it's more involved, is to create your own circular queue class, and have it implement the IProducerConsumerCollection interface. You can then use an instance of that class as the backing collection for your BlockingCollection. Implementing a circular queue isn't especially difficult, although there are edge cases that are tricky to get right. And you'll have to make it a concurrent data structure, although that's pretty easy to do with a lock.

If you don't expect the queue to overflow often, or if the queue is pretty low traffic (i.e. not being hit thousands of times per second), then my initial suggestion will do what you want and there won't be a performance problem. If there is a performance problem, then the circular queue is the solution.

Другие советы

I would use the Concurrent stack:

Represents a thread-safe last in-first out (LIFO) collection.

http://msdn.microsoft.com/en-us/library/dd267331%28v=vs.110%29.aspx

And I would send in the stack an object that wraps your task adding to it a timestamp. The consumer will take tasks from the stack and discard the ones with a timestamp that is older than a threshold defined by you.

Just call this method before you add your items to it.

public static void Clear<T>(this BlockingCollection<T> blockingCollection)
    {
        if (blockingCollection == null)
        {
            throw new ArgumentNullException("blockingCollection");
        }

        while (blockingCollection.Count > 0)
        {
            T item;
            blockingCollection.TryTake(out item);
        }
    }
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top