Question

I have written a simplified Silverlight client library for my WCF web service using Rx, however I notice sometimes I'm missing completed events.

public IObservable<XElement> GetReport(string reportName)
{
    return from client in Observable.Return(new WebServiceClient())
           from request in Observable.ToAsync<string>(client.GetReportDataAsync)(reportName)
           from result in Observable.FromEvent<GetReportDataCompletedEventArgs>(client, "GetReportDataCompleted").Take(1)
           from close in this.CloseClient(client)
           select result.EventArgs.Result;
}

I believe the issue is caused by the fact that the web service is called and returns prior to subscribing to the completed event. I can't figure out how to get Rx to subscribe to the event prior to the Async call. I tried StartWith but that requires that the input and output types be the same, any ideas?

Was it helpful?

Solution

Seems like the best answer is to use Observable.CreateWithDisposable()

e.g.

public IObservable<XElement> GetReport(string reportName)
{
    return from client in Observable.Return(new WebServiceClient())
            from completed in Observable.CreateWithDisposable<GetReportDataCompletedEventArgs>(observer =>
                {
                    var subscription = Observable.FromEvent<GetReportDataCompletedEventArgs>(client, "GetReportDataCompleted")
                        .Take(1)
                        .Select(e => e.EventArgs)
                        .Subscribe(observer);
                    client.GetReportDataAsync(reportName);
                    return subscription;
                })
            from close in this.CloseClient(client)
            select completed.Result;
}

To make this easier to work with I refactored the CreateWithDisposable into a common function that can be used with all my web service calls, including automatically determining the event name from the event args type:

private IObservable<T> CallService<T>(ICommunicationObject serviceClient, Action start) where T : AsyncCompletedEventArgs
{
    if (typeof(T) == typeof(AsyncCompletedEventArgs))
    {
        throw new InvalidOperationException("Event arguments type cannot be used to determine event name, use event name overload instead.");
    }

    string completedEventName = typeof(T).Name.TrimEnd("EventArgs");
    return CallService<T>(serviceClient, start, completedEventName);
}

private IObservable<T> CallService<T>(ICommunicationObject serviceClient, Action start, string completedEventName) where T : AsyncCompletedEventArgs
{
    return Observable.CreateWithDisposable<T>(observer =>
    {
        var subscription = Observable.FromEvent<T>(serviceClient, completedEventName).Take(1).Select(e => e.EventArgs).Subscribe(observer);
        start();
        return subscription;
    });
}

// Example usage:
public IObservable<XElement> GetReport(string reportName)
{
    return from client in Observable.Return(new WebServiceClient())
            from completed in this.CallService<GetReportDataCompletedEventArgs>(client, () => client.GetReportDataAsync(reportName))
            from close in this.CloseClient(client)
            select completed.Result;
}

/// <summary>
/// Asynchronously closes the web service client
/// </summary>
/// <param name="client">The web service client to be closed.</param>
/// <returns>Returns a cold observable sequence of a single success Unit.</returns>
private IObservable<AsyncCompletedEventArgs> CloseClient(WebServiceClient client)
{
    return this.CallService<AsyncCompletedEventArgs>(client, client.CloseAsync, "CloseCompleted");
}

Hope this helps someone else!

OTHER TIPS

I need to use general WebClient.DownloadStringAsync so here my version.

First, wrap the event:

public static IObservable<IEvent<DownloadStringCompletedEventArgs>>
    GetDownloadStringObservableEvent(this WebClient wc)
{
    return Observable.FromEvent<DownloadStringCompletedEventArgs>(
        wc, "DownloadStringCompleted");
}

Then create the extension method:

public static IObservable<string> GetDownloadString(this WebClient wc, Uri uri)
{
    return Observable.CreateWithDisposable<string>(
        observer => {
            // Several downloads may be going on simultaneously. The token allows
            // us to establish that we're retrieving the right one.
            Guid token = Guid.NewGuid();
            var stringDownloaded = wc.GetDownloadStringObservableEvent()
                    .Where(evt => ((Guid)evt.EventArgs.UserState) == token)
                    .Take(1);        //implicitly unhooks handler after event is received
            bool errorOccurred = false;
            IDisposable unsubscribe =
                stringDownloaded.Subscribe(
                    // OnNext action
                    ev => {
                        // Propagate the exception if one is reported.
                        if (ev.EventArgs.Error != null) {
                            errorOccurred = true;
                            observer.OnError(ev.EventArgs.Error);
                        } else if (!ev.EventArgs.Cancelled) {
                            observer.OnNext(ev.EventArgs.Result);
                        }
                    },
                    // OnError action (propagate exception)
                    ex => observer.OnError(ex),
                    // OnCompleted action
                    () => {
                        if (!errorOccurred) {
                            observer.OnCompleted();
                        }
                    });
            try {
                wc.DownloadStringAsync(uri, token);
            } catch (Exception ex) {
                observer.OnError(ex);
            }
            return unsubscribe;
        }
    );
}

Usage is simple:

wc.GetDownloadString(new Uri("http://myservice"))
    .Subscribe(resultCallback , errorCallback);
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top