문제

Is there a way to batch a collection of items from the blocking collection. E.G.

I have a messaging bus publisher calling blockingCollection.Add()

And a consuming thread which is created like this:

Task.Factory.StartNew(() =>
        {
            foreach (string value in blockingCollection.GetConsumingEnumerable())
                {
                    Console.WriteLine(value);
                }
        });

However, I only want the Console to write after the blocking collection has 10 items on it, whereas GetConsumingEnumerable() always fires after each item is added. I could write my own queue for this but I'd like to use the blocking collection if possible?

도움이 되었습니까?

해결책

Not sure what the project requirements are but I'd recommend TPL DataFlow BatchBlock.

You would instantiate a BatchBlock<string>, bind it to an ActionBlock<string> and then post to the batch block.

A pseudo code might look something like this:

var bb = new BatchBlock<string>(10);
var ab = new ActionBlock<string[]>(msgArray=>{ 
    foreach(var msg in msgArray) 
        Console.Writeline(msg);
});

bb.LinkTo(ab);

foreach (string value in blockingCollection.GetConsumingEnumerable())
{
      bb.Post(value);
}

Using DataFlow you might even want to replace the BlockingCollection with a BufferBlock or just post to the buffer block directly without first adding to the blocking collection, since batch block is already thread-safe.

다른 팁

A quick solution would be something like this

public class ConsoleQueue
{
    private readonly List<string> _values = new List<string>();

    public void FlushQueueIfFull()
    {
        if (_values.Count < 10) return;
        foreach (var value in _values)
        {
            Console.WriteLine(value);
        }
        _values.Clear();
    }

    public void Push(string message)
    {
        _values.Add(message);
        FlushQueueIfFull();
    }
}

then you can use it like this

        var queue = new ConsoleQueue();

        Task.Factory.StartNew(() =>
        {
            foreach (string value in blockingCollection.GetConsumingEnumerable())
            {
                queue.Push(value);
            }
        });

You can easily extend it to cover thread safety etc

라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top