質問

私は私の最初の単純な「Hello World」のRXアプリケーションに苦しんでいます。私はVS2010 RC、プラス最新RXのダウンロードを使用しています。

以下は、単純なコンソールアプリケーションである

    class Program
    {
        static void Main(string[] args)
        {

            var channel = new MessageChannel()
                .Where(m => m.process)
                .Subscribe((MyMessage m) => Console.WriteLine(m.subject));

            //channel.GenerateMsgs();
        }
    }

    public class MyMessage
    {
        public string subject;
        public bool process;
    }

    public class MessageChannel: IObservable<MyMessage>
    {
        List<IObserver<MyMessage>> observers = new List<IObserver<MyMessage>>();

        public IDisposable Subscribe(IObserver<MyMessage> observer)
        {
            observers.Add(observer);
            return observer as IDisposable;
        }

        public void GenerateMsgs()
        {
            foreach (IObserver<MyMessage> observer in observers)
            {
                observer.OnNext(new MyMessage() {subject = "Hello!", process = true});
            }
        }
    }

私は、WHERE句で例外ArgumentNullExceptionを取得します。ここでは、スタックがあります。

System.ArgumentNullException was unhandled
  Message=Value cannot be null.
Parameter name: disposable
  Source=System.Reactive
  ParamName=disposable
  StackTrace:
       at System.Collections.Generic.AnonymousObservable`1.Disposable.Set(IDisposable disposable)
       at System.Collections.Generic.AnonymousObservable`1.<>c__DisplayClass1.<Subscribe>b__0()
       at System.Threading.Scheduler.NowScheduler.Schedule(Action action)
       at System.Collections.Generic.AnonymousObservable`1.Subscribe(IObserver`1 observer)
       at ConsoleApplication1.Program.Main(String[] args) in C:\Users\Jason\documents\visual studio 2010\Projects\ConsoleApplication1\ConsoleApplication1\Program.cs:line 18
       at System.AppDomain._nExecuteAssembly(RuntimeAssembly assembly, String[] args)
       at System.AppDomain.ExecuteAssembly(String assemblyFile, Evidence assemblySecurity, String[] args)
       at Microsoft.VisualStudio.HostingProcess.HostProc.RunUsersAssembly()
       at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
       at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
       at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
       at System.Threading.ThreadHelper.ThreadStart()
  InnerException: 
役に立ちましたか?

解決

この行は大騒ぎを引き起こしているようだ。

return observer as IDisposable;

あなたは、観察者が使い捨てである、あなたが「退会」を知っている使い捨てのオブジェクトを返すことになっています。

を仮定することになっていません
  

メソッドへの参照を返します   IDisposableインターフェイス。この可能   退会するオブザーバー(つまり、   通知の受信を停止します)   プロバイダが終了する前に   それらを送信すると呼ばれます   加入者のOnCompletedメソッドます。

あなたはそれが何かをすることによって動作させることができます:

public class MessageChannel: IObservable<MyMessage>
{
    class Subscription : IDisposable {
        MessageChannel _c;
        IObservable<MyMessage> _obs;
        public Subscription(MessageChannel c, IObservable<MyMessage> obs) { 
            _c = c; _obs = obs;
        }
        public void Dispose() {
            _c.Unsubscribe(_obs);
        }
    }

    public IDisposable Subscribe(IObserver<MyMessage> observer)
    {
        observers.Add(observer);
        return new Subscription(this, observer);
    }

    void Unsubscribe(IObservable<MyMessage> obs) {
        observers.Remove(obs);
    }
}

他のヒント

!!レッドフラグ!!

私は強くあなたがIObserver<T>を実装するかを自分でIObservable<T>していないことを示唆しています。 Observable.Create<T>の使用を好むか、最後の手段としてSubject型を使用します。あなたは正しく、正しい受信の種類や事業者によって自動的に処理され、これらのインタフェースを実装するために検討する必要があるものがたくさんあります。

この例では、私はMessageChannelタイプをドロップすることができ促すだろうと

のためにそれを交換します
class Program
{
    static void Main(string[] args)
    {
        var channel = GenerateMsgs()
            .Where(m => m.process)
            .Subscribe((MyMessage m) => Console.WriteLine(m.subject));
    }

    public IObservable<MyMessage> GenerateMsgs()
    {
        return Observable.Create<MyMessage>(observer=>
        {
            observer.OnNext(new MyMessage() {subject = "Hello!", process = true});
        });
    }
}

public class MyMessage
{
    public string subject;
    public bool process;
}

は、システム設計の更なる検査では、あなたは、観察可能なシーケンスとして「チャンネル」を公開するサービスのいくつかの並べ替えを持っていることがあります。

public interface OrderService
{
    IObservable<OrderRequest> OrderRequests();
    IObservable<Order> ProcessedOrders();
    IObservable<OrderRejection> OrdersRejections();
}

このようにIObserver<T>またはIObservable<T>のこれらのカスタム実装の必要性を否定ます。

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top