Question

Let's pretend I have something like this:

<TextBox Text="{Binding Text, Mode=TwoWay}" />

And something like this:

public class MyViewModel : INotifyPropertyChanged
{
    public MyViewModel()
    {
        // run DoWork() when this.Text changes
        Observable.FromEventPattern<PropertyChangedEventArgs>(this, "PropertyChanged")
            .Where(x => x.EventArgs.PropertyName.Equals("Text"))
            .Subscribe(async x => await DoWork());
    }

    private async Task DoWork()
    {
        await Task.Delay(this.Text.Length * 100);
    }

    public event PropertyChangedEventHandler PropertyChanged;

    private string _Text = "Hello World";
    public string Text
    {
        get { return _Text; }
        set
        {
            _Text = value;
            if (this.PropertyChanged != null)
                this.PropertyChanged(this, new PropertyChangedEventArgs("Text"));
        }
    }
}

In this scenario, the user could be typing very quickly. I need:

  1. DoWork() must not run while DoWork() is already running

  2. The user may type in spurts, some changes, pause, some changes

  3. DoWork() is NOT required for every change, only the last change

  4. There is no need for DoWork() to be called more frequently than 1 second

  5. DoWork() cannot wait until the last change, if the spurt is > 1 second

  6. DoWork() should not be called while the system is idle

  7. The duration of DoWork() varies based on the length of this.Text

The question isn't if Rx can do this. I know it can. What's the proper syntax?

No correct solution

OTHER TIPS

While I kind of agree with James World, I think you can Do Better, if we use just a bit of mutable state. What if DoWork looked like this:

AsyncSubject<Unit> doingWork;
public IObservable<Unit> DoWork()
{
    if (doingWork != null) return doingWork;

    doingWork = Observable.Start(() => {
        // XXX: Do work
        Thread.Sleep(1000);

        // We only kick off this 1sec timeout *after* we finish doing work
        Observable.Timer(TimeSpan.FromSeconds(1.0), DispatcherScheduler.Instance)
            .Subscribe(_ => doingWork = null);
    });

    return doingWork;
}

Now, DoWork debounces itself Automagically™, and we can get rid of this await-in-Subscribe silliness; we set the throttle to 250ms to be Quick-But-Not-Too-Quick.

This initially appears to violate requirement #5 above, but we've ensured that anyone calling DoWork too quickly just gets the previous run's results - the effect will be that DoWork will be called many times, but not necessarily do anything. This ensures though, that if we aren't doing work, we won't have a 1sec delay after the user stops typing, like Throttle(1.seconds) would

    Observable.FromEventPattern<PropertyChangedEventArgs>(this, "PropertyChanged")
        .Where(x => x.EventArgs.PropertyName.Equals("Text"))
        .Throttle(TimeSpan.FromMilliseconds(250), DispatcherScheduler.Instance)
        .SelectMany(_ => DoWork())
        .Catch<Unit, Exception>(ex => {
            Console.WriteLine("Oh Crap, DoWork failed: {0}", ex);
            return Observable.Empty<Unit>();
        })
        .Subscribe(_ => Console.WriteLine("Did work"));

I think a simpler and reusable way to solve your problem might actually be async/await-based rather than RX-based. Check out the single threaded EventThrottler class implementation I got as an answer to my 'Is there such a synchronization tool as “single-item-sized async task buffer”?' question. With that you can rewrite your DoWork() method as simply:

private void DoWork()
{
    EventThrottler.Default.Run(async () =>
    {
        await Task.Delay(1000);
        //do other stuff
    });
}

and call it every time your text changes. No RX required. Also, if you are already using WinRT XAML Toolkit - the class is in there.

Here's a copy of the throttler class code as a quick reference:

public class EventThrottler
{
    private Func<Task> next = null;
    private bool isRunning = false;

    public async void Run(Func<Task> action)
    {
        if (isRunning)
            next = action;
        else
        {
            isRunning = true;

            try
            {
                await action();

                while (next != null)
                {
                    var nextCopy = next;
                    next = null;
                    await nextCopy();
                }
            }
            finally
            {
                isRunning = false;
            }
        }
    }

    private static Lazy<EventThrottler> defaultInstance =
        new Lazy<EventThrottler>(() => new EventThrottler());
    public static EventThrottler Default
    {
        get { return defaultInstance.Value; }
    }
}

You may be surprised how hard this is as a pure RX solution. It's subtly different to the similar (and typical Rx 101 example) of submitting a throttled search in response to textbox changes - in that case, it's ok to fire off concurrent searches, cancelling all but the latest one.

In this case, once DoWork() is off and running it can't be replaced or interrupted.

The problem is that Rx streams flow in one direction and can't "talk backwards" - so events queue up against slow consumers. To drop events due to slow consumers is quite hard in Rx.

It's much easier in a world where DoWork() can be cancelled and replaced when a new (probably throttled) event arrives.

First I present a pure Rx solution. Then at the end, a simpler approach where the slow consumer is dealt with by a dispatching mechanism outside of Rx.

For the pure approach, you'll need this helper extension method to drop events queued against a slow consumer which you can read about here:

public static IObservable<T> ObserveLatestOn<T>(
    this IObservable<T> source, IScheduler scheduler)
{
    return Observable.Create<T>(observer =>
    {
        Notification<T> outsideNotification = null;
        var gate = new object();
        bool active = false;

        var cancelable = new MultipleAssignmentDisposable();
        var disposable = source.Materialize().Subscribe(thisNotification =>
        {
            bool wasNotAlreadyActive;
            lock (gate)
            {
                wasNotAlreadyActive = !active;
                active = true;
                outsideNotification = thisNotification;
            }

            if (wasNotAlreadyActive)
            {
                cancelable.Disposable = scheduler.Schedule(self =>
                {
                    Notification<T> localNotification = null;
                    lock (gate)
                    {
                        localNotification = outsideNotification;
                        outsideNotification = null;
                    }
                    localNotification.Accept(observer);
                    bool hasPendingNotification = false;
                    lock (gate)
                    {
                        hasPendingNotification = active = (outsideNotification != null);
                    }
                    if (hasPendingNotification)
                    {
                        self();
                    }
                });
            }
        });
        return new CompositeDisposable(disposable, cancelable);
    });
}

With this available you can then do something like:

// run DoWork() when this.Text changes
Observable.FromEventPattern<PropertyChangedEventArgs>(this, "PropertyChanged")
          .Where(x => x.EventArgs.PropertyName.Equals("Text"))
          .Sample(TimeSpan.FromSeconds(1)) // get the latest event in each second
          .ObservableLatestOn(Scheduler.Default) // drop all but the latest event
          .Subscribe(x => DoWork().Wait()); // block to avoid overlap

Remarks

To be honest, you are probably better off avoiding the pure Rx solution here, and instead DON'T call DoWork() directly from a subscriber. I would wrap it with an intermediate dispatching mechanism called from the Subscribe method that handles not calling it if it's already running - the code would be way simpler to maintain.

EDIT:

After thinking on this for a few days, I didn't do any better than some of the other answers here - I'll leave the above for interest, but I think I like Filip Skakun approach the best.

Here is what I have (the code is tested, btw). It is based on the event throttling extension I created a few years ago. I think a good name for it would be Ouroboros. The major thing about it is that opposite to when using Throttle, it starts the work immediately if a cooldown has passed.

public static IObservable<TResult> CombineVeryLatest<TLeft, TRight, TResult>(
    this IObservable<TLeft> leftSource,
    IObservable<TRight> rightSource, 
    Func<TLeft, TRight, TResult> selector)
{
    return Observable.Defer(() =>
    {
        int l = -1, r = -1;
        return Observable.CombineLatest(
            leftSource.Select(Tuple.Create<TLeft, int>),
            rightSource.Select(Tuple.Create<TRight, int>),
                (x, y) => new { x, y })
            .Where(t => t.x.Item2 != l && t.y.Item2 != r)
            .Do(t => { l = t.x.Item2; r = t.y.Item2; })
            .Select(t => selector(t.x.Item1, t.y.Item1));
    });
}

public static IObservable<TWork> WorkSequencer<T, TWork>(
    this IObservable<T> source, Func<Task<TWork>> work)
{
    return source.Publish(src =>
    {
        var fire = new Subject<T>();
        var fireCompleted = fire.SelectMany(x => work()).Publish();
        fireCompleted.Connect();
        var whenCanFire = fireCompleted.StartWith(default(TWork));

        var subscription = src
            .CombineVeryLatest(whenCanFire, (x, flag) => x)
            .Subscribe(fire);

        return fireCompleted.Finally(subscription.Dispose);
    });
}

Then usage would be:

    private int _counter;

    public MainWindow()
    {
        InitializeComponent();
        var clicks = Observable
            .FromEventPattern(TestBn, "Click")
            .Do(_ =>
            {
                Console.WriteLine("click");
                _counter++;
            });
        clicks.WorkSequencer(DoWork).Subscribe();
    }

    private async Task<int> DoWork()
    {
        var workNumber = _counter;
        Console.WriteLine("Work Start " + workNumber);
        await Task.WhenAll(Task.Delay(_counter*100), Task.Delay(1000));
        Console.WriteLine("Work Done " + workNumber);
        return _counter;
    }

I have a couple of combinators called SubscribeWithoutOverlap I use in the UI for this purpose. All incoming events are discarded except the last one are discarded till the work is finished. When the work is finished the event buffer is asked for the next event.

    /// <summary>
    /// Subscribe to the observable whilst discarding all events that are
    /// recieved whilst the action is being processed. Can be
    /// used to improve resposiveness of UI's for example 
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <param name="source"></param>
    /// <param name="action"></param>
    /// <param name="scheduler"></param>
    /// <returns></returns>
    public static IDisposable SubscribeWithoutOverlap<T>
    (this IObservable<T> source, Action<T> action, IScheduler scheduler = null)
    {
        var sampler = new Subject<Unit>();
        scheduler = scheduler ?? Scheduler.Default;
        var p = source.Replay(1);

        var subscription = sampler.Select(x=>p.Take(1))
            .Switch()
            .ObserveOn(scheduler)
            .Subscribe(l =>
            {
                action(l);
                sampler.OnNext(Unit.Default);
            });

        var connection = p.Connect();
        sampler.OnNext(Unit.Default);

        return new CompositeDisposable(connection, subscription);
    }

and

    public static IDisposable SubscribeWithoutOverlap<T>
    (this IObservable<T> source, Func<T,Task> action, IScheduler scheduler = null)
    {
        var sampler = new Subject<Unit>();
        scheduler = scheduler ?? Scheduler.Default;
        var p = source.Replay(1);

        var subscription = sampler.Select(x=>p.Take(1))
            .Switch()
            .ObserveOn(scheduler)
            .Subscribe(async l =>
            {
                await action(l);
                sampler.OnNext(Unit.Default);
            });

        var connection = p.Connect();
        sampler.OnNext(Unit.Default);

        return new CompositeDisposable(connection, subscription);
    }

so the following should then meet your requirements.

IObservable<string> source;

source
   .Throttle(TimeSpan.FromMilliSeconds(100))
   .Merge(source.Sample(TimeSpan.FromSeconds(1))
   .SubscribeWithoutOverlap(DoWork)

Note the mix of Throttle and Sample to get both timing behaviors asked for in the question.

With regards to some of the other answers. If you find yourself putting complex RX logic into your business logic then extract into a custom combinator that has a clear purpose. You will thank yourself later when trying to understand what it does.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top