admin管理员组文章数量:1278790
Say I wanted to separately handle the first event emitted by an Observable, but then continue my subscription.
For example, if I had the following chain:
observable.throttleTime(2000)
.takeFirst(() => console.log('potato'))
.switchMap((event:MouseEvent) => {
const element:Element = event.target as Element;
return this.innerObservable(('tomato'));
})
.subscribe(({result}) => console.log(result))
That fails, but how would I go about making such behaviour work?
Say I wanted to separately handle the first event emitted by an Observable, but then continue my subscription.
For example, if I had the following chain:
observable.throttleTime(2000)
.takeFirst(() => console.log('potato'))
.switchMap((event:MouseEvent) => {
const element:Element = event.target as Element;
return this.innerObservable(('tomato'));
})
.subscribe(({result}) => console.log(result))
That fails, but how would I go about making such behaviour work?
Share Improve this question asked May 26, 2017 at 21:05 Abraham PAbraham P 15.5k13 gold badges63 silver badges130 bronze badges 1-
It's not clear what you want to do and RxJS does not have a
takeFirst
operator. If you are usingswitchMap
, its projection function is passed anindex
parameter. I'd suggest you use that to identify the first emitted value. – cartant Commented May 26, 2017 at 22:03
5 Answers
Reset to default 8Many RxJS 5 operators that take a callback function pass along also the index of the item going through. This means you could write for example the following:
observable.throttleTime(2000)
.map((value, index) => index === 0 ? 'potato' : value)
...
.subscribe(({result}) => console.log(result))
Indices are passed for example to filter()
, switchMap()
, takeWhile()
operators and more.
You can use two subscribers, one for the default case to be applied to all events and the other for the special case of the first event:
import { of } from 'rxjs';
import { first } from 'rxjs/operators';
//emit 4 values
const source = of(1, 2, 3, 4);
/*
Subscriber One - only first value: 1
Subscriber Two - all values: 1
Subscriber Two - all values: 2
Subscriber Two - all values: 3
Subscriber Two - all values: 4
*/
const subscriberOne = source
.pipe(first())
.subscribe((val) => console.log(`Subscriber One - only first value: ${val}`));
const subscriberTwo = source.subscribe((val) =>
console.log(`Subscriber Two - all values: ${val}`)
);
see it live on stackblitz
And to answer your question, where there is a special handler for the first event and then a mon handler for the rest of the event, excluding the first one:
import { of, skip } from 'rxjs';
import { first, publish } from 'rxjs/operators';
// emit 4 values
const source = of(1, 2, 3, 4);
// hold until `others` until `connect()` is called, skip `source` first event
const others = publish()(source.pipe(skip(1)));
// marker to make sure first event handler ran before the others
let onceRan = false;
// general case, has to be declared before so `others.subscribe()`
// runs before `others.connect()` is called
const subscriberTwo = others.subscribe((val) =>
console.log(`Subscriber Two - all values: ${val}, onceRan: ${onceRan}`)
);
// first event handler, `first()` => stops after the first event
const subscriberOne = source.pipe(first()).subscribe((val) => {
console.log(`Subscriber One - only first value: ${val}, onceRan: ${onceRan}`);
// toggle the merker
onceRan = true;
// release `others`
others.connect();
});
/*
Output:
Subscriber One - only first value: 1, onceRan: false
Subscriber Two - all values: 2, onceRan: true
Subscriber Two - all values: 3, onceRan: true
Subscriber Two - all values: 4, onceRan: true
*/
see it live on stackblitz
Simple solution to treat the first separately would be
let isFirstCame= false;
observable
.subscribe(({result}) =>
if(!isFirst){
//here is our first one
doSpecialWithTheFirst(result);
isFirstCame=true;
}else{
//All the remaining
doOtherWithRemaining();
})
Use publish along with first and concat operators. Here is an example:
Rx.Observable.interval(1000)
.publish(shared => {
return shared.first().mapTo('FIRST')
.concat(shared.skip(1));
})
.subscribe(x=>console.log(x))
You can create an operator that subscribes to the source observable, processes the first element and then removes itself from the chain. Let's call it onFirst
because takeFirst
is an existing operator in Java. Rxjs 5 example:
Observable.prototype.onFirst = function(callback) {
return Observable.create(observer => {
let source = this.publish()
let subs = source.subscribe({
next(v) {
subs.unsubscribe()
source.subscribe(observer)
callback(v)
},
error(e) { observer.error(e) },
plete() { observer.plete() }
})
source.connect()
})
}
// This prints
// first: 1
// 2
// 3
Observable.of(1, 2, 3)
.onFirst(v => console.log("first: " + v))
.subscribe(console.log)
本文标签: javascriptRxJS Treat first case speciallybut continue streaming the restStack Overflow
版权声明:本文标题:javascript - RxJS: Treat first case specially, but continue streaming the rest - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1741220558a2360889.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论