How do I create an Observable Timer that calls a method and blocks on cancellation if the method is running until it finishes?

StackOverflow https://stackoverflow.com/questions/20714219

  •  20-09-2022
  •  | 
  •  

Question

My requirements:

  1. Run method DoWork on a specified interval.
  2. If stop is called between calls to DoWork just stop the timer.
  3. If stop is called while DoWork is running, block until DoWork is finished.
  4. If DoWork takes too long to finish after stop is called, timeout.

I have a solution that seems to work so far, but I'm not super happy with it and think I may be missing something. The following is the void Main from my test app:

var source = new CancellationTokenSource();

// Create an observable sequence for the Cancel event.
var cancelObservable = Observable.Create<Int64>(o =>
{
    source.Token.Register(() =>
    {
        Console.WriteLine("Start on canceled handler.");
        o.OnNext(1);
        Console.WriteLine("End on canceled handler.");
    });

    return Disposable.Empty;
});

var observable =
    // Create observable timer.
    Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), Scheduler.Default)
        // Merge with the cancel observable so we have a composite that 
        // generates an event every 10 seconds AND immediately when a cancel is requested.
        .Merge(cancelObservable)
        // This is what I ended up doing instead of disposing the timer so that I could wait
        // for the sequence to finish, including DoWork.
        .TakeWhile(i => !source.IsCancellationRequested)
        // I could put this in an observer, but this way exceptions could be caught and handled
        // or the results of the work could be fed to a subscriber.
        .Do(l =>
        {
            Console.WriteLine("Start DoWork.");
            Thread.Sleep(TimeSpan.FromSeconds(5));
            Console.WriteLine("Finish DoWork.");
        });

var published = observable.Publish();

var disposable = published.Connect();

// Press key between Start DoWork and Finish DoWork to test the cancellation while
// running DoWork.
// Press key between Finish DoWork and Start DoWork to test cancellation between
// events.
Console.ReadKey();

// I doubt this is good practice, but I was finding that o.OnNext was blocking
// inside of register, and the timeout wouldn't work if I blocked here before
// I set it up.
Task.Factory.StartNew(source.Cancel);

// Is there a preferred way to block until a sequence is finished? My experience
// is there's a timing issue if Cancel finishes fast enough the sequence may already
// be finished by the time I get here and .Wait() complains that the sequence contains
// no elements.
published.Timeout(TimeSpan.FromSeconds(1))
    .ForEach(i => { });

disposable.Dispose();

Console.WriteLine("All finished! Press any key to continue.");
Console.ReadKey();
Était-ce utile?

La solution

First, in your cancelObservable, make sure and return the result of Token.Register as your disposable instead of returning Disposable.Empty.

Here's a good extension method for turning CancellationTokens into observables:

public static IObservable<Unit> AsObservable(this CancellationToken token, IScheduler scheduler)
{
    return Observable.Create<Unit>(observer =>
    {
        var d1 = new SingleAssignmentDisposable();
        return new CompositeDisposable(d1, token.Register(() =>
            {
                d1.Disposable = scheduler.Schedule(() =>
                {
                    observer.OnNext(Unit.Default);
                    observer.OnCompleted();
                });
            }));
    });
}

Now, to your actual request:

public IObservable<Unit> ScheduleWork(IObservable<Unit> cancelSignal)
{
    // Performs work on an interval
    // stops the timer (but finishes any work in progress) when the cancelSignal is received
    var workTimer = Observable
        .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10))
        .TakeUntil(cancelSignal)
        .Select(_ =>
        {
            DoWork();
            return Unit.Default;
        })
        .IgnoreElements();

    // starts a timer after cancellation that will eventually throw a timeout exception.
    var timeoutAfterCancelSignal = cancelSignal
        .SelectMany(c => Observable.Never<Unit>().Timeout(TimeSpan.FromSeconds(5)));

    // Use Amb to listen to both the workTimer
    // and the timeoutAfterCancelSignal
    // Since neither produce any data we are really just
    // listening to see which will complete first.
    // if the workTimer completes before the timeout
    // then Amb will complete without error.
    // However if the timeout expires first, then Amb
    // will produce an error
    return Observable.Amb(workTimer, timeoutAfterCancelSignal);
}

// Usage
var cts = new CancellationTokenSource();
var s = ScheduleWork(cts.Token.AsObservable(Scheduler.Default));

using (var finishedSignal = new ManualResetSlim())
{
    s.Finally(finishedSignal.Set).Subscribe(
        _ => { /* will never be called */},
        error => { /* handle error */ },
        () => { /* canceled without error */ } );

    Console.ReadKey();
    cts.Cancel();

    finishedSignal.Wait();
}

Note, instead of cancellation tokens you can also do:

var cancelSignal = new AsyncSubject<Unit>();
var s = ScheduleWork(cancelSignal);

// .. to cancel ..
Console.ReadKey();
cancelSignal.OnNext(Unit.Default);
cancelSignal.OnCompleted();
Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top