admin管理员组

文章数量:1302407

what I'm trying to achieve is this (with Angular 2/Typescript):

  • Observable A produces stream of events.

  • For each event of Observable A, make 8 different http calls. (8 switchmaps)

  • After all of the 8 requests return, do something (subscribe to zip of 8 switchmaps).

  • Repeat 8 requests for each event of Observable A (taken care of by switchmap and zip)

Code: (full code at )

let source = Observable
.interval(5000)
.take(100);

let requests = [];

for(let i=0; i<8;i++) {
  let request = source.switchMap(x=> http.get('/'+(i+1))).publish();
  request.subscribe(res => console.log(res.json()));
  requests.push(request);
}

Observable.zip(requests)
.subscribe(console.log("All requests pleted"));

requests.forEach(r => r.connect());

The problem is my zip never gets called. I console.log'ged the subscription to each of the 8 switchmaps and I'm getting logs showing eight http calls return successfully each time there is an event in Observable/stream A. (also can see the 8 calls returning in the network tab of the debug tools)

But zip never emits anything.


If I try a different (less ideal) approach:

  • Subscribe to Observable A once (not switchmap)
  • Within subscription create 8 Observables for each http call, and subscribe to ForkJoin of the 8 Observables

Code: (full code at )

let source = Observable
.interval(5000)
.take(100);

 source.subscribe(x=> {
   console.log(x);
   let requests = [];

   for(let i=0; i<8;i++) {
     let request = http.get('/'+(i+1)).publish();
     request.subscribe(res => console.log(res.json()));
     requests.push(request);
   }

   Observable.forkJoin(requests)
   .subscribe(console.log("All requests pleted"));

   requests.forEach(r => r.connect());

 });

This works. But with the obvious pitfall that I'm creating 8+1 nested observables/subscriptions each time Observable A emits.

(In both cases I'm using publish/connect to share/reuse subscriptions, but the problem exists even without it)

what I'm trying to achieve is this (with Angular 2/Typescript):

  • Observable A produces stream of events.

  • For each event of Observable A, make 8 different http calls. (8 switchmaps)

  • After all of the 8 requests return, do something (subscribe to zip of 8 switchmaps).

  • Repeat 8 requests for each event of Observable A (taken care of by switchmap and zip)

Code: (full code at https://plnkr.co/edit/44yqw0RYzC7v1TFACMx1)

let source = Observable
.interval(5000)
.take(100);

let requests = [];

for(let i=0; i<8;i++) {
  let request = source.switchMap(x=> http.get('https://jsonplaceholder.typicode./users/'+(i+1))).publish();
  request.subscribe(res => console.log(res.json()));
  requests.push(request);
}

Observable.zip(requests)
.subscribe(console.log("All requests pleted"));

requests.forEach(r => r.connect());

The problem is my zip never gets called. I console.log'ged the subscription to each of the 8 switchmaps and I'm getting logs showing eight http calls return successfully each time there is an event in Observable/stream A. (also can see the 8 calls returning in the network tab of the debug tools)

But zip never emits anything.


If I try a different (less ideal) approach:

  • Subscribe to Observable A once (not switchmap)
  • Within subscription create 8 Observables for each http call, and subscribe to ForkJoin of the 8 Observables

Code: (full code at https://plnkr.co/edit/GqQde1Ae2licBjtL0jcj)

let source = Observable
.interval(5000)
.take(100);

 source.subscribe(x=> {
   console.log(x);
   let requests = [];

   for(let i=0; i<8;i++) {
     let request = http.get('https://jsonplaceholder.typicode./users/'+(i+1)).publish();
     request.subscribe(res => console.log(res.json()));
     requests.push(request);
   }

   Observable.forkJoin(requests)
   .subscribe(console.log("All requests pleted"));

   requests.forEach(r => r.connect());

 });

This works. But with the obvious pitfall that I'm creating 8+1 nested observables/subscriptions each time Observable A emits.

(In both cases I'm using publish/connect to share/reuse subscriptions, but the problem exists even without it)

Share edited Jul 4, 2017 at 12:04 flak37 asked Jul 4, 2017 at 8:43 flak37flak37 8952 gold badges10 silver badges18 bronze badges 2
  • 1 Show some code. – Robin Dijkhof Commented Jul 4, 2017 at 8:52
  • Added it @RobinDijkhof . Took some time to distil the main logic from my application – flak37 Commented Jul 4, 2017 at 12:04
Add a ment  | 

1 Answer 1

Reset to default 9

You first example would work if you call zip correctly with multiple arguments and pass a function to subscribe (not the result of console.log which is undefined). Demo.

Observable.zip(...requests) // <-- spread this 
    .subscribe(() => console.log("All requests pleted")); // <-- pass a function

requests.forEach(r => r.connect());

本文标签: javascriptRxJS zip not working while forkJoin doesStack Overflow