Pergunta

With the TPL Dataflow library, I would like to do something like this:

myActionBlock.Post(newValue, cancelAllPreviousPosts: true);

It appears that the cancellation token on ActionBlock cancels the whole thing; I'd have to make a new ActionBlock if I set that one. Is it possible to do a partial cancellation with ActionBlock?

Posts that have not been processed yet should not be attempted. It would be nice if there was some cancellation token available to check in the currently-executing post.

Foi útil?

Solução

Take a look at BroadcastBlock<T>, which only holds the most recent item posted to it. You can put a broadcast block in front of an ActionBlock<T>.

While posting a new item to the broadcast block won't cancel the item currently being processed by the action block, it will overwrite any existing item already held by the broadcast block; in effect discarding any older messages not yet processed by the action block. When the action block completes its current item, it will take the most recent item posted to the broadcast block.

Outras dicas

In addition to Monroe Thomas's answer it is important to understand that the ActionBlock following the BroadcastBlock needs it's BoundedCapacity limited to 1 or it will store and process every message of the broadcast block, even when it is still executing.
A code example goes here:

ActionBlock<int> ExecuteBlock = new ActionBlock<int>(async ThisNumber =>
{
  await Task.Delay(100);
  Console.WriteLine($">{ThisNumber}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

BroadcastBlock<int> ThrottleBlock = new BroadcastBlock<int>(null);
ThrottleBlock.LinkTo(ExecuteBlock, new DataflowLinkOptions { PropagateCompletion = true });

for(int IX = 0; IX < 128; IX++)
{
  await ThrottleBlock.SendAsync(IX);
  await Task.Delay(10);
}

This results in the following:

>0
>6
>12
>20
>27
>34
>41
>48
>55
>62
>68
>75
>82
>88
>95
>101
>108
>115
>122
>127

Enjoy!
-Simon

There is nothing like this directly in TPL Dataflow, but I can see several ways how you could implement it yourself:

  1. If you don't need to be able to treat the modified block as a normal Dataflow block (e.g. no support for LinkTo()), then a simple way would to write a type that wraps ActionBlock, but whose items also contain a flag that says whether they should be processed. When you specify cancelAllPreviousPosts: true, all those flags are reset, so those items are going to be skipped.

    The code could look something like this:

    class CancellableActionBlock<T>
    {
        private class Item
        {
            public T Data { get; private set; }
            public bool ShouldProcess { get; set; }
    
            public Item(T data)
            {
                Data = data;
                ShouldProcess = true;
            }
        }
    
        private readonly ActionBlock<Item> actionBlock;
        private readonly ConcurrentDictionary<Item, bool> itemSet;
    
        public CancellableActionBlock(Action<T> action)
        {
            itemSet = new ConcurrentDictionary<Item, bool>();
            actionBlock = new ActionBlock<Item>(item =>
            {
                bool ignored;
                itemSet.TryRemove(item, out ignored);
    
                if (item.ShouldProcess)
                {
                    action(item.Data);
                }
            });
        }
    
        public bool Post(T data, bool cancelAllPreviousPosts = false)
        {
            if (cancelAllPreviousPosts)
            {
                foreach (var item in itemSet.Keys)
                {
                    item.ShouldProcess = false;
                }
                itemSet.Clear();
            }
    
            var newItem = new Item(data);
            itemSet.TryAdd(newItem, true);
            return actionBlock.Post(newItem);
        }
    
        // probably other members that wrap actionBlock members,
        // like Complete() and Completion
    }
    
  2. If you want to create something that's more composable and reusable, you could create a special block just for that cancellation. You could implement that using thee BufferBlocks linked together, where the third one would have capacity of 1 and the second one unlimited capacity. This way, almost all the queued items would be in the second block, so you could perform cancellation just by swapping that block for a new one. The whole structure would be represented by Encapsulate()ing the first and the third block.

    The issues with this approach is that the cancellation has a delay of 1 item (the one that's in the third block). Also, I didn't figure out a good interface for this.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top