Skip to content Skip to sidebar Skip to footer

Rx BehaviorSubject + Scan Pushing Prior Event To New Subscriber?

I want to have an stream to which I can push reducer functions. Each time a reducer function is pushed, a state object should be passed to the reducer, the reducer should return a

Solution 1:

Can you try to see if everything is conform to your expectations if you replace

export let downstream = upstream.scan((state, reducer) => {
  return reducer(state);
}, initialState);

by

export let downstream = upstream.scan((state, reducer) => {
  return reducer(state);
}, initialState).shareReplay(1);

jsfiddle here : http://jsfiddle.net/cqaumutp/

If so, you are another victim of the hot vs. cold nature of Rx.Observable, or maybe more accurately the lazy instantiation of observables.

In short (not so short), what happens everytime you do a subscribe, is that a chain of observables is created by going upstream the chain of operators. Each operator subscribes to its source, and returns another observable up to the starting source. In your case, when you subscribe to scan, scan subscribes to upstream which is the last one. upstream being a subject, on subscription it just registers the subscriber. Other sources would do other things (like register a listener on a DOM node, or a socket, or whatever).

The point here is that every time you subscribe to the scan, you start anew, i.e. with the initialState. If you want to use the values from the first subscription to the scan, you have to use the share operator. On the first subscription to the share, it will pass your subscription request on to the scan. On the second and subsequent ones, it will not, it will register it, and forward to the associated observer all the values coming from the scan firstly subscribed to.


Solution 2:

I have a solution that seems to be giving me the results I'm looking for. I would appreciate it if others could verify that it's an appropriate solution.

import Rx from 'rx';
import { Map } from 'immutable';

let initialState = Map({ counter: 0 });

export let upstream = new Rx.Subject();

let downstreamSource = upstream.scan((state, reducer) => {
  return reducer(state);
}, initialState);

export let downstream = new Rx.BehaviorSubject(initialState);
downstreamSource.subscribe(downstream);

let increment = state => {
  return state.update('counter', counter => counter + 1);
};

upstream.onNext(increment);

downstream.subscribe(state => {
  console.log('subscriptionA', state.get('counter'));
});

upstream.onNext(increment);

setTimeout(() => {
  downstream.subscribe(state => {
    console.log('subscriptionB', state.get('counter'));
  });
}, 3000);

Post a Comment for "Rx BehaviorSubject + Scan Pushing Prior Event To New Subscriber?"