Pregunta

Las extensiones reactivas vienen con una gran cantidad de métodos de ayuda para convertir los eventos existentes y las operaciones asincrónicas en observables sino cómo se implementa una IObservable a partir de cero?

IEnumerable tiene la palabra clave de rendimiento preciosa para que sea muy fácil de implementar.

¿Cuál es la forma correcta de aplicar IObservable ?

¿Es necesario preocuparse por la seguridad de rosca?

Sé que hay soporte para conseguir llamado de nuevo en un contexto de sincronización específica pero esto es algo que como IObservable Autor necesidad de preocuparse por esto de alguna manera o incorporada?

actualización:

Esta es mi versión de C # # F solución de Brian

using System;
using System.Linq;
using Microsoft.FSharp.Collections;

namespace Jesperll
{
    class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
    {
        private FSharpMap<int, IObserver<T>> subscribers = 
                 FSharpMap<int, IObserver<T>>.Empty;
        private readonly object thisLock = new object();
        private int key;
        private bool isDisposed;

        public void Dispose()
        {
            Dispose(true);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing && !isDisposed)
            {
                OnCompleted();
                isDisposed = true;
            }
        }

        protected void OnNext(T value)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnNext(value);
            }
        }

        protected void OnError(Exception exception)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnError(exception);
            }
        }

        protected void OnCompleted()
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnCompleted();
            }
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null)
            {
                throw new ArgumentNullException("observer");
            }

            lock (thisLock)
            {
                int k = key++;
                subscribers = subscribers.Add(k, observer);
                return new AnonymousDisposable(() =>
                {
                    lock (thisLock)
                    {
                        subscribers = subscribers.Remove(k);
                    }
                });
            }
        }
    }

    class AnonymousDisposable : IDisposable
    {
        Action dispose;
        public AnonymousDisposable(Action dispose)
        {
            this.dispose = dispose;
        }

        public void Dispose()
        {
            dispose();
        }
    }
}

editar No tire ObjectDisposedException si Desechar se llama dos veces

¿Fue útil?

Solución

Sinceramente, no estoy seguro de cómo 'derecho' es todo esto, pero si se siente muy bien basado en mi experiencia hasta ahora. Es F # código, pero esperamos que pueda hacerse una idea del sabor. Le permite 'nuevo hasta' un objeto de origen, que luego se puede llamar Siguiente / Completo / error en, y gestiona suscripciones y trata de hacer valer cuando la fuente o clientes hacen cosas malas.

type ObservableSource<'T>() =     // '
    let protect f =
        let mutable ok = false
        try 
            f()
            ok <- true
        finally
            Debug.Assert(ok, "IObserver methods must not throw!")
            // TODO crash?
    let mutable key = 0
    // Why a Map and not a Dictionary?  Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over
    let mutable subscriptions = Map.empty : Map<int,IObserver<'T>>  // '
    let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnNext(x)))
    let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnCompleted()))
    let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnError(e)))
    let thisLock = new obj()
    let obs = 
        { new IObservable<'T> with       // '
            member this.Subscribe(o) =
                let k =
                    lock thisLock (fun () ->
                        let k = key
                        key <- key + 1
                        subscriptions <- subscriptions.Add(k, o)
                        k)
                { new IDisposable with 
                    member this.Dispose() = 
                        lock thisLock (fun () -> 
                            subscriptions <- subscriptions.Remove(k)) } }
    let mutable finished = false
    // The methods below are not thread-safe; the source ought not call these methods concurrently
    member this.Next(x) =
        Debug.Assert(not finished, "IObserver is already finished")
        next x
    member this.Completed() =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        completed()
    member this.Error(e) =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        error e
    // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads
    member this.Value = obs

Voy a estar interesado en alguna idea sobre lo que es bueno o malo aquí; No he tenido la oportunidad de mirar a todas las cosas nuevas Rx de devlabs aún ...

Mis propias experiencias sugieren que:

  • Los que suscriben a los observables nunca se debe tirar de las suscripciones. No hay nada razonable un observable puede hacer cuando un abonado de lanza. (Esto es similar a los eventos.) Lo más probable es la excepción se acaba de burbuja hasta un nivel superior cajón de sastre manejador o chocar la aplicación.
  • Fuentes probablemente debería ser "roscado lógicamente sencillo". Creo que puede ser más difícil de escribir clientes que pueden reaccionar a las llamadas OnNext concurrentes; incluso si cada llamada proviene de un hilo diferente, es útil para evitar llamadas simultáneas.
  • Es sin duda útil tener una clase base / ayudante que hace cumplir algunos 'contratos'.

Estoy muy ansioso por ver si la gente puede mostrar un asesoramiento más concreto a lo largo de estas líneas.

Otros consejos

El documentación oficial deprecates usuarios ejecución IObservable sí mismos. En lugar de ello, se espera que los usuarios utilicen el método de fábrica Observable.Create

  

Cuando sea posible, implementar nuevos operadores mediante la composición de los operadores existentes. De lo contrario, aplicar operadores personalizados utilizando Observable.Create

Sucede que Observable.Create es una envoltura alrededor de trivial AnonymousObservable clase interna del Reactivo:

public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
{
    if (subscribe == null)
    {
        throw new ArgumentNullException("subscribe");
    }
    return new AnonymousObservable<TSource>(subscribe);
}

No sé por qué no hacer pública su aplicación, pero bueno, lo que sea.

Sí, la palabra clave de rendimiento es precioso; tal vez habrá algo similar para IObservable (OFT)? [Editar: En PDC '09 charla dice "sí, mira esto espacio" a un rendimiento declarativa para generar observables.]

Para algo cercano (en vez de rodar su propio), echa un vistazo a la parte inferior de la " (todavía no) 101 Rx muestras " wiki, donde el equipo sugiere el uso del Sujeto (T) como clase un "back-end" para implementar un IObservable (OFT). Aquí está su ejemplo:

public class Order
{            
    private DateTime? _paidDate;

    private readonly Subject<Order> _paidSubj = new Subject<Order>();
    public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } }

    public void MarkPaid(DateTime paidDate)
    {
        _paidDate = paidDate;                
        _paidSubj.OnNext(this); // Raise PAID event
    }
}

private static void Main()
{
    var order = new Order();
    order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe

    order.MarkPaid(DateTime.Now);
}
  1. Grieta Reflector abierto y echar un vistazo.

  2. ver algunos videos C9 - esta uno muestra cómo se puede 'deriva' Select 'Combinator'

  3. El secreto es crear AnonymousObservable, AnonymousObserver y AnonymousDisposable clases, (que son sólo trabajan rodeos por el hecho de que no se puede crear instancias de las interfaces). Contienen aplicación cero, a medida que pasan que con acciones y Funcs.

Por ejemplo:

public class AnonymousObservable<T> : IObservable<T>
{
    private Func<IObserver<T>, IDisposable> _subscribe;
    public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
    {
        _subscribe = subscribe;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _subscribe(observer);
    }
}

Voy a dejar de hacer ejercicio el resto ... es un muy buen ejercicio de comprensión.

Hay un pequeño hilo de crecimiento aquí relacionada con preguntas.

simplemente una observación con respecto a esta aplicación:

después de colecciones concurrentes están introduciendo en FW .net 4 es probable que sea mejor utilizar ConcurrentDictioary en lugar de un simple diccionario.

guarda la manipulación de cerraduras en la colección.

adi.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top