admin管理员组文章数量:1410674
I implemented the following pipeline:
DAG Graph
As seen from the graph, "tracker" and "tracker_second" are BranchPythonOperators. I am passing them a list of tasks that need to be executed via Xcom (this is done through the same Xcom!):
tracker = BranchPythonOperator(
task_id="tracker",
python_callable=get_task_for_update,
op_kwargs={"manual_run_task_list": "{{ params.manual_run_task_list }}"},
trigger_rule=TriggerRule.NONE_FAILED,
)
tracker_second = BranchPythonOperator(
task_id="tracker_second",
python_callable=get_task_for_update,
op_kwargs={"manual_run_task_list": "{{ params.manual_run_task_list }}"},
trigger_rule=TriggerRule.NONE_FAILED,
)
I need to run specific tasks, the names of which are passed through the Xcom variable manual_run_task_list. Moreover, tasks from the first and second groups should not overlap.
During testing, I discovered strange behavior from the BranchPythonOperator. I passed the following parameters:
{
"manual_run_task_list":[
0:"task_1"
1:"task_22"
2:"task_33"
3:"task_444"
4:"task_555"
5:"task_666"
]
}
Three tasks from the first group and three tasks from the second group should run, but in practice, the outcome is different:
Graph-Result
While investigating Xcom, I found that "tracker_second" had an unclear result for the parameter skipmixin_key:
"tracker":
return_value: ['task_1', 'task_22', 'task_33', 'task_444', 'task_555', 'task_666']
skipmixin_key: {'followed': ['task_22', 'task_33', 'task_1']}
"tracker_second":
return_value: ['task_1', 'task_22', 'task_33', 'task_444', 'task_555', 'task_666']
skipmixin_key: {'followed': ['task_333', 'task_444', 'task_555', 'task_666', 'task_111', 'task_222']}
It ran all tasks, not just the specific ones like "tracker" did.
Can you please help me understand the reason for this behavior?
Apache-Airflow Version: 2.10.4
Solution
The solution to this problem is to use a separate manual_run_task_list for each group of tasks instead of one for all tasks. That is, for "tracker," you would have one list of tasks from the first group, and for "tracker_second," you would have a second list. Example:
tracker = BranchPythonOperator(
task_id="tracker",
python_callable=get_task_for_update,
op_kwargs={"manual_run_task_list": "{{ params.manual_run_task_FIRST_GROUP }}"},
trigger_rule=TriggerRule.NONE_FAILED,
)
tracker_second = BranchPythonOperator(
task_id="tracker_second",
python_callable=get_task_for_update,
op_kwargs={"manual_run_task_list": "{{ params.manual_run_task_SECOND_GROUP }}"},
trigger_rule=TriggerRule.NONE_FAILED,
)
However, this seems to complicate the structure of the pipeline, so I would like to avoid that.
本文标签:
版权声明:本文标题:python - Airflow BranchPythonOperator works incorrectly when called in the following ways in the DAG - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1744994548a2636591.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论