admin管理员组

文章数量:1278790

Say I wanted to separately handle the first event emitted by an Observable, but then continue my subscription.

For example, if I had the following chain:

observable.throttleTime(2000)
      .takeFirst(() => console.log('potato'))
      .switchMap((event:MouseEvent) => {
        const element:Element = event.target as Element;
        return this.innerObservable(('tomato'));
      })
      .subscribe(({result}) => console.log(result))

That fails, but how would I go about making such behaviour work?

Say I wanted to separately handle the first event emitted by an Observable, but then continue my subscription.

For example, if I had the following chain:

observable.throttleTime(2000)
      .takeFirst(() => console.log('potato'))
      .switchMap((event:MouseEvent) => {
        const element:Element = event.target as Element;
        return this.innerObservable(('tomato'));
      })
      .subscribe(({result}) => console.log(result))

That fails, but how would I go about making such behaviour work?

Share Improve this question asked May 26, 2017 at 21:05 Abraham PAbraham P 15.5k13 gold badges63 silver badges130 bronze badges 1
  • It's not clear what you want to do and RxJS does not have a takeFirst operator. If you are using switchMap, its projection function is passed an index parameter. I'd suggest you use that to identify the first emitted value. – cartant Commented May 26, 2017 at 22:03
Add a ment  | 

5 Answers 5

Reset to default 8

Many RxJS 5 operators that take a callback function pass along also the index of the item going through. This means you could write for example the following:

observable.throttleTime(2000)
    .map((value, index) => index === 0 ? 'potato' : value)
    ...
    .subscribe(({result}) => console.log(result))

Indices are passed for example to filter(), switchMap(), takeWhile() operators and more.

You can use two subscribers, one for the default case to be applied to all events and the other for the special case of the first event:

import { of } from 'rxjs';
import { first } from 'rxjs/operators';

//emit 4 values
const source = of(1, 2, 3, 4);

/*
Subscriber One - only first value: 1
Subscriber Two - all values: 1
Subscriber Two - all values: 2
Subscriber Two - all values: 3
Subscriber Two - all values: 4
*/
const subscriberOne = source
  .pipe(first())
  .subscribe((val) => console.log(`Subscriber One - only first value: ${val}`));
const subscriberTwo = source.subscribe((val) =>
  console.log(`Subscriber Two - all values: ${val}`)
);

see it live on stackblitz

And to answer your question, where there is a special handler for the first event and then a mon handler for the rest of the event, excluding the first one:

import { of, skip } from 'rxjs';
import { first, publish } from 'rxjs/operators';

// emit 4 values
const source = of(1, 2, 3, 4);
// hold until `others` until `connect()` is called, skip `source` first event
const others = publish()(source.pipe(skip(1)));
// marker to make sure first event handler ran before the others
let onceRan = false;

// general case, has to be declared before so `others.subscribe()`
// runs before `others.connect()` is called
const subscriberTwo = others.subscribe((val) =>
  console.log(`Subscriber Two - all values: ${val}, onceRan: ${onceRan}`)
);

// first event handler, `first()` => stops after the first event
const subscriberOne = source.pipe(first()).subscribe((val) => {
  console.log(`Subscriber One - only first value: ${val}, onceRan: ${onceRan}`);
  // toggle the merker
  onceRan = true;
  // release `others`
  others.connect();
});

/*
Output:
Subscriber One - only first value: 1, onceRan: false
Subscriber Two - all values: 2, onceRan: true
Subscriber Two - all values: 3, onceRan: true
Subscriber Two - all values: 4, onceRan: true
*/

see it live on stackblitz

Simple solution to treat the first separately would be

let isFirstCame= false;

 observable
      .subscribe(({result}) =>
         if(!isFirst){
            //here is our first one
            doSpecialWithTheFirst(result);
            isFirstCame=true;
         }else{
           //All the remaining
           doOtherWithRemaining();
         })

Use publish along with first and concat operators. Here is an example:

Rx.Observable.interval(1000)
 .publish(shared => {
    return  shared.first().mapTo('FIRST')
       .concat(shared.skip(1));
})
.subscribe(x=>console.log(x))

You can create an operator that subscribes to the source observable, processes the first element and then removes itself from the chain. Let's call it onFirst because takeFirst is an existing operator in Java. Rxjs 5 example:

Observable.prototype.onFirst = function(callback) {
  return Observable.create(observer => {
    let source = this.publish()
    let subs = source.subscribe({
      next(v) {
        subs.unsubscribe()
        source.subscribe(observer)
        callback(v)
      },
      error(e) { observer.error(e) },
      plete() { observer.plete() }
    })
    source.connect()
  })
}

// This prints
// first: 1
// 2
// 3
Observable.of(1, 2, 3)
  .onFirst(v => console.log("first: " + v))
  .subscribe(console.log)

本文标签: javascriptRxJS Treat first case speciallybut continue streaming the restStack Overflow