admin管理员组

文章数量:1389804

I establish a ServerSentEvent connection using the code below. The GET request must have a "Last-Event-ID" HTTP Header set that indicates the last fully processed event. Occasionally the Server can close the connection, upon which I need to retry it. On each retry however I need to update the value of the "Last-Event-ID" header.

The code below works, including the retries. But it does not update the value of "Last-Event-ID" header before retrying. It always uses the value that was assigned initially.

Can anybody point me in the right direction on how to implement this correctly?

        ParameterizedTypeReference<ServerSentEvent<String>> type
                = new ParameterizedTypeReference<>() {
        };

        var req = webClient.get()
                .uri("/the/end/point");

        String lastEventId = loadLastId();
        if (lastEventId != null && !lastEventId.isEmpty()) {
            req = req.header("Last-Event-ID", lastEventId);
        }

        req
            .retrieve()
            .bodyToFlux(type)
            .retryWhen(Retry
                .backoff(
                    maxRetries,
                    Duration.ofSeconds(retryBackoffStep)
                )
                .maxBackoff(Duration.ofSeconds(10))
                .transientErrors(true)
                .filter(t -> {
                    log.warn("SSE Notification Channel: connection error detected", t);
                    return !(t instanceof ClientAuthorizationException)
                        && !(t instanceof WebClientResponseException.Forbidden)
                        && !(t instanceof WebClientResponseException.NotFound);
                })
                .doBeforeRetry(signal -> {
                    // How can I do the following?
                    // req.header("Last-Event-ID", loadLastId());
                    log.warn("SSE Notification Channel: retry SSE connection, attempt {}",
                                    signal.totalRetriesInARow() + 1)
                })
                .doAfterRetry(signal ->
                    log.warn("SSE Notification Channel: retried SSE connection, attempt {}, successful = {}",
                                    signal.totalRetriesInARow() + 1, signal.failure() == null))
                .onRetryExhaustedThrow((retryBackoffSpec, signal) -> {
                    log.error("SSE Notification Channel: SSE connection retries exhausted");
                    return signal.failure();
                })
            )
            .subscribe(
                this::handleContent,
                this::handleError,
                () -> log.error("SSE Notification Channel: SSE connection closed")
            );

I establish a ServerSentEvent connection using the code below. The GET request must have a "Last-Event-ID" HTTP Header set that indicates the last fully processed event. Occasionally the Server can close the connection, upon which I need to retry it. On each retry however I need to update the value of the "Last-Event-ID" header.

The code below works, including the retries. But it does not update the value of "Last-Event-ID" header before retrying. It always uses the value that was assigned initially.

Can anybody point me in the right direction on how to implement this correctly?

        ParameterizedTypeReference<ServerSentEvent<String>> type
                = new ParameterizedTypeReference<>() {
        };

        var req = webClient.get()
                .uri("/the/end/point");

        String lastEventId = loadLastId();
        if (lastEventId != null && !lastEventId.isEmpty()) {
            req = req.header("Last-Event-ID", lastEventId);
        }

        req
            .retrieve()
            .bodyToFlux(type)
            .retryWhen(Retry
                .backoff(
                    maxRetries,
                    Duration.ofSeconds(retryBackoffStep)
                )
                .maxBackoff(Duration.ofSeconds(10))
                .transientErrors(true)
                .filter(t -> {
                    log.warn("SSE Notification Channel: connection error detected", t);
                    return !(t instanceof ClientAuthorizationException)
                        && !(t instanceof WebClientResponseException.Forbidden)
                        && !(t instanceof WebClientResponseException.NotFound);
                })
                .doBeforeRetry(signal -> {
                    // How can I do the following?
                    // req.header("Last-Event-ID", loadLastId());
                    log.warn("SSE Notification Channel: retry SSE connection, attempt {}",
                                    signal.totalRetriesInARow() + 1)
                })
                .doAfterRetry(signal ->
                    log.warn("SSE Notification Channel: retried SSE connection, attempt {}, successful = {}",
                                    signal.totalRetriesInARow() + 1, signal.failure() == null))
                .onRetryExhaustedThrow((retryBackoffSpec, signal) -> {
                    log.error("SSE Notification Channel: SSE connection retries exhausted");
                    return signal.failure();
                })
            )
            .subscribe(
                this::handleContent,
                this::handleError,
                () -> log.error("SSE Notification Channel: SSE connection closed")
            );
Share asked Mar 14 at 11:42 Maarten BoekholdMaarten Boekhold 89911 silver badges22 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 1

Once a request is sent, you cannot modify it.

In order to send a modified request (e.g. set a new header, change request body), you need to create a new request.

Flux.defer() defers (delays) a request's creation until it is sent.
When request has to be retried (resent), a supplier passed to Flux.defer() creates a new request with new header.

Flux<String> requestFlux = Flux.defer(() -> 
    var req = webClient.get()
        .uri("/the/end/point");
    
    String lastEventId = loadLastId();
    if (lastEventId != null && !lastEventId.isEmpty()) {
        req = req.header("Last-Event-ID", lastEventId);
    }

    req.retrieve().bodyToFlux(type);
);

requestFlux
    .retryWhen(Retry.backoff(maxRetries, Duration.ofSeconds(retryBackoffStep))

本文标签: Spring WebFlux SSE connectionretries and updating header on each retryStack Overflow