admin管理员组

文章数量:1356900

I am trying to work with api.countrylayer payload which returns a JSON list of countries. The JSON batch list should be split by each country for further processing. Therefore I am using the unnest function in Flink SQL, but it seems to manipulate the output.

payload:

[{"name":"Afghanistan","topLevelDomain"
[".af"],"alpha2Code":"AF","alpha3Code":"AFG","callingCodes":["93"],"capital":"Kabul"...},
{"name":"\u00c5land Islands","topLevelDomain":[".ax"],"alpha2Code":"AX","alpha3Code":"ALA","callingCodes":["358"],"capital":"Mariehamn"...},
{"name":"Albania","topLevelDomain":[".al"],"alpha2Code":"AL","alpha3Code":"ALB","callingCodes":["355"],"capital":"Tirana"...}]

processing:

tableEnv.executeSql("create view countryview as" +
    " select " +
    " country " +
    " from kafkaCountryLayer " +
    " cross join unnest(json_query(countries, '$' returning array<string>)) as countrylayer (country))");

Table table = tableEnv.sqlQuery("select " +
    " country " +
    " from countryview ");

table.executeInsert("print_output")

print_output result

[{name=Afghanistan, topLevelDomain=[.af], alpha2Code=AF, alpha3Code=AFG, callingCodes=[93], capital=Kabul...}]
[{name=Åland Islands, topLevelDomain=[.ax], alpha2Code=AX, alpha3Code=ALA, callingCodes=[358], capital=Mariehamn...}]
[{name=Albania, topLevelDomain=[.al], alpha2Code=AL, alpha3Code=ALB, callingCodes=[355], capital=Tirana...}]

expected result

{"name":"Afghanistan","topLevelDomain":[".af"],"alpha2Code":"AF","alpha3Code":"AFG","callingCodes":["93"],"capital":"Kabul"...}

{"name":"\u00c5land Islands","topLevelDomain":[".ax"],"alpha2Code":"AX","alpha3Code":"ALA","callingCodes":["358"],"capital":"Mariehamn"...}

{"name":"Albania","topLevelDomain":[".al"],"alpha2Code":"AL","alpha3Code":"ALB","callingCodes":["355"],"capital":"Tirana"...}

Is it a bug or am I using the function incorrectly? Can someone help me to get the expected result?

本文标签: Flink SQL issue with unnestoutput is changed by functionStack Overflow