admin管理员组

文章数量:1410717

I'm using Debezium Server to capture changes from a PostgreSQL database and publish them to Google Cloud Pub/Sub. I want to set a table-specific ordering key so that messages related to the same entity are processed in order.

What I’ve Done:

  1. Implemented a custom Kafka Connect SMT (Single Message Transformation) to derive the ordering key based on the table name.
  2. Configured Debezium's Pub/Sub sink to use this key.
  3. Expecting the ordering key in the Pub/Sub metadata to be set dynamically based on the record's value.

Code Implementation: Custom Transformation (TableSpecificOrderingKey.java)

package com.example.debezium.transforms;

import .apache.kafkamon.config.ConfigDef;
import .apache.kafka.connect.connector.ConnectRecord;
import .apache.kafka.connect.data.Struct;
import .apache.kafka.connect.transforms.Transformation;
import java.util.Map;
import .apache.kafka.connect.header.Headers;

public class TableSpecificOrderingKey<R extends ConnectRecord<R>> implements Transformation<R> {

    @Override
    public R apply(R record) {
        // Ensure the record value is a Struct
        if (record.value() == null || !(record.value() instanceof Struct)) {
            return record;
        }

        Struct valueStruct = (Struct) record.value();
        Struct sourceStruct = valueStruct.getStruct("source");
        if (sourceStruct == null) {
            return record;
        }

        // Retrieve table name and the "after" struct
        String tableName = sourceStruct.getString("table");
        Struct afterStruct = valueStruct.getStruct("after");
        if (afterStruct == null) {
            return record;
        }

        // Determine the ordering value based on table-specific logic
        Object orderingValue = determineOrderingValue(tableName, afterStruct);
        if (orderingValue == null) {
            return record;
        }

        // Debug logs (optional)
        System.out.println("Pramod Record: " + record);
        System.out.println("Pramod orderingValue: " + orderingValue);

        // Add the ordering key as a header using a plain string value
        Headers headers = record.headers();
        headers.addString("debezium.ordering_key", orderingValue.toString());

        return record;
    }

    private Object determineOrderingValue(String tableName, Struct afterStruct) {
        if ("table_x".equalsIgnoreCase(tableName)) {
            return afterStruct.get("id");
        } else if (tableName.toLowerCase().startsWith("table_y")) {
            return afterStruct.get("uuid");
        } else {
            return afterStruct.get("table_x_id");
        }
    }

    @Override
    public ConfigDef config() {
        return new ConfigDef(); // No extra configuration required.
    }

    @Override
    public void close() {
        // No-op
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // No configuration needed.
    }
}

Debezium Sink Configuration (application.properties)

# --- Debezium Configuration ---
debezium.format.key=json
debezium.format.value=json
debezium.format.header=json
...

# --- Sink Configuration: Pubsub ---
debezium.sink.type=pubsub
debezium.sink.pubsub.project.id=test-app
debezium.sink.pubsub.ordering.enabled=true
debezium.sink.pubsub.ordering.key=debezium.ordering_key

...
# --- Transforms ---
debezium.source.transforms=routeDynamic,tableSpecificOrderingKey


debezium.source.transforms.routeDynamic.type=.apache.kafka.connect.transforms.RegexRouter
debezium.source.transforms.routeDynamic.regex=^test_local_prefix\.([^\.]+)\..*$
debezium.source.transforms.routeDynamic.replacement=$1_cdc_development

debezium.source.transforms.tableSpecificOrderingKey.type=com.example.debezium.transforms.TableSpecificOrderingKey

Problem: Even after applying this transformation and configuring the sink to use debezium.ordering_key, I still see that the ordering key in the Pub/Sub metadata is being set as the literal string "debezium.ordering_key", rather than the extracted value.

Here’s an example of a received Pub/Sub message:

ack_id: "...."
message {
  data: "{"before":null,"after":{"id":17965,"table_x_id":18206},"source":{"version":"3.0.7.Final","connector":"postgresql","name":"test_local_prefix","ts_ms":1741692368644,"snapshot":"false","db":"test_local","sequence":"[\"59661152400\",\"59661152520\"]","ts_us":1741692368644992,"ts_ns":1741692368644992000,"schema":"level","table":"metrics","txId":738646,"lsn":59661152520,"xmin":null},"transaction":null,"op":"u","ts_ms":1741692370921,"ts_us":1741692370921005,"ts_ns":1741692370921005673}"
  attributes {
    key: "debezium.ordering_key"
    value: "{\"schema\":{\"type\":\"string\",\"optional\":false},\"payload\":\"18206\"}"
  }
  message_id: "14324175668615445"
  publish_time {
    seconds: 1741692374
    nanos: 426000000
  }
  ordering_key: "debezium.ordering_key"

where 18206 is the expected ordering key extracted from table_x_id.

How can I ensure that the ordering key extracted in the transformation is correctly set as the ordering key in the Pub/Sub message metadata? Any insights would be greatly appreciated!

本文标签: apache kafka connectHow to Set TableSpecific Ordering Key in Debezium for Google PubSub SinkStack Overflow