我使用的Rx写了一个简单的Silverlight客户端库为我的WCF Web服务,但是我发现有时候我缺少完成的事件。

public IObservable<XElement> GetReport(string reportName)
{
    return from client in Observable.Return(new WebServiceClient())
           from request in Observable.ToAsync<string>(client.GetReportDataAsync)(reportName)
           from result in Observable.FromEvent<GetReportDataCompletedEventArgs>(client, "GetReportDataCompleted").Take(1)
           from close in this.CloseClient(client)
           select result.EventArgs.Result;
}

我认为这个问题是一个事实,即Web服务调用,并返回之前,订阅完成事件引起的。我无法弄清楚如何获得的Rx之前订阅事件的异步调用。我试图StartWith但要求输入和输出类型是相同的,任何想法?

有帮助吗?

解决方案

好像最好答案是使用Observable.CreateWithDisposable()

e.g。

public IObservable<XElement> GetReport(string reportName)
{
    return from client in Observable.Return(new WebServiceClient())
            from completed in Observable.CreateWithDisposable<GetReportDataCompletedEventArgs>(observer =>
                {
                    var subscription = Observable.FromEvent<GetReportDataCompletedEventArgs>(client, "GetReportDataCompleted")
                        .Take(1)
                        .Select(e => e.EventArgs)
                        .Subscribe(observer);
                    client.GetReportDataAsync(reportName);
                    return subscription;
                })
            from close in this.CloseClient(client)
            select completed.Result;
}

为了使这更容易的工作与我重构了CreateWithDisposable成可以与我所有的Web服务调用,包括事件参数类型自动确定事件名称中使用一个共同的功能:

private IObservable<T> CallService<T>(ICommunicationObject serviceClient, Action start) where T : AsyncCompletedEventArgs
{
    if (typeof(T) == typeof(AsyncCompletedEventArgs))
    {
        throw new InvalidOperationException("Event arguments type cannot be used to determine event name, use event name overload instead.");
    }

    string completedEventName = typeof(T).Name.TrimEnd("EventArgs");
    return CallService<T>(serviceClient, start, completedEventName);
}

private IObservable<T> CallService<T>(ICommunicationObject serviceClient, Action start, string completedEventName) where T : AsyncCompletedEventArgs
{
    return Observable.CreateWithDisposable<T>(observer =>
    {
        var subscription = Observable.FromEvent<T>(serviceClient, completedEventName).Take(1).Select(e => e.EventArgs).Subscribe(observer);
        start();
        return subscription;
    });
}

// Example usage:
public IObservable<XElement> GetReport(string reportName)
{
    return from client in Observable.Return(new WebServiceClient())
            from completed in this.CallService<GetReportDataCompletedEventArgs>(client, () => client.GetReportDataAsync(reportName))
            from close in this.CloseClient(client)
            select completed.Result;
}

/// <summary>
/// Asynchronously closes the web service client
/// </summary>
/// <param name="client">The web service client to be closed.</param>
/// <returns>Returns a cold observable sequence of a single success Unit.</returns>
private IObservable<AsyncCompletedEventArgs> CloseClient(WebServiceClient client)
{
    return this.CallService<AsyncCompletedEventArgs>(client, client.CloseAsync, "CloseCompleted");
}

希望这可以帮助其他人!

其他提示

我需要所以这里使用一般WebClient.DownloadStringAsync我的版本。

首先,包裹事件:

public static IObservable<IEvent<DownloadStringCompletedEventArgs>>
    GetDownloadStringObservableEvent(this WebClient wc)
{
    return Observable.FromEvent<DownloadStringCompletedEventArgs>(
        wc, "DownloadStringCompleted");
}

然后创建扩展方法:

public static IObservable<string> GetDownloadString(this WebClient wc, Uri uri)
{
    return Observable.CreateWithDisposable<string>(
        observer => {
            // Several downloads may be going on simultaneously. The token allows
            // us to establish that we're retrieving the right one.
            Guid token = Guid.NewGuid();
            var stringDownloaded = wc.GetDownloadStringObservableEvent()
                    .Where(evt => ((Guid)evt.EventArgs.UserState) == token)
                    .Take(1);        //implicitly unhooks handler after event is received
            bool errorOccurred = false;
            IDisposable unsubscribe =
                stringDownloaded.Subscribe(
                    // OnNext action
                    ev => {
                        // Propagate the exception if one is reported.
                        if (ev.EventArgs.Error != null) {
                            errorOccurred = true;
                            observer.OnError(ev.EventArgs.Error);
                        } else if (!ev.EventArgs.Cancelled) {
                            observer.OnNext(ev.EventArgs.Result);
                        }
                    },
                    // OnError action (propagate exception)
                    ex => observer.OnError(ex),
                    // OnCompleted action
                    () => {
                        if (!errorOccurred) {
                            observer.OnCompleted();
                        }
                    });
            try {
                wc.DownloadStringAsync(uri, token);
            } catch (Exception ex) {
                observer.OnError(ex);
            }
            return unsubscribe;
        }
    );
}

用法很简单:

wc.GetDownloadString(new Uri("http://myservice"))
    .Subscribe(resultCallback , errorCallback);
许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top