admin管理员组文章数量:1287656
Let's consider for a moment the following code
Rx.Observable.merge(
Rx.Observable.just(1),
Rx.Observable.just(1).delay(1000)
).distinctUntilChanged()
.subscribe(x => console.log(x))
We expect that 1
is logged just once. However what if we wanted to allow repetition of a value if its last emission was a configurable amount of time ago? I mean to get both events logged.
For example it would be cool to have something like the following
Rx.Observable.merge(
Rx.Observable.just(1),
Rx.Observable.just(1).delay(1000)
).distinctUntilChanged(1000)
.subscribe(x => console.log(x))
In which distinctUntilChanged()
accepts some sort of timeout to allow repetition on the next element. However such a thing does not exist and I was wondering if anybody knows an elegant way to achieve this by using high level operators without messing with a filter that would require handling state
Let's consider for a moment the following code
Rx.Observable.merge(
Rx.Observable.just(1),
Rx.Observable.just(1).delay(1000)
).distinctUntilChanged()
.subscribe(x => console.log(x))
We expect that 1
is logged just once. However what if we wanted to allow repetition of a value if its last emission was a configurable amount of time ago? I mean to get both events logged.
For example it would be cool to have something like the following
Rx.Observable.merge(
Rx.Observable.just(1),
Rx.Observable.just(1).delay(1000)
).distinctUntilChanged(1000)
.subscribe(x => console.log(x))
In which distinctUntilChanged()
accepts some sort of timeout to allow repetition on the next element. However such a thing does not exist and I was wondering if anybody knows an elegant way to achieve this by using high level operators without messing with a filter that would require handling state
2 Answers
Reset to default 10Unless I am misunderstanding I am pretty sure this could be acplished in a relatively straight-forward manner with windowTime
:
Observable
.merge(
Observable.of(1),
Observable.of(1).delay(250), // Ignored
Observable.of(1).delay(700), // Ignored
Observable.of(1).delay(2000),
Observable.of(1).delay(2200), //Ignored
Observable.of(2).delay(2300)
)
// Converts the stream into a stream of streams each 1000 milliseconds long
.windowTime(1000)
// Flatten each of the streams and emit only the latest (there should only be one active
// at a time anyway
// We apply the distinctUntilChanged to the windows before flattening
.switchMap(source => source.distinctUntilChanged())
.timeInterval()
.subscribe(
value => console.log(value),
error => console.log('error: ' + error),
() => console.log('plete')
);
See the example here (borrowed @martin's example inputs)
This is an interesting use-case. I wonder whether there's an easier solution than mine (note that I'm using RxJS 5):
let timedDistinctUntil = Observable.defer(() => {
let innerObs = null;
let innerSubject = null;
let delaySub = null;
function tearDown() {
delaySub.unsubscribe();
innerSubject.plete();
}
return Observable
.merge(
Observable.of(1),
Observable.of(1).delay(250), // ignored
Observable.of(1).delay(700), // ignored
Observable.of(1).delay(2000),
Observable.of(1).delay(2200), // ignored
Observable.of(2).delay(2300)
)
.do(undefined, undefined, () => tearDown())
.map(value => {
if (innerObs) {
innerSubject.next(value);
return null;
}
innerSubject = new BehaviorSubject(value);
delaySub = Observable.of(null).delay(1000).subscribe(() => {
innerObs = null;
});
innerObs = innerSubject.distinctUntilChanged();
return innerObs;
})
// filter out all skipped Observable emissions
.filter(observable => observable)
.switch();
});
timedDistinctUntil
.timestamp()
.subscribe(
value => console.log(value),
error => console.log('error: ' + error),
() => console.log('plete')
);
See live demo: https://jsbin./sivuxo/5/edit?js,console
The entire logic is wrapped into Observable.defer()
static method because it requires some additional variables.
A couple points how this all works:
The
merge()
is the source of items.I use
do()
to properly catch when the source pletes so I can shutdown the internal timer and send proper plete notification.The
map()
operator is where the most interesting things happen. I reemit the value that it received and then returnnull
if there's already a valid Observable (it was created less then 1000ms ago =innerObs != null
). Then I eventually create a new Subject where I'm going to reemit all items and return thisBehaviorSubject
chained with.distinctUntilChanged()
. At the end I schedule 1s delay to setinnerObs = null
which means then when another value arrives it'll return a new Observable with new.distinctUntilChanged()
.Then
filter()
will let me ignore allnull
values returned. This means it won't emit a new Observable more than once a second.Now I need to work with so called Higher-order Observables (Observables emitting Observables. For this reason I use
switch()
operator that always subscribes only to the newest Observable emitted by the source. In our case we emit Observables only max. once a second (thanks to thefilter()
used above) and this inner itself Observable can emit as many values it wants and all of them are going to be passed throughdistinctUntilChanged()
so duplicates are ignored.
The output for this demo will look like the following output:
Timestamp { value: 1, timestamp: 1484670434528 }
Timestamp { value: 1, timestamp: 1484670436475 }
Timestamp { value: 2, timestamp: 1484670436577 }
plete
As you can see the value 1
is emitted twice with cca 2s delay. However value 2
passed without any problem after 100ms thanks to distinctUntilChanged()
.
I know this isn't simple but I hope it makes sense to you :)
版权声明:本文标题:javascript - Rx distinctUntilChanged allow repetition after configurable time between events - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1741270452a2369186.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论