Вопрос

Реактивные расширения поставляются с множеством вспомогательных методов для преобразования существующих событий и асинхронных операций в наблюдаемые, но как бы вы реализовали IObservable<T> с нуля?

В IEnumerable есть ключевое слово lovely yield, которое делает его очень простым в реализации.

Каков правильный способ реализации IObservable<T>?

Нужно ли мне беспокоиться о потокобезопасности?

Я знаю, что существует поддержка обратного вызова в определенном контексте синхронизации, но является ли это чем-то, о чем я, как IObservable<T> автору нужно беспокоиться или это как-то встроено?

Обновить:

Вот моя C # версия решения Брайана F # на C #

using System;
using System.Linq;
using Microsoft.FSharp.Collections;

namespace Jesperll
{
    class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
    {
        private FSharpMap<int, IObserver<T>> subscribers = 
                 FSharpMap<int, IObserver<T>>.Empty;
        private readonly object thisLock = new object();
        private int key;
        private bool isDisposed;

        public void Dispose()
        {
            Dispose(true);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing && !isDisposed)
            {
                OnCompleted();
                isDisposed = true;
            }
        }

        protected void OnNext(T value)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnNext(value);
            }
        }

        protected void OnError(Exception exception)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnError(exception);
            }
        }

        protected void OnCompleted()
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnCompleted();
            }
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null)
            {
                throw new ArgumentNullException("observer");
            }

            lock (thisLock)
            {
                int k = key++;
                subscribers = subscribers.Add(k, observer);
                return new AnonymousDisposable(() =>
                {
                    lock (thisLock)
                    {
                        subscribers = subscribers.Remove(k);
                    }
                });
            }
        }
    }

    class AnonymousDisposable : IDisposable
    {
        Action dispose;
        public AnonymousDisposable(Action dispose)
        {
            this.dispose = dispose;
        }

        public void Dispose()
        {
            dispose();
        }
    }
}

Редактировать: Не вызывайте исключение ObjectDisposedException, если Dispose вызывается дважды

Это было полезно?

Решение

Честно говоря, я не уверен, насколько все это "правильно", но, судя по моему опыту, это довольно неплохо.Это F # код, но, надеюсь, вы почувствуете его вкус.Он позволяет вам "создать" исходный объект, для которого вы затем можете вызвать Next / Completed / Error , и он управляет подписками и пытается утверждать, когда источник или клиенты делают плохие вещи.

type ObservableSource<'T>() =     // '
    let protect f =
        let mutable ok = false
        try 
            f()
            ok <- true
        finally
            Debug.Assert(ok, "IObserver methods must not throw!")
            // TODO crash?
    let mutable key = 0
    // Why a Map and not a Dictionary?  Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over
    let mutable subscriptions = Map.empty : Map<int,IObserver<'T>>  // '
    let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnNext(x)))
    let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnCompleted()))
    let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnError(e)))
    let thisLock = new obj()
    let obs = 
        { new IObservable<'T> with       // '
            member this.Subscribe(o) =
                let k =
                    lock thisLock (fun () ->
                        let k = key
                        key <- key + 1
                        subscriptions <- subscriptions.Add(k, o)
                        k)
                { new IDisposable with 
                    member this.Dispose() = 
                        lock thisLock (fun () -> 
                            subscriptions <- subscriptions.Remove(k)) } }
    let mutable finished = false
    // The methods below are not thread-safe; the source ought not call these methods concurrently
    member this.Next(x) =
        Debug.Assert(not finished, "IObserver is already finished")
        next x
    member this.Completed() =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        completed()
    member this.Error(e) =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        error e
    // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads
    member this.Value = obs

Мне будут интересны любые мысли о том, что здесь хорошо или плохо;У меня еще не было возможности ознакомиться со всеми новинками Rx от devlabs...

Мой собственный опыт подсказывает , что:

  • Те, кто подписывается на observables, никогда не должны отказываться от подписок.Нет ничего разумного, что observable может сделать, когда подписчик выбрасывает.(Это похоже на события.) Скорее всего, исключение просто всплывет в обработчике catch-all верхнего уровня или приведет к сбою приложения.
  • Источники, вероятно, должны быть "логически однопоточными".Я думаю, что может быть сложнее писать клиентов, которые могут реагировать на одновременные вызовы onNext;даже если каждый отдельный вызов поступает из другого потока, полезно избегать одновременных вызовов.
  • Определенно полезно иметь базовый / вспомогательный класс, который обеспечивает выполнение некоторых "контрактов".

Мне очень любопытно, могут ли люди дать более конкретный совет в этом направлении.

Другие советы

В официальная документация осуждает пользователей, самостоятельно внедряющих IObservable.Вместо этого ожидается, что пользователи будут использовать заводской метод Observable.Create

Когда это возможно, внедряйте новые операторы, составляя существующие операторы.В противном случае реализуйте пользовательские операторы, используя Observable .Создать

Бывает, что Observable.Create - это тривиальная оболочка вокруг внутреннего класса Reactive AnonymousObservable:

public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
{
    if (subscribe == null)
    {
        throw new ArgumentNullException("subscribe");
    }
    return new AnonymousObservable<TSource>(subscribe);
}

Я не знаю, почему они не обнародовали свою реализацию, но, эй, неважно.

Да, ключевое слово yield - прекрасное;может быть, будет что-то подобное для IObservable (часто)?[править:В книге Эрика Мейера Выступление PDC '09 он говорит "да, следите за этим пространством" в ответ на декларативный результат для генерации наблюдаемых.]

Чтобы найти что-то близкое (вместо того, чтобы создавать свое собственное), ознакомьтесь на дне из числа "(пока нет) 101 Образец Rx" wiki, где команда предлагает использовать класс Subject (T) в качестве "серверной части" для реализации IObservable (часто).Вот их пример:

public class Order
{            
    private DateTime? _paidDate;

    private readonly Subject<Order> _paidSubj = new Subject<Order>();
    public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } }

    public void MarkPaid(DateTime paidDate)
    {
        _paidDate = paidDate;                
        _paidSubj.OnNext(this); // Raise PAID event
    }
}

private static void Main()
{
    var order = new Order();
    order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe

    order.MarkPaid(DateTime.Now);
}
  1. Приоткройте Отражатель и взгляните.

  2. Посмотрите несколько видеороликов C9 - это один из них показывает, как вы можете "вывести" выбранный "комбинатор"

  3. Секрет заключается в создании классов AnonymousObservable, AnonymousObserver и AnonymousDisposable (которые являются просто обходными путями для того, чтобы вы не могли создавать экземпляры интерфейсов).Они содержат нулевую реализацию, поскольку вы передаете ее с помощью действий и функций.

Например:

public class AnonymousObservable<T> : IObservable<T>
{
    private Func<IObserver<T>, IDisposable> _subscribe;
    public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
    {
        _subscribe = subscribe;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _subscribe(observer);
    }
}

Я позволю тебе додумать остальное...это очень хорошее упражнение в понимании.

Там растет милая маленькая ниточка здесь с сопутствующими вопросами.

только одно замечание относительно этой реализации :

после появления параллельных коллекций в .net fw 4, вероятно, лучше использовать ConcurrentDictioary вместо простого словаря.

это экономит время обработки блокировок в коллекции.

ади.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top