Domanda

I recently started using the TPL Dataflow library from .NET 4.5 and the whole concept of blocks is new to me. I'm implementing a producer-consumer queue in my application, and I need to protect against duplicate messages being put in the queue, and therefore the need to check if a message has already been queued. I am using a BufferBlock<Message> type (Message is a custom type). BufferBlock has the Count property but that doesn't help in this issue because the messages need to be uniquely identified.

Is there any way to check if a BufferBlock contains an item or go through all the items and inspect them? Is it possible to cast BufferBlock to something that allows iteration over the items? I'm following an example I saw on MSDN and it doesn't check if the item is in the queue, but I would think that checking the contents of a queue is a fairly needed operation. Any help is appreciated.

È stato utile?

Soluzione

Rather than breaking into the BufferBlock, why not instead insert a TransformManyBlock into the chain that does this for you? You can use a HashSet, where the Add method only returns true if the item hasn't already been added. It ends up being quite simple, but storage requirements obviously increase with time...

void Main()
{
    var bb = new BufferBlock<string>();
    var db = DataflowEx.CreateDistinctBlock<string>();
    var ab = new ActionBlock<string>(x => Console.WriteLine(x));
    bb.LinkTo(db);
    db.LinkTo(ab);
    bb.Post("this");
    bb.Post("this");
    bb.Post("this");
    bb.Post("is");
    bb.Post("is");
    bb.Post("a");
    bb.Post("test");
}

public class DataflowEx
{
    public static TransformManyBlock<T, T> CreateDistinctBlock<T>()
    {
        var hs = new HashSet<T>();
        //hs will be captured in the closure of the delegate
        //supplied to the TransformManyBlock below and therefore
        //will have the same lifespan as the returned block.
        //Look up the term "c# closure" for more info
        return new TransformManyBlock<T, T>(
                         x => Enumerable.Repeat(x, hs.Add(x) ? 1 : 0));
    }
}

The reason this works is that, just like Linq's SelectMany, the TransformManyBlock effectively flattens out lists of lists. So, the TransformManyBlock takes a delegate that returns an IEnumerable<T>, but offers the items in the returned IEnumerable<T> one at a time. By returning an IEnumerable<T> that either has 0 or 1 items in it, we can effectively create Where-like behaviour, either allowing an item through or preventing it from passing, depending on whether or not some predicate is satisfied. In this case, the predicate is whether or not we can add the item to the captured HashSet.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top