admin管理员组

文章数量:1389890

I have a pipeline that contains a kinesis input stream, a Flink aggregator and kinesis output stream (sink).

The objects written to the input stream contain the fields: source, target, fieldName and count. The data is written with a partition key containing uuid in it:

dto.getSource() + ";" + dto.getTarget() + ";"
+ dto.getFieldName() + ";" + UUID.randomUUID()

Then, in Flink, count is aggregated by the key source:target:fieldName, and after a tumbling window of 1 minute sent to a kinesis sink, defined like so:

KinesisStreamsSink.<MyDto>builder()
.setFailOnError(false)
.setKinesisClientProperties(consumerConfig)
.setPartitionKeyGenerator(new KeyOperator())
.setSerializationSchema(new JsonSerializationSchema<>())
.setStreamName(props.getProperty('output'))
.build();

(consumerConfig contains region only)

The KeyOperator class overrides the getKey and apply methods, both of which return:

String.format(
    "%s:%s:%s:%s", value.getSource(), value.getTarget(), value.getFieldName(), 
UUID.randomUUID());

Both the input stream and the output stream are configured to on-demand. Looking at the monitoring pages of the kinesis streams, I can see that the traffic to the kinesis sink is about half the volume of traffic to the input stream, which is expected due to aggregation. The part I don't understand is, why in the kinesis input stream I don't see any throttling, while in the kinesis output stream the throttling is pretty significant, sometimes 20%-50%? Any suggestions? Thanks.

本文标签: amazon web servicesAWS Kinesis sink throttling issueStack Overflow