admin管理员组

文章数量:1302360

I am using Apache Airflow (2.2.3) taskflow API in conjunction with the TriggerDagRunOperator.

I am trying to dynamically pass a variable to the conf option of TriggerDagRunOperator using jinja. Please see simple example below.

@dag(dag_id='my_dag_trigger')

def first_taskflow():

    @task(multiple_outputs=True)
    def create_some_values():

        return {'v1': value1, 'v2', value2}

    @task
    def trigger_dag(**kwargs):

        TriggerDagRunOperator(
            task_id='my_second_dag',
            conf={
                'v1': "{{ task_instance.xcom_pull(task_ids='create_some_values', key='v1') }}"
            }).execute(kwargs)

    create_some_values() >> trigger_dag()

first_taskflow()

# ------- DAG TO BE TRIGGERED -------

@dag(dag_id='my_second_dag')

def secondary_taskflow():

    @task()
    def secondary_task(**context):
        print(context['dag_run'].conf.get('v1'))

    secondary_task()

secondary_taskflow()

When the value is passed into the triggered DAG the jinja {{ task_instance.xcom_pull(task_ids='create_some_values', key='v1') }} statement is not returning the parameter value for v1 but returning "{{ task_instance.xcom_pull(task_ids='create_some_values', key='v1') }}" as the value.

Is there a way to pass values dynamically into the conf?

Any help greatly appreciated.

本文标签: