Question

I have the following code (simplified for posting purposes).

public class SomeDataObject
{
    public delegate void ReadyEventHandler;
    public delegate void ErrorEventHandler;

    public event ReadyEventHandler Ready;
    public event ErrorEventHandler Error;
    ...
}

pubic class ConsumerClass
{
    private SomeDataObject dataObject;

    private Task<List<string>> GetStrings()
    {
        List<string> results = new List<string>();
        var tcs = new TaskCompletionSource<List<string>>();

        SomeDataObject.ReadyEventHandler ReadyHandler = null;
        SomeDataObject.ErrorEventHandler ErrorHandler = null;

        ReadyHandler += () =>
        {
            for (int i =0; i < dataObject.ItemCount; i++)
                results.Add(dataObject[i].ToString());

            tcs.TrySetResult(results);
        }

        ErrorHandler += ()
        {
            tcs.TrySetException(new Exception("oops!");
        }

        dataObject.Ready += ReadyHandler;
        dataObject.Error += ErrorHandler;

        dataObject.DoRequest();
    }
}

The idea is that when DoRequest call is made, SomeDataObject will get some data and raise either the Ready or Error events (details not important!). If data is available, then the ItemCount indicates how many items are available.

I am new to Rx and cannot find any comparable example. So is it possible to convert this into Rx so that IObservable<string> is returned instead of Task<List<string>> using Observable.Create somehow?

Regards Alan

Was it helpful?

Solution

Matthew's answer is close but has some problems. First, it is eager, which is not normally in the spirit of Rx/Functional programming. Next I think that you will want to be able to release the event handles when the consumer disposes. Finally the usage of a subject should be a code smell, and this case it points to the two problems above :-)

Here I use Observable.Create (which should be your #1 goto tool in the tool box, with subjects being your last resort) to lazily connect, and also offer disconnection/releasing events when the subscription is disposed.

private IObservable<string> GetStrings()
{
    return Observable.Create<string>(o=>
    {
        SomeDataObject.ReadyEventHandler ReadyHandler = null;
        SomeDataObject.ErrorEventHandler ErrorHandler = null;

        ReadyHandler += () =>
        {
            for (int i =0; i < dataObject.ItemCount; i++)
                o.OnNext(dataObject[i].ToString());

            o.OnCompleted();
        }

        ErrorHandler += () =>
        {
            o.OnError(new Exception("oops!"));
        }

        dataObject.Ready += ReadyHandler;
        dataObject.Error += ErrorHandler;

        dataObject.DoRequest();

        return Disposable.Create(()=>
            {
                dataObject.Ready -= ReadyHandler;
                dataObject.Error -= ErrorHandler;
            });
    }
}

I would also consider moving dataObject to a parameter to the method too. Sharing state in an Async system is a source of problems.

OTHER TIPS

In response to your comments on Lee's (perfectly lovely and tick-worthy) answer, here's how to modify his answer to get a single List<string> response and block for it:

private IObservable<List<string>> GetStrings(SomeDataObject dataObject)
{
    return Observable.Create<List<string>>(o=>
    {
        SomeDataObject.ReadyEventHandler ReadyHandler = null;
        SomeDataObject.ErrorEventHandler ErrorHandler = null;

        ReadyHandler = () =>
        {
            var results = new List<string>(dataObject.ItemCount);
            for (int i =0; i < dataObject.ItemCount; i++)
                results.Add(dataObject[i].ToString());

            o.OnNext(results);
            o.OnCompleted();
        };

        ErrorHandler = () =>
        {
            o.OnError(new Exception("oops!"));
        };

        dataObject.Ready += ReadyHandler;
        dataObject.Error += ErrorHandler;

        dataObject.DoRequest();

        return Disposable.Create(()=>
            {
                dataObject.Ready -= ReadyHandler;
                dataObject.Error -= ErrorHandler;
            });
    });
}

Now you can block on this with:

var results = GetStrings().Wait();

If using .NET 4.5, then in an async method you can also do:

var results = await GetStrings();

I think the code below will do what you want. A ReplaySubject is used to ensure that the caller gets all of the results, even if the SomeDataObject events start immediately.

private IObservable<string> GetStrings()
{
    ReplaySubject<string> results = new ReplaySubject<string>();

    SomeDataObject.ReadyEventHandler ReadyHandler = null;
    SomeDataObject.ErrorEventHandler ErrorHandler = null;

    ReadyHandler += () =>
    {
        for (int i =0; i < dataObject.ItemCount; i++)
            results.OnNext(dataObject[i].ToString());

        results.OnCompleted();
    }

    ErrorHandler += ()
    {
        results.OnError(new Exception("oops!"));
    }

    dataObject.Ready += ReadyHandler;
    dataObject.Error += ErrorHandler;

    dataObject.DoRequest();

    return results;
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top