Question

Is there a Streams equivalent to Observable.Throttle? If not -- is there any reasonably elegant way to achieve a similar effect?

Was it helpful?

Solution 2

The rate_limit package provides throttling and debouncing of Streams.

OTHER TIPS

There's no such method on streams for now. A enhancement request has been filed, you can star issue 8492.

However, you can do that with the where method. In the following exemple, I have defined a ThrottleFilter class to ignore events during a given duration :

import 'dart:async';

class ThrottleFilter<T> {
  DateTime lastEventDateTime = null;
  final Duration duration;

  ThrottleFilter(this.duration);

  bool call(T e) {
    final now = new DateTime.now();
    if (lastEventDateTime == null ||
        now.difference(lastEventDateTime) > duration) {
      lastEventDateTime = now;
      return true;
    }
    return false;
  }
}

main() {
  final sc = new StreamController<int>();
  final stream = sc.stream;

  // filter stream with ThrottleFilter
  stream.where(new ThrottleFilter<int>(const Duration(seconds: 10)).call)
    .listen(print);

  // send ints to stream every second, but ThrottleFilter will give only one int
  // every 10 sec.
  int i = 0;
  new Timer.repeating(const Duration(seconds:1), (t) { sc.add(i++); });
}

The following version is closer to what Observable.Throttle does:

class Throttle extends StreamEventTransformer {
  final duration;
  Timer lastTimer;

  Throttle(millis) :
    duration = new Duration(milliseconds : millis);


  void handleData(event, EventSink<int> sink) {
    if(lastTimer != null){
      lastTimer.cancel();
    }
    lastTimer = new Timer(duration, () => sink.add(event));
  }
}

main(){
  //...
  stream.transform(new Throttle(500)).listen((_) => print(_));
  //..
}

@Victor Savkin's answer is nice, but I always try to avoid reinventing the wheel. So unless you really only need that throttle I'd suggest using the RxDart package. Since you are dealing with Streams and other reactive objects RxDart has a lot of nice goodies to offer besides throttling.

We can achieve a 500 millisecond throttle several ways:

  1. throttleTime from ThrottleExtensions<T> Stream<T> extensions: stream.throttleTime(Duration(milliseconds: 500)).listen(print);
  2. Combining ThrottleStreamTransformer with TimerStream: stream.transform(ThrottleStreamTransformer((_) => TimerStream(true, const Duration(milliseconds: 500)))).listen(print);
  3. Using Debounce Extensions / DebounceStreamTransformer: stream.debounceTime(Duration(milliseconds: 500)).listen(print);

There are some subtle differences regarding delays, but all of them throttles. As an example about throttleTime vs. debounceTime see What is the difference between throttleTime vs debounceTime in RxJS and when to choose which?

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top