Pregunta

Tengo un IObservable [filas con nombre en la muestra más abajo] de marco reactiva extensiones y quiero añadir números de índice para cada objeto que observa.

He tratado de poner en práctica esta función Zip usando:

rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) => 
    new { Row = row, Index = index })
    .Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());

.. pero lamentablemente esto arroja

ArgumentOutOfRangeException: Argumento especificado está fuera del rango de valores válidos. Nombre de parámetro: desechables

¿Estoy entendiendo la función equivocada postal o hay un problema con mi código?

La parte Rango del código no parece ser el problema y la IObservable aún no está recibiendo ningún evento.

¿Fue útil?

Solución 2

Al parecer, los métodos de extensión Zip convierte la costumbre IObservable original a un anónimo observable y que lo han suscrito crea una System.Collections.Generic.AnonymousObserver, que no implementa IDisposable. Por lo tanto, no se puede poner en práctica el método Suscribirse la forma normal (al menos la forma en que he visto que se usa), que es

public IDisposable Subscribe(IObserver<T> observer) {
  // ..add to observer list..
  return observer as IDisposable
}

Lo más probable es la respuesta correcta sería:

return Disposable.Create(() => Observers.Remove(observer));

Debería aunque tenga en cuenta que el collction probablemente será modificada durin método Completo, por tanto, crear una copia de la lista antes de procesarlos:

public void Completed()
{
    foreach (var observer in Observers.ToList())
    {
        observer.OnCompleted();
    }
 }

Otros consejos

.Seleccionar tiene una sobrecarga para incluir el índice:

rows.Select((row, index) => new { row, index });

No estoy seguro de cuál es tu problema, hace este trabajo para usted (y lo que falta aquí que usted está haciendo?):

    static void Main(string[] args)
    {
        var rows = new List<int> { 4,5,1,2,5 }.ToObservable();
        rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) =>
            new { Row = row, Index = index })
            .Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());

        Console.ReadLine();
    }
    static void ProcessRow(int row, int index) {
        Console.WriteLine("Row {0}, Index {1}", row, index);
    }
    static void Completed() {
    }
Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top