admin管理员组

文章数量:1125738

I am trying to consume a message from a topic to which messages are json converted and published using sftpcsvsource connector. Below are my sftpcsv source connector config and kafka consumer code

sftp csv source connector config:

name= csvsourceconnector_POC
kafka.topic =s ftp-testing-topic

batch.size=1000

tasks.max=1

connector.class=io.confluent.connect.sftp.SftpCsvSourceConnector

key.converter= org.apache.kafka.connect.storage.StringConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

schema.generation.enabled=true

value.converter.schema.registry=localhost: 8081/

value.converter.schemas.enable=true

errors.tolerance=NONE

errors.log.enable=true

errors.log.include.messages=true

cleanup.policy=MOVE

behavior.on.error=FAIL

sftp.host=abc.xyz

sftp.username=username

sftp.password=password

sftp.port=22

input.path=/path/to/data

error.path=/path/to/error

finished.path=/path/to/finished

input.file.pattern=csv-sftp-source.csv

message that got published on to topic

{"schema" : "{"type": "struct", "fields" : [{"type" : "string", "optional" : true, "field":"column01"}, {"type" : "string", "optional" : true, "field":"column02"}, {"type" : "string", "optional" : true, "field":"column03"}], "optional" : false, "name" : "defaultValueschemaname"}, 
"payload" : {
"column01" : "C00", 
"column02" : "priorityCode",
"column03" : "US"
}
}

POCKafkaConsumerService.java

public class POCKafkaConsumerService {
    
     @KafkaListener(topics = "${spring.kafka.consumer.topic}", groupId= 
         "${spring.kafka.consumer.group-id}", properties = 
         {"spring.json.value.default.type=com.example.CsvRecord"})
    
      public void consumeMessage(@Payload CsvRecord message, 
                                   @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {
        
             System.out.println("Received message :  " + message);
        }
}

application.properties:

spring.json.value.default.type= com.example.CsvRecord  
spring.kafka.properties.schema.registry.url : localhost: 8081/  
spring.kafka.consumer.bootstrap-server = localhost: 9092  
group-id=abc-consumergroup-1  
topic = sftp-testing-topic  
key-deserializer = org.apache.kafkamon.serialization.StringDeserializer  
value-deserializer = org.springframework.kafka.support.serializer.JsonDeserializer  

CsvRecord.Java

@Data
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
Public clcass CsvRecord {

    @JsonProperty("column01")
    private String column01;
    
    @JsonProperty("column02")
    private String column02;
    
    @JsonProperty("column03")
    private String column03;

}

With the above code, I am not able to successfully read/deserialize the json message to CsvRecordObject and all the fields member variables in CsvRecor object are being set to null. My objective is to Deserialize the json message produce by sftp source connector and store that to a mongodb. How do I achieve that?

本文标签: