¿Cómo fusionar dos observables para que el resultado se complete cuando se completa cualquiera de los observables?

StackOverflow https://stackoverflow.com/questions/4887955

  •  28-10-2019
  •  | 
  •  

Pregunta

Tengo este código:

var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();
var ss = s1.Merge(s2).Finally(() => Console.WriteLine("Finished!"));

ss.Subscribe(_ => Console.WriteLine("Next"));

s1.OnNext(new Unit());
s2.OnNext(new Unit());
s1.OnCompleted(); // I wish ss finished here.
s2.OnCompleted(); // Yet it does so here. =(

He resuelto mi problema usando OnError (nuevo OperationCanceledException ()), pero me gustaría una mejor solución (tiene que haber un combinador, ¿verdad?).

¿Fue útil?

Solución

O esto, que también es bastante ordenado:

public static class Ext
{
    public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
    {
        return Observable.CreateWithDisposable<T>(obs =>
        {
            var compositeDisposable = new CompositeDisposable();
            var subject = new Subject<T>();

            compositeDisposable.Add(subject.Subscribe(obs));
            compositeDisposable.Add(source.Subscribe(subject));
            compositeDisposable.Add(right.Subscribe(subject));


            return compositeDisposable;

        });     
    }
}

Esto usa un sujeto que se asegurará de que solo un OnCompleted sea empujado al Observador en CreateWithDisPosable ();

Otros consejos

En lugar de reescribir la fusión para terminar cuando se complete cualquiera de la transmisión, sugeriría convertir los eventos de OnCompleted a eventos OnNext y usar var ss = s1.Merge(s2).TakeUntil(s1ors2complete) donde S1ORS2Complete produce un valor cuando termina S1 o S2. También podrías simplemente encadenar .TakeUntil(s1completes).TakeUntil(s2completes) en lugar de crear S1ors2Complete. Este enfoque proporciona una mejor composición que una extensión de MergeWithCompleteMeOne, ya que se puede usar para modificar cualquier operador "completo cuando se complete" en un operador "completo cuando se complete".

En cuanto a cómo convertir los eventos de OnNext en eventos OnCompleted, hay algunas maneras de hacerlo. El métodoisable compuesto suena como un buen enfoque, y un poco de búsqueda encuentra este hilo interesante sobre Convertir entre OnNext, OnError y Notificaciones OnCompleted. Probablemente crearía un método de extensión llamado returnTrueONCompleted usando xs.SkipWhile(_ => true).concat(Observable.Return(True)) Y tu fusión se convierte en:

var s1ors2complete = s1.ReturnTrueOnCompleted().Amb(s2.ReturnTrueOnCompleted());
var ss = s1.Merge(s2).TakeUntil(s1ors2complete).Finally(() => Console.WriteLine("Finished!"));

También puede mirar el uso de un operador como zip que Completa automáticamente Cuando se completa una de las transmisiones de entrada.

Suponiendo que no necesita la salida de ninguna de las transmisiones, puede usar Amb combinado con algo de magia de Materialize:

var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();

var ss = Observable.Amb(
        s1.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted), 
        s2.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted)
    )
    .Finally(() => Console.WriteLine("Finished!"));

ss.Subscribe(_ => Console.WriteLine("Next"));

s1.OnNext(new Unit());
s2.OnNext(new Unit());

s1.OnCompleted(); // ss will finish here and s2 will be unsubscribed from

Si necesita los valores, puede usar Do en los dos sujetos.

Prueba esto:

public static class Ext
{
    public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
    {
        var completed = Observable.Throw<T>(new StreamCompletedException());

        return 
            source.Concat(completed)
            .Merge(right.Concat(completed))
            .Catch((StreamCompletedException ex) => Observable.Empty<T>());

    }

    private sealed class StreamCompletedException : Exception
    {
    }
}

Lo que esto hace es concatenar un IOBServable que lanzará una excepción cuando se complete la fuente o la fuente correcta. Luego podemos usar el método de extensión de captura para devolver un observable vacío para completar automáticamente la transmisión cuando se complete.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top