admin管理员组

文章数量:1305163

We are using google cloud pub/sub with Python pubsub_v1.PublisherClient. We have also enabled key ordering on both the subscription and the Python publisher. After the enablement of the key ordering, we are seeing duplicate published messages quite often-- anywhere from 30 to 50% of the messages.

Here is the code with some sample messages:

from google.cloud import pubsub_v1
import dataclasses
import json
import typing


class GCPubsubPublisher:
    def __init__(self, publisher_client: pubsub_v1.PublisherClient):
        self.publisher_client = publisher_client

    def serialize_data(self, data: typing.Any) -> bytes:
        if data is None:
            raise ValueError("data cannot be None")
        try:
            if dataclasses.is_dataclass(data):
                data = dataclasses.asdict(data)
            if not isinstance(data, str):
                data = json.dumps(data)

            return data.encode("utf-8")  # type: ignore[no-any-return]
        except Exception as exc:
            raise Exception(f"{data} cannot be serialized, Error: {exc}") from exc

    def publish(
        self,
        topic: str,
        data: typing.Any,
        async_wait_for_result: bool,
        ordering_key: str = "",
        **kwargs: typing.Dict[str, typing.Any],
    ) -> str | None:
        serialized_data = self.serialize_data(data)
        future = self.publisher_client.publish(
            topic=topic, data=serialized_data, ordering_key=ordering_key, **kwargs
        )

        # Calling future.result() waits asynchronously until the message has been published successfully
        # and returns the generated message_id
        return None if not async_wait_for_result else future.result()

    @staticmethod
    def build() -> "GCPubsubPublisher":
        publisher_options = pubsub_v1.types.PublisherOptions(
            enable_message_ordering=True, timeout=300
        )
        publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)
        return GCPubsubPublisher(publisher)

    @staticmethod
    def get_topic_path(project_name: str, topic_name: str) -> str:
        return pubsub_v1.PublisherClient.topic_path(project_name, topic_name)


if __name__ == "__main__":
    pubsub_client = GCPubsubPublisher.build()

    ordering_key1 = "50030469_bill_of_sales"
    form_payload1 = {
        "application_id": "50030469",
        "form_path": "gs://form_path/bill_of_sales.pdf",
        "timestamp": "2025-01-31T19:24:22+0000",
        "form_hash": "9191b4f4ea85d3a2be0f6c2f1cf513a5a40e301bc6306fdca49ee8c88144b696",
    }

    future = pubsub_client.publish(
        topic="projects/project_name/topics/form_submission",
        data=form_payload1,
        async_wait_for_result=True,
        ordering_key=ordering_key1,
    )

    ordering_key2 = "50030469_csc"
    form_payload2 = {
        "application_id": "50030469",
        "form_path": "gs://form_path/csc.pdf",
        "timestamp": "2025-01-31T19:24:22+0000",
        "form_hash": "9191b4f4ea85d3a2be0f6c2f1cf513a5a40e301bc6306fdca49ee8c88144b696",
    }

    future = pubsub_client.publish(
        topic="projects/project_name/topics/form_submission",
        data=form_payload2,
        async_wait_for_result=True,
        ordering_key=ordering_key2,
    )

Here are the duplicate messages-- you can see the message id is exactly the same for both of these: log output

Not sure if this is related to this or not-- but the message id comes in as both message_id as well as messageId. We never explicitly pass the message id-- not sure why we have two different variables for this?

本文标签: fastapiGoogle Pubsub with Python publisher duplicating messagesStack Overflow