Comment fusionner deux observables pour que le résultat se termine lorsque l'un des observables se termine?

StackOverflow https://stackoverflow.com/questions/4887955

  •  28-10-2019
  •  | 
  •  

Question

J'ai ce code:

var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();
var ss = s1.Merge(s2).Finally(() => Console.WriteLine("Finished!"));

ss.Subscribe(_ => Console.WriteLine("Next"));

s1.OnNext(new Unit());
s2.OnNext(new Unit());
s1.OnCompleted(); // I wish ss finished here.
s2.OnCompleted(); // Yet it does so here. =(

J'ai résolu mon problème en utilisant OnError (new OperationCanceledException ()), mais j'aimerais une meilleure solution (il doit y avoir un combinateur, non?).

Était-ce utile?

La solution

Ou ceci, qui est également assez chouette:

public static class Ext
{
    public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
    {
        return Observable.CreateWithDisposable<T>(obs =>
        {
            var compositeDisposable = new CompositeDisposable();
            var subject = new Subject<T>();

            compositeDisposable.Add(subject.Subscribe(obs));
            compositeDisposable.Add(source.Subscribe(subject));
            compositeDisposable.Add(right.Subscribe(subject));


            return compositeDisposable;

        });     
    }
}

Cela utilise un sujet qui s'assurera qu'un seul OnCompleted est poussé vers l'observateur dans CreateWithDisposable ();

Autres conseils

Au lieu de réécrire Merge pour terminer lorsque l'un des flux se termine, je suggérerais de convertir les événements onCompleted en événements onNext et d'utiliser var ss = s1.Merge(s2).TakeUntil(s1ors2complete) où s1ors2complete produit une valeur lorsque s1 ou s2 se termine. Vous pouvez également simplement enchaîner .TakeUntil(s1completes).TakeUntil(s2completes) au lieu de créer s1ors2complete. Cette approche offre une meilleure composition qu'une extension MergeWithCompleteOnEither car elle peut être utilisée pour modifier n'importe quel opérateur "terminé quand les deux sont terminés" en un opérateur "terminer quand tout se termine".

En ce qui concerne la conversion des événements onNext en événements onCompleted, il existe plusieurs façons de le faire. La méthode CompositeDisposable semble être une bonne approche, et un peu de recherche permet de trouver ce fil de discussion intéressant sur conversion entre les notifications onNext, onError et onCompleted . Je créerais probablement une méthode d'extension appelée ReturnTrueOnCompleted en utilisant xs.SkipWhile(_ => true).concat(Observable.Return(True)) et votre fusion devient alors:

var s1ors2complete = s1.ReturnTrueOnCompleted().Amb(s2.ReturnTrueOnCompleted());
var ss = s1.Merge(s2).TakeUntil(s1ors2complete).Finally(() => Console.WriteLine("Finished!"));

Vous pouvez également envisager d'utiliser un opérateur tel que Zip qui se termine automatiquement quand l'un des flux d'entrée se termine.

En supposant que vous n'ayez besoin de la sortie d'aucun des flux, vous pouvez utiliser Amb combiné avec un peu de magie de Materialize:

var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();

var ss = Observable.Amb(
        s1.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted), 
        s2.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted)
    )
    .Finally(() => Console.WriteLine("Finished!"));

ss.Subscribe(_ => Console.WriteLine("Next"));

s1.OnNext(new Unit());
s2.OnNext(new Unit());

s1.OnCompleted(); // ss will finish here and s2 will be unsubscribed from

Si vous avez besoin des valeurs, vous pouvez utiliser Do sur les deux sujets.

Essayez ceci:

public static class Ext
{
    public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
    {
        var completed = Observable.Throw<T>(new StreamCompletedException());

        return 
            source.Concat(completed)
            .Merge(right.Concat(completed))
            .Catch((StreamCompletedException ex) => Observable.Empty<T>());

    }

    private sealed class StreamCompletedException : Exception
    {
    }
}

Ce que cela fait, c'est concaténer un IObservable qui lèvera une exception lorsque la source ou la bonne source se terminera.Nous pouvons ensuite utiliser la méthode d'extension Catch pour renvoyer un Observable vide pour terminer automatiquement le flux lorsque l'un ou l'autre se termine.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top