admin管理员组

文章数量:1379475

I am having issues with XCOM communication in the context of dynamically mapped tasks.

Here is my code snippet:

with DAG(
  params={
    "shas": Param(
      type="array",
      items={
        "type": "string",
      },
    ),
  },
) as dag:
  # Get the input data into the processing chain
  @task(task_id="start", task_display_name="Initial input data")
  def get_input_data(**context) -> list[str]:
    return context["params"]["shas"]

  @task_group(group_id="process_sha")
  def process_sha(sha) -> None:

    @task(task_id="collect_input_data", task_display_name="Prepare Input Data")
    def collect_input_data(sha, **context):
      return sha

    bash_task = BashOperator(
      task_id="bash_task",
      map_index_template="{{ task_instance.xcom_pull(task_ids='collect_input_data') }}",            
      bash_command=r"""
        echo "{{ task_instance.xcom_pull('task_ids=collect_input_data') }}"
      """,
     )

     collect_input_data(sha) >> bash_task


  input_data = get_input_data()
  process_sha.expand(sha=input_data)

The value returned by the xcom_pull call is always None. When using another XCOM outside of the dynamically mapped task, the value is as expected.So, what is the correct approach to get the sha value into the Bash operator? And is there some way to get it into the Bash operator without using this extra Python task collect_input_data and directly access the sha parameter passed to the task group?

本文标签: Apache Airflow XCOM pull returns None in the context of dynamically mapped tasksStack Overflow