Frage

Ich habe ein IObservable [namens Zeilen in der Probe unten] aus Reactive Extensions Framework und ich mag jedes Objekt Indexnummern hinzuzufügen, es beobachtet.

Ich habe versucht, dies mit Zip-Funktion zu implementieren:

rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) => 
    new { Row = row, Index = index })
    .Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());

.. aber leider führt diese

ArgumentOutOfRangeException: Angegebene Argument war der Bereich der gültigen Werte aus. Parametername: Disposables

Bin ich die Zip-Funktion falsch zu verstehen oder ist es ein Problem mit meinem Code?

Der Range Teil des Codes scheint nicht das Problem zu sein und die IObservable ist noch keine Ereignisse zu empfangen.

War es hilfreich?

Lösung 2

Offenbar Zip Erweiterungsmethoden wandeln das ursprüngliche benutzerdefinierte IObservable einen anonymen beobachtbar und Anmeldung es eine System.Collections.Generic.AnonymousObserver schafft, die IDisposable nicht implementiert. So können Sie nicht implementieren die Methode, um die normale Art und Weise Abonnieren (atleast, wie ich gesehen habe es benutzt), die

ist
public IDisposable Subscribe(IObserver<T> observer) {
  // ..add to observer list..
  return observer as IDisposable
}

Wahrscheinlicher ist die richtige Antwort wäre:

return Disposable.Create(() => Observers.Remove(observer));

Sie sollten jedoch zur Kenntnis, dass die collction wahrscheinlich durin Completed-Verfahren modifiziert werden, schaffen so eine Kopie der Liste vor der Verarbeitung:

public void Completed()
{
    foreach (var observer in Observers.ToList())
    {
        observer.OnCompleted();
    }
 }

Andere Tipps

.Select hat eine Überlastung des Index enthalten:

rows.Select((row, index) => new { row, index });

Ich bin nicht sicher, was dein Problem ist, funktioniert das für Sie (und was fehlt hier, dass Sie tun?):

    static void Main(string[] args)
    {
        var rows = new List<int> { 4,5,1,2,5 }.ToObservable();
        rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) =>
            new { Row = row, Index = index })
            .Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());

        Console.ReadLine();
    }
    static void ProcessRow(int row, int index) {
        Console.WriteLine("Row {0}, Index {1}", row, index);
    }
    static void Completed() {
    }
Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top