admin管理员组

文章数量:1123419

I'm trying to improve the readability of my dags using taskgroups.

I also rely on the taskflow syntax, and the ability to map function parameters / return values to xcom automatically.

Also also :) I'm a mypy / pyright user, and I try to keep the source code of my project type-annotated.

With this scenario in mind, I'm trying to understand what is the best way to describe my dags and taskgroups.

Let me share a (simplified) example of my dags:

@task
def source() -> str:
    return 'blablabla'

@task
def task_len(data: str) -> int:
    return len(data)

@task
def task_mul(times: int) -> str:
    return 'x' * times

@task_group
def tg(data: str) -> str:
    return task_mul(task_len(data))

@task
def sink(data: str) -> None:
    print(data)

@dag()
def dag_tg() -> None:
    sink(tg(source()))

dag_tg()

Here you can see that I have some tasks that return / consume from xcom.

I also have a taskgroup, where I want to capture the needed input xcom and describe the output one.

In the dag my goal is to "use" the taskgroup as a task, that just consume and produce xcom "directly".

This setup "works", as it is loaded correctly in airflow, it runs as expected etcetera.

Yet, I have a bunch of errors from mypy: on the tg definition I have the error Value of type variable "FReturn" of "task_group" cannot be "str", and in the dag, where tg is used, I have Argument 1 has incompatible type "DAGNode"; expected "str".

It seems that - at least from the typing perspective - the taskgroup decorator expect a function that return a DAGNode (and this is also visible on .py#L182 ).

So my question is twofold:

  • Assuming the airflow annotations are right - am I using the taskgroup "wrong"? Should I avoid using taskflow-style functions in a group? Do this dag works just by chance?

  • Assuming my usage is correct - should airflow annotations be improved to support this scenario?

本文标签: pythonTyping issue using a taskgroup in a taskflowbased dagStack Overflow