The advice to avoid using Subject
in Rx is often overstated. There has to be a source for events in Rx, and it's fine for it to be a Subject
.
The issue with Subject is generally when it is used in between two Rx queries that could otherwise be joined, or where there is already a well-defined conversion to IObservable<T>
(such as Observable.FromEventXXX
or Observable.FromAsyncXXX
etc.
If you want, you can do away with the Dictionary
and multiple Subject
s with the approach below. This uses a single subject and returns a filtered query to the client.
It's not "better" per se, Whether this makes sense will depend on the specifics of your scenario, but it saves spawning lots of subjects, and gives you a nice option for monitoring all results in a single stream. If you were dispatching results serially (say from a message queue) this could make sense.
// you only need to synchronize if you are receiving results in parallel
private readonly ISubject<Tuple<int,IResult>, Tuple<int,IResult>> results =
Subject.Synchronize(new Subject<Tuple<int,IResult>>());
private Task<IResult> SendMessageAndWaitForResponse(
int endpoint, object message, TimeSpan timeout)
{
// your message processing here, I'm just echoing a second later
Task.Delay(TimeSpan.FromSeconds(1)).ContinueWith(t => {
CompleteWaitForResponseResponse(endpoint, new Result { Value = message });
});
return results.Where(r => r.Item1 == endpoint)
.Select(r => r.Item2)
.Take(1)
.Timeout(timeout)
.ToTask();
}
public void CompleteWaitForResponseResponse(int endpoint, IResult value)
{
results.OnNext(Tuple.Create(endpoint,value));
}
Where I defined a class for results like this:
public class Result : IResult
{
public object Value { get; set; }
}
public interface IResult
{
object Value { get; set; }
}
EDIT - In response to additional questions in the comments.
No need to dispose of the single Subject - it won't leak and will be garbage collected when it goes out of scope.
ToTask
does accept a cancellation token - but that's really for cancellation from the client side.If the remote side disconnects, you can send an the error to all clients with
results.OnError(exception);
- you'll want to instantiate a new subject instance at the same time.
Something like:
private void OnRemoteError(Exception e)
{
results.OnError(e);
}
This will manifest as a faulted task to all clients in the expected manner.
It's pretty thread safe too because clients subscribing to a subject that has previously sent OnError
will get an error back immediately - it's dead from that point. Then when ready you can reinitialise with:
private void OnInitialiseConnection()
{
// ... your connection logic
// reinitialise the subject...
results = Subject.Synchronize(new Subject<Tuple<int,IResult>>());
}
For individual client errors, you could consider:
- Extending your
IResult
interface to include errors as data You can then optionally project this to a fault for just that client by extending the Rx query in
SendMessageAndWaitForResponse
. For example, and an Exception and HasError property to IResult so that you can do something like:return results.Where(r => r.Item1 == endpoint) .SelectMany(r => r.Item2.HasError ? Observable.Throw<IResult>(r.Item2.Exception) : Observable.Return(r.Item2)) .Take(1) .Timeout(timeout) .ToTask();