Question

I'm trying to do a fully async read of a web end-point using Rx. I made it work, in an ugly way, using something like this:

        var reqUri = new Uri(string.Format("https://cds.cern.ch/record/{0}/export/xm?ln=en", docID));
        var wr = WebRequest.CreateHttp(reqUri);

        var s = Observable
                .FromAsyncPattern<WebResponse>(
                    wr.BeginGetResponse,
                    wr.EndGetResponse)
                .Invoke()
                .Catch(Observable.Return<WebResponse>(null))
                .Select(ExtractString)
                .Select(ParseToMD);

The ExtractString is a blocking method that opens the stream, and reads it back. I'd like to do it async, however, but to do that I'm having some trouble. I think the following should work:

        var s = Observable
                .FromAsyncPattern<WebResponse>(
                    wr.BeginGetResponse,
                    wr.EndGetResponse)
                .Invoke()
                .SelectMany(resp => Observable.Using(() => resp.GetResponseStream(), strm => Observable.Return(strm)))
                .SelectMany(resp => Observable.Using(() => new StreamReader(resp), strm => Observable.Return(strm)))
                .SelectMany(strm => Observable.FromAsync(tkn => strm.ReadToEndAsync()))
                .Select(ParseToMD);

And, if I put a break point at some point in there, sometimes it works. Other times it just hangs. So, I have two questions. First, is this the right way to do something like this? Second, what is the best way to debug something like this? Is there some tracing I could watch that shows the events going through the system? I have just been setting break points in the lambda functions for now.

The ParseMD function just converts a string into a class of data.

Was it helpful?

Solution

You can trace the stream lifetimes using the Spy method I present in this question.

You aren't far off here - I think the problem might be that you are being a bit over-zealous with the Observable.Using. The Observable.Return's can complete causing a race condition where the disposal of the response stream can take place before the stream reader is finished. I think this might behave more reliably:

var reqUri = new Uri(
    string.Format("https://cds.cern.ch/record/{0}/export/xm?ln=en", docId));

var s = Observable.FromAsyncPattern<WebResponse>(
    wr.BeginGetResponse,
    wr.EndGetResponse)
    .Invoke()
    .Select(response => response.GetResponseStream())
    .SelectMany(stream =>
        Observable.Using(    () => new StreamReader(stream),
                         reader => Observable.StartAsync(reader.ReadToEndAsync)))
    .Select(ParseToMD);
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top