admin管理员组

文章数量:1336183

I have built a complex Flink job for data enrichment. The job consumes messages from external systems via Kafka, and the goal is to enrich these messages with reference data stored in a PostgreSQL database. After enrichment, additional validation and transformation are performed before the enriched data is sent to Kafka sinks.

The setup includes:

  1. Three Input Kafka Streams:

    • Each with different types of raw data requiring enrichment and transformation.
  2. Six Reference Tables in PostgreSQL:

    • The tables are updated daily, and the data is read using the PostgreSQL CDC connector.
  3. Enrichment Process:

    • The reference tables are joined and converted to DataStreams.
    • Kafka streams are enriched using broadcast streams and processed using BroadcastProcessFunction.

Here’s how I implemented this job:


Step 1: PostgreSQL CDC Table Creation

I use the PostgreSQL CDC connector to create tables for the reference data:

tableEnv.executeSql(
    "CREATE TABLE meters (" +
    "  id BIGINT," +
    "  res_spec_id BIGINT," +
    "  serial STRING" +
    ") WITH (" +
    "  'connector' = 'postgres-cdc'," +
    "  'hostname' = 'XXXX'," +
    "  'port' = '5432'," +
    "  'username' = 'XXXX'," +
    "  'password' = 'XXXX'," +
    "  'database-name' = 'name'," +
    "  'schema-name' = 'schema'," +
    "  'decoding.plugin.name' = 'pgoutput'," +
    "  'table-name' = 'meters'," +
    "  'slot.name' = 'meters_slot'," +
    "  'debezium.publication.name' = 'flink_publication'," +
    "  'scan.incremental.snapshot.enabled' = 'true'" +
    ")"
);

This is repeated for all six tables.


Step 2: Joining Reference Tables

The reference tables are joined to enrich the data. For example:

public static Table createMeterTable(StreamTableEnvironment tableEnv) {
    return tableEnv.sqlQuery(
        "SELECT * FROM meters mm " +
        "JOIN meter_chars mc ON mm.id = mc.meter_id " +
        "JOIN resource_specs rs ON rs.id = mm.res_spec_id"
    );
}

public static Table createUpTable(StreamTableEnvironment tableEnv) {
    return tableEnv.sqlQuery(
        "SELECT * FROM usage_points up " +
        "JOIN up_chars uc ON up.id = uc.usage_point_id"
    );
}

Step 3: Converting to DataStreams

After creating joined tables, I convert them to DataStreams:

public DataStream<Row> getMeterStream(StreamTableEnvironment tableEnv) {
    Table meterTable = createMeterTable(tableEnv);
    return tableEnv.toChangelogStream(meterTable);
}

Step 4: Broadcasting Reference Data

Broadcast streams are created for the enriched data:

public BroadcastStream<Row> getMeterBroadcastStream(DataStream<Row> meterStream) {
    MapStateDescriptor<MeterKey, List<MeterDeployment>> meterStateDescriptor = 
        EnrichmentBroadcastStateConfig.createMeterStateDescriptor();
    return meterStream.broadcast(meterStateDescriptor);
}

public static MapStateDescriptor<MeterKey, List<MeterDeployment>> createMeterStateDescriptor() {
    return new MapStateDescriptor<>(
        "meterStateDescriptor",
        TypeInformation.of(new TypeHint<>() {}),
        TypeInformation.of(new TypeHint<>() {})
    );
}

Step 5: Enriching Kafka Streams

The input Kafka streams are connected to broadcast streams using BroadcastProcessFunction:

public SingleOutputStreamOperator<ReadingBlockDto> enrichBlockWithMeters(
        DataStream<RawBlock> rawBlocks, 
        BroadcastStream<Row> meterBroadcast) {
    MapStateDescriptor<MeterKey, List<MeterDeployment>> meterStateDescriptor = 
        EnrichmentBroadcastStateConfig.createMeterStateDescriptor();
    return rawBlocks.connect(meterBroadcast)
                    .process(new BlockMeterEnrichmentFunction(meterStateDescriptor));
}

Step 6: Enrichment Function

Here’s an example of the BroadcastProcessFunction used for enrichment:

public class BlockMeterEnrichmentFunction extends BroadcastProcessFunction<RawBlock, Row, ReadingBlockDto> {
    private final MapStateDescriptor<MeterKey, List<MeterDeployment>> meterStateDescriptor;
    private final OutputTag<RawBlock> rejectedTag;

    public BlockMeterEnrichmentFunction(MapStateDescriptor<MeterKey, List<MeterDeployment>> meterStateDescriptor, OutputTag<RawBlock> rejectedTag) {
        this.meterStateDescriptor = meterStateDescriptor;
        this.rejectedTag = rejectedTag;
    }

    @Override
    public void processElement(RawBlock rawBlock, ReadOnlyContext ctx, Collector<ReadingBlockDto> collector) throws Exception {
        ReadOnlyBroadcastState<MeterKey, List<MeterDeployment>> meterState = ctx.getBroadcastState(meterStateDescriptor);

        List<MeterDeployment> meterDeployments = meterState.get(new MeterKey(rawBlock.getSerial(), rawBlock.getVendorId()));
        // Perform enrichment logic...
    }

    @Override
    public void processBroadcastElement(Row row, Context ctx, Collector<ReadingBlockDto> out) throws Exception {
        MeterDeployment meterDeployment = MeterDeployment.builder() // parse columns
                .build();

        BroadcastState<MeterKey, List<MeterDeployment>> broadcastState = ctx.getBroadcastState(meterStateDescriptor);
        MeterKey key = new MeterKey(meterDeployment.getSerial(), meterDeployment.getVendorId());

        List<MeterDeployment> meterDeployments = broadcastState.get(key);
        if (meterDeployments == null) {
            meterDeployments = new ArrayList<>();
        }

        switch (row.getKind()) {
            case DELETE:
                meterDeployments.remove(meterDeployment);
                if (meterDeployments.isEmpty()) {
                    broadcastState.remove(key);
                } else {
                    broadcastState.put(key, meterDeployments);
                }
                break;
            case UPDATE_BEFORE:
                meterDeployments.remove(meterDeployment);
                broadcastState.put(key, meterDeployments);
                break;
            case UPDATE_AFTER:
            case INSERT:
                meterDeployments.add(meterDeployment);
                broadcastState.put(key, meterDeployments);
                break;
        }
    }
}

Additional Context

  • Flink Version: 1.20
  • CDC Version: 3.2
  • Flink Operator: 1.10
  • State Backend: RocksDB
  • Running on Kubernetes.

Questions

1. Is There a Better Way to Handle This?

I use the Table API because I need the PostgreSQL CDC connector. However:

  • The BroadcastProcessFunction logic is duplicated for every Kafka stream, which seems inefficient.
  • Is there an alternative approach to the broadcast pattern for repeatedly joining the same reference data with different streams?

2. Memory Management

I am using RocksDB as the state backend, but I’m unclear about where certain components are stored:

  • a) Where are the CDC tables created with tableEnv.executeSql stored? Are they in memory, and if so, which memory?
  • b) Do joins between the CDC tables require separate memory, or are they performed lazily?
  • c) Where is the broadcast state stored? Is it in RocksDB or somewhere else?

3. RocksDB Resource Management

  • Since RocksDB is my state backend, is its usage limited by the memory and disk allocated to my TaskManager pods in Kubernetes?
  • Can I inspect or monitor RocksDB state usage (e.g., disk, memory, or compaction metrics), or is it essentially a black box?

I would appreciate any guidance on improving this architecture or insights into Flink’s state and memory management. Thank you!

I have built a complex Flink job for data enrichment. The job consumes messages from external systems via Kafka, and the goal is to enrich these messages with reference data stored in a PostgreSQL database. After enrichment, additional validation and transformation are performed before the enriched data is sent to Kafka sinks.

The setup includes:

  1. Three Input Kafka Streams:

    • Each with different types of raw data requiring enrichment and transformation.
  2. Six Reference Tables in PostgreSQL:

    • The tables are updated daily, and the data is read using the PostgreSQL CDC connector.
  3. Enrichment Process:

    • The reference tables are joined and converted to DataStreams.
    • Kafka streams are enriched using broadcast streams and processed using BroadcastProcessFunction.

Here’s how I implemented this job:


Step 1: PostgreSQL CDC Table Creation

I use the PostgreSQL CDC connector to create tables for the reference data:

tableEnv.executeSql(
    "CREATE TABLE meters (" +
    "  id BIGINT," +
    "  res_spec_id BIGINT," +
    "  serial STRING" +
    ") WITH (" +
    "  'connector' = 'postgres-cdc'," +
    "  'hostname' = 'XXXX'," +
    "  'port' = '5432'," +
    "  'username' = 'XXXX'," +
    "  'password' = 'XXXX'," +
    "  'database-name' = 'name'," +
    "  'schema-name' = 'schema'," +
    "  'decoding.plugin.name' = 'pgoutput'," +
    "  'table-name' = 'meters'," +
    "  'slot.name' = 'meters_slot'," +
    "  'debezium.publication.name' = 'flink_publication'," +
    "  'scan.incremental.snapshot.enabled' = 'true'" +
    ")"
);

This is repeated for all six tables.


Step 2: Joining Reference Tables

The reference tables are joined to enrich the data. For example:

public static Table createMeterTable(StreamTableEnvironment tableEnv) {
    return tableEnv.sqlQuery(
        "SELECT * FROM meters mm " +
        "JOIN meter_chars mc ON mm.id = mc.meter_id " +
        "JOIN resource_specs rs ON rs.id = mm.res_spec_id"
    );
}

public static Table createUpTable(StreamTableEnvironment tableEnv) {
    return tableEnv.sqlQuery(
        "SELECT * FROM usage_points up " +
        "JOIN up_chars uc ON up.id = uc.usage_point_id"
    );
}

Step 3: Converting to DataStreams

After creating joined tables, I convert them to DataStreams:

public DataStream<Row> getMeterStream(StreamTableEnvironment tableEnv) {
    Table meterTable = createMeterTable(tableEnv);
    return tableEnv.toChangelogStream(meterTable);
}

Step 4: Broadcasting Reference Data

Broadcast streams are created for the enriched data:

public BroadcastStream<Row> getMeterBroadcastStream(DataStream<Row> meterStream) {
    MapStateDescriptor<MeterKey, List<MeterDeployment>> meterStateDescriptor = 
        EnrichmentBroadcastStateConfig.createMeterStateDescriptor();
    return meterStream.broadcast(meterStateDescriptor);
}

public static MapStateDescriptor<MeterKey, List<MeterDeployment>> createMeterStateDescriptor() {
    return new MapStateDescriptor<>(
        "meterStateDescriptor",
        TypeInformation.of(new TypeHint<>() {}),
        TypeInformation.of(new TypeHint<>() {})
    );
}

Step 5: Enriching Kafka Streams

The input Kafka streams are connected to broadcast streams using BroadcastProcessFunction:

public SingleOutputStreamOperator<ReadingBlockDto> enrichBlockWithMeters(
        DataStream<RawBlock> rawBlocks, 
        BroadcastStream<Row> meterBroadcast) {
    MapStateDescriptor<MeterKey, List<MeterDeployment>> meterStateDescriptor = 
        EnrichmentBroadcastStateConfig.createMeterStateDescriptor();
    return rawBlocks.connect(meterBroadcast)
                    .process(new BlockMeterEnrichmentFunction(meterStateDescriptor));
}

Step 6: Enrichment Function

Here’s an example of the BroadcastProcessFunction used for enrichment:

public class BlockMeterEnrichmentFunction extends BroadcastProcessFunction<RawBlock, Row, ReadingBlockDto> {
    private final MapStateDescriptor<MeterKey, List<MeterDeployment>> meterStateDescriptor;
    private final OutputTag<RawBlock> rejectedTag;

    public BlockMeterEnrichmentFunction(MapStateDescriptor<MeterKey, List<MeterDeployment>> meterStateDescriptor, OutputTag<RawBlock> rejectedTag) {
        this.meterStateDescriptor = meterStateDescriptor;
        this.rejectedTag = rejectedTag;
    }

    @Override
    public void processElement(RawBlock rawBlock, ReadOnlyContext ctx, Collector<ReadingBlockDto> collector) throws Exception {
        ReadOnlyBroadcastState<MeterKey, List<MeterDeployment>> meterState = ctx.getBroadcastState(meterStateDescriptor);

        List<MeterDeployment> meterDeployments = meterState.get(new MeterKey(rawBlock.getSerial(), rawBlock.getVendorId()));
        // Perform enrichment logic...
    }

    @Override
    public void processBroadcastElement(Row row, Context ctx, Collector<ReadingBlockDto> out) throws Exception {
        MeterDeployment meterDeployment = MeterDeployment.builder() // parse columns
                .build();

        BroadcastState<MeterKey, List<MeterDeployment>> broadcastState = ctx.getBroadcastState(meterStateDescriptor);
        MeterKey key = new MeterKey(meterDeployment.getSerial(), meterDeployment.getVendorId());

        List<MeterDeployment> meterDeployments = broadcastState.get(key);
        if (meterDeployments == null) {
            meterDeployments = new ArrayList<>();
        }

        switch (row.getKind()) {
            case DELETE:
                meterDeployments.remove(meterDeployment);
                if (meterDeployments.isEmpty()) {
                    broadcastState.remove(key);
                } else {
                    broadcastState.put(key, meterDeployments);
                }
                break;
            case UPDATE_BEFORE:
                meterDeployments.remove(meterDeployment);
                broadcastState.put(key, meterDeployments);
                break;
            case UPDATE_AFTER:
            case INSERT:
                meterDeployments.add(meterDeployment);
                broadcastState.put(key, meterDeployments);
                break;
        }
    }
}

Additional Context

  • Flink Version: 1.20
  • CDC Version: 3.2
  • Flink Operator: 1.10
  • State Backend: RocksDB
  • Running on Kubernetes.

Questions

1. Is There a Better Way to Handle This?

I use the Table API because I need the PostgreSQL CDC connector. However:

  • The BroadcastProcessFunction logic is duplicated for every Kafka stream, which seems inefficient.
  • Is there an alternative approach to the broadcast pattern for repeatedly joining the same reference data with different streams?

2. Memory Management

I am using RocksDB as the state backend, but I’m unclear about where certain components are stored:

  • a) Where are the CDC tables created with tableEnv.executeSql stored? Are they in memory, and if so, which memory?
  • b) Do joins between the CDC tables require separate memory, or are they performed lazily?
  • c) Where is the broadcast state stored? Is it in RocksDB or somewhere else?

3. RocksDB Resource Management

  • Since RocksDB is my state backend, is its usage limited by the memory and disk allocated to my TaskManager pods in Kubernetes?
  • Can I inspect or monitor RocksDB state usage (e.g., disk, memory, or compaction metrics), or is it essentially a black box?

I would appreciate any guidance on improving this architecture or insights into Flink’s state and memory management. Thank you!

Share edited Nov 20, 2024 at 20:23 MLlamas xWF 453 bronze badges asked Nov 19, 2024 at 20:09 MiniHMiniH 352 silver badges7 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 1

There's a lot to potentially react to here; I'm going to limit myself to sharing some relevant information.

2a) Where are the CDC tables created with tableEnv.executeSql stored? Are they in memory, and if so, which memory?

Those tables are stored in Postgres. Flink tables are nothing more than metadata describing data stored externally to Flink.

2b) Do joins between the CDC tables require separate memory, or are they performed lazily?

Flink will materialize in its state backend whatever it needs to retain from the records streaming in from those tables to produce the desired results. In this case, these so-called regular joins will need to permanently store in RocksDB every record from both sides of the join -- this is the most expensive type of join. I've made a video about streaming joins to explain this in more detail.

2c) Where is the broadcast state stored? Is it in RocksDB or somewhere else?

Flink always stores broadcast state on the heap -- so it's in memory. And each parallel instance of the job will checkpoint its own copy of the broadcast state. Broadcast state should only be used for relatively small state that cannot be key-partitioned.

  1. RocksDB

RocksDB uses the local disks of the task managers, with an in-memory cache in off-heap memory.

There is an extensive set of metrics available.

  1. Is there a better way to handle this?

Without taking the time to carefully consider your requirements, I'll just share a couple of pointers, but in general I would look for a pure Table API solution, avoiding regular joins and broadcast state (if feasible). Maybe the approach to enrichment described here would work, and be less expensive. Or maybe you can use temporal joins (described in the video linked above).

本文标签: