admin管理员组文章数量:1277888
How does Benthos handle the acknowledgement of pubsub messages? How can we manage ack/unack based on custom if-else conditions?
Here is the scenario i'm trying to achieve: I have written a Benthos job that pulls messages from a Pub/Sub subscriber, calls two APIs using that data, and sends Slack alerts for each failure. Finally, it leaves the message unacknowledged if any error is encountered while calling the two APIs.
The problem is that even after explicitly throwing an error using throw("Intentional error")
, this job still acknowledges the pulled message. Ideally, it should leave the message unacknowledged so that Pub/Sub's redelivery handles the retry. Am I missing something here?
PS: I had to throw error explicitly because 2nd API(event_service) sends event related to the 1st API(update_api_call) status.
Here is my current config which is giving this issue:
input:
gcp_pubsub:
project: project-name
subscription: subscription-name
sync: true
pipeline:
threads: 0
processors:
- branch:
request_map: |
root = {
"A": this.A.number(),
}
processors:
- try:
- resource: update_api_call
- catch:
- mapping: |
root = {
"text": "@here Alert message update_api_call failure"
}
- resource: slack_service
- mapping: 'root = {"error": error()}'
result_map: |
root.responses.update_api_call = {
"response": if this.error == null { this } else { null },
"error": if this.error != null { this.error } else { meta("error") }
}
- branch:
request_map: |
root = {
"i": uuid_v4(),
"data": {
"event": "event_name",
"properties": {
"A": this.A,
"status": if this.responses.update_api_call.error == null { true } else { false },
"failureReason": this.responses.update_api_call.error
}
}
}
processors:
- try:
- resource: event_service
- catch:
- mapping: |
root = {
"text": "@here Alert message send event failure",
}
- resource: slack_service
- mapping: 'root = {"error": error()}'
result_map: |
root.responses.event_service = {
"response": if this.error == null { this } else { null },
"error": if this.error != null { this.error } else { meta("error") }
}
# New processor to fail the message if either branch recorded an error.
- bloblang: |
if this.responses.update_api_call.error != null || this.responses.event_service.error != null {
throw("Intentional error") # for leaving message unacked
}
output:
label: responses
stdout:
codec: lines
How does Benthos handle the acknowledgement of pubsub messages? How can we manage ack/unack based on custom if-else conditions?
Here is the scenario i'm trying to achieve: I have written a Benthos job that pulls messages from a Pub/Sub subscriber, calls two APIs using that data, and sends Slack alerts for each failure. Finally, it leaves the message unacknowledged if any error is encountered while calling the two APIs.
The problem is that even after explicitly throwing an error using throw("Intentional error")
, this job still acknowledges the pulled message. Ideally, it should leave the message unacknowledged so that Pub/Sub's redelivery handles the retry. Am I missing something here?
PS: I had to throw error explicitly because 2nd API(event_service) sends event related to the 1st API(update_api_call) status.
Here is my current config which is giving this issue:
input:
gcp_pubsub:
project: project-name
subscription: subscription-name
sync: true
pipeline:
threads: 0
processors:
- branch:
request_map: |
root = {
"A": this.A.number(),
}
processors:
- try:
- resource: update_api_call
- catch:
- mapping: |
root = {
"text": "@here Alert message update_api_call failure"
}
- resource: slack_service
- mapping: 'root = {"error": error()}'
result_map: |
root.responses.update_api_call = {
"response": if this.error == null { this } else { null },
"error": if this.error != null { this.error } else { meta("error") }
}
- branch:
request_map: |
root = {
"i": uuid_v4(),
"data": {
"event": "event_name",
"properties": {
"A": this.A,
"status": if this.responses.update_api_call.error == null { true } else { false },
"failureReason": this.responses.update_api_call.error
}
}
}
processors:
- try:
- resource: event_service
- catch:
- mapping: |
root = {
"text": "@here Alert message send event failure",
}
- resource: slack_service
- mapping: 'root = {"error": error()}'
result_map: |
root.responses.event_service = {
"response": if this.error == null { this } else { null },
"error": if this.error != null { this.error } else { meta("error") }
}
# New processor to fail the message if either branch recorded an error.
- bloblang: |
if this.responses.update_api_call.error != null || this.responses.event_service.error != null {
throw("Intentional error") # for leaving message unacked
}
output:
label: responses
stdout:
codec: lines
Share
Improve this question
asked Feb 23 at 22:07
Tarun KumarTarun Kumar
32 bronze badges
1 Answer
Reset to default 0You'll have to use the reject
or the reject_errored
output explicitly to get the behaviour you're looking for. You can leverage the switch
output for this purpose. The throw
function just sets the error flag on the message, but the output will still process it. Also, if you use the delete()
function, the message will be acked instead of being rejected (nacked).
本文标签: etlLeaving message unacknowledged in Benthos job with gcppubsub inputStack Overflow
版权声明:本文标题:etl - Leaving message unacknowledged in Benthos job with gcp_pubsub input - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1741300851a2371082.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论