admin管理员组文章数量:1350050
In a Quarkus app I have the following handler which will retry to deserialize the Kafka message approximately every few seconds until there's success or the application is manually stopped.
@ApplicationScoped
@Identifier("event-deserialization-failure-handler")
public class EventDeserializationFailureHandler implements DeserializationFailureHandler<GenericData.Record> {
public EventDeserializationFailureHandler() {}
@Override
public GenericData.Record decorateDeserialization(Uni<GenericData.Record> deserialization, String topic, boolean isKey,
String deserializer, byte[] data, Headers headers) {
return deserialization
.onFailure().retry().withBackOff(Duration.ofSeconds(1), Duration.ofMillis(5)).indefinitely()
.await().indefinitely();
}
}
It works as expected except upon app shutdown. After attempting to shutdown (in IDE or via SIGINT/Ctrl+C), the process is still running requiring a hard kill
command.
A similar result will happen if I do something like this and attempt to stop it before it completes:
.onFailure().retry().atMost(Long.MAX_VALUE).await().indefinitely();
I've also tried passing my own Executor to the retry()
using .withExecutor(myExecutor)
and shutting it down via
@PreDestroy
void cleanup() {
myExecutor.shutdown();
}
But that cleanup method never gets called during the non-kill shutdown process.
本文标签:
版权声明:本文标题:mutiny - Kafka consumer - Deserialization Failure - Blocking indefinite retry prevents Quarkus app from completely shutting down 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1743871812a2553607.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论