admin管理员组

文章数量:1303032

Let's say I have a dag with multiple tasks

task1 >> task2 >> [task3, task4]

I want task1 to be executed at the beginning.

Then task2.

If task2 ends with success, then execute task3, else execute task4.

I've tried to set the TriggerRule all_failed to task4 and, if task2 fails/ends with success everything works as desired.

However if task1 fails, task2 and task3 are marked as upstream_failed, BUT task4 is executed.

According to .9.3/core-concepts/dags.html#trigger-rules this is "the expected behavior" ("all_failed: All upstream tasks are in a failed or upstream_failed state") but then it seems to me there is no way to set a trigger rule such as "execute task4 only if upstream is failed (and only failed)"

How can I achieve that?

Let's say I have a dag with multiple tasks

task1 >> task2 >> [task3, task4]

I want task1 to be executed at the beginning.

Then task2.

If task2 ends with success, then execute task3, else execute task4.

I've tried to set the TriggerRule all_failed to task4 and, if task2 fails/ends with success everything works as desired.

However if task1 fails, task2 and task3 are marked as upstream_failed, BUT task4 is executed.

According to https://airflow.apache./docs/apache-airflow/2.9.3/core-concepts/dags.html#trigger-rules this is "the expected behavior" ("all_failed: All upstream tasks are in a failed or upstream_failed state") but then it seems to me there is no way to set a trigger rule such as "execute task4 only if upstream is failed (and only failed)"

How can I achieve that?

Share asked Feb 10 at 16:35 Vito De TullioVito De Tullio 1,6854 gold badges35 silver badges59 bronze badges
Add a comment  | 

3 Answers 3

Reset to default 1 +450

The issue arises because all_failed considers both failed and upstream_failed states, which means Task4 runs even if Task1 fails. Airflow does not have a built-in "only direct upstream failed" rule, but you can use BranchPythonOperator to dynamically branch based on Task2's outcome.

...
task1 = DummyOperator(task_id="task1", dag=dag)

task2 = PythonOperator(
    task_id='task2',
    python_callable=task2_logic,
    provide_context=True,
    dag=dag
)

branch = BranchPythonOperator(
    task_id='branching',
    python_callable=branch_task,
    provide_context=True,
    dag=dag
)

task3 = DummyOperator(task_id="task3", dag=dag)
task4 = DummyOperator(task_id="task4", dag=dag)

task1 >> task2 >> branch
branch >> [task3, task4]

I think you can use ShortCircuitOperator between task1 and task2 in this case which allows you to skip all downstream tasks including task3 and task4 if its python_callable return False (task1 state is FAILED in below code)

from datetime import datetime
from airflow.models import DAG
from airflow.operators.python import PythonOperator, ShortCircuitOperator
from airflow.utils.state import State
from airflow.utils.trigger_rule import TriggerRule


def validate(**context):
    task = context['dag_run'].get_task_instance('task1')
    return not(task.state == State.FAILED)

dag = DAG(
    dag_id="dag",
    start_date=datetime(2025, 2, 12),
    schedule_interval='@once',
)

with dag:
    task1 = PythonOperator(
        task_id='task1',
        python_callable=lambda: True
    )

    short_circuit = ShortCircuitOperator(
        task_id='short_circuit',
        python_callable=validate,
        trigger_rule=TriggerRule.ALL_DONE
    )

    task2 = PythonOperator(
        task_id='task2',
        python_callable=lambda: False 
    )

    task1 >> short_ciruit >> task2 # >> [task3, task4]

You'll need to modify the task dependencies and trigger rules to achieve the behavior you want. Add a start task as an anchor point using EmptyOperator, then introduce two additional empty operators (task2_success and task2_failure) that act as gates:

  • task2_success only proceeds if task2 succeeds

  • task2_failure only proceeds if task2 fails

Both task3 and task4 should use ALL_SUCCESS trigger rule, but depend on different gates:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime

def task1_function():
    # Your task1 logic here
    pass

def task2_function():
    # Your task2 logic here
    pass

def task3_function():
    # Your task3 logic here
    pass

def task4_function():
    # Your task4 logic here
    pass

with DAG(
    'conditional_execution_dag',
    start_date=datetime(2024, 1, 1),
    schedule_interval=None
) as dag:

    # Create a starting point
    start = EmptyOperator(task_id='start')

    # Main tasks
    task1 = PythonOperator(
        task_id='task1',
        python_callable=task1_function
    )

    task2 = PythonOperator(
        task_id='task2',
        python_callable=task2_function
    )

    # Create branching conditions using EmptyOperator
    task2_success = EmptyOperator(
        task_id='task2_success',
        trigger_rule=TriggerRule.ALL_SUCCESS
    )

    task2_failure = EmptyOperator(
        task_id='task2_failure',
        trigger_rule=TriggerRule.ONE_FAILED
    )

    task3 = PythonOperator(
        task_id='task3',
        python_callable=task3_function,
        trigger_rule=TriggerRule.ALL_SUCCESS
    )

    task4 = PythonOperator(
        task_id='task4',
        python_callable=task4_function,
        trigger_rule=TriggerRule.ALL_SUCCESS
    )

    # Set dependencies
    start >> task1 >> task2
    task2 >> [task2_success, task2_failure]
    task2_success >> task3
    task2_failure >> task4
  • If task1 fails, everything downstream is marked as upstream_failed and nothing else executes
  • If task1 succeeds but task2 fails, only the task2_failure gate passes, leading to task4
  • If both task1 and task2 succeed, only the task2_success gate passes, leading to task3

本文标签: airflowTrigger a task iff direct upstream failedStack Overflow