admin管理员组文章数量:1410717
I'm using Debezium Server to capture changes from a PostgreSQL database and publish them to Google Cloud Pub/Sub. I want to set a table-specific ordering key so that messages related to the same entity are processed in order.
What I’ve Done:
- Implemented a custom Kafka Connect SMT (Single Message Transformation) to derive the ordering key based on the table name.
- Configured Debezium's Pub/Sub sink to use this key.
- Expecting the ordering key in the Pub/Sub metadata to be set dynamically based on the record's value.
Code Implementation: Custom Transformation (TableSpecificOrderingKey.java)
package com.example.debezium.transforms;
import .apache.kafkamon.config.ConfigDef;
import .apache.kafka.connect.connector.ConnectRecord;
import .apache.kafka.connect.data.Struct;
import .apache.kafka.connect.transforms.Transformation;
import java.util.Map;
import .apache.kafka.connect.header.Headers;
public class TableSpecificOrderingKey<R extends ConnectRecord<R>> implements Transformation<R> {
@Override
public R apply(R record) {
// Ensure the record value is a Struct
if (record.value() == null || !(record.value() instanceof Struct)) {
return record;
}
Struct valueStruct = (Struct) record.value();
Struct sourceStruct = valueStruct.getStruct("source");
if (sourceStruct == null) {
return record;
}
// Retrieve table name and the "after" struct
String tableName = sourceStruct.getString("table");
Struct afterStruct = valueStruct.getStruct("after");
if (afterStruct == null) {
return record;
}
// Determine the ordering value based on table-specific logic
Object orderingValue = determineOrderingValue(tableName, afterStruct);
if (orderingValue == null) {
return record;
}
// Debug logs (optional)
System.out.println("Pramod Record: " + record);
System.out.println("Pramod orderingValue: " + orderingValue);
// Add the ordering key as a header using a plain string value
Headers headers = record.headers();
headers.addString("debezium.ordering_key", orderingValue.toString());
return record;
}
private Object determineOrderingValue(String tableName, Struct afterStruct) {
if ("table_x".equalsIgnoreCase(tableName)) {
return afterStruct.get("id");
} else if (tableName.toLowerCase().startsWith("table_y")) {
return afterStruct.get("uuid");
} else {
return afterStruct.get("table_x_id");
}
}
@Override
public ConfigDef config() {
return new ConfigDef(); // No extra configuration required.
}
@Override
public void close() {
// No-op
}
@Override
public void configure(Map<String, ?> configs) {
// No configuration needed.
}
}
Debezium Sink Configuration (application.properties)
# --- Debezium Configuration ---
debezium.format.key=json
debezium.format.value=json
debezium.format.header=json
...
# --- Sink Configuration: Pubsub ---
debezium.sink.type=pubsub
debezium.sink.pubsub.project.id=test-app
debezium.sink.pubsub.ordering.enabled=true
debezium.sink.pubsub.ordering.key=debezium.ordering_key
...
# --- Transforms ---
debezium.source.transforms=routeDynamic,tableSpecificOrderingKey
debezium.source.transforms.routeDynamic.type=.apache.kafka.connect.transforms.RegexRouter
debezium.source.transforms.routeDynamic.regex=^test_local_prefix\.([^\.]+)\..*$
debezium.source.transforms.routeDynamic.replacement=$1_cdc_development
debezium.source.transforms.tableSpecificOrderingKey.type=com.example.debezium.transforms.TableSpecificOrderingKey
Problem: Even after applying this transformation and configuring the sink to use debezium.ordering_key, I still see that the ordering key in the Pub/Sub metadata is being set as the literal string "debezium.ordering_key", rather than the extracted value.
Here’s an example of a received Pub/Sub message:
ack_id: "...."
message {
data: "{"before":null,"after":{"id":17965,"table_x_id":18206},"source":{"version":"3.0.7.Final","connector":"postgresql","name":"test_local_prefix","ts_ms":1741692368644,"snapshot":"false","db":"test_local","sequence":"[\"59661152400\",\"59661152520\"]","ts_us":1741692368644992,"ts_ns":1741692368644992000,"schema":"level","table":"metrics","txId":738646,"lsn":59661152520,"xmin":null},"transaction":null,"op":"u","ts_ms":1741692370921,"ts_us":1741692370921005,"ts_ns":1741692370921005673}"
attributes {
key: "debezium.ordering_key"
value: "{\"schema\":{\"type\":\"string\",\"optional\":false},\"payload\":\"18206\"}"
}
message_id: "14324175668615445"
publish_time {
seconds: 1741692374
nanos: 426000000
}
ordering_key: "debezium.ordering_key"
where 18206 is the expected ordering key extracted from table_x_id.
How can I ensure that the ordering key extracted in the transformation is correctly set as the ordering key in the Pub/Sub message metadata? Any insights would be greatly appreciated!
版权声明:本文标题:apache kafka connect - How to Set Table-Specific Ordering Key in Debezium for Google PubSub Sink? - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1744787958a2625115.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论