admin管理员组

文章数量:1201415

Context: I've a custom inhouse task group - TG which is used in multiple DAGs. This task group has multiple tasks say A, B, C, D. Now task D for some unforeseen reasons retries due to some failure.

My requirement is instead of just D retrying, I want to retry the whole task group TG, maybe clear and rerun the whole TG (downstream+recursive).

In order to achieve this I tried to pass:

on_retry_callback=lambda context: reset_and_retry_tg(context, group_id)

from task D as a callback whose implementation is below:

@provide_session
def reset_and_retry_tg(context: Any, task_group_id: str, session: Any = None) -> None:
    dag_run = context["dag_run"]
    dag = dag_run.get_dag()
    task_instances = dag_run.get_task_instances(session=session)
    tasks_to_clear = set()
    for task in dag.tasks:
        if task.task_id.startswith(f"{task_group_id}."):
            tasks_to_clear.add(task.task_id)
            tasks_to_clear.update(dag.get_task(task.task_id).get_flat_relatives(upstream=False))

    LOG.info("Clearing tasks: %s", tasks_to_clear)

    tis_to_clear = [ti for ti in task_instances if ti.task_id in tasks_to_clear]
    if not tis_to_clear:
        LOG.warning("No tasks found in task group %s to clear.", task_group_id)
        return

    clear_task_instances(
        tis=tis_to_clear,
        session=session,
        dag=dag,
    )

    LOG.info("Cleared all tasks in task group %s to retry.", task_group_id)
    raise Exception(  # pylint: disable = broad-exception-raised
        f"Retrying entire task group '{task_group_id}' because task retries."
    )

But this does not serves my purpose, still only D retries. Any suggestions/idea on this can be done.

本文标签: airflow retry the whole task group when a task retries in that groupStack Overflow