First, in your cancelObservable
, make sure and return the result of Token.Register
as your disposable instead of returning Disposable.Empty
.
Here's a good extension method for turning CancellationTokens
into observables:
public static IObservable<Unit> AsObservable(this CancellationToken token, IScheduler scheduler)
{
return Observable.Create<Unit>(observer =>
{
var d1 = new SingleAssignmentDisposable();
return new CompositeDisposable(d1, token.Register(() =>
{
d1.Disposable = scheduler.Schedule(() =>
{
observer.OnNext(Unit.Default);
observer.OnCompleted();
});
}));
});
}
Now, to your actual request:
public IObservable<Unit> ScheduleWork(IObservable<Unit> cancelSignal)
{
// Performs work on an interval
// stops the timer (but finishes any work in progress) when the cancelSignal is received
var workTimer = Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10))
.TakeUntil(cancelSignal)
.Select(_ =>
{
DoWork();
return Unit.Default;
})
.IgnoreElements();
// starts a timer after cancellation that will eventually throw a timeout exception.
var timeoutAfterCancelSignal = cancelSignal
.SelectMany(c => Observable.Never<Unit>().Timeout(TimeSpan.FromSeconds(5)));
// Use Amb to listen to both the workTimer
// and the timeoutAfterCancelSignal
// Since neither produce any data we are really just
// listening to see which will complete first.
// if the workTimer completes before the timeout
// then Amb will complete without error.
// However if the timeout expires first, then Amb
// will produce an error
return Observable.Amb(workTimer, timeoutAfterCancelSignal);
}
// Usage
var cts = new CancellationTokenSource();
var s = ScheduleWork(cts.Token.AsObservable(Scheduler.Default));
using (var finishedSignal = new ManualResetSlim())
{
s.Finally(finishedSignal.Set).Subscribe(
_ => { /* will never be called */},
error => { /* handle error */ },
() => { /* canceled without error */ } );
Console.ReadKey();
cts.Cancel();
finishedSignal.Wait();
}
Note, instead of cancellation tokens you can also do:
var cancelSignal = new AsyncSubject<Unit>();
var s = ScheduleWork(cancelSignal);
// .. to cancel ..
Console.ReadKey();
cancelSignal.OnNext(Unit.Default);
cancelSignal.OnCompleted();