admin管理员组文章数量:1331653
I have an RxJS Observable that needs to be recalculated at specific times, as described by an array of DateTime
objects (although for the purpose of this question they could be JavaScript Date
objects, epoch milliseconds or anything else representing a specific instant in time):
const changeTimes = [
// yyyy, mm, dd, hh, mm
DateTime.utc( 2018, 10, 31, 21, 45 ),
DateTime.utc( 2018, 10, 31, 21, 50 ),
DateTime.utc( 2018, 10, 31, 22, 00 ),
DateTime.utc( 2018, 10, 31, 23, 00 ),
DateTime.utc( 2018, 10, 31, 23, 30 ),
];
I'm struggling to understand how to create an Observable that would emit at the times specified in such an array.
Here's what I've thought about in an attempt to answer my own question:
- I almost certainly need to use the
delay
operator where the specified delay is the time between “now” and the next future datetime. - I somehow need to ensure that “now” is current at the time of subscription, not at the time of Observable creation—possibly by using the
defer
operator—although I don't want to unnecessarily create multiple Observable instances if there are multiple subscriptions. - I'm unsure how to iterate over the array as time passes—the
expand
operator might be what I need, but it calls something recursively, and I'm just trying to iterate over a list. - The
timer
operator seems irrelevant, since the duration between each datetime is different. - I could map every datetime to its own delayed Observable and return them all via
merge
, but this bees horribly inefficient as the number of datetimes in the array increases (there could be hundreds), so this is an absolute last resort.
How can I make an RxJS Observable that takes a list of datetimes and then emits as each one is reached in time, pleting on the final one?
I have an RxJS Observable that needs to be recalculated at specific times, as described by an array of DateTime
objects (although for the purpose of this question they could be JavaScript Date
objects, epoch milliseconds or anything else representing a specific instant in time):
const changeTimes = [
// yyyy, mm, dd, hh, mm
DateTime.utc( 2018, 10, 31, 21, 45 ),
DateTime.utc( 2018, 10, 31, 21, 50 ),
DateTime.utc( 2018, 10, 31, 22, 00 ),
DateTime.utc( 2018, 10, 31, 23, 00 ),
DateTime.utc( 2018, 10, 31, 23, 30 ),
];
I'm struggling to understand how to create an Observable that would emit at the times specified in such an array.
Here's what I've thought about in an attempt to answer my own question:
- I almost certainly need to use the
delay
operator where the specified delay is the time between “now” and the next future datetime. - I somehow need to ensure that “now” is current at the time of subscription, not at the time of Observable creation—possibly by using the
defer
operator—although I don't want to unnecessarily create multiple Observable instances if there are multiple subscriptions. - I'm unsure how to iterate over the array as time passes—the
expand
operator might be what I need, but it calls something recursively, and I'm just trying to iterate over a list. - The
timer
operator seems irrelevant, since the duration between each datetime is different. - I could map every datetime to its own delayed Observable and return them all via
merge
, but this bees horribly inefficient as the number of datetimes in the array increases (there could be hundreds), so this is an absolute last resort.
How can I make an RxJS Observable that takes a list of datetimes and then emits as each one is reached in time, pleting on the final one?
Share Improve this question asked Oct 31, 2018 at 11:25 Alex PetersAlex Peters 2,9261 gold badge25 silver badges32 bronze badges4 Answers
Reset to default 2I think what you summarized in the bullet points is all correct. Using delay
seems obvious but it'll make the chain hard to understand.
Solution that es to my mind assumes that you know the changeTimes
array before creating the observable chain. You can create your own "observable creation method" that will return an Observable that emits based on setTimeout
for example (this is just "pseudo code", it doesn't calculate the date properly):
const schedule = (dates: Date[]): Observable<Date> => new Observable(observer => {
// Sort the `dates` array from the earliest to the latest...
let index = 0;
let clearTimeout;
const loop = () => {
const now = new Date();
const delay = dates[index] - now;
clearTimeout = setTimeout(() => {
observer.next(dates[index++]);
if (index < dates.length) {
loop();
}
}, delay);
}
loop();
return () => clearTimeout(clearTimeout);
});
...
schedule(changeTimes)
.subscribe(...)
The last option you mention with merge
isn't that bad actually. I understand you're concerned that it'll create a lot of subscriptions but if you sort the changeTimes
array and then use concat
instead of merge
it'll always keep only one active subscription even if you create 100s of Observables.
Here's a working example:
import {Injectable} from '@angular/core';
import {Observable, Subject, timer} from 'rxjs';
@Injectable()
export class TimerService {
futureDates: Date[] = [];
futureDate: Date;
notifier: Observable<string>;
cycle = (observer) => {
if (this.futureDates.length > 0) {
this.futureDate = this.futureDates.shift();
const msInFuture = this.futureDate.getTime() - Date.now();
if (msInFuture < 0) {
console.log(`date ${this.futureDate.toISOString()}
expected to be in the future, but was ${msInFuture} msec in the past, so stopping`);
observer.plete();
} else {
timer(msInFuture).subscribe(x => {
observer.next(`triggered at ${new Date().toISOString()}`);
this.cycle(observer);
});
}
} else {
observer. plete();
}
}
getTimer(): Observable<string> {
const now = new Date();
const ms1 = now.getTime() + 10000;
const ms2 = now.getTime() + 20000;
this.futureDates.push(new Date(ms1));
this.futureDates.push(new Date(ms2));
this.notifier = new Observable(observer => {
this.cycle(observer);
});
return this.notifier;
}
}
In this example, the list of future times is created in the getTimer()
method, but you could pass an array of dates into that method.
The key is to simply store the dates, process one at a time, and at processing time, check how far in the future that date is, and set a one-shot Rx timer for that number of milliseconds.
Given an array of DateTime
objects:
const changeTimes = [
// yyyy, mm, dd, hh, mm
DateTime.utc( 2018, 10, 31, 21, 45 ),
DateTime.utc( 2018, 10, 31, 21, 50 ),
DateTime.utc( 2018, 10, 31, 22, 00 ),
DateTime.utc( 2018, 10, 31, 23, 00 ),
DateTime.utc( 2018, 10, 31, 23, 30 ),
];
or better yet, an Observable that emits an array of these each time the array changes (which is what is actually happening in my scenario, although I didn't mention it in the question because it wasn't strictly relevant):
const changeTimes$: Observable<DateTime[]> = /* ... */;
the following Observable will immediately emit the next future time on subscription, emit each subsequent future time at the passage of the previous future time, then plete with null
:
const nextTime$ = changeTimes$.pipe(
// sort DateTimes chronologically
map(unsorted => [...unsorted].sort((x, y) => +x - +y),
// remove duplicates
map(duplicated => duplicated.filter((item, i) => !i || +item !== +duplicated[i - 1])),
// convert each time to a delayed Observable
map(times => [...times, null].map((time, i) => defer(() => of(time).pipe(
// emit the first one immediately
// emit all others at the previously emitted time
delay(i === 0 ? 0 : +times[i - 1] - +DateTime.utc())
)))),
// bine into a single Observable
switchMap(observables => concat(...observables)),
);
- Sorting is necessary since each inner Observable (“wait X milliseconds then report a time”) is subscribed to on pletion of the previous.
- Removing duplicates is not strictly necessary, but seems appropriate to cater for.
defer
is used so that the current time is calculated when the inner Observable is subscribed to.concat
is used to execute each inner Observable in succession (thanks to martin), avoiding the overhead of a subscription for each time in the list simultaneously.
The way this satisfies my original need:
I have an RxJS Observable that needs to be recalculated at specific times, as described by an array of DateTime objects...
is if I bine this with the data requiring recalculation using the bineLatest
operator to trigger that recalculation at the correct times:
const timeAwareData$ = bineLatest(timeUnawareData$, nextTime$).pipe(
tap(() => console.log('either the data has changed or a time has been reached')),
// ...
);
What I don't like about my solution
It creates a separate inner Observable, concurrently, for every time in the list. I feel like it's possible to refactor such that each inner Observable is only created after the previous one is destroyed. Any improvement tips would be gratefully received.
You may want base your solution on those:
- Call a javascript function at a specific time of day
- Javascript: how to trigger at a specific time
- Running JavaScript at Specified Times: Timed JavaScript
本文标签: javascriptHow can I make an RxJS Observable emit at specific datetimesStack Overflow
版权声明:本文标题:javascript - How can I make an RxJS Observable emit at specific datetimes? - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1742270365a2444190.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论