سؤال

I need to proxy all the different event streams through one subject.

I came up with this code:

var mySubject,
    getObservable;

getObservable = function (subject, eventName) {
    return subject
        .asObservable()
        .filter(function (x) {
            return x.EventName === eventName;
        })
        .flatMap(function (x) {
            if (x.Type === 'onNext') {
                return Rx.Observable.return(x.Data);
            }

            if (x.Type === 'onError') {
                return Rx.Observable.throw(x.Data);
            }

            return Rx.Observable.empty();
        });
};

mySubject = new Rx.Subject();

getObservable(mySubject, 'foo')
    .subscribe(function(x){ 
        console.log('foo onNext ' + x); 
    }, function(x){ 
        console.log('foo onError ' + x); 
    }, function(){ 
        console.log('foo onComplete');
    });

getObservable(mySubject, 'bar')
    .subscribe(function(x){ 
        console.log('bar onNext ' + x); 
    }, function(x){ 
        console.log('bar onError ' + x); 
    }, function(){ 
        console.log('bar onComplete');
    });

mySubject.onNext({Type: 'onNext', EventName: 'foo', Data: 5});
mySubject.onNext({Type: 'onCompleted', EventName: 'foo'});

mySubject.onNext({Type: 'onNext', EventName: 'bar', Data: 5});
mySubject.onNext({Type: 'onError', EventName: 'bar', Data: 'Error message'});

Got output:

foo onNext 5

bar onNext 5
bar onError Error message

Expected output:

foo onNext 5  
foo onCompleted

bar onNext 5
bar onError Error message

For the bar event, that works like a charm: onNext will be propagated and as soon as the error raises the onError function gets called and the event stream finishes. However, I can't get it to work for the onComplete.

Whenever a complete notification raises I do see that Rx.Observable.empty() gets called but that doesn't cause the subscribers onComplete handler to be called. Instead, its calling its onNext handler.

هل كانت مفيدة؟

المحلول

The getObservable function returns an observable that subscribes to a eventName event sent through a subject.

let getObservable = function (subject, eventName) {
    return Rx.Observable.create(function (observer) {
        subject
            .asObservable()
            .filter(function(x) {
                return x.EventName === eventName;
            })
            .map(function(x) {
                if (x.Type === 'onNext') {
                    observer.onNext(x.Data);
                }

                if (x.Type === 'onError') {
                    observer.onError(x.Data);
                }

                if (x.Type === 'onCompleted') {
                    observer.onCompleted();
                }

                return x;
            })
            .subscribe();
    });
};

This is a working example using data from the original question:

var mySubject,
    getObservable;

getObservable = function (subject, eventName) {
    return Rx.Observable.create(function (observer) {
        subject
            .asObservable()
            .filter(function(x) {
                return x.EventName === eventName;
            })
            .map(function(x) {
                if (x.Type === 'onNext') {
                    observer.onNext(x.Data);
                }

                if (x.Type === 'onError') {
                    observer.onError(x.Data);
                }
                
                if (x.Type === 'onCompleted') {
                    observer.onCompleted();
                }
                
                return x;
            })
            .subscribe();
    });
};

mySubject = new Rx.Subject();

getObservable(mySubject, 'foo')
    .subscribe(function(x){ 
        console.log('SomethingHappened onNext ' + x); 
    }, function(x){ 
        console.log('SomethingHappened onError ' + x); 
    }, function(){ 
        console.log('SomethingHappened onComplete');
    });


getObservable(mySubject, 'bar')
    .subscribe(function(x){ 
        console.log('DataUpdated onNext ' + x); 
    }, function(x){ 
        console.log('DataUpdated onError ' + x); 
    }, function(){ 
        console.log('DataUpdated onComplete');
    });

mySubject.onNext({Type: 'onNext', EventName: 'foo', Data: 5});
mySubject.onNext({Type: 'onCompleted', EventName: 'foo'});

mySubject.onNext({Type: 'onNext', EventName: 'bar', Data: 5});
mySubject.onNext({Type: 'onError', EventName: 'bar', Data: 'Error message'});
<script src='https://rawgit.com/Reactive-Extensions/RxJS/master/dist/rx.all.js'></script>

نصائح أخرى

in .NET Observable.SelectMany uses Observable.Merge to merge streams together to one composite observable. IMHO Observable.Merge completes only when any of the merged observables are completed.

e.g. http://theburningmonk.com/2010/02/rx-framework-iobservable-merge/

Thats could be the reason for issue.

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top