Zipping rx iobservable com um conjunto de números infinitos
-
22-09-2019 - |
Pergunta
Eu tenho um IOBServable [nomeado linhas na amostra abaixo] da estrutura de extensões reativas e quero adicionar números de índice a cada objeto que observa.
Eu tentei implementar isso usando a função ZIP:
rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) =>
new { Row = row, Index = index })
.Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());
.. mas infelizmente isso joga
ArgumentOutOfRangeException: O argumento especificado estava fora da faixa de valores válidos. Nome do parâmetro: descartáveis
Estou entendendo a função ZIP errada ou há um problema com o meu código?
A parte do intervalo do código não parece ser o problema e o IOBServable ainda não está recebendo nenhum evento.
Solução 2
Aparentemente, os métodos de extensão ZIP convertem o IoBservable personalizado original em um observável anônimo e a assinatura cria um sistema.Collection.Generic.anonymousObServer, o que não implementa idispossáveis. Assim, você não pode implementar o método de inscrição da maneira normal (pelo menos da maneira que eu o vi usado), o que é
public IDisposable Subscribe(IObserver<T> observer) {
// ..add to observer list..
return observer as IDisposable
}
Provavelmente, a resposta correta seria:
return Disposable.Create(() => Observers.Remove(observer));
Porém, você deve observar que a multidão provavelmente será modificada Durin concluída, portanto, crie uma cópia da lista antes de processá-las:
public void Completed()
{
foreach (var observer in Observers.ToList())
{
observer.OnCompleted();
}
}
Outras dicas
.Select tem uma sobrecarga para incluir o índice:
rows.Select((row, index) => new { row, index });
Não tenho certeza de qual é o seu problema, isso funciona para você (e o que está faltando aqui que você está fazendo?):
static void Main(string[] args)
{
var rows = new List<int> { 4,5,1,2,5 }.ToObservable();
rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) =>
new { Row = row, Index = index })
.Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());
Console.ReadLine();
}
static void ProcessRow(int row, int index) {
Console.WriteLine("Row {0}, Index {1}", row, index);
}
static void Completed() {
}