Question

Here we have a Observable Sequence... in .NET using Rx.

var aSource = new Subject<int>();

var bSource = new Subject<int>();

var paired = Observable
            .Merge(aSource, bSource)
    .GroupBy(i => i).SelectMany(g => g.Buffer(2).Take(1));

paired.Subscribe(g => Console.WriteLine("{0}:{1}", g.ElementAt(0), g.ElementAt(1)));

aSource.OnNext(4);
bSource.OnNext(1);
aSource.OnNext(2);
bSource.OnNext(5);
aSource.OnNext(3);
bSource.OnNext(3);
aSource.OnNext(5);
bSource.OnNext(2);
aSource.OnNext(1);
bSource.OnNext(4);

Output: 3:3 5:5 2:2 1:1 4:4

We will get events every time a pair of numbers arrive with the same id.

Perfect! Just what i want.

Groups of two, paired by value.

Next question....

How to get a selectmany/buffer for sequences of values.

So 1,2,3,4,5 arrives at both aSource and bSource via OnNext(). Then fire ConsoleWriteLine() for 1-5. Then when 2,3,4,5,6 arrives, we get another console.writeline(). Any clues anyone?

Immediately, the Rx forum suggests looking at .Window()

http://introtorx.com/Content/v1.0.10621.0/17_SequencesOfCoincidence.html

Which on the surface looks perfect. In my case i need a window of value 4, in this case.

Where in the query sequence does it belong to get this effect?

var paired = Observable.Merge(aSource, bSource).GroupBy(i => i).SelectMany(g => g.Buffer(2).Take(1));

Output 1,2,3,4,5 : 1,2,3,4,5 2,3,4,5,6 : 2,3,4,5,6

Regards,

Daniel

Was it helpful?

Solution

Assuming events arrive randomly at the sources, use my answer to "Reordering events with Reactive Extensions" to get the events in order.

Then use Observable.Buffer to create a sliding buffer:

// get this using the OrderedCollect/Sort in the referenced question
IObservable<int> orderedSource;

// then subscribe to this
orderedSource.Buffer(5, 1);

OTHER TIPS

Here is an extension method that fires when it has n inputs of the same ids.

public static class RxExtension
    {

        public static IObservable<TSource> MergeBuffer<TSource>(this IObservable<TSource> source, Func<TSource, int> keySelector, Func<IList<TSource>,TSource> mergeFunction, int bufferCount)
        {
            return Observable.Create<TSource>(o =>  {
                var buffer = new Dictionary<int, IList<TSource>>();
                return source.Subscribe<TSource>(i =>
                {
                    var index = keySelector(i);
                    if (buffer.ContainsKey(index))
                    {
                        buffer[index].Add(i);
                    }
                    else 
                    {
                        buffer.Add(index, new List<TSource>(){i});
                    }
                    if (buffer.Count==bufferCount)
                    {
                        o.OnNext(mergeFunction(buffer[index]));
                        buffer.Remove(index);
                    }
                });
            });
        }
    }

Calling the extension.

mainInput = Observable.Merge(inputNodes.ToArray()).MergeBuffer<NodeData>(x => x.id, x => MergeData(x), 1);
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top