Question

Is there a way to batch a collection of items from the blocking collection. E.G.

I have a messaging bus publisher calling blockingCollection.Add()

And a consuming thread which is created like this:

Task.Factory.StartNew(() =>
        {
            foreach (string value in blockingCollection.GetConsumingEnumerable())
                {
                    Console.WriteLine(value);
                }
        });

However, I only want the Console to write after the blocking collection has 10 items on it, whereas GetConsumingEnumerable() always fires after each item is added. I could write my own queue for this but I'd like to use the blocking collection if possible?

Was it helpful?

Solution

Not sure what the project requirements are but I'd recommend TPL DataFlow BatchBlock.

You would instantiate a BatchBlock<string>, bind it to an ActionBlock<string> and then post to the batch block.

A pseudo code might look something like this:

var bb = new BatchBlock<string>(10);
var ab = new ActionBlock<string[]>(msgArray=>{ 
    foreach(var msg in msgArray) 
        Console.Writeline(msg);
});

bb.LinkTo(ab);

foreach (string value in blockingCollection.GetConsumingEnumerable())
{
      bb.Post(value);
}

Using DataFlow you might even want to replace the BlockingCollection with a BufferBlock or just post to the buffer block directly without first adding to the blocking collection, since batch block is already thread-safe.

OTHER TIPS

A quick solution would be something like this

public class ConsoleQueue
{
    private readonly List<string> _values = new List<string>();

    public void FlushQueueIfFull()
    {
        if (_values.Count < 10) return;
        foreach (var value in _values)
        {
            Console.WriteLine(value);
        }
        _values.Clear();
    }

    public void Push(string message)
    {
        _values.Add(message);
        FlushQueueIfFull();
    }
}

then you can use it like this

        var queue = new ConsoleQueue();

        Task.Factory.StartNew(() =>
        {
            foreach (string value in blockingCollection.GetConsumingEnumerable())
            {
                queue.Push(value);
            }
        });

You can easily extend it to cover thread safety etc

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top