Question

Suppose I have an object that observes an IObservable so that it's always aware of the current state of some external source. Internally my object has a method that uses that external value as part of the operation:

public class MyObject
{
  public MyObject(IObservable<T> externalSource) { ... }

  public void DoSomething()
  {
    DoSomethingWith(CurrentT);
  }
}

What's the idomatic 'reactive' way of using IObservable for 'tracking current state' instead of 'responding to stream of events'.

Idea #1 is to just monitor the observable and write down values as they come in.

public class MyObject
{
  private T CurrentT;
  public MyObject(IObservable<T> externalSource) 
  {
    externalSource.Subscribe((t) => { CurrentT = t; });
  }

  public void DoSomething()
  {
    DoSomethingWith(CurrentT);
  }
}

And that's fine, but keeping track of the state in a class member seems very un-reactive-y.

Idea #2 is to use a BehaviorSubject

public class MyObject
{
  private readonly BehaviorSubject<T> bs;
  public MyObject(BehvaiorSubject<T> externalSource) 
  {
    this.bs = externalSource
  }

  public void DoSomething()
  {
    DoSomethingWith(bs.Value);
  }
}

But using subjects directly seems to be frowned upon. But at least in this case I have the ability to use a readonly field to store the behaviorsubject.

The BehaviorSubject (or ReplaySubject) does seem like it was made for this purpose, but is there some other better way here? And if I should use the subject, would it make more sense to take the subject as an injected parameter, or take the original observable and build the subject locally in the constructor?

(by the way I'm aware about the need to deal with the 1st value if the source observable hasn't fired yet. Don't get hung up on that, that's not what I'm asking about)

Was it helpful?

Solution

I'd go with a generic solution utilizing the ReactiveUI library. RUI has a standard way of mapping IObservable<T> to an INotifyPropertyChanged stateful property.

public class ObservableToINPCObject<T> : ReactiveObject, IDisposable
{
    ObservableAsPropertyHelper<T> _ValueHelper;
    public T Value {
        get { return _ValueHelper.Value; }
    }

    public ObservableToINPCObject(IObservable<T> source, T initial = default(T))
    {
        _ValueHelper = source.ToProperty(this, p=>p.Value, initial);
    }

    public Dispose(){
        _ValueHelper.Dispose();
    }
}

ValueHelper is contains both the current state of the observable and automatically triggers the correct INPC notification when the state changes. That's quite a bit of boiler plate handled for you.

and an extension method

public static class ObservableToINPCObject {
    public static ObservableToINPCObject<T> ToINPC<T>
        ( this IObservable<T> source, T init = default(T) )
        {
            return new ObservableToINPCObject(source, init);
        }
}

now given an

IObservable<int> observable;

you can do

var obj = observable.ToINPC(10);

and to get the latest value

Console.WriteLine(obj.Value);

also given that Value is an INPC supporting property you can use it in databinding. I use ToProperty all the time for exposing my observables as properties for WPF databinding.

OTHER TIPS

To be Rx-ish I'd suggest avoiding the second option and go with your first, but modified in one of two ways.

Either (1) make your class disposable so that you can cleanly close off the subscription to the observables or (2) make a method that lets you clean up individual observables.

(1)

public class MyObject : IDisposable
{
    private T CurrentT;
    private IDisposable Subscription;
    public MyObject(IObservable<T> externalSource) 
    {
        Subscription = externalSource
            .Subscribe((t) => { CurrentT = t; });
    }

    public void Dispose()
    {
        Subscription.Dispose();
    }

    public void DoSomething()
    {
        DoSomethingWith(CurrentT);
    }
}

(2)

public class MyObject
{
    private T CurrentT;

    public IDisposable Observe(IObservable<T> externalSource) 
    {
        return externalSource
            .Subscribe((t) => { CurrentT = t; });
    }

    public void DoSomething()
    {
        DoSomethingWith(CurrentT);
    }
}

Both allow proper clean-up and both don't use a subject.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top