admin管理员组文章数量:1389804
I establish a ServerSentEvent connection using the code below. The GET request must have a "Last-Event-ID" HTTP Header set that indicates the last fully processed event. Occasionally the Server can close the connection, upon which I need to retry it. On each retry however I need to update the value of the "Last-Event-ID" header.
The code below works, including the retries. But it does not update the value of "Last-Event-ID" header before retrying. It always uses the value that was assigned initially.
Can anybody point me in the right direction on how to implement this correctly?
ParameterizedTypeReference<ServerSentEvent<String>> type
= new ParameterizedTypeReference<>() {
};
var req = webClient.get()
.uri("/the/end/point");
String lastEventId = loadLastId();
if (lastEventId != null && !lastEventId.isEmpty()) {
req = req.header("Last-Event-ID", lastEventId);
}
req
.retrieve()
.bodyToFlux(type)
.retryWhen(Retry
.backoff(
maxRetries,
Duration.ofSeconds(retryBackoffStep)
)
.maxBackoff(Duration.ofSeconds(10))
.transientErrors(true)
.filter(t -> {
log.warn("SSE Notification Channel: connection error detected", t);
return !(t instanceof ClientAuthorizationException)
&& !(t instanceof WebClientResponseException.Forbidden)
&& !(t instanceof WebClientResponseException.NotFound);
})
.doBeforeRetry(signal -> {
// How can I do the following?
// req.header("Last-Event-ID", loadLastId());
log.warn("SSE Notification Channel: retry SSE connection, attempt {}",
signal.totalRetriesInARow() + 1)
})
.doAfterRetry(signal ->
log.warn("SSE Notification Channel: retried SSE connection, attempt {}, successful = {}",
signal.totalRetriesInARow() + 1, signal.failure() == null))
.onRetryExhaustedThrow((retryBackoffSpec, signal) -> {
log.error("SSE Notification Channel: SSE connection retries exhausted");
return signal.failure();
})
)
.subscribe(
this::handleContent,
this::handleError,
() -> log.error("SSE Notification Channel: SSE connection closed")
);
I establish a ServerSentEvent connection using the code below. The GET request must have a "Last-Event-ID" HTTP Header set that indicates the last fully processed event. Occasionally the Server can close the connection, upon which I need to retry it. On each retry however I need to update the value of the "Last-Event-ID" header.
The code below works, including the retries. But it does not update the value of "Last-Event-ID" header before retrying. It always uses the value that was assigned initially.
Can anybody point me in the right direction on how to implement this correctly?
ParameterizedTypeReference<ServerSentEvent<String>> type
= new ParameterizedTypeReference<>() {
};
var req = webClient.get()
.uri("/the/end/point");
String lastEventId = loadLastId();
if (lastEventId != null && !lastEventId.isEmpty()) {
req = req.header("Last-Event-ID", lastEventId);
}
req
.retrieve()
.bodyToFlux(type)
.retryWhen(Retry
.backoff(
maxRetries,
Duration.ofSeconds(retryBackoffStep)
)
.maxBackoff(Duration.ofSeconds(10))
.transientErrors(true)
.filter(t -> {
log.warn("SSE Notification Channel: connection error detected", t);
return !(t instanceof ClientAuthorizationException)
&& !(t instanceof WebClientResponseException.Forbidden)
&& !(t instanceof WebClientResponseException.NotFound);
})
.doBeforeRetry(signal -> {
// How can I do the following?
// req.header("Last-Event-ID", loadLastId());
log.warn("SSE Notification Channel: retry SSE connection, attempt {}",
signal.totalRetriesInARow() + 1)
})
.doAfterRetry(signal ->
log.warn("SSE Notification Channel: retried SSE connection, attempt {}, successful = {}",
signal.totalRetriesInARow() + 1, signal.failure() == null))
.onRetryExhaustedThrow((retryBackoffSpec, signal) -> {
log.error("SSE Notification Channel: SSE connection retries exhausted");
return signal.failure();
})
)
.subscribe(
this::handleContent,
this::handleError,
() -> log.error("SSE Notification Channel: SSE connection closed")
);
Share
asked Mar 14 at 11:42
Maarten BoekholdMaarten Boekhold
89911 silver badges22 bronze badges
1 Answer
Reset to default 1Once a request is sent, you cannot modify it.
In order to send a modified request (e.g. set a new header, change request body), you need to create a new request.
Flux.defer()
defers (delays) a request's creation until it is sent.
When request has to be retried (resent), a supplier passed to Flux.defer()
creates a new request with new header.
Flux<String> requestFlux = Flux.defer(() ->
var req = webClient.get()
.uri("/the/end/point");
String lastEventId = loadLastId();
if (lastEventId != null && !lastEventId.isEmpty()) {
req = req.header("Last-Event-ID", lastEventId);
}
req.retrieve().bodyToFlux(type);
);
requestFlux
.retryWhen(Retry.backoff(maxRetries, Duration.ofSeconds(retryBackoffStep))
本文标签: Spring WebFlux SSE connectionretries and updating header on each retryStack Overflow
版权声明:本文标题:Spring WebFlux SSE connection, retries and updating header on each retry - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1744659004a2618130.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论