admin管理员组文章数量:1323200
I have a S3 sink connector that reads from a Kafka topic and spills to S3. The connector is not consuming from the topic.
This is the connector configuration:
{
"name": "my-connector",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"behavior.on.null.values": "ignore",
"s3.region": "<aws region>",
"topics.dir": "my-topic",
"flush.size": "1000",
"tasks.max": "1",
"s3.part.size": "5242880",
"timezone": "UTC",
"rotate.interval.ms": "30000",
"locale": "en-US",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"aws.access.key.id": "<aws access key>",
"errors.deadletterqueue.topic.replication.factor": "1",
"value.converter": ".apache.kafka.connect.json.JsonConverter",
"s3.bucket.name": "<aws bucket>",
"partition.duration.ms": "30000",
"schemapatibility": "NONE",
"topics": "my-topic",
"aws.secret.access.key": "<aws secret key>",
"task.class": "io.confluent.connect.s3.S3SinkTask",
"errors.deadletterqueue.topic.name": "dlq-my-topic",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"name": "my-connector",
"errors.tolerance": "all",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"rotate.schedule.interval.ms": "60000",
"timestamp.extractor": "Record"
}
}
This is the connector status:
curl localhost:8083/connectors/my-connector/status
{
"name": "my-connector",
"connector": {
"state": "RUNNING",
"worker_id": "localhost:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "localhost:8083"
}
],
"type": "sink"
}
Information provided about the Kafka consumer group is as follows:
(./kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group connect-my-connector --describe
)
GROUP | TOPIC | PARTITION | CURRENT-OFFSET | LOG-END-OFFSET | LAG | CONSUMER-ID | HOST | CLIENT-ID |
---|---|---|---|---|---|---|---|---|
connect-my-connector | my-topic | 0 | 1182 | 12072 | 10890 | connector-consumer-my-connector-0-68793e0d-8312-4d20-b23c-5221ca54b0dc | ip | connector-consumer-my-connector-0 |
I have a S3 sink connector that reads from a Kafka topic and spills to S3. The connector is not consuming from the topic.
This is the connector configuration:
{
"name": "my-connector",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"behavior.on.null.values": "ignore",
"s3.region": "<aws region>",
"topics.dir": "my-topic",
"flush.size": "1000",
"tasks.max": "1",
"s3.part.size": "5242880",
"timezone": "UTC",
"rotate.interval.ms": "30000",
"locale": "en-US",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"aws.access.key.id": "<aws access key>",
"errors.deadletterqueue.topic.replication.factor": "1",
"value.converter": ".apache.kafka.connect.json.JsonConverter",
"s3.bucket.name": "<aws bucket>",
"partition.duration.ms": "30000",
"schemapatibility": "NONE",
"topics": "my-topic",
"aws.secret.access.key": "<aws secret key>",
"task.class": "io.confluent.connect.s3.S3SinkTask",
"errors.deadletterqueue.topic.name": "dlq-my-topic",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"name": "my-connector",
"errors.tolerance": "all",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"rotate.schedule.interval.ms": "60000",
"timestamp.extractor": "Record"
}
}
This is the connector status:
curl localhost:8083/connectors/my-connector/status
{
"name": "my-connector",
"connector": {
"state": "RUNNING",
"worker_id": "localhost:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "localhost:8083"
}
],
"type": "sink"
}
Information provided about the Kafka consumer group is as follows:
(./kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group connect-my-connector --describe
)
GROUP | TOPIC | PARTITION | CURRENT-OFFSET | LOG-END-OFFSET | LAG | CONSUMER-ID | HOST | CLIENT-ID |
---|---|---|---|---|---|---|---|---|
connect-my-connector | my-topic | 0 | 1182 | 12072 | 10890 | connector-consumer-my-connector-0-68793e0d-8312-4d20-b23c-5221ca54b0dc | ip | connector-consumer-my-connector-0 |
So it seems that kafka-connect has an active consumer in the consumer group. What could be the reason that the connector is not consuming from Kafka?
Looking at this - the answer doesn't make sense as authentication isn't enabled in our Kafka cluster. What else could cause a connected kafka-connect connector not to consume messages from a topic? No Apparent errors in the logs during connect startup, plugin loading or connector initialization.
Share Improve this question edited Jan 19 at 11:13 user152468 3,2427 gold badges30 silver badges60 bronze badges asked Jan 17 at 20:56 Mr T.Mr T. 4,51811 gold badges46 silver badges68 bronze badges1 Answer
Reset to default 0Yes, the configuration above did not consume for me either. When changing the value.converter
configuration as shown below, it started consuming and writing to S3. The data in my source topic is Avro data.
What's the data format of the data within your topic?
You may also want to set the connect server logging to trace
for the package io.confluent.connect.s3
in your log4j.properties
file to get more information.
{
"name": "s3-sink-connector",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"behavior.on.null.values": "ignore",
"s3.region": "$REGION",
"topics.dir": "pageviews",
"flush.size": "10",
"tasks.max": "1",
"s3.part.size": "5242880",
"timezone": "UTC",
"rotate.interval.ms": "600000",
"locale": "en-US",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"aws.access.key.id": "${AWS_ACCESS_KEY_ID}",
"errors.deadletterqueue.topic.replication.factor": "1",
"key.converter": ".apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"s3.bucket.name": "s3-sink-test ",
"partition.duration.ms": "30000",
"schemapatibility": "NONE",
"topics": "pageviews",
"aws.secret.access.key": "${AWS_SECRET_ACCESS_KEY}",
"task.class": "io.confluent.connect.s3.S3SinkTask",
"errors.deadletterqueue.topic.name": "dlq-my-topic",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"errors.tolerance": "all",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=mm",
"rotate.schedule.interval.ms": "60000",
"timestamp.extractor": "Record"
}
}
本文标签: Kafka ConnectS3 Sink does not consume from topicStack Overflow
版权声明:本文标题:Kafka Connect - S3 Sink does not consume from topic - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1742091821a2420312.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论