Question

I would like to create the following combinator

public static IObservable<U> HeadTailSelect<T, U>
    (this IObservable<T> source, Func<T, IObservable<T>, U> fn)
{

}

The selector method should be passed the current event and an observable to all future events, the tail. It must be guaranteed that upon subscribing to the tail any time in the future the first received event will be the very next one that was received after the head.

I'm aware that this will require some buffering but I'm not quite sure how to put this together.

This has some nice properties. You can do

IObservable<IObservable<Unit>> windows =
source
    .HeadTailSelect((h,tail)=>Observable
        .Interval(TimeSpan.FromSeconds(1))
        .TakeUntil(tail)
        .Select(_=>Unit.Default)
    )

and avoid race conditions whereby within the window it takes TakeUntil to be registered after responding to the first event you miss some events.

Also bonus karma for any ideas on how to test the implementation.

The following test case is necessary for the implementation to satisfy though it may not be sufficient to prove if race conditions are avoided.

public class HeadTailSelect : ReactiveTest
{
    TestScheduler _Scheduler = new TestScheduler();

    [Fact]
    public void ShouldWork()
    {

        var o = _Scheduler.CreateColdObservable
            (OnNext(10, "A")
            , OnNext(11, "B")
            , OnNext(12, "C")
            , OnNext(13, "D")
            , OnNext(14, "E")
            , OnNext(15, "F")
            , OnCompleted<string>(700)
            );

        var data = o.HeadTailSelect((head, tail) => tail.Take(2).ToList())
            .SelectMany(p=>p)
            .Select(l=>String.Join("-", l));


        var actual = _Scheduler.Start(() =>
            data
        , created: 0
        , subscribed: 1
        , disposed: 1000
        );

        actual.Messages.Count()
                .Should()
                .Be(7);

        var messages = actual.Messages.Take(6)
                             .Select(v => v.Value.Value)
                             .ToList();

        messages[0].Should().Be("B-C");
        messages[1].Should().Be("C-D");
        messages[2].Should().Be("D-E");
        messages[3].Should().Be("E-F");
        messages[4].Should().Be("F");
        messages[5].Should().Be("");

    }
}

No correct solution

OTHER TIPS

Here is a candidate solution which passes the above test. However I'm not rock certain that it satisfies requirements.

/// <summary>
/// Pass the head and tail of the observable to the
/// selector function. Note that 
/// </summary>
/// <typeparam name="T"></typeparam>
/// <typeparam name="U"></typeparam>
/// <param name="source"></param>
/// <param name="fn"></param>
/// <returns></returns>
public static IObservable<U> HeadTailSelect<T, U>
    (this IObservable<T> source, Func<T, IObservable<T>, U> fn)
{
    var tail = new Subject<T>();
    return Observable.Create<U>(observer =>
    {
        return source.Subscribe(v =>
        {
            tail.OnNext(v);
            var u = fn(v, tail);
            observer.OnNext(u);

        }
        ,e=> { tail.OnCompleted();observer.OnError(e);  }
        ,()=> { tail.OnCompleted();observer.OnCompleted();  });
    });
}

Note that u is most likely to be some kind of IObservable and should be subscribed to immediately. If this is done I think everything should be ok.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top