admin管理员组文章数量:1302360
I am using Apache Airflow (2.2.3) taskflow API in conjunction with the TriggerDagRunOperator.
I am trying to dynamically pass a variable to the conf option of TriggerDagRunOperator using jinja. Please see simple example below.
@dag(dag_id='my_dag_trigger')
def first_taskflow():
@task(multiple_outputs=True)
def create_some_values():
return {'v1': value1, 'v2', value2}
@task
def trigger_dag(**kwargs):
TriggerDagRunOperator(
task_id='my_second_dag',
conf={
'v1': "{{ task_instance.xcom_pull(task_ids='create_some_values', key='v1') }}"
}).execute(kwargs)
create_some_values() >> trigger_dag()
first_taskflow()
# ------- DAG TO BE TRIGGERED -------
@dag(dag_id='my_second_dag')
def secondary_taskflow():
@task()
def secondary_task(**context):
print(context['dag_run'].conf.get('v1'))
secondary_task()
secondary_taskflow()
When the value is passed into the triggered DAG the jinja {{ task_instance.xcom_pull(task_ids='create_some_values', key='v1') }}
statement is not returning the parameter value for v1 but returning "{{ task_instance.xcom_pull(task_ids='create_some_values', key='v1') }}" as the value.
Is there a way to pass values dynamically into the conf?
Any help greatly appreciated.
本文标签:
版权声明:本文标题:python - Apache Airflow Taskflow API: Pass XCOM value using JINJA2 into TriggerDagRunOperator conf - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1741701699a2393328.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论