admin管理员组

文章数量:1203443

I'm need to merge large data streams available over flux which each contain a timestamp and a value. If the timestamps match, then the values need to be summed up. The data in the flux is sorted by the timestamp in ascending order.

For smaller streams I would utilize the groupBy function, but since the flux hold many entries this isn't efficient.

I would like to exploit the fact that the entries in the flux are ordered but I can't find the right construct. What are the tools to achieve something like this. Below is some sudo code on what I want to do:

    var flux1 = Flux.just(
            new Data(ZonedDateTime.parse("2025-01-01T00:00:00"), 1.0),
            new Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 1.0)
    );


    var flux2 = Flux.just(
            new Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 2.0),
            new Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 2.0),
            new Data(ZonedDateTime.parse("2025-04-01T00:00:00"), 2.0)
    );

    var flux3 = Flux.just(
            new Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 5.0)
    );

    var input = List.of(flux1, flux2, flux3);


    var output = Flux.create(sink -> {
        List<ZonedDateTime> nextEntries = input.stream().map(Flux::next).toList();

        do {
            ZonedDateTime nextTimestamp = nextEntries.stream().map(Data::getTimestamp).min(ZonedDateTime::compareTo).get();
            List<Integer> affectedStreams = IntStream.range(0, input.size()).filter(i -> nextTimestamp == nextEntries[i].getTimestamp()).toList();
            double nextOutput = affectedStreams.stream().mapToDouble(i -> nextEntries[i].getValue()).sum();
            sink.next(new Data(nextTimestamp, nextOutput));
            affectedStreams.forEach(i -> nextEntries[i] = input.get(i).next());
        } while (!allFluxAreConsumed);
    });

    // expected output:
    // [
    //      Data(ZonedDateTime.parse("2025-01-01T00:00:00"), 1.0),
    //      Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 7.0),
    //      Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 3.0),
    //      Data(ZonedDateTime.parse("2025-05-01T00:00:00"), 2.0)
    // ]

I'm need to merge large data streams available over flux which each contain a timestamp and a value. If the timestamps match, then the values need to be summed up. The data in the flux is sorted by the timestamp in ascending order.

For smaller streams I would utilize the groupBy function, but since the flux hold many entries this isn't efficient.

I would like to exploit the fact that the entries in the flux are ordered but I can't find the right construct. What are the tools to achieve something like this. Below is some sudo code on what I want to do:

    var flux1 = Flux.just(
            new Data(ZonedDateTime.parse("2025-01-01T00:00:00"), 1.0),
            new Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 1.0)
    );


    var flux2 = Flux.just(
            new Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 2.0),
            new Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 2.0),
            new Data(ZonedDateTime.parse("2025-04-01T00:00:00"), 2.0)
    );

    var flux3 = Flux.just(
            new Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 5.0)
    );

    var input = List.of(flux1, flux2, flux3);


    var output = Flux.create(sink -> {
        List<ZonedDateTime> nextEntries = input.stream().map(Flux::next).toList();

        do {
            ZonedDateTime nextTimestamp = nextEntries.stream().map(Data::getTimestamp).min(ZonedDateTime::compareTo).get();
            List<Integer> affectedStreams = IntStream.range(0, input.size()).filter(i -> nextTimestamp == nextEntries[i].getTimestamp()).toList();
            double nextOutput = affectedStreams.stream().mapToDouble(i -> nextEntries[i].getValue()).sum();
            sink.next(new Data(nextTimestamp, nextOutput));
            affectedStreams.forEach(i -> nextEntries[i] = input.get(i).next());
        } while (!allFluxAreConsumed);
    });

    // expected output:
    // [
    //      Data(ZonedDateTime.parse("2025-01-01T00:00:00"), 1.0),
    //      Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 7.0),
    //      Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 3.0),
    //      Data(ZonedDateTime.parse("2025-05-01T00:00:00"), 2.0)
    // ]
Share Improve this question asked Jan 21 at 16:06 BernardBernard 3752 silver badges9 bronze badges 1
  • what about the last element in flux2, on 2025-04-01 ? It does not appear in expected output. Is it a mistake ? Shouldn't the last expected output be on 2025-04-01 instead of 2025-05-01 ? – amanin Commented Jan 21 at 18:09
Add a comment  | 

1 Answer 1

Reset to default 1

You can get the expected result by stacking the following operators :

  1. Combine input fluxes preserving overall date ordering using mergeComparing operator. WARNING: if one the input flux is slow, it will slow down the entire downstream pipeline
  2. Using windowUntilChanged to group adjacent records that use the same timestamp
  3. Use reduce on each window to merge records as you wish.

Which gives something like this :

Flux.mergeComparing(Comparator.comparing(Data::datetime), flux1, flux2, flux3)
    .windowUntilChanged(Data::datetime)
    .flatMap(window -> window.reduce((d1, d2) -> new Data(d1.datetime(), d1.value() + d2.value())));

You can test it with a unit test like so :

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import java.time.LocalDateTime;
import java.util.Comparator;

public class MergeSorted {

    record Data(LocalDateTime datetime, Integer value) {}

    @Test
    public void test() {

        var flux1 = Flux.just(
                new Data(LocalDateTime.parse("2025-01-01T00:00:00"), 1),
                new Data(LocalDateTime.parse("2025-03-01T00:00:00"), 1)
        );


        var flux2 = Flux.just(
                new Data(LocalDateTime.parse("2025-02-01T00:00:00"), 2),
                new Data(LocalDateTime.parse("2025-03-01T00:00:00"), 2),
                new Data(LocalDateTime.parse("2025-05-01T00:00:00"), 2)
        );

        var flux3 = Flux.just(
                new Data(LocalDateTime.parse("2025-02-01T00:00:00"), 5)
        );

        var mergeSum = Flux.mergeComparing(Comparator.comparing(Data::datetime), flux1, flux2, flux3)
                           .windowUntilChanged(Data::datetime)
                           .flatMap(window -> window.reduce((d1, d2) -> new Data(d1.datetime(), d1.value() + d2.value())));

        StepVerifier.create(mergeSum)
                .expectNext(
                              new Data(LocalDateTime.parse("2025-01-01T00:00:00"), 1),
                              new Data(LocalDateTime.parse("2025-02-01T00:00:00"), 7),
                              new Data(LocalDateTime.parse("2025-03-01T00:00:00"), 3),
                              new Data(LocalDateTime.parse("2025-05-01T00:00:00"), 2)
                )
                .verifyComplete();
    }
}

本文标签: javaJoin sorted Flux Producers efficientlyStack Overflow