I would do this as follows:
- Window the data stream using the gate stream as a closing selector
- We can use
DistinctUntilChanged
on the gate stream to ensure no repeated values - We will also force the gate stream to start closed (false) - it won't affect the output and allows a neat trick
- Then use the overload of
Select
that gives each element an index number. With this we can tell if we need to buffer or just emit the window as is, because we know that even-numbered windows are for buffering (because we made sure the gate stream starts with false) - We can use
ToList()
to buffer each even window until it closes - this is the actually equivalent of aBuffer()
that waits untilOnCompleted
- We use an identity
SelectMany
to flatten the buffered windows - Finally we concatenate the windows in order to guarantee order is preserved
It looks like this:
dataStream.Window(gateStream.StartWith(false).DistinctUntilChanged())
.Select((w, i) => i % 2 == 0 ? w.ToList().SelectMany(x => x) : w)
.Concat()
.Subscribe(Console.WriteLine);