admin管理员组

文章数量:1410731

I am trying to handle deserialization errors in my kafka consumer setup. I used the ErrorHandlingDeserializer which seems to working fine. But now it is repeatedly trying to post to dead letter topic and results in SerializationException.

How do I configure such that deserialization errors are not posted to dead letter topic?

2025-03-08 11:39:21.492 - - ERROR o.s.k.r.DeadLetterPublishingRecovererFactory <my_consumer_group>-0-C-1 - Record: topic = <my_topic>, partition = 0, offset = 0, main topic = <my_topic> threw an error at topic <my_topic> and won't be retried. Sending to DLT with name <my_topic>.deadLetter. 
.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
    at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2921)
    at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.checkDeser(KafkaMessageListenerContainer.java:2969)
    at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2821)
    at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$56(KafkaMessageListenerContainer.java:2744)
    at io.micrometer.observation.Observation.observe(Observation.java:565)
    at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2742)
    at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2595)
    at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2481)
    at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2123)
    at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1478)
    at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1442)
    at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1313)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: .springframework.kafka.support.serializer.DeserializationException: failed to deserialize
    at .springframework.kafka.support.serializer.SerializationUtils.deserializationException(SerializationUtils.java:158)
    at .springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:218)
    at .apache.kafkamon.serialization.Deserializer.deserialize(Deserializer.java:73)
    at .apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300)
    at .apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263)
    at .apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340)
    at .apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306)
    at .apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1262)
    at .apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186)
    at .apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
    at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1649)
    at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1624)
    at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1421)
    ... 3 common frames omitted
Caused by: java.lang.IllegalArgumentException: com.google.protobuf.InvalidProtocolBufferException: Type of the Any message does not match the given class.
    at xxxx.MySerde.deserialize(MySerde.java:48)
    at .apache.kafkamon.serialization.Deserializer.deserialize(Deserializer.java:62)
    at .springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:215)
    ... 14 common frames omitted
Caused by: com.google.protobuf.InvalidProtocolBufferException: Type of the Any message does not match the given class.
    at com.google.protobuf.Any.unpack(Any.java:110)
    at xxxx.MySerde.unpack(MySerde.java:54)
    ... 17 common frames omitted

I read that I can create instance of DeadLetterPublishingRecoverer something like this. But the issue is we have custom names for (retry and) dead letter topics, and I am not sure I can get access to that here in the custom recoverer.

@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(){
return new DeadLetterPublishingRecoverer(kafkaTemplate,
  (r, e) -> {
    var ex = ErrorHandlingUtils.unwrapIfNeeded(e);
    if (ex instanceof SerializationException) {
      return null;
    } else {
      // publish to dead letter queue
      return new TopicPartition(????, r.partition());
    }
  });

References:

  • Spring Kafka version: 3.1.1
  • Spring Kafka - CommonErrorHandler ignored on DeserializationException
  • Spring Kafka error: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer'

本文标签: spring kafkaHow to avoid posting to dead letter topic on deserialization errorsStack Overflow