admin管理员组文章数量:1291734
Let's imagine i have a function fetchUser
which takes as parameter userId
and return an observable of user.
As i am calling this method often, i want to batch the ids to perform one request with multiple ids instead !
Here my troubles began...
I can't find a solution to do that without sharing an observable between the different calls of fetchUser
.
import { Subject, from } from "rxjs"
import { bufferTime, mergeMap, map, toArray, filter, take, share } from "rxjs/operators"
const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(
map((userId) => ({ id: userId, name: "George" })),
toArray(),
)
const userToFetch$ = new Subject<string>()
const fetchedUser$ = userToFetch$.pipe(
bufferTime(1000),
mergeMap((userIds) => functionThatSimulateAFetch(userIds)),
share(),
)
const fetchUser = (userId: string) => {
const observable = fetchedUser$.pipe(
map((users) => users.find((user) => user.id === userId)),
filter((user) => !!user),
take(1),
)
userToFetch$.next(userId)
return observable
}
But that's ugly and it has multiple troubles:
- If i unsubscribe from the observable returned by
fetchUser
before the timer ofbufferTime
is finished, it doesn't prevent the fetch of the user. - If i unsubscribe from all the observables returned by
fetchUser
before the fetch of the batch is finished, it doesn't cancel the request. - Error handling is more plex
- etc
More generally: i don't know how to solve the problems requiring sharing resources using RxJS. It's difficult to find advanced example of RxJS.
Let's imagine i have a function fetchUser
which takes as parameter userId
and return an observable of user.
As i am calling this method often, i want to batch the ids to perform one request with multiple ids instead !
Here my troubles began...
I can't find a solution to do that without sharing an observable between the different calls of fetchUser
.
import { Subject, from } from "rxjs"
import { bufferTime, mergeMap, map, toArray, filter, take, share } from "rxjs/operators"
const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(
map((userId) => ({ id: userId, name: "George" })),
toArray(),
)
const userToFetch$ = new Subject<string>()
const fetchedUser$ = userToFetch$.pipe(
bufferTime(1000),
mergeMap((userIds) => functionThatSimulateAFetch(userIds)),
share(),
)
const fetchUser = (userId: string) => {
const observable = fetchedUser$.pipe(
map((users) => users.find((user) => user.id === userId)),
filter((user) => !!user),
take(1),
)
userToFetch$.next(userId)
return observable
}
But that's ugly and it has multiple troubles:
- If i unsubscribe from the observable returned by
fetchUser
before the timer ofbufferTime
is finished, it doesn't prevent the fetch of the user. - If i unsubscribe from all the observables returned by
fetchUser
before the fetch of the batch is finished, it doesn't cancel the request. - Error handling is more plex
- etc
More generally: i don't know how to solve the problems requiring sharing resources using RxJS. It's difficult to find advanced example of RxJS.
Share Improve this question edited Sep 11, 2018 at 16:59 Buggy 3,6491 gold badge23 silver badges38 bronze badges asked Sep 11, 2018 at 16:03 antoinestvantoinestv 3,3062 gold badges25 silver badges39 bronze badges 2-
You say you need to fetch users in batches, and this is what
functionThatSimulateAFetch
, but then you havefetchUser = (userId: string) => {...}
, i.e. a function to fetch one single user. What is that you want to achieve? – Picci Commented Sep 12, 2018 at 5:50 -
The problem is the following: I want to fetch one user at a time, but to avoid to perform too many API calls i want to batch the requests (so i use one API endpoint that returns the list of user for a given list of user ids). But it is just an example for sure, i encounter this kind of issue often. As soon as i share an observable using
share
/shareReplay
i can no-longer know the source of the values of my stream, and i have to pass a context etc (like in the example above). I know i am not taking the problem in the right way, that's the reason on my question ! – antoinestv Commented Sep 12, 2018 at 8:02
4 Answers
Reset to default 4What you have is a good, but as with everything RxJS, but the devil is in the details.
Issues
- The
switchMap
ing
mergeMap((userIds) => functionThatSimulateAFetch(userIds)),
This is where you first go wrong. By using a merge map here, you are making it impossible to tell appart the "stream of requests" from the "stream returned by a single request":
- You are making it near impossible to unsubscribe from an individual request (to cancel it)
- You are making it impossible to handle errors
- It falls appart if your inner observable emits more than once.
Rather, what you want is to emit individual BatchEvent
s, via a normal map
(producing an observable of observable), and switchMap
/mergeMap
those after the filtering.
- Side effects when creating an observable & Emitting before subscribing
userToFetch$.next(userId)
return observable
Don’t do this. An observable by itself does not actually do anything. It’s a "blueprint" for a sequence of actions to happen when you subscribe to it. By doing this, you’ll only create a batch action on observable creating, but you’re screwed if you get multiple or delayed subscriptions.
Rather, you want to create an observable from defer
that emits to userToFetch$
on every subscription.
Even then you’ll want to subscribe to your observable before emitting to userToFetch
: If you aren’t subscribed, your observable is not listening to the subject, and the event will be lost. You can do this in a defer-like observable.
Solution
Short, and not very different from your code, but structure it like this.
const BUFFER_TIME = 1000;
type BatchEvent = { keys: Set<string>, values: Observable<Users> };
/** The ining keys */
const keySubject = new Subject<string>();
const requests: Observable<{ keys: Set<string>, values: Observable<Users> }> =
this.keySubject.asObservable().pipe(
bufferTime(BUFFER_TIME),
map(keys => this.fetchBatch(keys)),
share(),
);
/** Returns a single User from an ID. Batches the request */
function get(userId: string): Observable<User> {
console.log("Creating observable for:", userId);
// The money observable. See "defer":
// triggers a new subject event on subscription
const observable = new Observable<BatchEvent>(observer => {
this.requests.subscribe(observer);
// Emit *after* the subscription
this.keySubject.next(userId);
});
return observable.pipe(
first(v => v.keys.has(userId)),
// There is only 1 item, so any *Map will do here
switchMap(v => v.values),
map(v => v[userId]),
);
}
function fetchBatch(args: string[]): BatchEvent {
const keys = new Set(args); // Do not batch duplicates
const values = this.userService.get(Array.from(keys)).pipe(
share(),
);
return { keys, values };
}
This does exactly what you were asking, including:
- Errors are propagated to the recipients of the batch call, but nobody else
- If everybody unsubscribes from a batch, the observable is canceled
- If everybody unsubscribes from a batch before the request is even fired, it never fires
- The observable behaves like HttpClient: subscribing to the observable fires a new (batched) request for data. Callers are free to pipe
shareReplay
or whatever though. So no surprises there.
Here is a working stackblitz Angular demo: https://stackblitz./edit/angular-rxjs-batch-request
In particular, notice the behavior when you "toggle" the display: You’ll notice that re-subscribing to existing observables will fire new batch requests, and that those requests will cancel (or outright not fire) if you re-toggle fast enough.
Use case
In our project, we use this for Angular Tables, where each row needs to individually fetch additional data to render. This allows us to:
- chunk all the requests for a "single page", without needing any special knowledge of pagination
- Potentially fetch multiple pages at once if the user paginates fast
- re-use existing results even if page size changes
Limitations
I would not add chunking or rate limitting into this. Because the source observable is a dumb bufferTime
you run into issues:
- The "chunking" will happen before the deduping. So if you have 100 requests for a single userId, you’ll end up firing several requests with only 1 element
- If you rate limit, you’ll not be able to inspect your queue. So you may end up with a very long queue containing multiple same requests.
This is a pessimistic point of view though. Fixing it would mean going full out with a stateful queue/batch mechanism, which is an order of magnitude more plex.
I think @Biggy is right.
This is the way I understand the problem and what you want to achieve
- There are different places in your app where you want to fetch users
- You do not want to fire a fetch request all the time, rather you want to buffer them and send them at a certain interval of time, let's say 1 second
- You want to cancel a certain buffer and avoid that for that 1 second interval a request to fetch a batch of users is fired
- At the same time, if somebody, let's call it Code at Position
X has asked for a User and just few milliseconds later somebody
else, i.e. Code at Position Y cancels the entire batch of
requests, then Code at Position X has to receive some sort of
answer, let's say a
null
- More, you may want to be able to ask to fetch a User and then change your mind, if within the interval of the buffer time, and and avoid this User to be fetched (I am far from sure this is really something you want, but it seems somehow to emerge from your question
If this is all true, then you probably have to have some sort of queuing mechanism, as Buggy suggested.
Then there may be many implementations of such mechanism.
I'm not sure if this is the best way to solve this problem (at least it need tests), but I will try to explain my point of view.
We have 2 queue
: for pending and for feature requests.
result
to help delivery response/error to subscribers.
Some kind of worker who is based on some schedule takes a task from the queue to do the request.
If i unsubscribe from the observable returned by fetchUser before the timer of bufferTime is finished, it doesn't prevent the fetch of the user.
Unsubscribe from fetchUser
will cleanup the request queue
and worker
will do nothing.
If i unsubscribe from all the observables returned by fetchUser before the fetch of the batch is finished, it doesn't cancel the request.
Worker subscribe until isNothingRemain$
const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(
map((userId) => ({ id: userId, name: "George" })),
toArray(),
tap(() => console.log('API_CALL', userIds)),
delay(200),
)
class Queue {
queue$ = new BehaviorSubject(new Map());
private get currentQueue() {
return new Map(this.queue$.getValue());
}
add(...ids) {
const newMap = ids.reduce((acc, id) => {
acc.set(id, (acc.get(id) || 0) + 1);
return acc;
}, this.currentQueue);
this.queue$.next(newMap);
};
addMap(idmap: Map<any, any>) {
const newMap = (Array.from(idmap.keys()))
.reduce((acc, id) => {
acc.set(id, (acc.get(id) || 0) + idmap.get(id));
return acc;
}, this.currentQueue);
this.queue$.next(newMap);
}
remove(...ids) {
const newMap = ids.reduce((acc, id) => {
acc.get(id) > 1 ? acc.set(id, acc.get(id) - 1) : acc.delete(id);
return acc;
}, this.currentQueue)
this.queue$.next(newMap);
};
removeMap(idmap: Map<any, any>) {
const newMap = (Array.from(idmap.keys()))
.reduce((acc, id) => {
acc.get(id) > idmap.get(id) ? acc.set(id, acc.get(id) - idmap.get(id)) : acc.delete(id);
return acc;
}, this.currentQueue)
this.queue$.next(newMap);
};
has(id) {
return this.queue$.getValue().has(id);
}
asObservable() {
return this.queue$.asObservable();
}
}
class Result {
result$ = new BehaviorSubject({ ids: new Map(), isError: null, value: null });
select(id) {
return this.result$.pipe(
filter(({ ids }) => ids.has(id)),
switchMap(({ isError, value }) => isError ? throwError(value) : of(value.find(x => x.id === id)))
)
}
add({ isError, value, ids }) {
this.result$.next({ ids, isError, value });
}
clear(){
this.result$.next({ ids: new Map(), isError: null, value: null });
}
}
const result = new Result();
const queueToSend = new Queue();
const queuePending = new Queue();
const doRequest = new Subject();
const fetchUser = (id: string) => {
return Observable.create(observer => {
queueToSend.add(id);
doRequest.next();
const subscription = result
.select(id)
.pipe(take(1))
.subscribe(observer);
// cleanup queue after got response or unsubscribe
return () => {
(queueToSend.has(id) ? queueToSend : queuePending).remove(id);
subscription.unsubscribe();
}
})
}
// some kind of worker that take task from queue and send requests
doRequest.asObservable().pipe(
auditTime(1000),
// clear outdated results
tap(()=>result.clear()),
withLatestFrom(queueToSend.asObservable()),
map(([_, queue]) => queue),
filter(ids => !!ids.size),
mergeMap(ids => {
// abort the request if it have no subscribers
const isNothingRemain$ = bineLatest(queueToSend.asObservable(), queuePending.asObservable()).pipe(
map(([queueToSendIds, queuePendingIds]) => Array.from(ids.keys()).some(k => queueToSendIds.has(k) || queuePendingIds.has(k))),
filter(hasSameKey => !hasSameKey)
)
// prevent to request the same ids if previous requst is not plete
queueToSend.removeMap(ids);
queuePending.addMap(ids);
return functionThatSimulateAFetch(Array.from(ids.keys())).pipe(
map(res => ({ isErorr: false, value: res, ids })),
takeUntil(isNothingRemain$),
catchError(error => of({ isError: true, value: error, ids }))
)
}),
).subscribe(res => result.add(res))
fetchUser('1').subscribe(console.log);
const subs = fetchUser('2').subscribe(console.log);
subs.unsubscribe();
fetchUser('3').subscribe(console.log);
setTimeout(() => {
const subs1 = fetchUser('10').subscribe(console.log);
subs1.unsubscribe();
const subs2 = fetchUser('11').subscribe(console.log);
subs2.unsubscribe();
}, 2000)
setTimeout(() => {
const subs1 = fetchUser('20').subscribe(console.log);
subs1.unsubscribe();
const subs21 = fetchUser('20').subscribe(console.log);
const subs22 = fetchUser('20').subscribe(console.log);
}, 4000)
// API_CALL
// ["1", "3"]
// {id: "1", name: "George"}
// {id: "3", name: "George"}
// API_CALL
// ["20"]
// {id: "20", name: "George"}
// {id: "20", name: "George"}
stackblitz example
FYI, i tried to create a generic batched task queue using the answers of @buggy & @picci :
import { Observable, Subject, BehaviorSubject, from, timer } from "rxjs"
import { catchError, share, mergeMap, map, filter, takeUntil, take, bufferTime, timeout, concatMap } from "rxjs/operators"
export interface Task<TInput> {
uid: number
input: TInput
}
interface ErroredTask<TInput> extends Task<TInput> {
error: any
}
interface SucceededTask<TInput, TOutput> extends Task<TInput> {
output: TOutput
}
export type FinishedTask<TInput, TOutput> = ErroredTask<TInput> | SucceededTask<TInput, TOutput>
const taskErrored = <TInput, TOutput>(
taskFinished: FinishedTask<TInput, TOutput>,
): taskFinished is ErroredTask<TInput> => !!(taskFinished as ErroredTask<TInput>).error
type BatchedWorker<TInput, TOutput> = (tasks: Array<Task<TInput>>) => Observable<FinishedTask<TInput, TOutput>>
export const createSimpleBatchedWorker = <TInput, TOutput>(
work: (inputs: TInput[]) => Observable<TOutput[]>,
workTimeout: number,
): BatchedWorker<TInput, TOutput> => (
tasks: Array<Task<TInput>>,
) => work(
tasks.map((task) => task.input),
).pipe(
mergeMap((outputs) => from(tasks.map((task, index) => ({
...task,
output: outputs[index],
})))),
timeout(workTimeout),
catchError((error) => from(tasks.map((task) => ({
...task,
error,
})))),
)
export const createBatchedTaskQueue = <TInput, TOutput>(
worker: BatchedWorker<TInput, TOutput>,
concurrencyLimit: number = 1,
batchTimeout: number = 0,
maxBatchSize: number = Number.POSITIVE_INFINITY,
) => {
const taskSubject = new Subject<Task<TInput>>()
const cancelTaskSubject = new BehaviorSubject<Set<number>>(new Set())
const cancelTask = (task: Task<TInput>) => {
const cancelledUids = cancelTaskSubject.getValue()
const newCancelledUids = new Set(cancelledUids)
newCancelledUids.add(task.uid)
cancelTaskSubject.next(newCancelledUids)
}
const output$: Observable<FinishedTask<TInput, TOutput>> = taskSubject.pipe(
bufferTime(batchTimeout, undefined, maxBatchSize),
map((tasks) => {
const cancelledUids = cancelTaskSubject.getValue()
return tasks.filter((task) => !cancelledUids.has(task.uid))
}),
filter((tasks) => tasks.length > 0),
mergeMap(
(tasks) => worker(tasks).pipe(
takeUntil(cancelTaskSubject.pipe(
filter((uids) => {
for (const task of tasks) {
if (!uids.has(task.uid)) {
return false
}
}
return true
}),
)),
),
undefined,
concurrencyLimit,
),
share(),
)
let nextUid = 0
return (input$: Observable<TInput>): Observable<TOutput> => input$.pipe(
concatMap((input) => new Observable<TOutput>((observer) => {
const task = {
uid: nextUid++,
input,
}
const subscription = output$.pipe(
filter((taskFinished) => taskFinished.uid === task.uid),
take(1),
map((taskFinished) => {
if (taskErrored(taskFinished)) {
throw taskFinished.error
}
return taskFinished.output
}),
).subscribe(observer)
subscription.add(
timer(0).subscribe(() => taskSubject.next(task)),
)
return () => {
subscription.unsubscribe()
cancelTask(task)
}
})),
)
}
With our example:
import { from } from "rxjs"
import { map, toArray } from "rxjs/operators"
import { createBatchedTaskQueue, createSimpleBatchedWorker } from "mmr/ponents/rxjs/batched-task-queue"
const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(
map((userId) => ({ id: userId, name: "George" })),
toArray(),
)
const userFetchQueue = createBatchedTaskQueue(
createSimpleBatchedWorker(
functionThatSimulateAFetch,
10000,
),
)
const fetchUser = (userId: string) => {
return from(userId).pipe(
userFetchQueue,
)
}
I am open to any improvement suggestions
本文标签: javascriptRxJS Batch requests and share responseStack Overflow
版权声明:本文标题:javascript - RxJS: Batch requests and share response - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1741540962a2384317.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论