Come unire due osservabili in modo che il risultato sia completato quando si completa uno degli osservabili?

StackOverflow https://stackoverflow.com/questions/4887955

  •  28-10-2019
  •  | 
  •  

Domanda

Ho questo codice:

var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();
var ss = s1.Merge(s2).Finally(() => Console.WriteLine("Finished!"));

ss.Subscribe(_ => Console.WriteLine("Next"));

s1.OnNext(new Unit());
s2.OnNext(new Unit());
s1.OnCompleted(); // I wish ss finished here.
s2.OnCompleted(); // Yet it does so here. =(

Ho risolto il mio problema usando ONError (New OperationCanceleDexception ()), ma vorrei una soluzione migliore (ci deve essere un combinator giusto?).

È stato utile?

Soluzione

O questo, che è anche abbastanza pulito:

public static class Ext
{
    public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
    {
        return Observable.CreateWithDisposable<T>(obs =>
        {
            var compositeDisposable = new CompositeDisposable();
            var subject = new Subject<T>();

            compositeDisposable.Add(subject.Subscribe(obs));
            compositeDisposable.Add(source.Subscribe(subject));
            compositeDisposable.Add(right.Subscribe(subject));


            return compositeDisposable;

        });     
    }
}

Questo utilizza un argomento che si assicurerà che solo uno oncompleted venga spinto all'osservatore nella createwithdisposible ();

Altri suggerimenti

Invece di riscrivere unisciti per finire quando uno stream completa, suggerirei di convertire gli eventi on completati in eventi OnNext e utilizzare var ss = s1.Merge(s2).TakeUntil(s1ors2complete) dove S1ors2Complete produce un valore quando termina S1 o S2. Potresti anche solo catena .TakeUntil(s1completes).TakeUntil(s2completes) Invece di creare S1ors2Complete. Questo approccio fornisce una composizione migliore di un'estensione di unione di fusione in quanto può essere utilizzata per modificare qualsiasi operatore "completato quando entrambi completi" in un operatore "completato quando si completa".

Per quanto riguarda come convertire gli eventi OnNext in eventi on completati, ci sono alcuni modi per farlo. Il metodo compositedisposable suona come un buon approccio e un po 'di ricerca trova questo thread interessante su Convertire tra le notifiche Onnext, Onerror e OnCompleted. Probabilmente creerei un metodo di estensione chiamato returnTrueonCompleted usando xs.SkipWhile(_ => true).concat(Observable.Return(True)) E la tua unione diventa allora:

var s1ors2complete = s1.ReturnTrueOnCompleted().Amb(s2.ReturnTrueOnCompleted());
var ss = s1.Merge(s2).TakeUntil(s1ors2complete).Finally(() => Console.WriteLine("Finished!"));

Potresti anche guardare usando un operatore come Zip che completa automaticamente Quando si completa uno dei flussi di input.

Supponendo che tu non abbia bisogno dell'output di nessuno dei flussi, puoi usare Amb combinato con un po 'di magia da Materialize:

var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();

var ss = Observable.Amb(
        s1.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted), 
        s2.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted)
    )
    .Finally(() => Console.WriteLine("Finished!"));

ss.Subscribe(_ => Console.WriteLine("Next"));

s1.OnNext(new Unit());
s2.OnNext(new Unit());

s1.OnCompleted(); // ss will finish here and s2 will be unsubscribed from

Se hai bisogno dei valori, puoi usare Do sui due soggetti.

Prova questo:

public static class Ext
{
    public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
    {
        var completed = Observable.Throw<T>(new StreamCompletedException());

        return 
            source.Concat(completed)
            .Merge(right.Concat(completed))
            .Catch((StreamCompletedException ex) => Observable.Empty<T>());

    }

    private sealed class StreamCompletedException : Exception
    {
    }
}

Ciò che fa è concatenare un Iobservable che lancerà un'eccezione quando la fonte o la fonte giusta si completano. Possiamo quindi utilizzare il metodo di estensione Catch per restituire un osservabile vuoto per completare automaticamente il flusso al termine.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top