admin管理员组文章数量:1350346
Below is a minimal implementation of a branch operator using the taskflow api.
The dag will execute either odd_task
or even_task
based on the string given by branch_on_condition
. odd_task
or even_task
will also use the return_int
value on which the branching decision was made. Then the final_task
will be executed. All straight forward.
from airflow.decorators import dag, task
@dag(dag_id='Example_Dag_Simple_Branch')
def simple_dag():
@task(task_id='return_int')
def return_int():
return 3
@task.branch(task_id='branch_on_condition')
def branch_on_condition(upstream_value):
if upstream_value & 1:
return 'odd_task'
else:
return 'even_task'
@task(task_id='odd_task')
def odd_task(input_val):
print(f"{input_val} is an odd number")
return input_val
@task(task_id='even_task')
def even_task(input_val):
print(f"{input_val} is an even number")
return input_val
@task(task_id='final_task', trigger_rule='one_success')
def final_task():
print('final task executed')
return
returned_int = return_int()
branch_value = branch_on_condition(upstream_value=returned_int)
even_task_return = even_task(input_val=returned_int)
odd_task_return = odd_task(input_val=returned_int)
final_return = final_task()
branch_value >> [even_task_return, odd_task_return] >> final_return
simple_dag()
This logs either INFO - 3 is an odd number
or INFO - 2 is an even number
.
Why am I unable to implement a similar pattern using dynamically mapped tasks?
If the return_int
task were to now return an n length list of integers return_list
I can dynamically branch for this list fine, but BOTH branches are executed!
from airflow.decorators import dag, task
@dag(dag_id='Example_Dag_Dynamic_Branch')
def simple_dag():
@task(task_id='return_list')
def return_list():
return [1,2,3,4,5,6,7]
@task.branch(task_id='branch_on_condition')
def branch_on_condition(upstream_value):
if upstream_value & 1:
return 'odd_task'
else:
return 'even_task'
@task(task_id='odd_task')
def odd_task(input_val):
print(f"{input_val} is an odd number")
return input_val
@task(task_id='even_task')
def even_task(input_val):
print(f"{input_val} is an even number")
return input_val
@task(task_id='final_task', trigger_rule='one_success')
def final_task():
print('final task executed')
return
returned_list = return_list()
branch_value = branch_on_condition.expand(upstream_value=returned_list)
even_task_return = even_task.expand(input_val=returned_list)
odd_task_return = odd_task.expand(input_val=returned_list)
final_return = final_task()
branch_value >> [even_task_return, odd_task_return] >> final_return
simple_dag()
And so the output of the odd_task
is for example 7 tasks all that say INFO - n is an odd number
for n 1 through 7, which is of course not true.
I have tried
- Packing the branch and downstream tasks into a task group.
- Messing around with
.expand()
and.partial()
- Reading related questions: 1. 2. 3.
None of these really answers the question!
Finally, I am aware that this behavior could literally just a be standalone if else block within an operator. But in reality the if else behavior could become quite complex and might be nice to separate into multiple tasks.
本文标签: pythonIn Airflow 210 can I use dynamic task mapping with BranchPythonOperatorStack Overflow
版权声明:本文标题:python - In Airflow 2.10 can I use dynamic task mapping with BranchPythonOperator? - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1743877551a2554605.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论