Rather than breaking into the BufferBlock
, why not instead insert a TransformManyBlock
into the chain that does this for you? You can use a HashSet
, where the Add
method only returns true
if the item hasn't already been added. It ends up being quite simple, but storage requirements obviously increase with time...
void Main()
{
var bb = new BufferBlock<string>();
var db = DataflowEx.CreateDistinctBlock<string>();
var ab = new ActionBlock<string>(x => Console.WriteLine(x));
bb.LinkTo(db);
db.LinkTo(ab);
bb.Post("this");
bb.Post("this");
bb.Post("this");
bb.Post("is");
bb.Post("is");
bb.Post("a");
bb.Post("test");
}
public class DataflowEx
{
public static TransformManyBlock<T, T> CreateDistinctBlock<T>()
{
var hs = new HashSet<T>();
//hs will be captured in the closure of the delegate
//supplied to the TransformManyBlock below and therefore
//will have the same lifespan as the returned block.
//Look up the term "c# closure" for more info
return new TransformManyBlock<T, T>(
x => Enumerable.Repeat(x, hs.Add(x) ? 1 : 0));
}
}
The reason this works is that, just like Linq's SelectMany, the TransformManyBlock effectively flattens out lists of lists. So, the TransformManyBlock takes a delegate that returns an IEnumerable<T>
, but offers the items in the returned IEnumerable<T>
one at a time. By returning an IEnumerable<T>
that either has 0 or 1 items in it, we can effectively create Where
-like behaviour, either allowing an item through or preventing it from passing, depending on whether or not some predicate is satisfied. In this case, the predicate is whether or not we can add the item to the captured HashSet.