Rx BehaviorSubject + Scan Pushing Prior Event To New Subscriber?
Solution 1:
Can you try to see if everything is conform to your expectations if you replace
export let downstream = upstream.scan((state, reducer) => {
return reducer(state);
}, initialState);
by
export let downstream = upstream.scan((state, reducer) => {
return reducer(state);
}, initialState).shareReplay(1);
jsfiddle here : http://jsfiddle.net/cqaumutp/
If so, you are another victim of the hot vs. cold nature of Rx.Observable
, or maybe more accurately the lazy instantiation of observables.
In short (not so short), what happens everytime you do a subscribe
, is that a chain of observables is created by going upstream the chain of operators. Each operator subscribes to its source, and returns another observable up to the starting source. In your case, when you subscribe to scan
, scan
subscribes to upstream
which is the last one. upstream
being a subject, on subscription it just registers the subscriber. Other sources would do other things (like register a listener on a DOM node, or a socket, or whatever).
The point here is that every time you subscribe to the scan
, you start anew, i.e. with the initialState
. If you want to use the values from the first subscription to the scan
, you have to use the share
operator. On the first subscription to the share
, it will pass your subscription request on to the scan
. On the second and subsequent ones, it will not, it will register it, and forward to the associated observer all the values coming from the scan
firstly subscribed to.
Solution 2:
I have a solution that seems to be giving me the results I'm looking for. I would appreciate it if others could verify that it's an appropriate solution.
import Rx from 'rx';
import { Map } from 'immutable';
let initialState = Map({ counter: 0 });
export let upstream = new Rx.Subject();
let downstreamSource = upstream.scan((state, reducer) => {
return reducer(state);
}, initialState);
export let downstream = new Rx.BehaviorSubject(initialState);
downstreamSource.subscribe(downstream);
let increment = state => {
return state.update('counter', counter => counter + 1);
};
upstream.onNext(increment);
downstream.subscribe(state => {
console.log('subscriptionA', state.get('counter'));
});
upstream.onNext(increment);
setTimeout(() => {
downstream.subscribe(state => {
console.log('subscriptionB', state.get('counter'));
});
}, 3000);
Post a Comment for "Rx BehaviorSubject + Scan Pushing Prior Event To New Subscriber?"