admin管理员组

文章数量:1302958

I have the following scenario:

I have a Quarkus Kakfa consumer getting events in JSON format. I have it configured with an exponential backoff. However while testing I noticed that if the process is in wait state (i.e. it is in backoff for 2 or 4 minutes or whatever before retrying again), if I then kill the process and restart it from scratch, that event is not consumed from the topic anymore. Is there a way I can tell quarkus/kafka to ONLY move on to next offset on topic once it has gotten an ack (i.e. I have processed the event successfully)?

My application.properties looks as follows:

mp.messaging.incoming.student-in.connector=smallrye-kafka
mp.messaging.incoming.student-in.topic=academia-student-event-topic
mp.messaging.incoming.student-in.key.deserializer=.apache.kafkamon.serialization.StringDeserializer
mp.messaging.incoming.student-in.value.deserializer=.apache.kafkamon.serialization.StringDeserializer
mp.messaging.incoming.student-in.health-enabled=true
mp.messaging.incoming.student-in.health-readiness-enabled=true
mp.messaging.incoming.student-in.enable.automit=false
mp.messaging.incoming.student-in.max.poll.records=1
mp.messaging.incoming.student-in.client.id=TESTKafkaConsumer
mp.messaging.incoming.student-in.retry=true
mp.messaging.incoming.student-in.retry-attempts=-1
mp.messaging.incoming.student-in.retry-max-wait=20
mp.messaging.incoming.student-in.failure-strategy=ignore

and my test code looks as follows:

int retryCount = 0;
  
  @Incoming("student-in")
  @Retry(maxRetries = -1, delay = 1, delayUnit = ChronoUnit.MINUTES)
  @ExponentialBackoff(maxDelay=0)
  public void receive(ConsumerRecord<String, String> event) throws Exception, RetryException {
    LG.info("Got a TEST event from " + event.topic() + " with retry " + retryCount + " With key of: " + event.key() + " and value of: " + event.value());
    topic = event.topic();
    try {
      if (retryCount < 2) {
        retryCount++;
        throw new RetryException("RETRY_EXCEPTION", "Unable to send to target, retrying ");
      }
      else {
        LG.info("Stopping Retries, getting next event");
        retryCount = 0;   
      }
    }
    catch (RetryException rEx) {
      LG.error("Catching Retry Exception: " + rEx.getMessage());
      throw rEx;
    }
    catch (Exception ex) {
      LG.debug("Retry Exception Gen Ex", ex);
      throw new RetryException("RETRY_EXCEPTION Gen Ex", "Unable to send to target FMS, retrying " + ex.getMessage());
    }
  }

本文标签: