admin管理员组文章数量:1414938
I'm using Flink's MapBundleFunction to achieve mini batch processing using datastream. In order to achieve this I'm keying the stream based on an attribute(of flink's internal RowData type) and then I keep on accumulating the elements in a hashmap(which happens to be of type <RowData, MyPojo>). When triggering the finishBundle function, I'm iterating through all the keys, setting the key for the context on every iteration using
ctx.setCurrentKey(currentKey);
performing the transformations and updating the state.
Here are what the key components of the code look like.
- Setter of the key based on which the stream is being keyed
public void setStateKey(String id) {
this.setStateKey = GenericRowData.of(id);
}
Keying of the stream
stream1 .union(stream2) .keyBy(MyPojo::getStateKey) .transform("M",TypeInformation.of(MyPojo.class), microBatchAccumulator) .sinkTo(kafkaSink);
Implementation of the finishBundle function.
@Override public void finishBundle(Map<RowData, List<MoPojo>> buffer, Collector<MyPojo> collector) throws Exception { buffer.forEach((currentKey, currentValue) -> { log.info("Hash of the current Key -> {}", currentKey.hashCode()); ctx.setCurrentKey(currentKey); /....some code../////
This setup works fine locally, when deployed on kubernetes fails with the exception:
Caused by: java.lang.IllegalArgumentException: Key group 193 is not in KeyGroupRange{startKeyGroup=144, endKeyGroup=179}. Unless you're directly using low level state access APIs, this is most likely caused by non-deterministic shuffle key (hashCode and equals implementation).
at .apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37) ~[flink-dist-1.19.0.jar:1.19.0]
at .apache.flink.runtime.state.InternalKeyContextImpl.setCurrentKeyGroupIndex(InternalKeyContextImpl.java:74) ~[flink-dist-1.19.0.jar:1.19.0]
at .apache.flink.runtime.state.AbstractKeyedStateBackend.setCurrentKey(AbstractKeyedStateBackend.java:249) ~[flink-dist-1.19.0.jar:1.19.0]
at .apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:430) ~[flink-dist-1.19.0.jar:1.19.0]
at .apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:383) ~[flink-dist-1.19.0.jar:1.19.0]
... 18 more
The hashcode that I'm logging also seems to be inconsistent over restarts, for the same string ids being processed.
Flink version -> 1.19.0
parallelism -> 20
max parallelism -> 720
本文标签: Issue while setting current context using MapBundleOperator in FlinkStack Overflow
版权声明:本文标题:Issue while setting current context using MapBundleOperator in Flink - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1745166014a2645683.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论