Question

I have two data sources.
One of them is a cached list, and another one is the new data pushed via IObservable<T>.

I want to use Rx to find out which operations need to be performed on the cached list A to make it identical in its order and contents to the new data.

I am looking for a function that takes a IEnumerable<T> a and IObservable<T> b and returns an observable that pushes operations (insertions and deletions) on a that would make it identical to b without waiting for b to complete.

Note: I know I can't modify a list or observable. I don't want to.

I only want to know what operations, in what order would turn a hypothetical list identical in its order and sequence to A into a list identical in its order and sequence to B, as soon as these operations become known.

Both a and b are unique and sorted, T implements IComparable<T> and IEquatable<T>.

public static IObservable<Tuple<int, bool>> IndexDelta<T>(
    IEnumerable<T> a,
    IObservable<T> b
) where T : IEquatable<T>, IComparable<T> {
    // ???
}

I will use ints in my example.

What?!

Consider these two sequences:

A: [150, 100, 70, 30, 20]
B: [300, 200, 100, 70, 60, 50, 20]

The goal is to find a series of delete/insert operations that transform A into B. Think A is a cached data source, B is the new data, I want to know how to translate these updates to a grid without reloading it.

Rows are sorted in both sources.

I want the output to be in form

[(0, true), (1, true), (0, false), (3, false), (4, true), (5, true)]

I would later group these operations by boolean flag:

deleted:  [0, 3]
inserted: [0, 1, 4, 5]

which would translate to human language as

  1. Delete A0 and A3:

    A = [150, 100, 70, 30, 20] = [100, 70, 20]

  2. Insert B0, B1, B4, B5 into A:

    A = [300, 200, 100, 70, 60, 50, 20]

  3. Now A is identical to B.

Requirements

There are several important things I want to note:

  1. A is a list that is guaranteed not to change. B is a cold observable that takes some time to complete but yields first items pretty soon. Therefore, the result observable need to be pushed as soon as enough data is available.

  2. Items are guaranteed to be unique with IEquatable<T> in both sources.

  3. Items are non-mutable and are guaranteed to be sorted descending using IComparable<T> in both sources.

  4. It is preferable to optimize for new items being added to the left of B. This is the most common scenario. It is however possible that items are deleted or inserted at any other place, given their timestamp is appropriate (doesn't ruin sorting). Think an iPhone camera roll.

  5. (*) I'm interested in a pure functional solution if this is possible.

Pseudocode Sketch

I sketched a pseudocode algorithm that implements this in an imperative way.

I made up Current, MoveNext, await and yield push semantics but the idea should make some sense.

IObservable<Tuple<int, bool>> IndexDelta(a, b)
{
    var indexA = 0;
    var indexB = 0;

    while (true) {
        var headA = a.Current;
        var headB = b.Current; 

        if (headA == null && headB == null) {
            return yield break; // both sequences are over
        }

        var reportDeletion = () => {
            yield push Tuple.Create(indexA, false);
            await a.MoveNext(); // this one is fast
        }

        var reportInsertion = () => {
            yield push Tuple.Create(indexB, true);
            await b.MoveNext(); // can take a long time
        }

        if (headA == null) { // No source item at this position
            reportInsertion();
            continue;
        }

        if (headB == null) { // No fetched item at this position
            reportDeletion();
            continue;
        }

        switch (headB.CompareTo(headA)) {
            case 0:
                yield continue;
                break;
            case 1: // Fetched item is newer than source item
                reportInsertion();
                break; 
            case -1: // Source item is newer than fetched item
                reportDeletion();
                break; 
        }

        indexA++;
        indexB++;
    }
} 

I believe you could implement something very similar with Subject<T>. However I don't want to proceed with this solution because I'm wondering if it is possible to solve it purely by composing Rx functions such as Aggregate, Zip or CombineLatest.

What are your thoughts?

Was it helpful?

Solution

Seems to work ...:

void Main()
{
    var a = new int?[] {150, 100, 70, 30, 20 };
    var b = new int?[] {300, 200, 100, 70, 60, 50, 20 };
    var result = IndexDelta(a, b);
    result.Dump();
}

// Define other methods and classes here
IObservable<Tuple<int, bool>> IndexDelta(IEnumerable<int?> a, IEnumerable<int?> b)
{
    var observable = Observable.Create<Tuple<int, bool>>(o => {
        var indexA = 0;
        var indexB = 0;
        var aEnumerator = a.GetEnumerator();
        var bEnumerator = b.GetEnumerator();
        var aHasNext = aEnumerator.MoveNext();
        var bHasNext = bEnumerator.MoveNext();

        while(true) {
            if (aHasNext == false && bHasNext == false) {
                "Completed".Dump();
                o.OnCompleted(); // both sequences are over
                break;
            }

            var headA = aEnumerator.Current;
            var headB = bEnumerator.Current; 

            headA.Dump("A");
            headB.Dump("B");

            Action reportDeletion = () => {
                o.OnNext(Tuple.Create(indexA, false));
                aHasNext = aEnumerator.MoveNext(); // this one is fast
            };
            Action reportInsertion = () => {
                o.OnNext(Tuple.Create(indexB, true));
                bHasNext = bEnumerator.MoveNext(); // can take a long time
            };

            if (headA == null) { // No source item at this position
                reportInsertion();
                continue;
            }

            if (headB == null) { // No fetched item at this position
                reportDeletion();
                continue;
            }   

            switch (headB.Value.CompareTo(headA.Value)) {
                case 0:     
                    aHasNext = aEnumerator.MoveNext();
                    bHasNext = bEnumerator.MoveNext();
                    indexA++;
                    indexB++;
                    break;
                case 1: // Fetched item is newer than source item
                    reportInsertion();
                    indexB++;
                    break; 
                case -1: // Source item is newer than fetched item
                    reportDeletion();
                    indexA++;
                    break; 
            }           
        }
        return Disposable.Empty;
    });     
    return observable;
} 

OTHER TIPS

This code is based on Richard's answer but works with any T.
I haven't been able to escape the curse of ToEnumerable though—any help is appreciated.

IObservable<Tuple<int, T, bool>> IndexDelta<T>(
    IObservable<T> first, IObservable<T> second
)
    where T : IComparable, IEquatable<T>
{
    return Observable.Create<Tuple<int, T, bool>> (o => {
        var a = first.ToEnumerable ().GetEnumerator ();
        var b = second.ToEnumerable ().GetEnumerator ();

        var indexA = -1;
        var indexB = -1;

        var hasNextA = true;
        var hasNextB = true;

        var headA = default(T);
        var headB = default(T);

        Action<bool> advanceA = (bool reportDeletion) => {
            if (reportDeletion) {
                o.OnNext (Tuple.Create (indexA, headA, false));
            }

            if (hasNextA = a.MoveNext ()) {
                indexA++;
                headA = a.Current;
            }
        };

        Action<bool> advanceB = (bool reportInsertion) => {
            if (reportInsertion) {
                o.OnNext (Tuple.Create (indexB, headB, true));
            }

            if (hasNextB = b.MoveNext ()) {
                indexB++;
                headB = b.Current;
            }
        };

        advanceA (false);
        advanceB (false);

        while (true) {
            if (!hasNextA && !hasNextB) {
                o.OnCompleted ();
                break;
            }

            if (!hasNextA) {
                advanceB (true);
                continue;
            } 

            if (!hasNextB) {
                advanceA (true);
                continue;
            } 

            switch (headA.CompareTo (headB)) {
            case 0:
                advanceA (false);
                advanceB (false);
                break;
            case 1:
                advanceA (true);
                break; 
            case -1:
                advanceB (true);
                break; 
            }          
        }

        return Disposable.Create (() => {
            a.Dispose ();
            b.Dispose ();
        });
    });     
} 
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top