admin管理员组

文章数量:1410674

I implemented the following pipeline:

DAG Graph

As seen from the graph, "tracker" and "tracker_second" are BranchPythonOperators. I am passing them a list of tasks that need to be executed via Xcom (this is done through the same Xcom!):

tracker = BranchPythonOperator(
    task_id="tracker",
    python_callable=get_task_for_update,
    op_kwargs={"manual_run_task_list": "{{ params.manual_run_task_list }}"},
    trigger_rule=TriggerRule.NONE_FAILED,
)
tracker_second = BranchPythonOperator(
    task_id="tracker_second",
    python_callable=get_task_for_update,
    op_kwargs={"manual_run_task_list": "{{ params.manual_run_task_list }}"},
    trigger_rule=TriggerRule.NONE_FAILED,
)

I need to run specific tasks, the names of which are passed through the Xcom variable manual_run_task_list. Moreover, tasks from the first and second groups should not overlap.

During testing, I discovered strange behavior from the BranchPythonOperator. I passed the following parameters:

{
    "manual_run_task_list":[
        0:"task_1"
        1:"task_22"
        2:"task_33"
        3:"task_444"
        4:"task_555"
        5:"task_666"
    ]
}

Three tasks from the first group and three tasks from the second group should run, but in practice, the outcome is different:

Graph-Result

While investigating Xcom, I found that "tracker_second" had an unclear result for the parameter skipmixin_key:

"tracker": 
    return_value: ['task_1', 'task_22', 'task_33', 'task_444', 'task_555', 'task_666']
    skipmixin_key:  {'followed': ['task_22', 'task_33', 'task_1']}

"tracker_second": 
    return_value: ['task_1', 'task_22', 'task_33', 'task_444', 'task_555', 'task_666']
    skipmixin_key:  {'followed': ['task_333', 'task_444', 'task_555', 'task_666', 'task_111', 'task_222']}

It ran all tasks, not just the specific ones like "tracker" did.

Can you please help me understand the reason for this behavior?

Apache-Airflow Version: 2.10.4

Solution

The solution to this problem is to use a separate manual_run_task_list for each group of tasks instead of one for all tasks. That is, for "tracker," you would have one list of tasks from the first group, and for "tracker_second," you would have a second list. Example:

tracker = BranchPythonOperator(
    task_id="tracker",
    python_callable=get_task_for_update,
    op_kwargs={"manual_run_task_list": "{{ params.manual_run_task_FIRST_GROUP }}"},
    trigger_rule=TriggerRule.NONE_FAILED,
)
tracker_second = BranchPythonOperator(
    task_id="tracker_second",
    python_callable=get_task_for_update,
    op_kwargs={"manual_run_task_list": "{{ params.manual_run_task_SECOND_GROUP }}"},
    trigger_rule=TriggerRule.NONE_FAILED,
)

However, this seems to complicate the structure of the pipeline, so I would like to avoid that.

本文标签: