admin管理员组

文章数量:1356215

I get the following error when trying create a stream in KSQL that has a STRUCT as the key:

Key format does not support schema.
format: KAFKA
schema: Persistence{columns=[`topic` STRUCT<`topic` STRING, `ID` INTEGER> KEY], features=[]}
reason: The 'KAFKA' format does not support type 'STRUCT'

The output (from Lenses MQTT Source Connector using ByteArrayConverter):

kcat -C -b broker:29092 -t t10 -f 'Key is %k, and message payload is: %s \n'

Key is Struct{topic=m/bgmdev/af1/t5/test,id=0}, and message payload is: {"val":39}
Key is Struct{topic=m/bgmdev/af1/t5/test,id=0}, and message payload is: {"val":40}

tried creating stream with ksql: Note that topic is reserved word so had to use back ticks:

 CREATE STREAM TEST50 (`topic` STRUCT<`topic` VARCHAR, id INT> KEY, val INT)
  WITH (KAFKA_TOPIC='t10', PARTITIONS=1, VALUE_FORMAT='JSON');

error:

Key format does not support schema.
format: KAFKA
schema: Persistence{columns=[`topic` STRUCT<`topic` STRING, `ID` INTEGER> KEY], features=[]}
reason: The 'KAFKA' format does not support type 'STRUCT'

I followed format from:

/

Create a stream with a struct key in KSQL

Also noticed this from a much previous version:

I also tried to force the KEY format as JSON:

 CREATE STREAM TEST50 (`topic` STRUCT<`topic` VARCHAR, id INT> KEY, val INT)
  WITH (KAFKA_TOPIC='t10', PARTITIONS=1, KEY_FORMAT='JSON', VALUE_FORMAT='JSON');
ksql> describe TEST51;

Name                 : TEST51
 Field | Type
---------------------------------------------------------
 topic | STRUCT<topic VARCHAR(STRING), ID INTEGER> (key)
 VAL   | INTEGER
---------------------------------------------------------

But I don't get any messages coming in and no errors in the logs. Even turned up logging levels. I'm sure I'm doing something wrong with creating the STRUCT.

本文标签: