admin管理员组

文章数量:1277888

How does Benthos handle the acknowledgement of pubsub messages? How can we manage ack/unack based on custom if-else conditions?

Here is the scenario i'm trying to achieve: I have written a Benthos job that pulls messages from a Pub/Sub subscriber, calls two APIs using that data, and sends Slack alerts for each failure. Finally, it leaves the message unacknowledged if any error is encountered while calling the two APIs.

The problem is that even after explicitly throwing an error using throw("Intentional error"), this job still acknowledges the pulled message. Ideally, it should leave the message unacknowledged so that Pub/Sub's redelivery handles the retry. Am I missing something here?

PS: I had to throw error explicitly because 2nd API(event_service) sends event related to the 1st API(update_api_call) status.

Here is my current config which is giving this issue:

input:
  gcp_pubsub:
    project: project-name
    subscription: subscription-name
    sync: true

pipeline:
  threads: 0
  processors:
    - branch:
        request_map: |
          root = {
            "A": this.A.number(),
          }
        processors:
          - try:
              - resource: update_api_call
          - catch:
              - mapping: |
                  root = {
                    "text": "@here Alert message update_api_call failure"
                  }
              - resource: slack_service
              - mapping: 'root = {"error": error()}'
        result_map: |
          root.responses.update_api_call = {
            "response": if this.error == null { this } else { null },
            "error": if this.error != null { this.error } else { meta("error") }
          }

    - branch:
        request_map: |
          root = {
            "i": uuid_v4(),
            "data": {
              "event": "event_name",
              "properties": {
                "A": this.A,
                "status": if this.responses.update_api_call.error == null { true } else { false },
                "failureReason": this.responses.update_api_call.error
              }
            }
          }
        processors:
          - try:
              - resource: event_service
          - catch:
              - mapping: |
                  root = {
                    "text": "@here Alert message send event failure",
                  }
              - resource: slack_service
              - mapping: 'root = {"error": error()}'
        result_map: |
          root.responses.event_service = {
            "response": if this.error == null { this } else { null },
            "error": if this.error != null { this.error } else { meta("error") }
          }

    # New processor to fail the message if either branch recorded an error.
    - bloblang: |
        if this.responses.update_api_call.error != null || this.responses.event_service.error != null {
          throw("Intentional error") # for leaving message unacked
        }

output:
  label: responses
  stdout:
    codec: lines

How does Benthos handle the acknowledgement of pubsub messages? How can we manage ack/unack based on custom if-else conditions?

Here is the scenario i'm trying to achieve: I have written a Benthos job that pulls messages from a Pub/Sub subscriber, calls two APIs using that data, and sends Slack alerts for each failure. Finally, it leaves the message unacknowledged if any error is encountered while calling the two APIs.

The problem is that even after explicitly throwing an error using throw("Intentional error"), this job still acknowledges the pulled message. Ideally, it should leave the message unacknowledged so that Pub/Sub's redelivery handles the retry. Am I missing something here?

PS: I had to throw error explicitly because 2nd API(event_service) sends event related to the 1st API(update_api_call) status.

Here is my current config which is giving this issue:

input:
  gcp_pubsub:
    project: project-name
    subscription: subscription-name
    sync: true

pipeline:
  threads: 0
  processors:
    - branch:
        request_map: |
          root = {
            "A": this.A.number(),
          }
        processors:
          - try:
              - resource: update_api_call
          - catch:
              - mapping: |
                  root = {
                    "text": "@here Alert message update_api_call failure"
                  }
              - resource: slack_service
              - mapping: 'root = {"error": error()}'
        result_map: |
          root.responses.update_api_call = {
            "response": if this.error == null { this } else { null },
            "error": if this.error != null { this.error } else { meta("error") }
          }

    - branch:
        request_map: |
          root = {
            "i": uuid_v4(),
            "data": {
              "event": "event_name",
              "properties": {
                "A": this.A,
                "status": if this.responses.update_api_call.error == null { true } else { false },
                "failureReason": this.responses.update_api_call.error
              }
            }
          }
        processors:
          - try:
              - resource: event_service
          - catch:
              - mapping: |
                  root = {
                    "text": "@here Alert message send event failure",
                  }
              - resource: slack_service
              - mapping: 'root = {"error": error()}'
        result_map: |
          root.responses.event_service = {
            "response": if this.error == null { this } else { null },
            "error": if this.error != null { this.error } else { meta("error") }
          }

    # New processor to fail the message if either branch recorded an error.
    - bloblang: |
        if this.responses.update_api_call.error != null || this.responses.event_service.error != null {
          throw("Intentional error") # for leaving message unacked
        }

output:
  label: responses
  stdout:
    codec: lines
Share Improve this question asked Feb 23 at 22:07 Tarun KumarTarun Kumar 32 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

You'll have to use the reject or the reject_errored output explicitly to get the behaviour you're looking for. You can leverage the switch output for this purpose. The throw function just sets the error flag on the message, but the output will still process it. Also, if you use the delete() function, the message will be acked instead of being rejected (nacked).

本文标签: etlLeaving message unacknowledged in Benthos job with gcppubsub inputStack Overflow