admin管理员组文章数量:1312957
I need to modify the current offset for a Kafka topic at runtime from the messages receiving method, marked with the @KafkaListener
annotation. The idea is to move the offset at runtime to an the arbitrary position where a concrete marker-message is inside the topic.
My consumer implements ConsumerSeekAware
, and I've overridden onPartitionsAssigned()
, registerSeekCallback()
, onPartitionsRevoked()
, unregisterSeekCallback()
and onIdleContainer()
.
The problem is that, when I start listening to a topic that has a number of messages, I receive the messages but calls to callback.seek()
and callback.seekToEnd()
do not alter the offset.
What should I do to be able to modify the offset from inside the method that receives the messages from the topic?
Here is my simplified implementation:
private val _callbacksForThread: ThreadLocal<ConsumerSeekCallback> = ThreadLocal()
private val _callbacks: MutableMap<TopicPartition, ConsumerSeekCallback> = ConcurrentHashMap()
override fun onPartitionsAssigned(
assignments: Map<TopicPartition?, Long?>,
callback: ConsumerSeekCallback,
) {
assignments.entries.forEach { entry ->
_callbacks[entry.key!!] = _callbacksForThread.get()
}
}
override fun registerSeekCallback(callback: ConsumerSeekCallback) {
_callbacksForThread.set(callback)
}
private fun moveOffsetTo(offset: Long) {
_callbacks.forEach { (tp, callback) ->
callback.seek(tp.topic(), tp.partition(), offset)
}
}
private fun moveOffsetToLastMessage() {
_callbacks.forEach { (tp, callback) ->
callback.seekToEnd(tp.topic(), tp.partition())
callback.seekRelative(tp.topic(), tp.partition(), -1, false)
}
}
@KafkaListener(topics = ["a-topic"], groupId = "a-group-id")
fun listenKafka(consumerRecord: ConsumerRecord<String, SomeDtoClass>, ack: Acknowledgment) {
// Some logic that calls moveOffsetTo(...) and moveOffsetToLastMessage(...)
// for finding a message containing a specific SomeDtoClass content
}
BR
本文标签: Move Kafka topic offset at runtime from the KafkaListener methodStack Overflow
版权声明:本文标题:Move Kafka topic offset at runtime from the @KafkaListener method - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1741902431a2403945.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论