admin管理员组

文章数量:1291026

I want in Flink to migrate huge table from cassandra so I do:

 @Override
public Source<HistoryReadout, CassandraSplit, CassandraEnumeratorState> getCassandraSource(ClusterBuilder clusterBuilder, String keyspace) {
    String query = String.format("SELECT * FROM %s.tablename;", keyspace);

    long maxSplitMemorySize = MemorySize.ofMebiBytes(10).getBytes();
    return new OPTCassandraSource<>(clusterBuilder,
            maxSplitMemorySize,
            HistoryReadout.class,
            query,
            () -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)});
}

and standard Source Transform Sink

Source<HistoryReadout, CassandraSplit, CassandraEnumeratorState> cassandraSource = getCassandraSource(clusterBuilder, keyspace);
DataStream<Tuple2<String, HistoryReadout>> processedStream = cassandraStream.process(
                new ProcessFunction<>() {
                    @Override
                    public void processElement(HistoryReadout readout, Context context, Collector<Tuple2<String, HistoryReadout>> out) {
                        System.out.println("In process element"+readout);
                        try {
                            String tableName = HistoryReadoutCassandraServiceUtil.getRowTableName(keyspace, readout);
                            System.out.println(tableName);
                            out.collect(new Tuple2<>(tableName, readout));
                        } catch (Exception e) {
                            System.err.printf("Error generating table name for record: %s, Error: %s%n", readout, e.getMessage());
                        }
                    }
                }
        );

but from what I debug is that connector fetches data from new queries using token( and never lands in processing state. I need to process this huge table in some batches. What I am doing wrong here ? thanks

本文标签: cassandraFlink Java Api process holw huge tableStack Overflow