Primo passo esitante con le estensioni reattivi
-
20-09-2019 - |
Domanda
Sto lottando con il mio primo "ciao mondo" semplice applicazione RX. Sto usando VS2010 RC, più l'ultimo RX download.
Quello che segue è la semplice console app;
class Program
{
static void Main(string[] args)
{
var channel = new MessageChannel()
.Where(m => m.process)
.Subscribe((MyMessage m) => Console.WriteLine(m.subject));
//channel.GenerateMsgs();
}
}
public class MyMessage
{
public string subject;
public bool process;
}
public class MessageChannel: IObservable<MyMessage>
{
List<IObserver<MyMessage>> observers = new List<IObserver<MyMessage>>();
public IDisposable Subscribe(IObserver<MyMessage> observer)
{
observers.Add(observer);
return observer as IDisposable;
}
public void GenerateMsgs()
{
foreach (IObserver<MyMessage> observer in observers)
{
observer.OnNext(new MyMessage() {subject = "Hello!", process = true});
}
}
}
ho un ArgumentNullException alla clausola WHERE. Ecco la pila;
System.ArgumentNullException was unhandled
Message=Value cannot be null.
Parameter name: disposable
Source=System.Reactive
ParamName=disposable
StackTrace:
at System.Collections.Generic.AnonymousObservable`1.Disposable.Set(IDisposable disposable)
at System.Collections.Generic.AnonymousObservable`1.<>c__DisplayClass1.<Subscribe>b__0()
at System.Threading.Scheduler.NowScheduler.Schedule(Action action)
at System.Collections.Generic.AnonymousObservable`1.Subscribe(IObserver`1 observer)
at ConsoleApplication1.Program.Main(String[] args) in C:\Users\Jason\documents\visual studio 2010\Projects\ConsoleApplication1\ConsoleApplication1\Program.cs:line 18
at System.AppDomain._nExecuteAssembly(RuntimeAssembly assembly, String[] args)
at System.AppDomain.ExecuteAssembly(String assemblyFile, Evidence assemblySecurity, String[] args)
at Microsoft.VisualStudio.HostingProcess.HostProc.RunUsersAssembly()
at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Threading.ThreadHelper.ThreadStart()
InnerException:
Soluzione
Questa linea sembra essere la causa del polverone:
return observer as IDisposable;
Non sono tenuti ad assumere la l'osservatore è usa e getta, si suppone per restituire un oggetto usa e getta che sa di "annullamento dell'iscrizione".
Il metodo restituisce un riferimento a un interfaccia IDisposable. In questo modo l'osservatore a cancellarsi (cioè, per non ricevere notifiche) prima che il provider ha terminato inviando loro e ha chiamato la Metodo OnCompleted dell'abbonato.
È possibile farlo funzionare facendo qualcosa di simile:
public class MessageChannel: IObservable<MyMessage>
{
class Subscription : IDisposable {
MessageChannel _c;
IObservable<MyMessage> _obs;
public Subscription(MessageChannel c, IObservable<MyMessage> obs) {
_c = c; _obs = obs;
}
public void Dispose() {
_c.Unsubscribe(_obs);
}
}
public IDisposable Subscribe(IObserver<MyMessage> observer)
{
observers.Add(observer);
return new Subscription(this, observer);
}
void Unsubscribe(IObservable<MyMessage> obs) {
observers.Remove(obs);
}
}
Altri suggerimenti
!! Bandiera rossa !!
Vorrei suggerire che non implementare IObserver<T>
o IObservable<T>
da soli. Favorire l'utilizzo di Observable.Create<T>
o come ultima risorsa utilizzare i tipi Subject
. Ci sono un sacco di cose che dovete considerare per implementare correttamente queste interfacce, che viene gestito per voi da tipi e operatori Rx corrette.
In questo esempio io vi esorto a goccia tipo MessageChannel e scambiarlo per
class Program
{
static void Main(string[] args)
{
var channel = GenerateMsgs()
.Where(m => m.process)
.Subscribe((MyMessage m) => Console.WriteLine(m.subject));
}
public IObservable<MyMessage> GenerateMsgs()
{
return Observable.Create<MyMessage>(observer=>
{
observer.OnNext(new MyMessage() {subject = "Hello!", process = true});
});
}
}
public class MyMessage
{
public string subject;
public bool process;
}
In un'ulteriore ispezione di un progetto di sistema si può avere un qualche tipo di servizio che espone "canali" come sequenze osservabili.
public interface OrderService
{
IObservable<OrderRequest> OrderRequests();
IObservable<Order> ProcessedOrders();
IObservable<OrderRejection> OrdersRejections();
}
In tal modo negando la necessità di queste implementazioni personalizzate di IObserver<T>
o IObservable<T>
.