admin管理员组文章数量:1356423
I have Kafka with disabled topic auto creation. All topics must be created with external API. And I want to implement topic auto creation =)
try { producer.send(...) } catch (UnknownTopicException e) { ... external API call ... }
By default, KafkaProducer
do not throw exception on send to unknown topic. Only logs endlessly UNKNOWN_TOPIC_OR_PARTITION
errors:
[kafka-producer-network-thread | producer-1] WARN .apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {unknown_topic=UNKNOWN_TOPIC_OR_PARTITION}
I fix this with decreasing max.block.ms
configuration. Now KafkaProducer
don't retry before thrown the exception:
import .apache.kafka.clients.producer.*;
import .apache.kafkamon.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class Main {
public static void main(String[] args) {
var properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "***:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty("max.block.ms", "0");
try (var producer = new KafkaProducer<String, String>(properties)) {
System.out.println("recordMetadata: " + producer.send(new ProducerRecord<>("unknown_topic", "hello world")).get());
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}
This doesn't seem to be the best solution. What other options are there?
I have Kafka with disabled topic auto creation. All topics must be created with external API. And I want to implement topic auto creation =)
try { producer.send(...) } catch (UnknownTopicException e) { ... external API call ... }
By default, KafkaProducer
do not throw exception on send to unknown topic. Only logs endlessly UNKNOWN_TOPIC_OR_PARTITION
errors:
[kafka-producer-network-thread | producer-1] WARN .apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {unknown_topic=UNKNOWN_TOPIC_OR_PARTITION}
I fix this with decreasing max.block.ms
configuration. Now KafkaProducer
don't retry before thrown the exception:
import .apache.kafka.clients.producer.*;
import .apache.kafkamon.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class Main {
public static void main(String[] args) {
var properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "***:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty("max.block.ms", "0");
try (var producer = new KafkaProducer<String, String>(properties)) {
System.out.println("recordMetadata: " + producer.send(new ProducerRecord<>("unknown_topic", "hello world")).get());
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}
This doesn't seem to be the best solution. What other options are there?
Share Improve this question asked Mar 31 at 13:03 Makrushin EvgeniiMakrushin Evgenii 1,2212 gold badges11 silver badges23 bronze badges2 Answers
Reset to default 1The best option to check if the topic exists is to call AdminClient
before any call.
It has this method: describeTopics
. That method throws the UNKNOWN_TOPIC_OR_PARTITION
exception, so the way to go is to catch it.
https://kafka.apache./23/javadoc//apache/kafka/clients/admin/AdminClient.html#describeTopics-java.util.Collection-.apache.kafka.clients.admin.DescribeTopicsOptions-
You could just modify your try block:
//...
try (var adminClient = AdminClient.create(properties);
var producer = new KafkaProducer<String, String>(properties))
{
if (topicExists(adminClient, "yourTopic"))
{
System.out.println("recordMetadata: " + producer.send(new ProducerRecord<>("yourTopic", "hi!")).get());
//...
}
else //well, it doesn't exist as we catched the exception and returned false
{
//your non-topic logic here.
}
//...
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
//...
private boolean topicExists(AdminClient adminClient, String topic)
{
try
{
DescribeTopicsResult result = adminClient.describeTopics(java.util.List.of(topic));
result.all().get();
return true; //OK, it exists
}
catch (ExecutionException e)
{
if (e.getCause() instanceof UnknownTopicOrPartitionException)
return false; //catch the false -- UnknownTopicOrPartition means it doesnt exist
else
throw new RuntimeException(e);
}
}
Thank @aran for this hint with describeTopics()
. Write this decorator for KafkaProducer
. Redirect records to backoff-topic, If destination topic is unknown or creation is requested a short time ago
import .apache.kafka.clients.admin.AdminClient;
import .apache.kafka.clients.producer.Callback;
import .apache.kafka.clients.producer.KafkaProducer;
import .apache.kafka.clients.producer.ProducerRecord;
import .apache.kafka.clients.producer.RecordMetadata;
import .apache.kafkamon.errors.UnknownTopicOrPartitionException;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class KafkaProducerAutoCreateDecorator<K, V> extends KafkaProducer<K, V> {
private final Map<String, Long> describedTopics = new HashMap<>();
private final Map<String, Long> requestedTopics = new HashMap<>();
private final AdminClient adminClient;
private static final long describedTopicsTtlMs = 1000;
private static final long requestedTopicsTtlMs = 1000;
private static final String defaultTopic = "test";
public KafkaProducerAutoCreateDecorator(Properties properties) {
super(properties);
this.adminClient = AdminClient.create(properties);
}
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
if (!isTopicExist(record.topic())) {
record = new ProducerRecord<>(defaultTopic, record.value());
}
return super.send(record, callback);
}
private boolean isTopicExist(String topic) {
if (describedTopics.containsKey(topic) && Instant.now().toEpochMilli() - describedTopics.get(topic) < describedTopicsTtlMs) {
return true;
}
if (requestedTopics.containsKey(topic) && Instant.now().toEpochMilli() - requestedTopics.get(topic) < requestedTopicsTtlMs) {
return false;
}
try {
adminClient.describeTopics(List.of(topic)).all().get();
describedTopics.put(topic, Instant.now().toEpochMilli());
requestedTopics.remove(topic);
return true;
} catch (ExecutionException | InterruptedException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
// TODO: request topic creation
requestedTopics.put(topic, Instant.now().toEpochMilli());
return false;
}
throw new RuntimeException(e);
}
}
public void close() {
adminClient.close();
super.close();
}
}
import .apache.kafka.clients.producer.*;
import .apache.kafkamon.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class Main {
public static void main(String[] args) {
var properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "***:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
var records = new ProducerRecord[]{
new ProducerRecord<>("test", "..."),
new ProducerRecord<>("unknown_topic", "..."),
new ProducerRecord<>("test", "..."),
new ProducerRecord<>("unknown_topic", "...")
};
try (var producer = new KafkaProducerAutoCreateDecorator<String, String>(properties)) {
for (var record : records) {
System.out.println(producer.send(record).get());
}
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Not sure what will happen if the described topic is deleted
本文标签: javaKafkaProducer How to raise and catch UNKNOWNTOPICORPARTITION errorStack Overflow
版权声明:本文标题:java - KafkaProducer: How to raise and catch UNKNOWN_TOPIC_OR_PARTITION error? - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1743946380a2566481.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论