Question

In my .NET 4.0 library I have a piece of code that sends data over the network and waits for a response. In order to not block the calling code the method returns a Task<T> that completes when the response is received so that the code can call the method like this:

// Send the 'message' to the given 'endpoint' and then wait for the response
Task<IResult> task = sender.SendMessageAndWaitForResponse(endpoint, message);
task.ContinueWith(
    t => 
    {
        // Do something with t.Result ...
    });

The underlying code uses a TaskCompletionSource so that it can wait for the response message without having to spin up a thread only to have it sit there idling until the response comes in:

private readonly Dictionary<int, TaskCompletionSource<IResult>> m_TaskSources
    = new Dictionary<int, TaskCompletionSource<IResult>>();

public Task<IResult> SendMessageAndWaitForResponse(int endpoint, object message)
{
    var source = new TaskCompletionSource<IResult>(TaskCreationOptions.None);
    m_TaskSources.Add(endpoint, source);

    // Send the message here ...

    return source.Task;
}

When the response is received it is processed like this:

public void CompleteWaitForResponseResponse(int endpoint, IResult value)
{
    if (m_TaskSources.ContainsKey(endpoint))
    {
        var source = m_TaskSources[endpoint];
        source.SetResult(value);
        m_TaskSources.Remove(endpoint);
    }
}

Now I want to add a time-out so that the calling code won't wait indefinitely for the response. However on .NET 4.0 that is somewhat messy because there is no easy way to time-out a task. So I was wondering if Rx would be able to do this easier. So I came up with the following:

private readonly Dictionary<int, Subject<IResult>> m_SubjectSources
    = new Dictionary<int, Subject<IResult>>();

private Task<IResult> SendMessageAndWaitForResponse(int endpoint, object message, TimeSpan timeout)
{
    var source = new Subject<IResult>();
    m_SubjectSources.Add(endpoint, source);

    // Send the message here ...

    return source.Timeout(timeout).ToTask();
}

public void CompleteWaitForResponseResponse(int endpoint, IResult value)
{
    if (m_SubjectSources.ContainsKey(endpoint))
    {
        var source = m_SubjectSources[endpoint];
        source.OnNext(value);
        source.OnCompleted();
        m_SubjectSources.Remove(endpoint);
    }
}

This all seems to work without issue, however I've seen several questions stating that Subject should be avoided so now I'm wondering if there is a more Rx-y way to achieve my goal.

Was it helpful?

Solution

The advice to avoid using Subject in Rx is often overstated. There has to be a source for events in Rx, and it's fine for it to be a Subject.

The issue with Subject is generally when it is used in between two Rx queries that could otherwise be joined, or where there is already a well-defined conversion to IObservable<T> (such as Observable.FromEventXXX or Observable.FromAsyncXXX etc.

If you want, you can do away with the Dictionary and multiple Subjects with the approach below. This uses a single subject and returns a filtered query to the client.

It's not "better" per se, Whether this makes sense will depend on the specifics of your scenario, but it saves spawning lots of subjects, and gives you a nice option for monitoring all results in a single stream. If you were dispatching results serially (say from a message queue) this could make sense.

// you only need to synchronize if you are receiving results in parallel
private readonly ISubject<Tuple<int,IResult>, Tuple<int,IResult>> results =
    Subject.Synchronize(new Subject<Tuple<int,IResult>>());

private Task<IResult> SendMessageAndWaitForResponse(
    int endpoint, object message, TimeSpan timeout)
{           
    // your message processing here, I'm just echoing a second later
    Task.Delay(TimeSpan.FromSeconds(1)).ContinueWith(t => {
        CompleteWaitForResponseResponse(endpoint, new Result { Value = message }); 
    });

    return results.Where(r => r.Item1 == endpoint)
                  .Select(r => r.Item2)
                  .Take(1)
                  .Timeout(timeout)
                  .ToTask();
}

public void CompleteWaitForResponseResponse(int endpoint, IResult value)
{
    results.OnNext(Tuple.Create(endpoint,value));
}

Where I defined a class for results like this:

public class Result : IResult
{
    public object Value { get; set; }
}

public interface IResult
{
    object Value { get; set; }
}

EDIT - In response to additional questions in the comments.

  • No need to dispose of the single Subject - it won't leak and will be garbage collected when it goes out of scope.

  • ToTask does accept a cancellation token - but that's really for cancellation from the client side.

  • If the remote side disconnects, you can send an the error to all clients with results.OnError(exception); - you'll want to instantiate a new subject instance at the same time.

Something like:

private void OnRemoteError(Exception e)
{
    results.OnError(e);        
}

This will manifest as a faulted task to all clients in the expected manner.

It's pretty thread safe too because clients subscribing to a subject that has previously sent OnError will get an error back immediately - it's dead from that point. Then when ready you can reinitialise with:

private void OnInitialiseConnection()
{
    // ... your connection logic

    // reinitialise the subject...
    results = Subject.Synchronize(new Subject<Tuple<int,IResult>>());
}

For individual client errors, you could consider:

  • Extending your IResult interface to include errors as data
  • You can then optionally project this to a fault for just that client by extending the Rx query in SendMessageAndWaitForResponse. For example, and an Exception and HasError property to IResult so that you can do something like:

    return results.Where(r => r.Item1 == endpoint)
                .SelectMany(r => r.Item2.HasError
                    ? Observable.Throw<IResult>(r.Item2.Exception)
                    : Observable.Return(r.Item2))
                .Take(1)
                .Timeout(timeout)
                .ToTask();
    
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top