admin管理员组

文章数量:1414941

I have a setup where I’m using a Neo4j source connector to propagate change events to a Kafka topic. My Spring Boot application consumes these messages. However, when a transaction in Neo4j involves multiple changes, the connector sends each change as a separate Kafka message (But having the same txId and a sequence number). I want all changes from a single transaction to be sent to my application as a single Kafka message so that I can process them together.

Sample message from kafka:

{
  "id": "CJUg4WrNW0Y7ttlh8lbkxfwAAAAAAAAEMAAAAAAAAAACAAABkh21o5c=",
  "txId": 1072,
  "seq": 2,
  "event": {
    "elementId": "4:9520e16a-cd5b-463b-b6d9-61f256e4c5fc:2073",
    "eventType": "NODE",
    "operation": "CREATE",
    "labels": [
      "Environment"
    ],
    "keys": {},
    "state": {
      "before": null,
      "after": {
        "labels": [
          "Environment"
        ],
        "properties": {
          "name": {
            "B": null,
            "I64": null,
            "F64": null,
            "S": "Dev",
            "BA": null,
            "TLD": null,
            "TLDT": null,
            "TLT": null,
            "TZDT": null,
            "TOT": null,
            "TD": null,
            "SP": null,
            "LB": null,
            "LI64": null,
            "LF64": null,
            "LS": null,
            "LTLD": null,
            "LTLDT": null,
            "LTLT": null,
            "LZDT": null,
            "LTOT": null,
            "LTD": null,
            "LSP": null
          },
          "id": {
            "B": null,
            "I64": null,
            "F64": null,
            "S": "78b90e78-9b79-4330-9d02-7895f349964b",
            "BA": null,
            "TLD": null,
            "TLDT": null,
            "TLT": null,
            "TZDT": null,
            "TOT": null,
            "TD": null,
            "SP": null,
            "LB": null,
            "LI64": null,
            "LF64": null,
            "LS": null,
            "LTLD": null,
            "LTLDT": null,
            "LTLT": null,
            "LZDT": null,
            "LTOT": null,
            "LTD": null,
            "LSP": null
          }
        }
      }
    }
  }
}

Current Setup

My neo4j server version is 5.22 and Kafka connect version is 5.1.1

Spring Boot Application: Consumes messages from the Kafka topic and processes them.

Problem

When a Neo4j transaction involves multiple changes (e.g., creating multiple nodes or relationships), the connector sends each change as a separate Kafka message. This makes it difficult to process the changes as a single unit in my application.

Example Scenario

  1. A Neo4j transaction creates 3 nodes and 2 relationships.
  2. The connector sends 5 separate Kafka messages (one for each change).
  3. I want these 5 changes to be sent as a single Kafka message.

What I’ve Tried

I looked into the Neo4j connector configuration but couldn’t find a way to group changes by transaction.

I considered aggregating messages in my Spring Boot application in an in-memory buffer, but this feels error-prone and complex.

I have a setup where I’m using a Neo4j source connector to propagate change events to a Kafka topic. My Spring Boot application consumes these messages. However, when a transaction in Neo4j involves multiple changes, the connector sends each change as a separate Kafka message (But having the same txId and a sequence number). I want all changes from a single transaction to be sent to my application as a single Kafka message so that I can process them together.

Sample message from kafka:

{
  "id": "CJUg4WrNW0Y7ttlh8lbkxfwAAAAAAAAEMAAAAAAAAAACAAABkh21o5c=",
  "txId": 1072,
  "seq": 2,
  "event": {
    "elementId": "4:9520e16a-cd5b-463b-b6d9-61f256e4c5fc:2073",
    "eventType": "NODE",
    "operation": "CREATE",
    "labels": [
      "Environment"
    ],
    "keys": {},
    "state": {
      "before": null,
      "after": {
        "labels": [
          "Environment"
        ],
        "properties": {
          "name": {
            "B": null,
            "I64": null,
            "F64": null,
            "S": "Dev",
            "BA": null,
            "TLD": null,
            "TLDT": null,
            "TLT": null,
            "TZDT": null,
            "TOT": null,
            "TD": null,
            "SP": null,
            "LB": null,
            "LI64": null,
            "LF64": null,
            "LS": null,
            "LTLD": null,
            "LTLDT": null,
            "LTLT": null,
            "LZDT": null,
            "LTOT": null,
            "LTD": null,
            "LSP": null
          },
          "id": {
            "B": null,
            "I64": null,
            "F64": null,
            "S": "78b90e78-9b79-4330-9d02-7895f349964b",
            "BA": null,
            "TLD": null,
            "TLDT": null,
            "TLT": null,
            "TZDT": null,
            "TOT": null,
            "TD": null,
            "SP": null,
            "LB": null,
            "LI64": null,
            "LF64": null,
            "LS": null,
            "LTLD": null,
            "LTLDT": null,
            "LTLT": null,
            "LZDT": null,
            "LTOT": null,
            "LTD": null,
            "LSP": null
          }
        }
      }
    }
  }
}

Current Setup

My neo4j server version is 5.22 and Kafka connect version is 5.1.1

Spring Boot Application: Consumes messages from the Kafka topic and processes them.

Problem

When a Neo4j transaction involves multiple changes (e.g., creating multiple nodes or relationships), the connector sends each change as a separate Kafka message. This makes it difficult to process the changes as a single unit in my application.

Example Scenario

  1. A Neo4j transaction creates 3 nodes and 2 relationships.
  2. The connector sends 5 separate Kafka messages (one for each change).
  3. I want these 5 changes to be sent as a single Kafka message.

What I’ve Tried

I looked into the Neo4j connector configuration but couldn’t find a way to group changes by transaction.

I considered aggregating messages in my Spring Boot application in an in-memory buffer, but this feels error-prone and complex.

Share Improve this question edited Feb 11 at 12:54 Sarthak Sharma asked Feb 11 at 12:46 Sarthak SharmaSarthak Sharma 1912 silver badges9 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 1

Unfortunately this is not supported. Neo4j Connector for Kafka publishes all change messages received from CDC in the same way that it publishes to target topic(s). Note that this is designed in this way since publishing all change events within a single message would cause several problems (such as hitting memory limits or message size limits), especially for large transactions.

If this is a must for you, what you already suggested might be the best option.

本文标签: