admin管理员组文章数量:1125738
I am trying to consume a message from a topic to which messages are json converted and published using sftpcsvsource connector. Below are my sftpcsv source connector config and kafka consumer code
sftp csv source connector config:
name= csvsourceconnector_POC
kafka.topic =s ftp-testing-topic
batch.size=1000
tasks.max=1
connector.class=io.confluent.connect.sftp.SftpCsvSourceConnector
key.converter= org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
schema.generation.enabled=true
value.converter.schema.registry=localhost: 8081/
value.converter.schemas.enable=true
errors.tolerance=NONE
errors.log.enable=true
errors.log.include.messages=true
cleanup.policy=MOVE
behavior.on.error=FAIL
sftp.host=abc.xyz
sftp.username=username
sftp.password=password
sftp.port=22
input.path=/path/to/data
error.path=/path/to/error
finished.path=/path/to/finished
input.file.pattern=csv-sftp-source.csv
message that got published on to topic
{"schema" : "{"type": "struct", "fields" : [{"type" : "string", "optional" : true, "field":"column01"}, {"type" : "string", "optional" : true, "field":"column02"}, {"type" : "string", "optional" : true, "field":"column03"}], "optional" : false, "name" : "defaultValueschemaname"},
"payload" : {
"column01" : "C00",
"column02" : "priorityCode",
"column03" : "US"
}
}
POCKafkaConsumerService.java
public class POCKafkaConsumerService {
@KafkaListener(topics = "${spring.kafka.consumer.topic}", groupId=
"${spring.kafka.consumer.group-id}", properties =
{"spring.json.value.default.type=com.example.CsvRecord"})
public void consumeMessage(@Payload CsvRecord message,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {
System.out.println("Received message : " + message);
}
}
application.properties:
spring.json.value.default.type= com.example.CsvRecord
spring.kafka.properties.schema.registry.url : localhost: 8081/
spring.kafka.consumer.bootstrap-server = localhost: 9092
group-id=abc-consumergroup-1
topic = sftp-testing-topic
key-deserializer = org.apache.kafkamon.serialization.StringDeserializer
value-deserializer = org.springframework.kafka.support.serializer.JsonDeserializer
CsvRecord.Java
@Data
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
Public clcass CsvRecord {
@JsonProperty("column01")
private String column01;
@JsonProperty("column02")
private String column02;
@JsonProperty("column03")
private String column03;
}
With the above code, I am not able to successfully read/deserialize the json message to CsvRecordObject and all the fields member variables in CsvRecor object are being set to null. My objective is to Deserialize the json message produce by sftp source connector and store that to a mongodb. How do I achieve that?
本文标签:
版权声明:本文标题:sftp - kafka consumer not able to read jsonconverter messages that are published to topic using sftpcsvsourceconnector - Stack O 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1736673927a1947074.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论