admin管理员组文章数量:1410731
I have two observables and I want listen to the one that emits its first value last, is there an operator for this ? Something like that :
let obs1 = Rx.Observable.timer(500,500);
let obs2 = Rx.Observable.timer(1000,1000); // I want the values from this one
let sloth = Rx.Observable.sloth(obs1,obs2);
where the sloth
observable would emit the values from obs2
as it is the one who emits its first value last.
If that's not the case, is there any other way ?
I have two observables and I want listen to the one that emits its first value last, is there an operator for this ? Something like that :
let obs1 = Rx.Observable.timer(500,500);
let obs2 = Rx.Observable.timer(1000,1000); // I want the values from this one
let sloth = Rx.Observable.sloth(obs1,obs2);
where the sloth
observable would emit the values from obs2
as it is the one who emits its first value last.
If that's not the case, is there any other way ?
Share Improve this question edited Apr 24, 2017 at 15:36 n00dl3 asked Apr 24, 2017 at 15:14 n00dl3n00dl3 21.6k8 gold badges68 silver badges77 bronze badges5 Answers
Reset to default 2I see this possibility, for now, but I'm curious if someone find anything else :
let obs1 = Rx.Observable.timer(500,500).map(i=>`cheetah ${i}`);
let obs2 = Rx.Observable.timer(1000,1000).map(i=>`sloth ${i}`);
let sloth = Rx.Observable.merge(
obs1.take(1).mapTo(obs1),
obs2.take(1).mapTo(obs2)
).takeLast(1).mergeAll()
sloth.subscribe(data=>console.log(data))
<script src="https://unpkg./@reactivex/[email protected]/dist/global/Rx.js"></script>
Edit as pointed out by @user3743222 (very nice nickname :-D ), it would not work for hot observables, but this should be fine :
let obs1 = Rx.Observable.timer(500,500).map(i=>`cheetah ${i}`).publish();
let obs2 = Rx.Observable.timer(1000,1000).map(i=>`sloth ${i}`).publish();
obs1.connect();
obs2.connect();
let sloth = Rx.Observable.merge(
obs1.take(1).map((val)=>obs1.startWith(val)),
obs2.take(1).map((val)=>obs2.startWith(val))
).takeLast(1).mergeAll();
sloth.subscribe(data=>console.log(data));
<script src="https://unpkg./@reactivex/[email protected]/dist/global/Rx.js"></script>
I like your solution (though I suspect you might never see the first emitted value if you have a hot stream - if the source is cold, all seems good). Can you make a jsfiddle
to check that out? If you dont miss any value, your solution is the best. If you do, it might be possible to correct it by adding the skipped value back to the source (obs1.take(1).map(val => obs1.startWith(val))
.
Otherwise, for a generic lengthy solution, the key here is that you have state, so you need also the scan
operator. We tag the source with an index, and we keep a state which represents the indices of the sources which already have started. When all but one have started, we know the index of the one who hasnt, and we pick only the values from that one. Please note, that this should work independently of whether the sources are hot or cold as all is made in one pass, i,e, there is no multiple subscriptions.
Rx.Observable.merge(
obs1.map(val => {val, sourceId: 1})
obs2.map(val => {val, sourceId: 2})
obsn.map(val => {val, sourceId: n})
).scan(
(acc, valueStruct) => {
acc.valueStruct = valueStruct
acc.alreadyEmitted[valueStruct.sourceId - 1] = true
if (acc.alreadyEmitted.filter(Boolean).length === n - 1) {
acc.lastSourceId = 1 + acc.alreadyEmitted.findIndex(element => element === false)
}
return acc
}, {alreadyEmitted : new Array(n).fill(false), lastSourceId : 0, valueStruct: null}
)
.map (acc => acc.valueStruct.sourceId === acc.lastSourceId ? acc.valueStruct.val : null )
.filter(Boolean)
Maybe there is shorter, I dont know. I'll try to put that in a fiddle to see if it actually works, or if you do before let me know.
How about this?
let obs1 = Rx.Observable.timer(500,500);
let obs2 = Rx.Observable.timer(1000,1000);
let sloth = Rx.Observable.race(
obs1.take(1).concat(obs2),
obs2.take(1).concat(obs1)
).skip(1);
And as a function with multiple Observables support:
let sloth = (...observables) =>
observables.length === 1 ?
observables[0] :
observables.length === 2 ?
Rx.Observable.race(
observables[0].take(1).concat(observables[1]),
observables[1].take(1).concat(observables[0])
).skip(1) :
observables.reduce((prev, current) => sloth(prev, current))[0];
I had the same issue and was able to solve it using a bination of merge
and skipUntil
. The pipe(last())
stops you receiving multiple results if both plete at the same time.
Try pasting the following into https://rxviz./:
const { timer, merge } = Rx;
const { mapTo, skipUntil, last } = RxOperators;
let obs1 = timer(500).pipe(mapTo('1'));
let obs2 = timer(1000).pipe(mapTo('2')); // I want the values from this one
let sloth = merge(
obs1.pipe(skipUntil(obs2)),
obs2.pipe(skipUntil(obs1))
).pipe(last())
sloth
Using RxJS 6 and ReplaySubject:
function lastOf(...observables) {
const replayable = observables
.map(o => {
let r = o.pipe(multicast(new ReplaySubject(1)));
r.connect();
return r;
});
const racing = replayable
.map((v, i) => v.pipe(
take(1),
mapTo(i),
))
;
return of(...racing).pipe(
mergeAll(),
reduce((_, val) => val),
switchMap(i => replayable[i]),
);
}
Use:
const fast = interval(500);
const medium = interval(1000);
const slow = interval(2000);
lastOf(fast, slow, medium).subscribe(console.log);
本文标签: javascriptIsthere an opposite of the race operator in RxJSStack Overflow
版权声明:本文标题:javascript - Is-there an opposite of the `race` operator in RxJS? - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1745031885a2638549.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论