admin管理员组文章数量:1303032
Let's say I have a dag with multiple tasks
task1 >> task2 >> [task3, task4]
I want task1
to be executed at the beginning.
Then task2
.
If task2
ends with success, then execute task3
, else execute task4
.
I've tried to set the TriggerRule
all_failed
to task4
and, if task2
fails/ends with success everything works as desired.
However if task1
fails, task2
and task3
are marked as upstream_failed
, BUT task4
is executed.
According to .9.3/core-concepts/dags.html#trigger-rules this is "the expected behavior" ("all_failed: All upstream tasks are in a failed or upstream_failed state") but then it seems to me there is no way to set a trigger rule such as "execute task4
only if upstream is failed (and only failed)"
How can I achieve that?
Let's say I have a dag with multiple tasks
task1 >> task2 >> [task3, task4]
I want task1
to be executed at the beginning.
Then task2
.
If task2
ends with success, then execute task3
, else execute task4
.
I've tried to set the TriggerRule
all_failed
to task4
and, if task2
fails/ends with success everything works as desired.
However if task1
fails, task2
and task3
are marked as upstream_failed
, BUT task4
is executed.
According to https://airflow.apache./docs/apache-airflow/2.9.3/core-concepts/dags.html#trigger-rules this is "the expected behavior" ("all_failed: All upstream tasks are in a failed or upstream_failed state") but then it seems to me there is no way to set a trigger rule such as "execute task4
only if upstream is failed (and only failed)"
How can I achieve that?
Share asked Feb 10 at 16:35 Vito De TullioVito De Tullio 1,6854 gold badges35 silver badges59 bronze badges3 Answers
Reset to default 1 +450The issue arises because all_failed considers both failed and upstream_failed states, which means Task4 runs even if Task1 fails. Airflow does not have a built-in "only direct upstream failed" rule, but you can use BranchPythonOperator
to dynamically branch based on Task2's outcome.
...
task1 = DummyOperator(task_id="task1", dag=dag)
task2 = PythonOperator(
task_id='task2',
python_callable=task2_logic,
provide_context=True,
dag=dag
)
branch = BranchPythonOperator(
task_id='branching',
python_callable=branch_task,
provide_context=True,
dag=dag
)
task3 = DummyOperator(task_id="task3", dag=dag)
task4 = DummyOperator(task_id="task4", dag=dag)
task1 >> task2 >> branch
branch >> [task3, task4]
I think you can use ShortCircuitOperator between task1
and task2
in this case which allows you to skip all downstream tasks including task3
and task4
if its python_callable
return False
(task1
state is FAILED
in below code)
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python import PythonOperator, ShortCircuitOperator
from airflow.utils.state import State
from airflow.utils.trigger_rule import TriggerRule
def validate(**context):
task = context['dag_run'].get_task_instance('task1')
return not(task.state == State.FAILED)
dag = DAG(
dag_id="dag",
start_date=datetime(2025, 2, 12),
schedule_interval='@once',
)
with dag:
task1 = PythonOperator(
task_id='task1',
python_callable=lambda: True
)
short_circuit = ShortCircuitOperator(
task_id='short_circuit',
python_callable=validate,
trigger_rule=TriggerRule.ALL_DONE
)
task2 = PythonOperator(
task_id='task2',
python_callable=lambda: False
)
task1 >> short_ciruit >> task2 # >> [task3, task4]
You'll need to modify the task dependencies and trigger rules to achieve the behavior you want. Add a start
task as an anchor point using EmptyOperator
, then introduce two additional empty operators (task2_success
and task2_failure
) that act as gates:
task2_success
only proceeds if task2 succeedstask2_failure
only proceeds if task2 fails
Both task3 and task4 should use ALL_SUCCESS
trigger rule, but depend on different gates:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime
def task1_function():
# Your task1 logic here
pass
def task2_function():
# Your task2 logic here
pass
def task3_function():
# Your task3 logic here
pass
def task4_function():
# Your task4 logic here
pass
with DAG(
'conditional_execution_dag',
start_date=datetime(2024, 1, 1),
schedule_interval=None
) as dag:
# Create a starting point
start = EmptyOperator(task_id='start')
# Main tasks
task1 = PythonOperator(
task_id='task1',
python_callable=task1_function
)
task2 = PythonOperator(
task_id='task2',
python_callable=task2_function
)
# Create branching conditions using EmptyOperator
task2_success = EmptyOperator(
task_id='task2_success',
trigger_rule=TriggerRule.ALL_SUCCESS
)
task2_failure = EmptyOperator(
task_id='task2_failure',
trigger_rule=TriggerRule.ONE_FAILED
)
task3 = PythonOperator(
task_id='task3',
python_callable=task3_function,
trigger_rule=TriggerRule.ALL_SUCCESS
)
task4 = PythonOperator(
task_id='task4',
python_callable=task4_function,
trigger_rule=TriggerRule.ALL_SUCCESS
)
# Set dependencies
start >> task1 >> task2
task2 >> [task2_success, task2_failure]
task2_success >> task3
task2_failure >> task4
- If task1 fails, everything downstream is marked as
upstream_failed
and nothing else executes - If task1 succeeds but task2 fails, only the
task2_failure
gate passes, leading to task4 - If both task1 and task2 succeed, only the
task2_success
gate passes, leading to task3
本文标签: airflowTrigger a task iff direct upstream failedStack Overflow
版权声明:本文标题:airflow - Trigger a task iff direct upstream failed - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1741705534a2393537.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论