Zippare Rx IObservable con infinito numero di set
-
22-09-2019 - |
Domanda
Ho un IObservable [nome righe nell'esempio riportato di seguito] dal Reattiva estensioni quadro e voglio aggiungere i numeri di indice per ogni oggetto che osserva.
Ho cercato di attuare questo utilizza Zip funzione:
rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) =>
new { Row = row, Index = index })
.Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());
..ma purtroppo questo getta
ArgumentOutOfRangeException:Argomento specificato l'intervallo di valori validi.Nome del parametro:monouso
Sto comprensione Zip funzione di sbagliato o c'è un problema con il mio codice?
La Gamma parte del codice non sembra essere il problema e la IObservable non è ancora di ricevere eventuali eventi.
Soluzione 2
A quanto pare, Zip metodi di estensione converte l'originale personalizzato IObservable a un anonimo osservabili e Sottoscrizione, crea un Sistema.Collezioni.Generico.AnonymousObserver, che non implementa IDisposable.Pertanto, non è possibile implementare il metodo di Sottoscrizione modo normale (almeno il modo in cui l'ho visto usato), che è
public IDisposable Subscribe(IObserver<T> observer) {
// ..add to observer list..
return observer as IDisposable
}
Più probabile che la risposta corretta sarebbe:
return Disposable.Create(() => Observers.Remove(observer));
Si deve però notare che la collction sarà probabilmente modificato durin Completato-metodo, in modo da creare una copia della lista prima del trattamento:
public void Completed()
{
foreach (var observer in Observers.ToList())
{
observer.OnCompleted();
}
}
Altri suggerimenti
.Select dispone di un sovraccarico per includere l'indice:
rows.Select((row, index) => new { row, index });
Non sono sicuro che cosa il vostro problema è che fa questo lavoro per voi (e cosa manca qui che si fa?):
static void Main(string[] args)
{
var rows = new List<int> { 4,5,1,2,5 }.ToObservable();
rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) =>
new { Row = row, Index = index })
.Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());
Console.ReadLine();
}
static void ProcessRow(int row, int index) {
Console.WriteLine("Row {0}, Index {1}", row, index);
}
static void Completed() {
}