Pregunta

Teaser: guys, this question is not about how to implement retry policy. It's about correct completion of a TPL Dataflow block.

This question is mostly a continuation of my previous question Retry policy within ITargetBlock. The answer to this question was @svick's smart solution that utilizes TransformBlock (source) and TransformManyBlock (target). The only problem left is to complete this block in a right way: wait for all the retries to be completed first, and then complete the target block. Here is what I ended up with (it's just a snippet, don't pay too many attention to a non-threadsafe retries set):

var retries = new HashSet<RetryingMessage<TInput>>();

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message =>
    {
        try
        {
            var result = new[] { await transform(message.Data) };
            retries.Remove(message);
            return result;
        }
        catch (Exception ex)
        {
            message.Exceptions.Add(ex);
            if (message.RetriesRemaining == 0)
            {
                if (failureHandler != null)
                    failureHandler(message.Exceptions);

                retries.Remove(message);
            }
            else
            {
                retries.Add(message);
                message.RetriesRemaining--;

                Task.Delay(retryDelay)
                    .ContinueWith(_ => target.Post(message));
            }
            return null;
        }
    }, dataflowBlockOptions);

source.LinkTo(target);

source.Completion.ContinueWith(async _ =>
{
    while (target.InputCount > 0 || retries.Any())
        await Task.Delay(100);

    target.Complete();
});

The idea is to perform some kind of polling and verify whether there are still messages that waiting to be processed and there are no messages that require retrying. But in this solution I don't like the idea of polling.

Yes, I can encapsulate the logic of adding/removing retries into a separate class, and even e.g. perform some action when the set of retries becomes empty, but how to deal with target.InputCount > 0 condition? There is not such a callback that get called when there are no pending messages for the block, so it seems that verifying target.ItemCount in a loop with a small delay is an only option.

Does anybody knows a smarter way to achieve this?

¿Fue útil?

Solución 2

Combining hwcverwe answer and JamieSee comment could be the ideal solution.

First, you need to create more than one event:

var signal  = new ManualResetEvent(false);
var completedEvent = new ManualResetEvent(false);

Then, you have to create an observer, and subscribe to the TransformManyBlock, so you are notified when a relevant event happens:

var observer = new RetryingBlockObserver<TOutput>(completedEvent);
var observable = target.AsObservable();
observable.Subscribe(observer);

The observable can be quite easy:

private class RetryingBlockObserver<T> : IObserver<T> {
        private ManualResetEvent completedEvent;

        public RetryingBlockObserver(ManualResetEvent completedEvent) {                
            this.completedEvent = completedEvent;
        }

        public void OnCompleted() {
            completedEvent.Set();
        }

        public void OnError(Exception error) {
            //TODO
        }

        public void OnNext(T value) {
            //TODO
        }
    }

And you can wait for either the signal, or completion (exhaustion of all the source items), or both

 source.Completion.ContinueWith(async _ => {

            WaitHandle.WaitAll(completedEvent, signal);
            // Or WaitHandle.WaitAny, depending on your needs!

            target.Complete();
        });

You can inspect the result value of WaitAll to understand which event was set, and react accordingly. You can also add other events to the code, passing them to the observer, so that it can set them when needed. You can differentiate your behaviour and respond differently when an error is raised, for example

Otros consejos

Maybe a ManualResetEvent can do the trick for you.

Add a public property to TransformManyBlock

private ManualResetEvent _signal  = new ManualResetEvent(false);
public ManualResetEvent Signal { get { return _signal; } }

And here you go:

var retries = new HashSet<RetryingMessage<TInput>>();

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message =>
    {
        try
        {
            var result = new[] { await transform(message.Data) };
            retries.Remove(message);

            // Sets the state of the event to signaled, allowing one or more waiting threads to proceed
            if(!retries.Any()) Signal.Set(); 
            return result;
        }
        catch (Exception ex)
        {
            message.Exceptions.Add(ex);
            if (message.RetriesRemaining == 0)
            {
                if (failureHandler != null)
                    failureHandler(message.Exceptions);

                retries.Remove(message);

                // Sets the state of the event to signaled, allowing one or more waiting threads to proceed
                if(!retries.Any()) Signal.Set(); 
            }
            else
            {
                retries.Add(message);
                message.RetriesRemaining--;

                Task.Delay(retryDelay)
                    .ContinueWith(_ => target.Post(message));
            }
            return null;
        }
    }, dataflowBlockOptions);

source.LinkTo(target);

source.Completion.ContinueWith(async _ =>
{
    //Blocks the current thread until the current WaitHandle receives a signal.
    target.Signal.WaitOne();

    target.Complete();
});

I am not sure where your target.InputCount is set. So at the place you change target.InputCount you can add following code:

if(InputCount == 0)  Signal.Set();
Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top