Question

I have a IObservable [named rows in the sample below] from Reactive extensions framework and I want to add index numbers to each object it observes.

I've tried to implement this using Zip function:

rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) => 
    new { Row = row, Index = index })
    .Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());

.. but unfortunately this throws

ArgumentOutOfRangeException: Specified argument was out of the range of valid values. Parameter name: disposables

Am I understanding the Zip function wrong or is there a problem with my code?

The Range part of the code doesn't seem to be the problem and the IObservable isn't yet receiving any events.

Was it helpful?

Solution 2

Apparently, Zip extension methods converts the original custom IObservable to an anonymous observable and Subscribing to it creates an System.Collections.Generic.AnonymousObserver, which doesn't implement IDisposable. Thus, you cannot implement the Subscribe method the normal way (atleast the way I've seen it used), which is

public IDisposable Subscribe(IObserver<T> observer) {
  // ..add to observer list..
  return observer as IDisposable
}

More likely the correct answer would be:

return Disposable.Create(() => Observers.Remove(observer));

You should though note that the collction will probably be modified durin Completed-method, so create a copy of the list before processing them:

public void Completed()
{
    foreach (var observer in Observers.ToList())
    {
        observer.OnCompleted();
    }
 }

OTHER TIPS

.Select has an overload to include the index:

rows.Select((row, index) => new { row, index });

I am not sure what your problem is, does this work for you (and what's missing here that you are doing?):

    static void Main(string[] args)
    {
        var rows = new List<int> { 4,5,1,2,5 }.ToObservable();
        rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) =>
            new { Row = row, Index = index })
            .Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());

        Console.ReadLine();
    }
    static void ProcessRow(int row, int index) {
        Console.WriteLine("Row {0}, Index {1}", row, index);
    }
    static void Completed() {
    }
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top