admin管理员组文章数量:1379475
I am having issues with XCOM communication in the context of dynamically mapped tasks.
Here is my code snippet:
with DAG(
params={
"shas": Param(
type="array",
items={
"type": "string",
},
),
},
) as dag:
# Get the input data into the processing chain
@task(task_id="start", task_display_name="Initial input data")
def get_input_data(**context) -> list[str]:
return context["params"]["shas"]
@task_group(group_id="process_sha")
def process_sha(sha) -> None:
@task(task_id="collect_input_data", task_display_name="Prepare Input Data")
def collect_input_data(sha, **context):
return sha
bash_task = BashOperator(
task_id="bash_task",
map_index_template="{{ task_instance.xcom_pull(task_ids='collect_input_data') }}",
bash_command=r"""
echo "{{ task_instance.xcom_pull('task_ids=collect_input_data') }}"
""",
)
collect_input_data(sha) >> bash_task
input_data = get_input_data()
process_sha.expand(sha=input_data)
The value returned by the xcom_pull
call is always None. When using another XCOM outside of the dynamically mapped task, the value is as expected.So, what is the correct approach to get the sha
value into the Bash operator? And is there some way to get it into the Bash operator without using this extra Python task collect_input_data
and directly access the sha
parameter passed to the task group?
本文标签: Apache Airflow XCOM pull returns None in the context of dynamically mapped tasksStack Overflow
版权声明:本文标题:Apache Airflow XCOM pull returns None in the context of dynamically mapped tasks - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1744424513a2605620.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论