Question

I need to design custom datafloaw block, which acts like a buffer but would makes items available after some timeout. I would put incoming messages into a queue and start timer. When timer is triggered, I wold move an item from queue into BufferBlock which would make it available for Dataflow consumers.

If I move an item from internal queue into output BufferBlock from timer handler, obviously it wont be thread safe, because timer handler can collide with enqueue call and corrupt queue. There is a claim in MSDN that dataflow is based on Actor idea, which assumes that messages are executed single-threaded thus solving synchronization problem. But If I introduce a timer handler, this would break this assumption. I can use old school lock on the queue, or use CuncurrentQueue but I am curious if there is more idiosyncratic way of dataflow to manage timer so that it does not conflict with dataflow block's Post() call.

Or to extend this question, is there an elegant way to have a dataflow block handling several different types of messages and still offer thread-safe model?

Was it helpful?

Solution

I need to design custom datafloaw block, which acts like a buffer but would makes items available after some timeout.

So, something like this?

public static TransformBlock<T, T> Delay<T>(Timespan delay)
{
  return new TransformBlock<T, T>(async x =>
  {
    await Task.Delay(delay);
    return x;
  },
  new ExecutionDataflowBlockOptions
  {
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
  });
}

If you still think you need a custom block, be sure to read the Guide to Implementing Custom Dataflow Blocks, which describes all the locks you need to worry about.

OTHER TIPS

From the MSDN page you referenced:

Because the runtime manages dependencies between data, you can often avoid the requirement to synchronize access to shared data.

What this means is that when you're using dataflow blocks in your code, you usually don't have to worry about synchronization, because the blocks do that for you.

But when you're writing a custom dataflow block, then you do need to handle synchronization yourself. For example, imagine you were implementing BufferBlock. Calling Post() on that block has to be synchronized somehow, because two source blocks could call Post() at the same time. And nobody will handle that synchronization for you, so your implementation of Post()* would need to use locks or ConcurrentQueue or something like that.

* Actually, you don't implement Post(), you implement OfferMessage().

But, if I understand your requirement correctly, you can actually implement your block without any manual synchronization, by taking advantage of the synchronization that already exists in TDF. You would implement your block by using two BufferBlocks, a helper Task and DataflowBlock.Encapsulate():

public static IPropagatorBlock<T, T> CreateDelayedBlock<T>(TimeSpan delay)
{
    var source = new BufferBlock<T>();
    var target = new BufferBlock<T>();

    Task.Run(
        async () =>
        {
            while (await source.OutputAvailableAsync())
            {
                T item;
                if (source.TryReceive(out item))
                {
                    await Task.Delay(delay);
                    await target.SendAsync(item);
                }
                else
                {
                    // this shouldn't happen
                    // nobody else should be able to receive from source
                }
            }

            // TODO: if source failed, fail target
            target.Complete();
        });

    return DataflowBlock.Encapsulate(source, target);
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top