admin管理员组

文章数量:1312957

I need to modify the current offset for a Kafka topic at runtime from the messages receiving method, marked with the @KafkaListener annotation. The idea is to move the offset at runtime to an the arbitrary position where a concrete marker-message is inside the topic.

My consumer implements ConsumerSeekAware, and I've overridden onPartitionsAssigned(), registerSeekCallback(), onPartitionsRevoked(), unregisterSeekCallback() and onIdleContainer().

The problem is that, when I start listening to a topic that has a number of messages, I receive the messages but calls to callback.seek() and callback.seekToEnd() do not alter the offset.

What should I do to be able to modify the offset from inside the method that receives the messages from the topic?

Here is my simplified implementation:

private val _callbacksForThread: ThreadLocal<ConsumerSeekCallback> = ThreadLocal()
private val _callbacks: MutableMap<TopicPartition, ConsumerSeekCallback> = ConcurrentHashMap()

override fun onPartitionsAssigned(
    assignments: Map<TopicPartition?, Long?>,
    callback: ConsumerSeekCallback,
) {
    assignments.entries.forEach { entry ->
        _callbacks[entry.key!!] = _callbacksForThread.get()
    }
}

override fun registerSeekCallback(callback: ConsumerSeekCallback) {
    _callbacksForThread.set(callback)
}

private fun moveOffsetTo(offset: Long) {
    _callbacks.forEach { (tp, callback) ->
        callback.seek(tp.topic(), tp.partition(), offset)
    }
}

private fun moveOffsetToLastMessage() {
    _callbacks.forEach { (tp, callback) ->
        callback.seekToEnd(tp.topic(), tp.partition())
        callback.seekRelative(tp.topic(), tp.partition(), -1, false)
    }
}

@KafkaListener(topics = ["a-topic"], groupId = "a-group-id")
fun listenKafka(consumerRecord: ConsumerRecord<String, SomeDtoClass>, ack: Acknowledgment) {
    // Some logic that calls moveOffsetTo(...) and moveOffsetToLastMessage(...)
    // for finding a message containing a specific SomeDtoClass content
}

BR

本文标签: Move Kafka topic offset at runtime from the KafkaListener methodStack Overflow