admin管理员组

文章数量:1331653

I have an RxJS Observable that needs to be recalculated at specific times, as described by an array of DateTime objects (although for the purpose of this question they could be JavaScript Date objects, epoch milliseconds or anything else representing a specific instant in time):

const changeTimes = [
    //            yyyy, mm, dd, hh, mm
    DateTime.utc( 2018, 10, 31, 21, 45 ),
    DateTime.utc( 2018, 10, 31, 21, 50 ),
    DateTime.utc( 2018, 10, 31, 22, 00 ),
    DateTime.utc( 2018, 10, 31, 23, 00 ),
    DateTime.utc( 2018, 10, 31, 23, 30 ),
];

I'm struggling to understand how to create an Observable that would emit at the times specified in such an array.

Here's what I've thought about in an attempt to answer my own question:

  • I almost certainly need to use the delay operator where the specified delay is the time between “now” and the next future datetime.
  • I somehow need to ensure that “now” is current at the time of subscription, not at the time of Observable creation—possibly by using the defer operator—although I don't want to unnecessarily create multiple Observable instances if there are multiple subscriptions.
  • I'm unsure how to iterate over the array as time passes—the expand operator might be what I need, but it calls something recursively, and I'm just trying to iterate over a list.
  • The timer operator seems irrelevant, since the duration between each datetime is different.
  • I could map every datetime to its own delayed Observable and return them all via merge, but this bees horribly inefficient as the number of datetimes in the array increases (there could be hundreds), so this is an absolute last resort.

How can I make an RxJS Observable that takes a list of datetimes and then emits as each one is reached in time, pleting on the final one?

I have an RxJS Observable that needs to be recalculated at specific times, as described by an array of DateTime objects (although for the purpose of this question they could be JavaScript Date objects, epoch milliseconds or anything else representing a specific instant in time):

const changeTimes = [
    //            yyyy, mm, dd, hh, mm
    DateTime.utc( 2018, 10, 31, 21, 45 ),
    DateTime.utc( 2018, 10, 31, 21, 50 ),
    DateTime.utc( 2018, 10, 31, 22, 00 ),
    DateTime.utc( 2018, 10, 31, 23, 00 ),
    DateTime.utc( 2018, 10, 31, 23, 30 ),
];

I'm struggling to understand how to create an Observable that would emit at the times specified in such an array.

Here's what I've thought about in an attempt to answer my own question:

  • I almost certainly need to use the delay operator where the specified delay is the time between “now” and the next future datetime.
  • I somehow need to ensure that “now” is current at the time of subscription, not at the time of Observable creation—possibly by using the defer operator—although I don't want to unnecessarily create multiple Observable instances if there are multiple subscriptions.
  • I'm unsure how to iterate over the array as time passes—the expand operator might be what I need, but it calls something recursively, and I'm just trying to iterate over a list.
  • The timer operator seems irrelevant, since the duration between each datetime is different.
  • I could map every datetime to its own delayed Observable and return them all via merge, but this bees horribly inefficient as the number of datetimes in the array increases (there could be hundreds), so this is an absolute last resort.

How can I make an RxJS Observable that takes a list of datetimes and then emits as each one is reached in time, pleting on the final one?

Share Improve this question asked Oct 31, 2018 at 11:25 Alex PetersAlex Peters 2,9261 gold badge25 silver badges32 bronze badges
Add a ment  | 

4 Answers 4

Reset to default 2

I think what you summarized in the bullet points is all correct. Using delay seems obvious but it'll make the chain hard to understand.

Solution that es to my mind assumes that you know the changeTimes array before creating the observable chain. You can create your own "observable creation method" that will return an Observable that emits based on setTimeout for example (this is just "pseudo code", it doesn't calculate the date properly):

const schedule = (dates: Date[]): Observable<Date> => new Observable(observer => {
  // Sort the `dates` array from the earliest to the latest...

  let index = 0;
  let clearTimeout;

  const loop = () => {
    const now = new Date();
    const delay = dates[index] - now;

    clearTimeout = setTimeout(() => {
      observer.next(dates[index++]);

      if (index < dates.length) {
        loop();
      }
    }, delay);
  }

  loop();

  return () => clearTimeout(clearTimeout);
}); 

...

schedule(changeTimes)
  .subscribe(...)

The last option you mention with merge isn't that bad actually. I understand you're concerned that it'll create a lot of subscriptions but if you sort the changeTimes array and then use concat instead of merge it'll always keep only one active subscription even if you create 100s of Observables.

Here's a working example:

import {Injectable} from '@angular/core';
import {Observable, Subject, timer} from 'rxjs';

@Injectable()
export class TimerService {

  futureDates: Date[] = [];
  futureDate: Date;
  notifier: Observable<string>;

  cycle = (observer) => {
    if (this.futureDates.length > 0) {
      this.futureDate = this.futureDates.shift();

      const msInFuture = this.futureDate.getTime() - Date.now();
      if (msInFuture < 0) {
        console.log(`date ${this.futureDate.toISOString()}
            expected to be in the future, but was ${msInFuture} msec in the past, so stopping`);

        observer.plete();
      } else {
        timer(msInFuture).subscribe(x => {
          observer.next(`triggered at ${new Date().toISOString()}`);
          this.cycle(observer);
        });
      }
    } else {
        observer. plete();
    }
  }

  getTimer(): Observable<string> {
    const now = new Date();
    const ms1 = now.getTime() + 10000;
    const ms2 = now.getTime() + 20000;
    this.futureDates.push(new Date(ms1));
    this.futureDates.push(new Date(ms2));

    this.notifier = new Observable(observer => {
      this.cycle(observer);
    });

    return this.notifier;
  }
}

In this example, the list of future times is created in the getTimer() method, but you could pass an array of dates into that method.

The key is to simply store the dates, process one at a time, and at processing time, check how far in the future that date is, and set a one-shot Rx timer for that number of milliseconds.

Given an array of DateTime objects:

const changeTimes = [
    //            yyyy, mm, dd, hh, mm
    DateTime.utc( 2018, 10, 31, 21, 45 ),
    DateTime.utc( 2018, 10, 31, 21, 50 ),
    DateTime.utc( 2018, 10, 31, 22, 00 ),
    DateTime.utc( 2018, 10, 31, 23, 00 ),
    DateTime.utc( 2018, 10, 31, 23, 30 ),
];

or better yet, an Observable that emits an array of these each time the array changes (which is what is actually happening in my scenario, although I didn't mention it in the question because it wasn't strictly relevant):

const changeTimes$: Observable<DateTime[]> = /* ... */;

the following Observable will immediately emit the next future time on subscription, emit each subsequent future time at the passage of the previous future time, then plete with null:

const nextTime$ = changeTimes$.pipe(
    // sort DateTimes chronologically
    map(unsorted => [...unsorted].sort((x, y) => +x - +y),
    // remove duplicates
    map(duplicated => duplicated.filter((item, i) => !i || +item !== +duplicated[i - 1])),
    // convert each time to a delayed Observable
    map(times => [...times, null].map((time, i) => defer(() => of(time).pipe(
        // emit the first one immediately
        // emit all others at the previously emitted time
        delay(i === 0 ? 0 : +times[i - 1] - +DateTime.utc())
    )))),
    // bine into a single Observable
    switchMap(observables => concat(...observables)),
);
  • Sorting is necessary since each inner Observable (“wait X milliseconds then report a time”) is subscribed to on pletion of the previous.
  • Removing duplicates is not strictly necessary, but seems appropriate to cater for.
  • defer is used so that the current time is calculated when the inner Observable is subscribed to.
  • concat is used to execute each inner Observable in succession (thanks to martin), avoiding the overhead of a subscription for each time in the list simultaneously.

The way this satisfies my original need:

I have an RxJS Observable that needs to be recalculated at specific times, as described by an array of DateTime objects...

is if I bine this with the data requiring recalculation using the bineLatest operator to trigger that recalculation at the correct times:

const timeAwareData$ = bineLatest(timeUnawareData$, nextTime$).pipe(
    tap(() => console.log('either the data has changed or a time has been reached')),
    // ...
);

What I don't like about my solution

It creates a separate inner Observable, concurrently, for every time in the list. I feel like it's possible to refactor such that each inner Observable is only created after the previous one is destroyed. Any improvement tips would be gratefully received.

You may want base your solution on those:

  • Call a javascript function at a specific time of day
  • Javascript: how to trigger at a specific time
  • Running JavaScript at Specified Times: Timed JavaScript

本文标签: javascriptHow can I make an RxJS Observable emit at specific datetimesStack Overflow