admin管理员组

文章数量:1306896

I have two merged observables with a scan after the merge. The first one is a simple range and the other is a Subject. Whenever the Subject emits a new value with onNext I concatenate that value in the scan and return the new array as the accumulator. If I dispose of my subscription, and then subscribe again it replays the values from the range but I have lost the ones from the Subject. In the code below I want my second subscription to have a final value of [1, 2, 3, 4, 5]

What would be the best way to do this? Right now I have another Subject where I store that final value and subscribe to that, but it feels wrong.

Here's a simple version that demonstrates what is happening:

var Rx = require('rx');

var source = Rx.Observable.range(1, 3);

var adder = new Rx.Subject();

var merged = source.merge(adder)
                    .scan([], function(accum, x) {
                        return accum.concat(x);
                    });

var subscription1 = merged.subscribe(function(x) {console.log(x)});
adder.onNext(4);
adder.onNext(5);

subscription1.dispose();

console.log('After Disposal');

var subscription2 = merged.subscribe(function(x) {console.log(x)});

This outputs:

[ 1 ]
[ 1, 2 ]
[ 1, 2, 3 ]
[ 1, 2, 3, 4 ]
[ 1, 2, 3, 4, 5 ]
After Disposal
[ 1 ]
[ 1, 2 ]
[ 1, 2, 3 ]

I have two merged observables with a scan after the merge. The first one is a simple range and the other is a Subject. Whenever the Subject emits a new value with onNext I concatenate that value in the scan and return the new array as the accumulator. If I dispose of my subscription, and then subscribe again it replays the values from the range but I have lost the ones from the Subject. In the code below I want my second subscription to have a final value of [1, 2, 3, 4, 5]

What would be the best way to do this? Right now I have another Subject where I store that final value and subscribe to that, but it feels wrong.

Here's a simple version that demonstrates what is happening:

var Rx = require('rx');

var source = Rx.Observable.range(1, 3);

var adder = new Rx.Subject();

var merged = source.merge(adder)
                    .scan([], function(accum, x) {
                        return accum.concat(x);
                    });

var subscription1 = merged.subscribe(function(x) {console.log(x)});
adder.onNext(4);
adder.onNext(5);

subscription1.dispose();

console.log('After Disposal');

var subscription2 = merged.subscribe(function(x) {console.log(x)});

This outputs:

[ 1 ]
[ 1, 2 ]
[ 1, 2, 3 ]
[ 1, 2, 3, 4 ]
[ 1, 2, 3, 4, 5 ]
After Disposal
[ 1 ]
[ 1, 2 ]
[ 1, 2, 3 ]
Share Improve this question asked May 31, 2015 at 23:12 gabogabo 1631 silver badge8 bronze badges 3
  • Could you be a little more descriptive about what you are looking to do? The pattern you describe will be very brittle in the wild, there is likely a better way to structure it. – paulpdaniels Commented Jun 1, 2015 at 1:37
  • I'm looking for a stream to contain my models. For example, the todo list in a todo app. I could have my main route/view that displays all todo's and then navigate away from it so I would dispose my subscription when that view unloads but when I get back to that route I will be subscribing again. I agree that it seems brittle but that's where I'm stuck. – gabo Commented Jun 1, 2015 at 3:12
  • Note: For anyone trying to use this scan sample please note the order of parameters has changed in more recent RxJS and the [] (seed value) is now the last parameter pipe(scan((acc, x) => ([...acc, x]), [])) – Simon_Weaver Commented Aug 22, 2018 at 0:51
Add a ment  | 

2 Answers 2

Reset to default 8

A Subject is a hot Observable, that's why the second subscription won't see events ing from the Subject. The range Observable is cold, so each "execution instance" is entirely owned by each subscription. On the other hand, a Subject's "execution instance" is singleton and independent, hence the second subscription doesn't see the events.

There are a couple of ways of solving this.

  1. Use a ReplaySubject. You would have to specify the buffer size. If you don't have a good limit for that buffer, using an unlimited buffer might cause memory problems.
  2. Avoid using Subject. In other words, avoid using a hot Observable, replace it with a cold Observable, and according to the problem description I gave in the beginning, your subscriptions wouldn't have the problem and they would see the same events. Normally a Subject can be replaced by an Observable, but it depends on your overall architecture. Might need to rewrite a lot. And in the worst case, such as a circular dependency of Observables, you cannot avoid using a Subject.
  3. Rearrange code such that subscriptions start before the Subject starts emitting, so all subscriptions get a chance to see "live" emissions from the hot Observables.

However, if my interpretation of this problem is correct, you only need the last event emitted by merged, so you could use a variant of alternative (1) where you replay only the last event. That would be a matter of adding .shareReplay(1) to merged, which will make it a hot replayed Observable.

From your description it sounds like you are looking for either BehaviorSubject or ReplaySubject (or their corresponding operators, publishValue() and replay()), rather than scan.

You could use the following as a way to hook up your state each time you revisit a page, (since you mentioned it I will use a todo app example):

var todos = buttonClicked
  .map(function(e) { return newTodoBox.text(); })
   //This will preserve the state of the todo list across subscriptions
  .replay();

var todoSubscription;

pageActivated.subscribe(function() {
  todoSubscription = new Rx.CompositeDisposable(
                          //Add the items to the list
                          todos.subscribe(addItem),
                          //This enables the replay to actually start
                          //receiving values
                          todos.connect());
  /*Do other activation work*/
});

pageDeactivated.subscribe(function() {
   todoSubscription && todoSubscription.dispose();
   /*Do other clean up work*/
});

And here is a working code sample I think mirrors what you are looking for, notice in this case I don't need scan since the list gets fully rehydrated by the replay() further up the chain. You'll notice that if you add some todo items, then disconnect and connect again that the items will reappear.

var buttonClicked = Rx.Observable.fromEvent($('#add'), 'click');

var disconnect = Rx.Observable.fromEvent($('#disconnect'), 'click');

var connect = Rx.Observable.fromEvent($('#connect'), 'click');

var newTodoBox = $('#todos')[0];

var mylist = $('#mylist');

function addItem(e) {
  mylist.append('<li>' + e + '</li>');
}

var todos = buttonClicked
  .map(function(e) {
    return newTodoBox.value;
  })
  .replay();

var todoSubscription;

disconnect.subscribe(function() {
  mylist.empty();
  todoSubscription && todoSubscription.dispose();
});

connect.startWith(true).subscribe(function() {
  todoSubscription = new Rx.CompositeDisposable(
    todos.subscribe(addItem),
    todos.connect());
});
<head>
  <script src="https://ajax.googleapis./ajax/libs/jquery/2.1.1/jquery.min.js"></script>
  <script src="https://cdnjs.cloudflare./ajax/libs/rxjs/2.5.2/rx.all.js"></script>
</head>

<body>
  <input id="todos" />
  <button id="add">Add Todo</button>
  <div>
    <ul id='mylist'></ul>
  </div>
  <div>
    <button id='disconnect'>Disconnect</button>
    <button id='connect'>Connect</button>
  </div>
</body>

本文标签: javascriptHow to store accumulated result of a scan with rxjsStack Overflow