Question

What would be the most idiomatic way to yield values of an Observable by a specific amount of time? For example, let's say I have an Observable created from a big Array and I want to yield a value every 2 seconds. Is a combination of interval and selectMany the best way?

Was it helpful?

Solution

For your specific example, the idea is to map each value from the array to an observable that will yield its result after a delay, then concatenate the resulting stream of observables:

var delayedStream = Rx.Observable
    .fromArray([1, 2, 3, 4, 5])
    .map(function (value) { return Rx.Observable.return(value).delay(2000); })
    .concatAll();

Other examples might indeed make use of timer or interval. It just depends.

For example, if your array is really really big, then the above will cause a fair amount of memory pressure (because it is creating N observables for a really large N). Here is an alternative that uses interval to lazily walk the array:

var delayedStream = Rx.Observable
    .interval(2000)
    .take(reallyBigArray.length) // end the observable after it pulses N times
    .map(function (i) { return reallyBigArray[i]; });

This one will yield the next value from the array every 2 seconds until it has iterated over the entire array.

OTHER TIPS

I think that using zip produce better and more readable code, still using just 3 observables.

var items = ['A', 'B', 'C'];

Rx.Observable.zip(
  Rx.Observable.fromArray(items),
  Rx.Observable.timer(2000, 2000),  
  function(item, i) { return item;}
)

For RxJS v6 getting the next one with a delay of 2 seconds.

Example 1. concatMap:

import {of} from 'rxjs';
import {concatMap, delay} from 'rxjs/operators';

of(1, 2, 3, 4, 5)
  .pipe(
    concatMap(x => of(x)
      .pipe(
        delay(2000))
    )
  )
  .subscribe({
    next(value) {
      console.log(value);
    }
  });

Example 2. map + concatAll:

import {of} from 'rxjs';
import {concatAll, delay, map} from 'rxjs/operators';

of(1, 2, 3, 4, 5)
  .pipe(
    map(x => of(x)
      .pipe(
        delay(2000))
    ),
    concatAll()
  )
  .subscribe({
    next(value) {
      console.log(value);
    }
  });

While Brandon's answer gets the gist of the idea, here's a version which yields the first item immediately, then puts time between the following items.

var delay = Rx.Observable.empty().delay(2000);

var items = Rx.Observable.fromArray([1,2,3,4,5])
  .map(function (x) {
    return Rx.Observable.return(x).concat(delay); // put some time after the item
  })
  .concatAll();

Updated for newer RxJS:

var delay = Rx.Observable.empty().delay(2000);

var items = Rx.Observable.fromArray([1,2,3,4,5])
  .concatMap(function (x) {
    return Rx.Observable.of(x).concat(delay); // put some time after the item
  });

For RxJS 5:

Rx.Observable.from([1, 2, 3, 4, 5])
  .zip(Rx.Observable.timer(0, 2000), x => x)
  .subscribe(x => console.log(x));

Agree that zip is a clean approach. Here is a reusable function to generate an interval stream for an array:

function yieldByInterval(items, time) {
  return Rx.Observable.from(items).zip(
    Rx.Observable.interval(time),
    function(item, index) { return item; }
  );
}

// test
yieldByInterval(['A', 'B', 'C'], 2000)
  .subscribe(console.log.bind(console));

This builds on farincz's answer, but is slightly shorter by using .zip as an instance method.

Also, I used Rx.Observable.from() because Rx.Observable.fromArray() is deprecated.

Since this wasn't mentioned, I think concatMap combined with delay is pretty readable.

Rx.Observable.fromArray([1, 2, 3, 4, 5])
    .concatMap(x => Rx.Observable.of(x).delay(1000));

See https://codepen.io/jmendes/pen/EwaPzw

Building on the zip solutions by farincz and user3587412, here is how it works in RxJS v6

const { zip, from, timer } = require("rxjs")
const { map } = require("rxjs/operators")

const input = [1, 2, 3, 4, 5]
const delay = 2000

zip(
    from(input),
    timer(0, delay)
).pipe(
    map(([ delayedInput, _timer ]) => delayedInput) // throw away timer index
).subscribe(
    console.log
)
//create a custom operator
const delayEach=(millis)=>(o)=>o.pipe(concatMap((x)=>of(x).pipe(delay(millis))))



of(1, 2, 3, 4, 5)
  .pipe(delayEach(1000))
  .subscribe(console.log);

RxJs 6 code that emits the first item immediately and delays the remaining items:

import { of, EMPTY, concat } from "rxjs";
import { concatMap, delay } from "rxjs/operators";

const delayed$ = EMPTY.pipe(delay(1000));

console.log("start");
of(1, 2, 3, 4)
  .pipe(concatMap(v => concat(of(v), delayed$)))
  .subscribe({
    next: console.log
  });

Full Stackblitz example

idea:

  • for each item we create an observable (using concat) that will output the item immediately (of(v)) and then emit an EMPTY observable after the delay
  • since we use concatMap all emitted observables will be emitted in the correct order

A simple one-liner:

const delayMs = 2000
from([1, 2, 3]).pipe(concatMap(x => of(x).pipe(delay(delayMs)))).subscribe(item => {

});
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top