admin管理员组文章数量:1302958
I have the following scenario:
I have a Quarkus Kakfa consumer getting events in JSON format. I have it configured with an exponential backoff. However while testing I noticed that if the process is in wait state (i.e. it is in backoff for 2 or 4 minutes or whatever before retrying again), if I then kill the process and restart it from scratch, that event is not consumed from the topic anymore. Is there a way I can tell quarkus/kafka to ONLY move on to next offset on topic once it has gotten an ack (i.e. I have processed the event successfully)?
My application.properties looks as follows:
mp.messaging.incoming.student-in.connector=smallrye-kafka
mp.messaging.incoming.student-in.topic=academia-student-event-topic
mp.messaging.incoming.student-in.key.deserializer=.apache.kafkamon.serialization.StringDeserializer
mp.messaging.incoming.student-in.value.deserializer=.apache.kafkamon.serialization.StringDeserializer
mp.messaging.incoming.student-in.health-enabled=true
mp.messaging.incoming.student-in.health-readiness-enabled=true
mp.messaging.incoming.student-in.enable.automit=false
mp.messaging.incoming.student-in.max.poll.records=1
mp.messaging.incoming.student-in.client.id=TESTKafkaConsumer
mp.messaging.incoming.student-in.retry=true
mp.messaging.incoming.student-in.retry-attempts=-1
mp.messaging.incoming.student-in.retry-max-wait=20
mp.messaging.incoming.student-in.failure-strategy=ignore
and my test code looks as follows:
int retryCount = 0;
@Incoming("student-in")
@Retry(maxRetries = -1, delay = 1, delayUnit = ChronoUnit.MINUTES)
@ExponentialBackoff(maxDelay=0)
public void receive(ConsumerRecord<String, String> event) throws Exception, RetryException {
LG.info("Got a TEST event from " + event.topic() + " with retry " + retryCount + " With key of: " + event.key() + " and value of: " + event.value());
topic = event.topic();
try {
if (retryCount < 2) {
retryCount++;
throw new RetryException("RETRY_EXCEPTION", "Unable to send to target, retrying ");
}
else {
LG.info("Stopping Retries, getting next event");
retryCount = 0;
}
}
catch (RetryException rEx) {
LG.error("Catching Retry Exception: " + rEx.getMessage());
throw rEx;
}
catch (Exception ex) {
LG.debug("Retry Exception Gen Ex", ex);
throw new RetryException("RETRY_EXCEPTION Gen Ex", "Unable to send to target FMS, retrying " + ex.getMessage());
}
}
本文标签:
版权声明:本文标题:java - Quarkus Kafka consumer with exponential backoff does not start consuming same message again if process is killed during t 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1741670325a2391563.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论