admin管理员组

文章数量:1402796

Now I'm implementing Transactional Outbox Pattern with 3 Brokers of Kafka. And I have problems that @KafkaListener not listens certain partitions.

My Kafka Consumer Configuration

@EnableKafka
@Configuration
class KafkaOrderConsumerConfig {
    @Bean
    fun orderConsumerFactory(): ConsumerFactory<String, OrderOutboxMessage> {
        val config = mutableMapOf<String, Any>()
        config[ConsumerConfig.GROUP_ID_CONFIG] = "ORDER_OUTBOX"
        config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
        config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = KafkaAvroDeserializer::class.java.name
        config[KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://schema-registry:8081"
        config[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_VALUE_TYPE_CONFIG] = OrderOutboxMessage::class.java
        config[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true
        config[ConsumerConfig.CLIENT_ID_CONFIG] = "ORDER_OUTBOX_CLIENT"
        config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
        config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "kafka1:9092,kafka2:9092,kafka3:9092"
        config[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
        config[ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG] = "false"

        return DefaultKafkaConsumerFactory(config)
    }

    @Bean
    fun orderListenerContainer(): ConcurrentKafkaListenerContainerFactory<String, OrderOutboxMessage> {
        val listener = ConcurrentKafkaListenerContainerFactory<String, OrderOutboxMessage>()
        listener.consumerFactory = orderConsumerFactory()
        listener.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE

        return listener
    }
}

My @KafkaListener

@KafkaListener(topics = [KafkaTopicNames.ORDER_OUTBOX],
    groupId = "ORDER_OUTBOX",
    containerFactory = "orderListenerContainer",
    concurrency = "3",
    )
fun processOrder(record: ConsumerRecord<String, OrderOutboxMessage>, ack: Acknowledgment) {
    val outbox = record.value()

    log.info { "there's new outbox = $outbox" }

    if(ProcessStage.PENDING != ProcessStage.of(outbox.processStage.name)) {
        return
    }

    val foundOutbox = orderOutboxRepository.findByOrderId(outbox.orderId)
        ?: run {
            log.info { "No OrderNumber = ${outbox.orderId} found" }
            OrderOutbox(id = null, aggId = "noop", processStage = ProcessStage.EXCEPTION, orderId = "noop")
        }

    if(foundOutbox.processStage == ProcessStage.EXCEPTION) {
        return
    }

    val shippingMessage = generateShippingMessage(orderId = outbox.orderId)
    shippingTemplate.send(KafkaTopicNames.SHIPPING, "order-${outbox.orderId}", shippingMessage)

    foundOutbox.processStage = ProcessStage.PROCESSED
    orderOutboxRepository.save(foundOutbox)

    ack.acknowledge()
}

even though I successfully publish message, @KafkaListener doesn't listen message and change outbox process stage PENDING to PROCESSED. And number of partitions and brokers make problems are different whenever I run docker containers.

but after I add this code, the problem is fixed.

config[ConsumerConfig.GROUP_INSTANCE_ID_CONFIG] = "ORDER_OUTBOX_INSTANCE"

I saw articles about static membership of kafka. Dynamic vs. Static Consumer Membership in Apache Kafka

But still not get it. how Static Membership actually helps @KafkaListener working?

edited: Problem Still not solved. There's my Kafka-ui screenshots. as you see, there's 22 Messages but committed messages are only 10.

Sent messages Committed Messages

Now I'm implementing Transactional Outbox Pattern with 3 Brokers of Kafka. And I have problems that @KafkaListener not listens certain partitions.

My Kafka Consumer Configuration

@EnableKafka
@Configuration
class KafkaOrderConsumerConfig {
    @Bean
    fun orderConsumerFactory(): ConsumerFactory<String, OrderOutboxMessage> {
        val config = mutableMapOf<String, Any>()
        config[ConsumerConfig.GROUP_ID_CONFIG] = "ORDER_OUTBOX"
        config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
        config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = KafkaAvroDeserializer::class.java.name
        config[KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://schema-registry:8081"
        config[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_VALUE_TYPE_CONFIG] = OrderOutboxMessage::class.java
        config[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true
        config[ConsumerConfig.CLIENT_ID_CONFIG] = "ORDER_OUTBOX_CLIENT"
        config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
        config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "kafka1:9092,kafka2:9092,kafka3:9092"
        config[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
        config[ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG] = "false"

        return DefaultKafkaConsumerFactory(config)
    }

    @Bean
    fun orderListenerContainer(): ConcurrentKafkaListenerContainerFactory<String, OrderOutboxMessage> {
        val listener = ConcurrentKafkaListenerContainerFactory<String, OrderOutboxMessage>()
        listener.consumerFactory = orderConsumerFactory()
        listener.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE

        return listener
    }
}

My @KafkaListener

@KafkaListener(topics = [KafkaTopicNames.ORDER_OUTBOX],
    groupId = "ORDER_OUTBOX",
    containerFactory = "orderListenerContainer",
    concurrency = "3",
    )
fun processOrder(record: ConsumerRecord<String, OrderOutboxMessage>, ack: Acknowledgment) {
    val outbox = record.value()

    log.info { "there's new outbox = $outbox" }

    if(ProcessStage.PENDING != ProcessStage.of(outbox.processStage.name)) {
        return
    }

    val foundOutbox = orderOutboxRepository.findByOrderId(outbox.orderId)
        ?: run {
            log.info { "No OrderNumber = ${outbox.orderId} found" }
            OrderOutbox(id = null, aggId = "noop", processStage = ProcessStage.EXCEPTION, orderId = "noop")
        }

    if(foundOutbox.processStage == ProcessStage.EXCEPTION) {
        return
    }

    val shippingMessage = generateShippingMessage(orderId = outbox.orderId)
    shippingTemplate.send(KafkaTopicNames.SHIPPING, "order-${outbox.orderId}", shippingMessage)

    foundOutbox.processStage = ProcessStage.PROCESSED
    orderOutboxRepository.save(foundOutbox)

    ack.acknowledge()
}

even though I successfully publish message, @KafkaListener doesn't listen message and change outbox process stage PENDING to PROCESSED. And number of partitions and brokers make problems are different whenever I run docker containers.

but after I add this code, the problem is fixed.

config[ConsumerConfig.GROUP_INSTANCE_ID_CONFIG] = "ORDER_OUTBOX_INSTANCE"

I saw articles about static membership of kafka. Dynamic vs. Static Consumer Membership in Apache Kafka

But still not get it. how Static Membership actually helps @KafkaListener working?

edited: Problem Still not solved. There's my Kafka-ui screenshots. as you see, there's 22 Messages but committed messages are only 10.

Sent messages Committed Messages

Share Improve this question edited Mar 24 at 6:34 박수민 asked Mar 22 at 9:54 박수민박수민 235 bronze badges 2
  • how many partitions does your topic have? Are there more consumers in this group ID beyond your @KafkaListener application? – zn43 Commented Mar 22 at 13:55
  • @zn43 1. I have 10 partitions in order-outbox.topic 2. yes. there's more consumers such as payment-outbox.topic and shipping-topic. – 박수민 Commented Mar 23 at 14:04
Add a comment  | 

1 Answer 1

Reset to default 0

I solved this problem with myself!

But still I don't know what's the problem... I solved it by rewrite all configuration code of consumers and producers.

I think the problem was configuration of consumers. My new consumer configuration code is below.

@EnableKafka
@Configuration
class KafkaOrderOutboxConsumerConfig {
    @Bean
    fun orderOutboxConsumerFactory(): ConsumerFactory<String, OrderOutboxMessage> {
        val config = mutableMapOf<String, Any>()
        config[ConsumerConfig.GROUP_ID_CONFIG] = "ORDER_OUTBOX"
        config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
        config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
        config[ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS] = KafkaAvroDeserializer::class.java
        config[KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://schema-registry:8081"
        config[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_VALUE_TYPE_CONFIG] = OrderOutboxMessage::class.java
        config[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true
        config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
        config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "kafka1:9092,kafka2:9092,kafka3:9092"
        config[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
        config[ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG] = "true"

        return DefaultKafkaConsumerFactory(config)
    }

    @Bean
    fun orderOutboxListenerContainer(): ConcurrentKafkaListenerContainerFactory<String, OrderOutboxMessage> {
        val listener = ConcurrentKafkaListenerContainerFactory<String, OrderOutboxMessage>()
        listener.consumerFactory = orderOutboxConsumerFactory()
        listener.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
        listener.setConcurrency(3)

        return listener
    }
}

本文标签: spring bootKafkaListener sometimes not consumes messageStack Overflow