admin管理员组文章数量:1402796
Now I'm implementing Transactional Outbox Pattern with 3 Brokers of Kafka. And I have problems that @KafkaListener not listens certain partitions.
My Kafka Consumer Configuration
@EnableKafka
@Configuration
class KafkaOrderConsumerConfig {
@Bean
fun orderConsumerFactory(): ConsumerFactory<String, OrderOutboxMessage> {
val config = mutableMapOf<String, Any>()
config[ConsumerConfig.GROUP_ID_CONFIG] = "ORDER_OUTBOX"
config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = KafkaAvroDeserializer::class.java.name
config[KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://schema-registry:8081"
config[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_VALUE_TYPE_CONFIG] = OrderOutboxMessage::class.java
config[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true
config[ConsumerConfig.CLIENT_ID_CONFIG] = "ORDER_OUTBOX_CLIENT"
config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "kafka1:9092,kafka2:9092,kafka3:9092"
config[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
config[ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG] = "false"
return DefaultKafkaConsumerFactory(config)
}
@Bean
fun orderListenerContainer(): ConcurrentKafkaListenerContainerFactory<String, OrderOutboxMessage> {
val listener = ConcurrentKafkaListenerContainerFactory<String, OrderOutboxMessage>()
listener.consumerFactory = orderConsumerFactory()
listener.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
return listener
}
}
My @KafkaListener
@KafkaListener(topics = [KafkaTopicNames.ORDER_OUTBOX],
groupId = "ORDER_OUTBOX",
containerFactory = "orderListenerContainer",
concurrency = "3",
)
fun processOrder(record: ConsumerRecord<String, OrderOutboxMessage>, ack: Acknowledgment) {
val outbox = record.value()
log.info { "there's new outbox = $outbox" }
if(ProcessStage.PENDING != ProcessStage.of(outbox.processStage.name)) {
return
}
val foundOutbox = orderOutboxRepository.findByOrderId(outbox.orderId)
?: run {
log.info { "No OrderNumber = ${outbox.orderId} found" }
OrderOutbox(id = null, aggId = "noop", processStage = ProcessStage.EXCEPTION, orderId = "noop")
}
if(foundOutbox.processStage == ProcessStage.EXCEPTION) {
return
}
val shippingMessage = generateShippingMessage(orderId = outbox.orderId)
shippingTemplate.send(KafkaTopicNames.SHIPPING, "order-${outbox.orderId}", shippingMessage)
foundOutbox.processStage = ProcessStage.PROCESSED
orderOutboxRepository.save(foundOutbox)
ack.acknowledge()
}
even though I successfully publish message, @KafkaListener doesn't listen message and change outbox process stage PENDING to PROCESSED. And number of partitions and brokers make problems are different whenever I run docker containers.
but after I add this code, the problem is fixed.
config[ConsumerConfig.GROUP_INSTANCE_ID_CONFIG] = "ORDER_OUTBOX_INSTANCE"
I saw articles about static membership of kafka.
Dynamic vs. Static Consumer Membership in Apache Kafka
But still not get it. how Static Membership actually helps @KafkaListener working?
edited: Problem Still not solved. There's my Kafka-ui screenshots. as you see, there's 22 Messages but committed messages are only 10.
Sent messages Committed Messages
Now I'm implementing Transactional Outbox Pattern with 3 Brokers of Kafka. And I have problems that @KafkaListener not listens certain partitions.
My Kafka Consumer Configuration
@EnableKafka
@Configuration
class KafkaOrderConsumerConfig {
@Bean
fun orderConsumerFactory(): ConsumerFactory<String, OrderOutboxMessage> {
val config = mutableMapOf<String, Any>()
config[ConsumerConfig.GROUP_ID_CONFIG] = "ORDER_OUTBOX"
config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = KafkaAvroDeserializer::class.java.name
config[KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://schema-registry:8081"
config[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_VALUE_TYPE_CONFIG] = OrderOutboxMessage::class.java
config[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true
config[ConsumerConfig.CLIENT_ID_CONFIG] = "ORDER_OUTBOX_CLIENT"
config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "kafka1:9092,kafka2:9092,kafka3:9092"
config[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
config[ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG] = "false"
return DefaultKafkaConsumerFactory(config)
}
@Bean
fun orderListenerContainer(): ConcurrentKafkaListenerContainerFactory<String, OrderOutboxMessage> {
val listener = ConcurrentKafkaListenerContainerFactory<String, OrderOutboxMessage>()
listener.consumerFactory = orderConsumerFactory()
listener.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
return listener
}
}
My @KafkaListener
@KafkaListener(topics = [KafkaTopicNames.ORDER_OUTBOX],
groupId = "ORDER_OUTBOX",
containerFactory = "orderListenerContainer",
concurrency = "3",
)
fun processOrder(record: ConsumerRecord<String, OrderOutboxMessage>, ack: Acknowledgment) {
val outbox = record.value()
log.info { "there's new outbox = $outbox" }
if(ProcessStage.PENDING != ProcessStage.of(outbox.processStage.name)) {
return
}
val foundOutbox = orderOutboxRepository.findByOrderId(outbox.orderId)
?: run {
log.info { "No OrderNumber = ${outbox.orderId} found" }
OrderOutbox(id = null, aggId = "noop", processStage = ProcessStage.EXCEPTION, orderId = "noop")
}
if(foundOutbox.processStage == ProcessStage.EXCEPTION) {
return
}
val shippingMessage = generateShippingMessage(orderId = outbox.orderId)
shippingTemplate.send(KafkaTopicNames.SHIPPING, "order-${outbox.orderId}", shippingMessage)
foundOutbox.processStage = ProcessStage.PROCESSED
orderOutboxRepository.save(foundOutbox)
ack.acknowledge()
}
even though I successfully publish message, @KafkaListener doesn't listen message and change outbox process stage PENDING to PROCESSED. And number of partitions and brokers make problems are different whenever I run docker containers.
but after I add this code, the problem is fixed.
config[ConsumerConfig.GROUP_INSTANCE_ID_CONFIG] = "ORDER_OUTBOX_INSTANCE"
I saw articles about static membership of kafka.
Dynamic vs. Static Consumer Membership in Apache Kafka
But still not get it. how Static Membership actually helps @KafkaListener working?
edited: Problem Still not solved. There's my Kafka-ui screenshots. as you see, there's 22 Messages but committed messages are only 10.
Sent messages Committed Messages
Share Improve this question edited Mar 24 at 6:34 박수민 asked Mar 22 at 9:54 박수민박수민 235 bronze badges 2- how many partitions does your topic have? Are there more consumers in this group ID beyond your @KafkaListener application? – zn43 Commented Mar 22 at 13:55
- @zn43 1. I have 10 partitions in order-outbox.topic 2. yes. there's more consumers such as payment-outbox.topic and shipping-topic. – 박수민 Commented Mar 23 at 14:04
1 Answer
Reset to default 0I solved this problem with myself!
But still I don't know what's the problem... I solved it by rewrite all configuration code of consumers and producers.
I think the problem was configuration of consumers. My new consumer configuration code is below.
@EnableKafka
@Configuration
class KafkaOrderOutboxConsumerConfig {
@Bean
fun orderOutboxConsumerFactory(): ConsumerFactory<String, OrderOutboxMessage> {
val config = mutableMapOf<String, Any>()
config[ConsumerConfig.GROUP_ID_CONFIG] = "ORDER_OUTBOX"
config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
config[ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS] = KafkaAvroDeserializer::class.java
config[KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://schema-registry:8081"
config[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_VALUE_TYPE_CONFIG] = OrderOutboxMessage::class.java
config[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true
config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "kafka1:9092,kafka2:9092,kafka3:9092"
config[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
config[ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG] = "true"
return DefaultKafkaConsumerFactory(config)
}
@Bean
fun orderOutboxListenerContainer(): ConcurrentKafkaListenerContainerFactory<String, OrderOutboxMessage> {
val listener = ConcurrentKafkaListenerContainerFactory<String, OrderOutboxMessage>()
listener.consumerFactory = orderOutboxConsumerFactory()
listener.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
listener.setConcurrency(3)
return listener
}
}
本文标签: spring bootKafkaListener sometimes not consumes messageStack Overflow
版权声明:本文标题:spring boot - @KafkaListener sometimes not consumes message - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1744319953a2600445.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论