admin管理员组

文章数量:1350050

In a Quarkus app I have the following handler which will retry to deserialize the Kafka message approximately every few seconds until there's success or the application is manually stopped.

@ApplicationScoped
@Identifier("event-deserialization-failure-handler")
public class EventDeserializationFailureHandler implements DeserializationFailureHandler<GenericData.Record> {

    public EventDeserializationFailureHandler() {}
    
    @Override
    public GenericData.Record decorateDeserialization(Uni<GenericData.Record> deserialization, String topic, boolean isKey,
                                                      String deserializer, byte[] data, Headers headers) {
        return deserialization
                    .onFailure().retry().withBackOff(Duration.ofSeconds(1), Duration.ofMillis(5)).indefinitely()
                    .await().indefinitely();
    }
}

It works as expected except upon app shutdown. After attempting to shutdown (in IDE or via SIGINT/Ctrl+C), the process is still running requiring a hard kill command.

A similar result will happen if I do something like this and attempt to stop it before it completes:

.onFailure().retry().atMost(Long.MAX_VALUE).await().indefinitely();

I've also tried passing my own Executor to the retry() using .withExecutor(myExecutor) and shutting it down via

@PreDestroy
void cleanup() {
    myExecutor.shutdown();
}

But that cleanup method never gets called during the non-kill shutdown process.

本文标签: