Question

I have a small question in regards to RX. I have a stream of symbols coming in from keyboard, and i need to partition them into groups. A new group should be started when a ';' symbols comes from the stream. In simpler terms, I need an operator kinda like Buffer, but that fires when a certain condition is true, rather than after some time delay or event count. Is there a way to build it with operators already present in RX, or should I enroll my own?

Was it helpful?

Solution

Here's a source.

var source = new[] { 'a', 'b', ';', 'c', 'd', 'e', ';' }.ToObservable();

Here's what you're asking for:

var groups = source
    // Group the elements by some constant (0)
    // and end the group when we see a semicolon
    .GroupByUntil(x => 0, group => group.Where(x => x == ';'))

Here's a way to use it:

groups
    // Log that we're on the next group now.
    .Do(x => Console.WriteLine("Group: "))
    // Merge / Concat all the groups together
    // {{a..b..;}..{c..d..e..;}} => {a..b..;..c..d..e..;}
    .Merge()
    // Ignore the semicolons? This is optional, I suppose.
    .Where(x => x != ';')
    // Log the characters!
    .Do(x => Console.WriteLine("  {0}", x))
    // Make it so, Number One!
    .Subscribe();

Output:

Group:
  a
  b
Group:
  c
  d
  e

OTHER TIPS

We can use the Buffer override with boundary observable where the boundary observable is our initial stream filtered to only semicolon entries.

//this is our incoming stream
IObservable<char> keyboardStream;

//if the observable is cold we need to do this 
//in case of it being hot (as I would expect a keyboard stream to be) we do not need to do it
var persistedStream = keyboardStream.Publish().RefCount();

var bufferedBySemicolon = persistedStream.Buffer(persistedStream .Where(c=>c==';'));

Here's a non-RefCount version of Nikolai's answer. This provides more explicit synchronization of subscription and disposition, and should remove a race condition which occurs when your source is observed on a different thread than which your consumer is subscribed on (which is often times the case when you're dealing with UIs).

var groups = Observable.Create(o => {

    var publishedSource = source.Publish();

    return new CompositeDisposable(
        publishedSource.Buffer(publishedSource.Where(c => c == ';')).Subscribe(o),
        publishedSource.Connect()
        );

});
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top