admin管理员组文章数量:1203443
I'm need to merge large data streams available over flux which each contain a timestamp and a value. If the timestamps match, then the values need to be summed up. The data in the flux is sorted by the timestamp in ascending order.
For smaller streams I would utilize the groupBy
function, but since the flux hold many entries this isn't efficient.
I would like to exploit the fact that the entries in the flux are ordered but I can't find the right construct. What are the tools to achieve something like this. Below is some sudo code on what I want to do:
var flux1 = Flux.just(
new Data(ZonedDateTime.parse("2025-01-01T00:00:00"), 1.0),
new Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 1.0)
);
var flux2 = Flux.just(
new Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 2.0),
new Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 2.0),
new Data(ZonedDateTime.parse("2025-04-01T00:00:00"), 2.0)
);
var flux3 = Flux.just(
new Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 5.0)
);
var input = List.of(flux1, flux2, flux3);
var output = Flux.create(sink -> {
List<ZonedDateTime> nextEntries = input.stream().map(Flux::next).toList();
do {
ZonedDateTime nextTimestamp = nextEntries.stream().map(Data::getTimestamp).min(ZonedDateTime::compareTo).get();
List<Integer> affectedStreams = IntStream.range(0, input.size()).filter(i -> nextTimestamp == nextEntries[i].getTimestamp()).toList();
double nextOutput = affectedStreams.stream().mapToDouble(i -> nextEntries[i].getValue()).sum();
sink.next(new Data(nextTimestamp, nextOutput));
affectedStreams.forEach(i -> nextEntries[i] = input.get(i).next());
} while (!allFluxAreConsumed);
});
// expected output:
// [
// Data(ZonedDateTime.parse("2025-01-01T00:00:00"), 1.0),
// Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 7.0),
// Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 3.0),
// Data(ZonedDateTime.parse("2025-05-01T00:00:00"), 2.0)
// ]
I'm need to merge large data streams available over flux which each contain a timestamp and a value. If the timestamps match, then the values need to be summed up. The data in the flux is sorted by the timestamp in ascending order.
For smaller streams I would utilize the groupBy
function, but since the flux hold many entries this isn't efficient.
I would like to exploit the fact that the entries in the flux are ordered but I can't find the right construct. What are the tools to achieve something like this. Below is some sudo code on what I want to do:
var flux1 = Flux.just(
new Data(ZonedDateTime.parse("2025-01-01T00:00:00"), 1.0),
new Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 1.0)
);
var flux2 = Flux.just(
new Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 2.0),
new Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 2.0),
new Data(ZonedDateTime.parse("2025-04-01T00:00:00"), 2.0)
);
var flux3 = Flux.just(
new Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 5.0)
);
var input = List.of(flux1, flux2, flux3);
var output = Flux.create(sink -> {
List<ZonedDateTime> nextEntries = input.stream().map(Flux::next).toList();
do {
ZonedDateTime nextTimestamp = nextEntries.stream().map(Data::getTimestamp).min(ZonedDateTime::compareTo).get();
List<Integer> affectedStreams = IntStream.range(0, input.size()).filter(i -> nextTimestamp == nextEntries[i].getTimestamp()).toList();
double nextOutput = affectedStreams.stream().mapToDouble(i -> nextEntries[i].getValue()).sum();
sink.next(new Data(nextTimestamp, nextOutput));
affectedStreams.forEach(i -> nextEntries[i] = input.get(i).next());
} while (!allFluxAreConsumed);
});
// expected output:
// [
// Data(ZonedDateTime.parse("2025-01-01T00:00:00"), 1.0),
// Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 7.0),
// Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 3.0),
// Data(ZonedDateTime.parse("2025-05-01T00:00:00"), 2.0)
// ]
Share
Improve this question
asked Jan 21 at 16:06
BernardBernard
3752 silver badges9 bronze badges
1
|
1 Answer
Reset to default 1You can get the expected result by stacking the following operators :
- Combine input fluxes preserving overall date ordering using mergeComparing operator. WARNING: if one the input flux is slow, it will slow down the entire downstream pipeline
- Using windowUntilChanged to group adjacent records that use the same timestamp
- Use
reduce
on each window to merge records as you wish.
Which gives something like this :
Flux.mergeComparing(Comparator.comparing(Data::datetime), flux1, flux2, flux3)
.windowUntilChanged(Data::datetime)
.flatMap(window -> window.reduce((d1, d2) -> new Data(d1.datetime(), d1.value() + d2.value())));
You can test it with a unit test like so :
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.time.LocalDateTime;
import java.util.Comparator;
public class MergeSorted {
record Data(LocalDateTime datetime, Integer value) {}
@Test
public void test() {
var flux1 = Flux.just(
new Data(LocalDateTime.parse("2025-01-01T00:00:00"), 1),
new Data(LocalDateTime.parse("2025-03-01T00:00:00"), 1)
);
var flux2 = Flux.just(
new Data(LocalDateTime.parse("2025-02-01T00:00:00"), 2),
new Data(LocalDateTime.parse("2025-03-01T00:00:00"), 2),
new Data(LocalDateTime.parse("2025-05-01T00:00:00"), 2)
);
var flux3 = Flux.just(
new Data(LocalDateTime.parse("2025-02-01T00:00:00"), 5)
);
var mergeSum = Flux.mergeComparing(Comparator.comparing(Data::datetime), flux1, flux2, flux3)
.windowUntilChanged(Data::datetime)
.flatMap(window -> window.reduce((d1, d2) -> new Data(d1.datetime(), d1.value() + d2.value())));
StepVerifier.create(mergeSum)
.expectNext(
new Data(LocalDateTime.parse("2025-01-01T00:00:00"), 1),
new Data(LocalDateTime.parse("2025-02-01T00:00:00"), 7),
new Data(LocalDateTime.parse("2025-03-01T00:00:00"), 3),
new Data(LocalDateTime.parse("2025-05-01T00:00:00"), 2)
)
.verifyComplete();
}
}
本文标签: javaJoin sorted Flux Producers efficientlyStack Overflow
版权声明:本文标题:java - Join sorted Flux Producers efficiently - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1738618968a2103078.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
flux2
, on2025-04-01
? It does not appear in expected output. Is it a mistake ? Shouldn't the last expected output be on2025-04-01
instead of2025-05-01
? – amanin Commented Jan 21 at 18:09