admin管理员组

文章数量:1350346

Below is a minimal implementation of a branch operator using the taskflow api.

The dag will execute either odd_task or even_task based on the string given by branch_on_condition. odd_task or even_task will also use the return_int value on which the branching decision was made. Then the final_task will be executed. All straight forward.

from airflow.decorators import dag, task


@dag(dag_id='Example_Dag_Simple_Branch')
def simple_dag():
    
    @task(task_id='return_int')
    def return_int():
        return 3


    @task.branch(task_id='branch_on_condition')
    def branch_on_condition(upstream_value):
        if upstream_value & 1:
            return 'odd_task'
        else:
            return 'even_task'
        
        
    @task(task_id='odd_task')
    def odd_task(input_val):
        print(f"{input_val} is an odd number")
        return input_val
    
    
    @task(task_id='even_task')
    def even_task(input_val):
        print(f"{input_val} is an even number")
        return input_val        


    @task(task_id='final_task', trigger_rule='one_success')
    def final_task():
        print('final task executed')
        return
    
    
    returned_int = return_int()
    branch_value = branch_on_condition(upstream_value=returned_int)
    even_task_return = even_task(input_val=returned_int)
    odd_task_return = odd_task(input_val=returned_int)
    final_return = final_task()
    
    branch_value >> [even_task_return, odd_task_return] >> final_return
    
simple_dag()

This logs either INFO - 3 is an odd number or INFO - 2 is an even number.

Why am I unable to implement a similar pattern using dynamically mapped tasks?

If the return_int task were to now return an n length list of integers return_list I can dynamically branch for this list fine, but BOTH branches are executed!

from airflow.decorators import dag, task


@dag(dag_id='Example_Dag_Dynamic_Branch')
def simple_dag():
    
    @task(task_id='return_list')
    def return_list():
        return [1,2,3,4,5,6,7]


    @task.branch(task_id='branch_on_condition')
    def branch_on_condition(upstream_value):
        if upstream_value & 1:
            return 'odd_task'
        else:
            return 'even_task'
        
        
    @task(task_id='odd_task')
    def odd_task(input_val):
        print(f"{input_val} is an odd number")
        return input_val
    
    
    @task(task_id='even_task')
    def even_task(input_val):
        print(f"{input_val} is an even number")
        return input_val        


    @task(task_id='final_task', trigger_rule='one_success')
    def final_task():
        print('final task executed')
        return
    
    
    returned_list = return_list()
    branch_value = branch_on_condition.expand(upstream_value=returned_list)
    even_task_return = even_task.expand(input_val=returned_list)
    odd_task_return = odd_task.expand(input_val=returned_list)
    final_return = final_task()
    
    branch_value >> [even_task_return, odd_task_return] >> final_return
    
simple_dag()

And so the output of the odd_task is for example 7 tasks all that say INFO - n is an odd number for n 1 through 7, which is of course not true.

I have tried

  • Packing the branch and downstream tasks into a task group.
  • Messing around with .expand() and .partial()
  • Reading related questions: 1. 2. 3.

None of these really answers the question!

Finally, I am aware that this behavior could literally just a be standalone if else block within an operator. But in reality the if else behavior could become quite complex and might be nice to separate into multiple tasks.

本文标签: pythonIn Airflow 210 can I use dynamic task mapping with BranchPythonOperatorStack Overflow