Question

Je me bats avec mon premier simple « bonjour monde » l'application RX. J'utilise VS2010 RC, ainsi que le dernier téléchargement RX.

Ce qui suit est l'application simple de la console;

    class Program
    {
        static void Main(string[] args)
        {

            var channel = new MessageChannel()
                .Where(m => m.process)
                .Subscribe((MyMessage m) => Console.WriteLine(m.subject));

            //channel.GenerateMsgs();
        }
    }

    public class MyMessage
    {
        public string subject;
        public bool process;
    }

    public class MessageChannel: IObservable<MyMessage>
    {
        List<IObserver<MyMessage>> observers = new List<IObserver<MyMessage>>();

        public IDisposable Subscribe(IObserver<MyMessage> observer)
        {
            observers.Add(observer);
            return observer as IDisposable;
        }

        public void GenerateMsgs()
        {
            foreach (IObserver<MyMessage> observer in observers)
            {
                observer.OnNext(new MyMessage() {subject = "Hello!", process = true});
            }
        }
    }

Je reçois un ArgumentNullException à la clause Where. Voici la pile;

System.ArgumentNullException was unhandled
  Message=Value cannot be null.
Parameter name: disposable
  Source=System.Reactive
  ParamName=disposable
  StackTrace:
       at System.Collections.Generic.AnonymousObservable`1.Disposable.Set(IDisposable disposable)
       at System.Collections.Generic.AnonymousObservable`1.<>c__DisplayClass1.<Subscribe>b__0()
       at System.Threading.Scheduler.NowScheduler.Schedule(Action action)
       at System.Collections.Generic.AnonymousObservable`1.Subscribe(IObserver`1 observer)
       at ConsoleApplication1.Program.Main(String[] args) in C:\Users\Jason\documents\visual studio 2010\Projects\ConsoleApplication1\ConsoleApplication1\Program.cs:line 18
       at System.AppDomain._nExecuteAssembly(RuntimeAssembly assembly, String[] args)
       at System.AppDomain.ExecuteAssembly(String assemblyFile, Evidence assemblySecurity, String[] args)
       at Microsoft.VisualStudio.HostingProcess.HostProc.RunUsersAssembly()
       at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
       at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
       at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
       at System.Threading.ThreadHelper.ThreadStart()
  InnerException: 
Était-ce utile?

La solution

Cette ligne semble être la cause de l'agitation:

return observer as IDisposable;

Vous n'êtes pas censé assumer le l'observateur est disponible, vous êtes censé retourner un objet jetable qui sait « désabonnement ».

  

La méthode renvoie une référence à un   interface IDisposable. Cela permet   l'observateur de se désabonner (à savoir,   à cesser de recevoir des notifications)   avant que le fournisseur a fini   eux et appelé l'envoi de la   La méthode de l'abonné OnCompleted.

Vous pouvez le faire fonctionner en faisant quelque chose comme:

public class MessageChannel: IObservable<MyMessage>
{
    class Subscription : IDisposable {
        MessageChannel _c;
        IObservable<MyMessage> _obs;
        public Subscription(MessageChannel c, IObservable<MyMessage> obs) { 
            _c = c; _obs = obs;
        }
        public void Dispose() {
            _c.Unsubscribe(_obs);
        }
    }

    public IDisposable Subscribe(IObserver<MyMessage> observer)
    {
        observers.Add(observer);
        return new Subscription(this, observer);
    }

    void Unsubscribe(IObservable<MyMessage> obs) {
        observers.Remove(obs);
    }
}

Autres conseils

!! drapeau rouge !!

Je suggère fortement que vous ne mettez pas en œuvre IObserver<T> ou vous IObservable<T>. Privilégier l'utilisation de Observable.Create<T> ou en dernier recours utiliser les types de Subject. Il y a beaucoup de choses que vous devez prendre en compte pour mettre en œuvre correctement ces interfaces qui est gérée pour vous par les types Rx corrects et les opérateurs.

Dans cet exemple, je vous invite à supprimer le type de MessageChannel et échanger pour

class Program
{
    static void Main(string[] args)
    {
        var channel = GenerateMsgs()
            .Where(m => m.process)
            .Subscribe((MyMessage m) => Console.WriteLine(m.subject));
    }

    public IObservable<MyMessage> GenerateMsgs()
    {
        return Observable.Create<MyMessage>(observer=>
        {
            observer.OnNext(new MyMessage() {subject = "Hello!", process = true});
        });
    }
}

public class MyMessage
{
    public string subject;
    public bool process;
}

Sur une inspection plus poussée d'une conception du système vous pouvez avoir une sorte de service qui expose des « canaux » comme des séquences observables.

public interface OrderService
{
    IObservable<OrderRequest> OrderRequests();
    IObservable<Order> ProcessedOrders();
    IObservable<OrderRejection> OrdersRejections();
}

ainsi le besoin pour ces implémentations personnalisées de IObserver<T> ou IObservable<T>.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top