admin管理员组

文章数量:1182737

I'm using Sinks.many().multicast().onBackpressureBuffer(int bufferSize) in Reactor. This method buffers up to bufferSize items before the first subscriber is registered. However, if the buffer overflows, it discards new items and keeps only the first bufferSize items.

I need the opposite behavior: I want it to keep only the last bufferSize items (i.e., something like a "retain the newest" policy). I know that Flux.onBackpressureBuffer(int maxSize, BufferOverflowStrategy) can be used with BufferOverflowStrategy.DROP_OLDEST, but that applies while the subscriber is active, and doesn't match my exact use case.

Specifically, I only need the buffering before the first subscriber is registered. Once the subscriber has subscribed, I want to consume those last bufferSize items and then continue only with new items.

I want to emphasize, that I need buffering only during warming up phase - i.e. before the first subscriber has registered. I don't need items to be buffered and replayed for each subscribers - only for the first.

To handle the stream after that, I’m already using .onBackpressureLatest(), which retains only the most recent item.

Is there a way to configure the sink (or some other approach) to achieve this "keep the newest items until the first subscriber arrives, then switch to latest-only" behavior?

本文标签: