Pergunta

As extensões reativas vêm com muitos métodos auxiliares para transformar eventos existentes e operações assíncronas em observáveis, mas como você implementaria um IOBServableu003CT> do princípio?

O IEnumerable tem a adorável palavra -chave de rendimento para torná -la muito simples de implementar.

Qual é a maneira correta de implementar o IoBServableu003CT> ?

Eu preciso me preocupar com a segurança do tópico?

Eu sei que há suporte para ser chamado de volta em um contexto de sincronização específico, mas isso é algo que eu como um IOBServableu003CT> O autor precisa se preocupar ou de alguma forma embutido?

atualizar:

Aqui está minha versão C# da solução F# de Brian

using System;
using System.Linq;
using Microsoft.FSharp.Collections;

namespace Jesperll
{
    class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
    {
        private FSharpMap<int, IObserver<T>> subscribers = 
                 FSharpMap<int, IObserver<T>>.Empty;
        private readonly object thisLock = new object();
        private int key;
        private bool isDisposed;

        public void Dispose()
        {
            Dispose(true);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing && !isDisposed)
            {
                OnCompleted();
                isDisposed = true;
            }
        }

        protected void OnNext(T value)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnNext(value);
            }
        }

        protected void OnError(Exception exception)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnError(exception);
            }
        }

        protected void OnCompleted()
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnCompleted();
            }
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null)
            {
                throw new ArgumentNullException("observer");
            }

            lock (thisLock)
            {
                int k = key++;
                subscribers = subscribers.Add(k, observer);
                return new AnonymousDisposable(() =>
                {
                    lock (thisLock)
                    {
                        subscribers = subscribers.Remove(k);
                    }
                });
            }
        }
    }

    class AnonymousDisposable : IDisposable
    {
        Action dispose;
        public AnonymousDisposable(Action dispose)
        {
            this.dispose = dispose;
        }

        public void Dispose()
        {
            dispose();
        }
    }
}

editar: Não jogue o objectDisposedException se o descarte for chamado duas vezes

Foi útil?

Solução

Honestamente, não tenho certeza de como é 'certo' tudo isso, mas se é muito bom com base na minha experiência até agora. É o código F#, mas espero que você tenha uma noção do sabor. Ele permite que você 'up' um objeto de origem, que você pode ligar para o próximo/concluído/erro, e gerencia assinaturas e tenta afirmar quando a fonte ou os clientes fazem coisas ruins.

type ObservableSource<'T>() =     // '
    let protect f =
        let mutable ok = false
        try 
            f()
            ok <- true
        finally
            Debug.Assert(ok, "IObserver methods must not throw!")
            // TODO crash?
    let mutable key = 0
    // Why a Map and not a Dictionary?  Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over
    let mutable subscriptions = Map.empty : Map<int,IObserver<'T>>  // '
    let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnNext(x)))
    let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnCompleted()))
    let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnError(e)))
    let thisLock = new obj()
    let obs = 
        { new IObservable<'T> with       // '
            member this.Subscribe(o) =
                let k =
                    lock thisLock (fun () ->
                        let k = key
                        key <- key + 1
                        subscriptions <- subscriptions.Add(k, o)
                        k)
                { new IDisposable with 
                    member this.Dispose() = 
                        lock thisLock (fun () -> 
                            subscriptions <- subscriptions.Remove(k)) } }
    let mutable finished = false
    // The methods below are not thread-safe; the source ought not call these methods concurrently
    member this.Next(x) =
        Debug.Assert(not finished, "IObserver is already finished")
        next x
    member this.Completed() =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        completed()
    member this.Error(e) =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        error e
    // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads
    member this.Value = obs

Estarei interessado em qualquer pensamento sobre o que é bom ou ruim aqui; Ainda não tive a chance de olhar para todas as novas coisas do RX do Devlabs ...

Minhas próprias experiências sugerem que:

  • Aqueles que se inscrevem no Observables nunca devem jogar das assinaturas. Não há nada razoável que um observável possa fazer quando um assinante joga. (Isso é semelhante aos eventos.) Provavelmente, a exceção apenas borbulhará até um manipulador de captura de nível superior ou falhará o aplicativo.
  • As fontes provavelmente devem ser "logicamente simples". Eu acho que pode ser mais difícil escrever clientes que possam reagir a chamadas simultâneas no OnNext; Mesmo que cada chamada individual venha de um encadeamento diferente, é útil evitar chamadas simultâneas.
  • É definitivamente útil ter uma classe base/auxiliar que aplique alguns 'contratos'.

Estou muito curioso se as pessoas podem mostrar mais conselhos concretos nesse sentido.

Outras dicas

o documentação oficial deprecia os usuários que implementam o IoBServable. Em vez disso, espera -se que os usuários usem o método da fábrica Observable.Create

Quando possível, implemente novos operadores compondo operadores existentes. Caso contrário, implemente os operadores personalizados usando o observável.create

Acontece que observável.create é um invólucro trivial em torno da classe interna do Reactive AnonymousObservable:

public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
{
    if (subscribe == null)
    {
        throw new ArgumentNullException("subscribe");
    }
    return new AnonymousObservable<TSource>(subscribe);
}

Não sei por que eles não tornaram sua implementação pública, mas ei, tanto faz.

Sim, a palavra -chave de rendimento é adorável; Talvez haja algo semelhante para o IOBServable (OFT)? [Editar: no Eric Meijer PDC '09 Talk Ele diz "sim, assista a este espaço" a um rendimento declarativo para gerar observáveis.

Para algo próximo (em vez de rolar o seu próprio), confira o fundo do "(ainda não) 101 amostras RX"Wiki, onde a equipe sugere o uso da classe de sujeito (t) como um" back -end "para implementar um IOBServable (muitas vezes). Aqui está o exemplo deles:

public class Order
{            
    private DateTime? _paidDate;

    private readonly Subject<Order> _paidSubj = new Subject<Order>();
    public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } }

    public void MarkPaid(DateTime paidDate)
    {
        _paidDate = paidDate;                
        _paidSubj.OnNext(this); // Raise PAID event
    }
}

private static void Main()
{
    var order = new Order();
    order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe

    order.MarkPaid(DateTime.Now);
}
  1. Crack Open Reflector e dê uma olhada.

  2. Assista a alguns vídeos C9 - isto Um mostra como você pode 'derivar' o 'combinador selecionado'

  3. O segredo é criar classes anonymousobservable, anonymousobserver e anonymousousdisposable (que são apenas contornadas pelo fato de você não pode instantar interfaces). Eles contêm implementação zero, à medida que você passa com ações e funcas.

Por exemplo:

public class AnonymousObservable<T> : IObservable<T>
{
    private Func<IObserver<T>, IDisposable> _subscribe;
    public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
    {
        _subscribe = subscribe;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _subscribe(observer);
    }
}

Vou deixar você resolver o resto ... é um bom exercício de entendimento.

Há um pequeno fio agradável crescendo aqui com perguntas relacionadas.

Apenas uma observação sobre esta implementação:

Após as coleções simultâneas sendo introduzidas no .NET FW 4, provavelmente é melhor usar o ConcurrentDictioary em vez de um simples dicionário.

Economiza bloqueios de manuseio na coleção.

Adi.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top