Question

I have this class for explaining my problem:

public class DataObserver: IDisposable
{
    private readonly List<IDisposable> _subscriptions = new List<IDisposable>();
    private readonly SomeBusinessLogicServer _server;

    public DataObserver(SomeBusinessLogicServer server, IObservable<SomeData> data)
    {
        _server = server;
        _subscriptions.Add(data.Subscribe(TryHandle));
    }

    private void TryHandle(SomeData data)
    {
        try
        {
            _server.MakeApiCallAsync(data).Wait();
        }
        catch (Exception)
        {
            // Handle exceptions somehow!
        }
    }

    public void Dispose()
    {
        _subscriptions.ForEach(s => s.Dispose());
        _subscriptions.Clear();
    }

}

A) How can I avoid blocking inside the TryHandle() function?

B) How would you publish exceptions caught inside that function for handling them properly?

Was it helpful?

Solution

The Rx Design Guidelines provide a lot of useful advice when writing your own Rx operators:

http://go.microsoft.com/fwlink/?LinkID=205219

I'm sure I'll get lambasted for linking to an external article, but this link has been good for a couple of years and it's too big to republish on SO.

OTHER TIPS

First, take a look at CompositeDisposable instead of re-implementing it yourself.

Other than that, there are many answers to your question. I have found that the best insight I've had when working with Rx is realizing that most cases where you want to subscribe are really just more chains in the observable you are building and you don't really want to subscribe but instead want to apply yet another transform to the incoming observable. And let some code that is further "on the edge of the system" and has more knowledge of how to handle errors do the actual subscribing

In the example you have presented:

A) Don't block by just transforming the IObservable<SomeData> into an IObservable<Task> (which is really better expressed as an IObservable<IObservable<Unit>>). B) Publish exceptions by just ending the observable with an error or, if you don't want the exception to end the observable, exposing an IObservable<Exception>.

Here's how I'd re-write your example, assuming you did not want the stream to end on error, but instead just keep running after reporting the errors:

public static class DataObserver
{
    public static IObservable<Exception> ApplyLogic(this IObservable<SomeData> source, SomeBusinessLogicServer server)
    {
        return source
            .Select(data =>
                {
                    // execute the async method as an observable<Unit>
                    // ignore its results, but capture its error (if any) and yield it.
                    return Observable
                        .FromAsync(() => server.MakeApiCallAsync(data))
                        .IgnoreElements()
                        .Select(_ => (Exception)null) // to cast from IObservable<Unit> to IObservable<Exception>
                        .Catch((Exception e) => Observable.Return(e));
                })
            // runs the Api calls sequentially (so they will not run concurrently)
            // If you prefer to let the calls run in parallel, then use
            // .Merge() instead of .Concat()
            .Concat() ;
    }
}


// Usage (in Main() perhaps)
IObservable<SomeData> dataStream = ...;
var subscription = dataStream.ApplyLogic(server).Subscribe(error =>
{
    Console.WriteLine("An error occurred processing a dataItem: {0}", error);
}, fatalError =>
{
    Console.WriteLine("A fatal error occurred retrieving data from the dataStream: {0}", fatalError);
});
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top