我已经从反应性的扩展框架的IObservable [下面的样品在名为行数],我想索引号添加到它观察的每个对象。

我试图实现此使用Zip功能:

rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) => 
    new { Row = row, Index = index })
    .Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());

..但不幸的是,这将引发

ArgumentOutOfRangeException: 指定的参数是有效的值的范围的出来。 参数名称:一次性

我是否了解邮编功能失常或有一个问题,我的代码?

的码的范围部分似乎并不成为问题和的IObservable尚未接收到任何事件。

有帮助吗?

解决方案 2

显然,邮编扩展方法转换原来的自定义的IObservable到匿名可观察到的和订阅它创建了一个System.Collections.Generic.AnonymousObserver,它不实现IDisposable。 因此,就无法实现订阅方法以正常的方式(至少我已经看到了它使用的方式),这是

public IDisposable Subscribe(IObserver<T> observer) {
  // ..add to observer list..
  return observer as IDisposable
}

更可能的是正确的答案将是:

return Disposable.Create(() => Observers.Remove(observer));

您应该不过请注意,collction可能都灵完成法进行修改,所以处理它们之前创建列表的副本:

public void Completed()
{
    foreach (var observer in Observers.ToList())
    {
        observer.OnCompleted();
    }
 }

其他提示

。选择具有过载到包括索引:

rows.Select((row, index) => new { row, index });

我不知道你的问题是什么,这是否对你的工作(和缺少了什么在这里,你在做什么?):

    static void Main(string[] args)
    {
        var rows = new List<int> { 4,5,1,2,5 }.ToObservable();
        rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) =>
            new { Row = row, Index = index })
            .Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());

        Console.ReadLine();
    }
    static void ProcessRow(int row, int index) {
        Console.WriteLine("Row {0}, Index {1}", row, index);
    }
    static void Completed() {
    }
许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top