admin管理员组文章数量:1410731
I am trying to handle deserialization errors in my kafka consumer setup. I used the ErrorHandlingDeserializer
which seems to working fine. But now it is repeatedly trying to post to dead letter topic and results in SerializationException
.
How do I configure such that deserialization errors are not posted to dead letter topic?
2025-03-08 11:39:21.492 - - ERROR o.s.k.r.DeadLetterPublishingRecovererFactory <my_consumer_group>-0-C-1 - Record: topic = <my_topic>, partition = 0, offset = 0, main topic = <my_topic> threw an error at topic <my_topic> and won't be retried. Sending to DLT with name <my_topic>.deadLetter.
.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2921)
at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.checkDeser(KafkaMessageListenerContainer.java:2969)
at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2821)
at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$56(KafkaMessageListenerContainer.java:2744)
at io.micrometer.observation.Observation.observe(Observation.java:565)
at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2742)
at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2595)
at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2481)
at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2123)
at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1478)
at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1442)
at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1313)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: .springframework.kafka.support.serializer.DeserializationException: failed to deserialize
at .springframework.kafka.support.serializer.SerializationUtils.deserializationException(SerializationUtils.java:158)
at .springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:218)
at .apache.kafkamon.serialization.Deserializer.deserialize(Deserializer.java:73)
at .apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300)
at .apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263)
at .apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340)
at .apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306)
at .apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1262)
at .apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186)
at .apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1649)
at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1624)
at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1421)
... 3 common frames omitted
Caused by: java.lang.IllegalArgumentException: com.google.protobuf.InvalidProtocolBufferException: Type of the Any message does not match the given class.
at xxxx.MySerde.deserialize(MySerde.java:48)
at .apache.kafkamon.serialization.Deserializer.deserialize(Deserializer.java:62)
at .springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:215)
... 14 common frames omitted
Caused by: com.google.protobuf.InvalidProtocolBufferException: Type of the Any message does not match the given class.
at com.google.protobuf.Any.unpack(Any.java:110)
at xxxx.MySerde.unpack(MySerde.java:54)
... 17 common frames omitted
I read that I can create instance of DeadLetterPublishingRecoverer
something like this. But the issue is we have custom names for (retry and) dead letter topics, and I am not sure I can get access to that here in the custom recoverer.
@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(){
return new DeadLetterPublishingRecoverer(kafkaTemplate,
(r, e) -> {
var ex = ErrorHandlingUtils.unwrapIfNeeded(e);
if (ex instanceof SerializationException) {
return null;
} else {
// publish to dead letter queue
return new TopicPartition(????, r.partition());
}
});
References:
- Spring Kafka version: 3.1.1
- Spring Kafka - CommonErrorHandler ignored on DeserializationException
- Spring Kafka error: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer'
本文标签: spring kafkaHow to avoid posting to dead letter topic on deserialization errorsStack Overflow
版权声明:本文标题:spring kafka - How to avoid posting to dead letter topic on deserialization errors? - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1744895894a2631061.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论