admin管理员组文章数量:1291026
I want in Flink to migrate huge table from cassandra so I do:
@Override
public Source<HistoryReadout, CassandraSplit, CassandraEnumeratorState> getCassandraSource(ClusterBuilder clusterBuilder, String keyspace) {
String query = String.format("SELECT * FROM %s.tablename;", keyspace);
long maxSplitMemorySize = MemorySize.ofMebiBytes(10).getBytes();
return new OPTCassandraSource<>(clusterBuilder,
maxSplitMemorySize,
HistoryReadout.class,
query,
() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)});
}
and standard Source Transform Sink
Source<HistoryReadout, CassandraSplit, CassandraEnumeratorState> cassandraSource = getCassandraSource(clusterBuilder, keyspace);
DataStream<Tuple2<String, HistoryReadout>> processedStream = cassandraStream.process(
new ProcessFunction<>() {
@Override
public void processElement(HistoryReadout readout, Context context, Collector<Tuple2<String, HistoryReadout>> out) {
System.out.println("In process element"+readout);
try {
String tableName = HistoryReadoutCassandraServiceUtil.getRowTableName(keyspace, readout);
System.out.println(tableName);
out.collect(new Tuple2<>(tableName, readout));
} catch (Exception e) {
System.err.printf("Error generating table name for record: %s, Error: %s%n", readout, e.getMessage());
}
}
}
);
but from what I debug is that connector fetches data from new queries using token( and never lands in processing state. I need to process this huge table in some batches. What I am doing wrong here ? thanks
本文标签: cassandraFlink Java Api process holw huge tableStack Overflow
版权声明:本文标题:cassandra - Flink Java Api process holw huge table - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1741519352a2383087.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论