admin管理员组

文章数量:1291734

Let's imagine i have a function fetchUser which takes as parameter userId and return an observable of user.

As i am calling this method often, i want to batch the ids to perform one request with multiple ids instead !

Here my troubles began...

I can't find a solution to do that without sharing an observable between the different calls of fetchUser.

import { Subject, from } from "rxjs"
import { bufferTime, mergeMap, map, toArray, filter, take, share } from "rxjs/operators"

const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(
    map((userId) => ({ id: userId, name: "George" })),
    toArray(),
)

const userToFetch$ = new Subject<string>()

const fetchedUser$ = userToFetch$.pipe(
    bufferTime(1000),
    mergeMap((userIds) => functionThatSimulateAFetch(userIds)),
    share(),
)

const fetchUser = (userId: string) => {
    const observable = fetchedUser$.pipe(
        map((users) => users.find((user) => user.id === userId)),
        filter((user) => !!user),
        take(1),
    )
    userToFetch$.next(userId)
    return observable
}

But that's ugly and it has multiple troubles:

  • If i unsubscribe from the observable returned by fetchUser before the timer of bufferTime is finished, it doesn't prevent the fetch of the user.
  • If i unsubscribe from all the observables returned by fetchUser before the fetch of the batch is finished, it doesn't cancel the request.
  • Error handling is more plex
  • etc

More generally: i don't know how to solve the problems requiring sharing resources using RxJS. It's difficult to find advanced example of RxJS.

Let's imagine i have a function fetchUser which takes as parameter userId and return an observable of user.

As i am calling this method often, i want to batch the ids to perform one request with multiple ids instead !

Here my troubles began...

I can't find a solution to do that without sharing an observable between the different calls of fetchUser.

import { Subject, from } from "rxjs"
import { bufferTime, mergeMap, map, toArray, filter, take, share } from "rxjs/operators"

const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(
    map((userId) => ({ id: userId, name: "George" })),
    toArray(),
)

const userToFetch$ = new Subject<string>()

const fetchedUser$ = userToFetch$.pipe(
    bufferTime(1000),
    mergeMap((userIds) => functionThatSimulateAFetch(userIds)),
    share(),
)

const fetchUser = (userId: string) => {
    const observable = fetchedUser$.pipe(
        map((users) => users.find((user) => user.id === userId)),
        filter((user) => !!user),
        take(1),
    )
    userToFetch$.next(userId)
    return observable
}

But that's ugly and it has multiple troubles:

  • If i unsubscribe from the observable returned by fetchUser before the timer of bufferTime is finished, it doesn't prevent the fetch of the user.
  • If i unsubscribe from all the observables returned by fetchUser before the fetch of the batch is finished, it doesn't cancel the request.
  • Error handling is more plex
  • etc

More generally: i don't know how to solve the problems requiring sharing resources using RxJS. It's difficult to find advanced example of RxJS.

Share Improve this question edited Sep 11, 2018 at 16:59 Buggy 3,6491 gold badge23 silver badges38 bronze badges asked Sep 11, 2018 at 16:03 antoinestvantoinestv 3,3062 gold badges25 silver badges39 bronze badges 2
  • You say you need to fetch users in batches, and this is what functionThatSimulateAFetch, but then you have fetchUser = (userId: string) => {...}, i.e. a function to fetch one single user. What is that you want to achieve? – Picci Commented Sep 12, 2018 at 5:50
  • The problem is the following: I want to fetch one user at a time, but to avoid to perform too many API calls i want to batch the requests (so i use one API endpoint that returns the list of user for a given list of user ids). But it is just an example for sure, i encounter this kind of issue often. As soon as i share an observable using share/shareReplay i can no-longer know the source of the values of my stream, and i have to pass a context etc (like in the example above). I know i am not taking the problem in the right way, that's the reason on my question ! – antoinestv Commented Sep 12, 2018 at 8:02
Add a ment  | 

4 Answers 4

Reset to default 4

What you have is a good, but as with everything RxJS, but the devil is in the details.

Issues

  1. The switchMaping
        mergeMap((userIds) => functionThatSimulateAFetch(userIds)),

This is where you first go wrong. By using a merge map here, you are making it impossible to tell appart the "stream of requests" from the "stream returned by a single request":

  • You are making it near impossible to unsubscribe from an individual request (to cancel it)
  • You are making it impossible to handle errors
  • It falls appart if your inner observable emits more than once.

Rather, what you want is to emit individual BatchEvents, via a normal map (producing an observable of observable), and switchMap/mergeMap those after the filtering.

  1. Side effects when creating an observable & Emitting before subscribing
    userToFetch$.next(userId)
    return observable

Don’t do this. An observable by itself does not actually do anything. It’s a "blueprint" for a sequence of actions to happen when you subscribe to it. By doing this, you’ll only create a batch action on observable creating, but you’re screwed if you get multiple or delayed subscriptions.

Rather, you want to create an observable from defer that emits to userToFetch$ on every subscription.

Even then you’ll want to subscribe to your observable before emitting to userToFetch: If you aren’t subscribed, your observable is not listening to the subject, and the event will be lost. You can do this in a defer-like observable.

Solution

Short, and not very different from your code, but structure it like this.

const BUFFER_TIME = 1000;

type BatchEvent = { keys: Set<string>, values: Observable<Users> };

/** The ining keys */
const keySubject = new Subject<string>();

const requests: Observable<{ keys: Set<string>, values: Observable<Users> }> =
  this.keySubject.asObservable().pipe(
    bufferTime(BUFFER_TIME),
    map(keys => this.fetchBatch(keys)),
    share(),
  );

/** Returns a single User from an ID. Batches the request */
function get(userId: string): Observable<User> {
  console.log("Creating observable for:", userId);
  // The money observable. See "defer":
  // triggers a new subject event on subscription
  const observable = new Observable<BatchEvent>(observer => {
    this.requests.subscribe(observer);
    // Emit *after* the subscription
    this.keySubject.next(userId);
  });
  return observable.pipe(
    first(v => v.keys.has(userId)),
    // There is only 1 item, so any *Map will do here
    switchMap(v => v.values),
    map(v => v[userId]),
  );
}

function fetchBatch(args: string[]): BatchEvent {
  const keys = new Set(args); // Do not batch duplicates
  const values = this.userService.get(Array.from(keys)).pipe(
    share(),
  );
  return { keys, values };
}

This does exactly what you were asking, including:

  • Errors are propagated to the recipients of the batch call, but nobody else
  • If everybody unsubscribes from a batch, the observable is canceled
  • If everybody unsubscribes from a batch before the request is even fired, it never fires
  • The observable behaves like HttpClient: subscribing to the observable fires a new (batched) request for data. Callers are free to pipe shareReplay or whatever though. So no surprises there.

Here is a working stackblitz Angular demo: https://stackblitz./edit/angular-rxjs-batch-request

In particular, notice the behavior when you "toggle" the display: You’ll notice that re-subscribing to existing observables will fire new batch requests, and that those requests will cancel (or outright not fire) if you re-toggle fast enough.

Use case

In our project, we use this for Angular Tables, where each row needs to individually fetch additional data to render. This allows us to:

  • chunk all the requests for a "single page", without needing any special knowledge of pagination
  • Potentially fetch multiple pages at once if the user paginates fast
  • re-use existing results even if page size changes

Limitations

I would not add chunking or rate limitting into this. Because the source observable is a dumb bufferTime you run into issues:

  • The "chunking" will happen before the deduping. So if you have 100 requests for a single userId, you’ll end up firing several requests with only 1 element
  • If you rate limit, you’ll not be able to inspect your queue. So you may end up with a very long queue containing multiple same requests.

This is a pessimistic point of view though. Fixing it would mean going full out with a stateful queue/batch mechanism, which is an order of magnitude more plex.

I think @Biggy is right.

This is the way I understand the problem and what you want to achieve

  1. There are different places in your app where you want to fetch users
  2. You do not want to fire a fetch request all the time, rather you want to buffer them and send them at a certain interval of time, let's say 1 second
  3. You want to cancel a certain buffer and avoid that for that 1 second interval a request to fetch a batch of users is fired
  4. At the same time, if somebody, let's call it Code at Position X has asked for a User and just few milliseconds later somebody else, i.e. Code at Position Y cancels the entire batch of requests, then Code at Position X has to receive some sort of answer, let's say a null
  5. More, you may want to be able to ask to fetch a User and then change your mind, if within the interval of the buffer time, and and avoid this User to be fetched (I am far from sure this is really something you want, but it seems somehow to emerge from your question

If this is all true, then you probably have to have some sort of queuing mechanism, as Buggy suggested.

Then there may be many implementations of such mechanism.

I'm not sure if this is the best way to solve this problem (at least it need tests), but I will try to explain my point of view.

We have 2 queue: for pending and for feature requests.
result to help delivery response/error to subscribers.
Some kind of worker who is based on some schedule takes a task from the queue to do the request.

If i unsubscribe from the observable returned by fetchUser before the timer of bufferTime is finished, it doesn't prevent the fetch of the user.

Unsubscribe from fetchUser will cleanup the request queue and worker will do nothing.

If i unsubscribe from all the observables returned by fetchUser before the fetch of the batch is finished, it doesn't cancel the request.

Worker subscribe until isNothingRemain$

const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(

  map((userId) => ({ id: userId, name: "George" })),
  toArray(),
  tap(() => console.log('API_CALL', userIds)),
  delay(200),
)

class Queue {
  queue$ = new BehaviorSubject(new Map());

  private get currentQueue() {
    return new Map(this.queue$.getValue());
  }

  add(...ids) {
    const newMap = ids.reduce((acc, id) => {
      acc.set(id, (acc.get(id) || 0) + 1);
      return acc;
    }, this.currentQueue);
    this.queue$.next(newMap);
  };

  addMap(idmap: Map<any, any>) {

    const newMap = (Array.from(idmap.keys()))
      .reduce((acc, id) => {
        acc.set(id, (acc.get(id) || 0) + idmap.get(id));
        return acc;
      }, this.currentQueue);
    this.queue$.next(newMap);
  }

  remove(...ids) {
    const newMap = ids.reduce((acc, id) => {
      acc.get(id) > 1 ? acc.set(id, acc.get(id) - 1) : acc.delete(id);
      return acc;
    }, this.currentQueue)
    this.queue$.next(newMap);
  };

  removeMap(idmap: Map<any, any>) {
    const newMap = (Array.from(idmap.keys()))
      .reduce((acc, id) => {
        acc.get(id) > idmap.get(id) ? acc.set(id, acc.get(id) - idmap.get(id)) : acc.delete(id);
        return acc;
      }, this.currentQueue)
    this.queue$.next(newMap);
  };

  has(id) {
    return this.queue$.getValue().has(id);
  }

  asObservable() {
    return this.queue$.asObservable();
  }
}

class Result {
  result$ = new BehaviorSubject({ ids: new Map(), isError: null, value: null });
  select(id) {
    return this.result$.pipe(
      filter(({ ids }) => ids.has(id)),
      switchMap(({ isError, value }) => isError ? throwError(value) : of(value.find(x => x.id === id)))
    )
  }
  add({ isError, value, ids }) {
    this.result$.next({ ids, isError, value });
  }

  clear(){
    this.result$.next({ ids: new Map(), isError: null, value: null });
  }
}

const result = new Result();
const queueToSend = new Queue();
const queuePending = new Queue();
const doRequest = new Subject();

const fetchUser = (id: string) => {
  return Observable.create(observer => {
    queueToSend.add(id);
    doRequest.next();

    const subscription = result
      .select(id)
      .pipe(take(1))
      .subscribe(observer);

    // cleanup queue after got response or unsubscribe
    return () => {
      (queueToSend.has(id) ? queueToSend : queuePending).remove(id);
      subscription.unsubscribe();
    }
  })
}


// some kind of worker that take task from queue and send requests
doRequest.asObservable().pipe(
  auditTime(1000),
  // clear outdated results
  tap(()=>result.clear()),
  withLatestFrom(queueToSend.asObservable()),
  map(([_, queue]) => queue),
  filter(ids => !!ids.size),
  mergeMap(ids => {
    // abort the request if it have no subscribers
    const isNothingRemain$ = bineLatest(queueToSend.asObservable(), queuePending.asObservable()).pipe(
      map(([queueToSendIds, queuePendingIds]) => Array.from(ids.keys()).some(k => queueToSendIds.has(k) || queuePendingIds.has(k))),
      filter(hasSameKey => !hasSameKey)
    )

    // prevent to request the same ids if previous requst is not plete
    queueToSend.removeMap(ids);
    queuePending.addMap(ids);
    return functionThatSimulateAFetch(Array.from(ids.keys())).pipe(
      map(res => ({ isErorr: false, value: res, ids })),
      takeUntil(isNothingRemain$),
      catchError(error => of({ isError: true, value: error, ids }))
    )
  }),
).subscribe(res => result.add(res))




fetchUser('1').subscribe(console.log);

const subs = fetchUser('2').subscribe(console.log);
subs.unsubscribe();

fetchUser('3').subscribe(console.log);



setTimeout(() => {
  const subs1 = fetchUser('10').subscribe(console.log);
  subs1.unsubscribe();

  const subs2 = fetchUser('11').subscribe(console.log);
  subs2.unsubscribe();
}, 2000)


setTimeout(() => {
  const subs1 = fetchUser('20').subscribe(console.log);
  subs1.unsubscribe();

  const subs21 = fetchUser('20').subscribe(console.log);
  const subs22 = fetchUser('20').subscribe(console.log);
}, 4000)


// API_CALL
// ["1", "3"]
// {id: "1", name: "George"}
// {id: "3", name: "George"}
// API_CALL
// ["20"]
// {id: "20", name: "George"}
// {id: "20", name: "George"}

stackblitz example

FYI, i tried to create a generic batched task queue using the answers of @buggy & @picci :

import { Observable, Subject, BehaviorSubject, from, timer } from "rxjs"
import { catchError, share, mergeMap, map, filter, takeUntil, take, bufferTime, timeout, concatMap } from "rxjs/operators"

export interface Task<TInput> {
    uid: number
    input: TInput
}

interface ErroredTask<TInput> extends Task<TInput> {
    error: any
}

interface SucceededTask<TInput, TOutput> extends Task<TInput> {
    output: TOutput
}

export type FinishedTask<TInput, TOutput> = ErroredTask<TInput> | SucceededTask<TInput, TOutput>

const taskErrored = <TInput, TOutput>(
    taskFinished: FinishedTask<TInput, TOutput>,
): taskFinished is ErroredTask<TInput> => !!(taskFinished as ErroredTask<TInput>).error

type BatchedWorker<TInput, TOutput> = (tasks: Array<Task<TInput>>) => Observable<FinishedTask<TInput, TOutput>>

export const createSimpleBatchedWorker = <TInput, TOutput>(
    work: (inputs: TInput[]) => Observable<TOutput[]>,
    workTimeout: number,
): BatchedWorker<TInput, TOutput> => (
    tasks: Array<Task<TInput>>,
) => work(
    tasks.map((task) => task.input),
).pipe(
    mergeMap((outputs) => from(tasks.map((task, index) => ({
        ...task,
        output: outputs[index],
    })))),
    timeout(workTimeout),
    catchError((error) => from(tasks.map((task) => ({
        ...task,
        error,
    })))),
)

export const createBatchedTaskQueue = <TInput, TOutput>(
    worker: BatchedWorker<TInput, TOutput>,
    concurrencyLimit: number = 1,
    batchTimeout: number = 0,
    maxBatchSize: number = Number.POSITIVE_INFINITY,
) => {
    const taskSubject = new Subject<Task<TInput>>()
    const cancelTaskSubject = new BehaviorSubject<Set<number>>(new Set())
    const cancelTask = (task: Task<TInput>) => {
        const cancelledUids = cancelTaskSubject.getValue()
        const newCancelledUids = new Set(cancelledUids)
        newCancelledUids.add(task.uid)
        cancelTaskSubject.next(newCancelledUids)
    }
    const output$: Observable<FinishedTask<TInput, TOutput>> = taskSubject.pipe(
        bufferTime(batchTimeout, undefined, maxBatchSize),
        map((tasks) => {
          const cancelledUids = cancelTaskSubject.getValue()
          return tasks.filter((task) => !cancelledUids.has(task.uid))
        }),
        filter((tasks) => tasks.length > 0),
        mergeMap(
            (tasks) => worker(tasks).pipe(
                takeUntil(cancelTaskSubject.pipe(
                    filter((uids) => {
                        for (const task of tasks) {
                            if (!uids.has(task.uid)) {
                                return false
                            }
                        }
                        return true
                    }),
                )),
            ),
            undefined,
            concurrencyLimit,
        ),
        share(),
    )
    let nextUid = 0
    return (input$: Observable<TInput>): Observable<TOutput> => input$.pipe(
        concatMap((input) => new Observable<TOutput>((observer) => {
            const task = {
                uid: nextUid++,
                input,
            }
            const subscription = output$.pipe(
                filter((taskFinished) => taskFinished.uid === task.uid),
                take(1),
                map((taskFinished) => {
                    if (taskErrored(taskFinished)) {
                        throw taskFinished.error
                    }
                    return taskFinished.output
                }),
            ).subscribe(observer)
            subscription.add(
                timer(0).subscribe(() => taskSubject.next(task)),
            )
            return () => {
                subscription.unsubscribe()
                cancelTask(task)
            }
        })),
    )
}

With our example:

import { from } from "rxjs"
import { map, toArray } from "rxjs/operators"
import { createBatchedTaskQueue, createSimpleBatchedWorker } from "mmr/ponents/rxjs/batched-task-queue"

const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(
    map((userId) => ({ id: userId, name: "George" })),
    toArray(),
)

const userFetchQueue = createBatchedTaskQueue(
    createSimpleBatchedWorker(
        functionThatSimulateAFetch,
        10000,
    ),
)

const fetchUser = (userId: string) => {
    return from(userId).pipe(
        userFetchQueue,
    )
}

I am open to any improvement suggestions

本文标签: javascriptRxJS Batch requests and share responseStack Overflow