Первый прерывистый шаг с реактивными расширениями

StackOverflow https://stackoverflow.com/questions/2258658

  •  20-09-2019
  •  | 
  •  

Вопрос

Я борюсь со своим первым простым приложением RX "hello world".Я использую VS2010 RC, плюс последнюю версию RX.

Ниже приведено простое консольное приложение;

    class Program
    {
        static void Main(string[] args)
        {

            var channel = new MessageChannel()
                .Where(m => m.process)
                .Subscribe((MyMessage m) => Console.WriteLine(m.subject));

            //channel.GenerateMsgs();
        }
    }

    public class MyMessage
    {
        public string subject;
        public bool process;
    }

    public class MessageChannel: IObservable<MyMessage>
    {
        List<IObserver<MyMessage>> observers = new List<IObserver<MyMessage>>();

        public IDisposable Subscribe(IObserver<MyMessage> observer)
        {
            observers.Add(observer);
            return observer as IDisposable;
        }

        public void GenerateMsgs()
        {
            foreach (IObserver<MyMessage> observer in observers)
            {
                observer.OnNext(new MyMessage() {subject = "Hello!", process = true});
            }
        }
    }

Я получаю исключение ArgumentNullException в предложении Where.Вот эта стопка;

System.ArgumentNullException was unhandled
  Message=Value cannot be null.
Parameter name: disposable
  Source=System.Reactive
  ParamName=disposable
  StackTrace:
       at System.Collections.Generic.AnonymousObservable`1.Disposable.Set(IDisposable disposable)
       at System.Collections.Generic.AnonymousObservable`1.<>c__DisplayClass1.<Subscribe>b__0()
       at System.Threading.Scheduler.NowScheduler.Schedule(Action action)
       at System.Collections.Generic.AnonymousObservable`1.Subscribe(IObserver`1 observer)
       at ConsoleApplication1.Program.Main(String[] args) in C:\Users\Jason\documents\visual studio 2010\Projects\ConsoleApplication1\ConsoleApplication1\Program.cs:line 18
       at System.AppDomain._nExecuteAssembly(RuntimeAssembly assembly, String[] args)
       at System.AppDomain.ExecuteAssembly(String assemblyFile, Evidence assemblySecurity, String[] args)
       at Microsoft.VisualStudio.HostingProcess.HostProc.RunUsersAssembly()
       at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
       at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
       at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
       at System.Threading.ThreadHelper.ThreadStart()
  InnerException: 
Это было полезно?

Решение

Эта строка, похоже, вызывает ажиотаж:

return observer as IDisposable;

Вы не должны предполагать, что наблюдатель является одноразовым, вы должны вернуть одноразовый объект, который знает об "отмене подписки".

Метод возвращает ссылку на IDisposable интерфейс.Это позволяет наблюдателю отказаться от подписки (то есть прекратить получать уведомления) до того, как провайдер завершит их отправку и вызовет метод OnCompleted подписчика.

Вы можете заставить это работать, сделав что-то вроде:

public class MessageChannel: IObservable<MyMessage>
{
    class Subscription : IDisposable {
        MessageChannel _c;
        IObservable<MyMessage> _obs;
        public Subscription(MessageChannel c, IObservable<MyMessage> obs) { 
            _c = c; _obs = obs;
        }
        public void Dispose() {
            _c.Unsubscribe(_obs);
        }
    }

    public IDisposable Subscribe(IObserver<MyMessage> observer)
    {
        observers.Add(observer);
        return new Subscription(this, observer);
    }

    void Unsubscribe(IObservable<MyMessage> obs) {
        observers.Remove(obs);
    }
}

Другие советы

!!Красный флаг!!

Я настоятельно рекомендую вам не реализовывать IObserver<T> или IObservable<T> сам.Отдавайте предпочтение использованию Observable.Create<T> или в крайнем случае используйте Subject типы.Есть много вещей, которые вам нужно учитывать, чтобы правильно реализовать эти интерфейсы, которые обрабатываются правильными типами и операторами Rx.

В этом примере я бы посоветовал вам отказаться от типа MessageChannel и заменить его на

class Program
{
    static void Main(string[] args)
    {
        var channel = GenerateMsgs()
            .Where(m => m.process)
            .Subscribe((MyMessage m) => Console.WriteLine(m.subject));
    }

    public IObservable<MyMessage> GenerateMsgs()
    {
        return Observable.Create<MyMessage>(observer=>
        {
            observer.OnNext(new MyMessage() {subject = "Hello!", process = true});
        });
    }
}

public class MyMessage
{
    public string subject;
    public bool process;
}

При дальнейшем рассмотрении конструкции системы у вас может возникнуть своего рода Служба, которая представляет «каналы» как наблюдаемые последовательности.

public interface OrderService
{
    IObservable<OrderRequest> OrderRequests();
    IObservable<Order> ProcessedOrders();
    IObservable<OrderRejection> OrdersRejections();
}

Таким образом, устраняется необходимость в этих пользовательских реализациях IObserver<T> или IObservable<T>.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top