admin管理员组文章数量:1201415
Context: I've a custom inhouse task group - TG which is used in multiple DAGs. This task group has multiple tasks say A, B, C, D. Now task D for some unforeseen reasons retries due to some failure.
My requirement is instead of just D retrying, I want to retry the whole task group TG, maybe clear and rerun the whole TG (downstream+recursive).
In order to achieve this I tried to pass:
on_retry_callback=lambda context: reset_and_retry_tg(context, group_id)
from task D as a callback whose implementation is below:
@provide_session
def reset_and_retry_tg(context: Any, task_group_id: str, session: Any = None) -> None:
dag_run = context["dag_run"]
dag = dag_run.get_dag()
task_instances = dag_run.get_task_instances(session=session)
tasks_to_clear = set()
for task in dag.tasks:
if task.task_id.startswith(f"{task_group_id}."):
tasks_to_clear.add(task.task_id)
tasks_to_clear.update(dag.get_task(task.task_id).get_flat_relatives(upstream=False))
LOG.info("Clearing tasks: %s", tasks_to_clear)
tis_to_clear = [ti for ti in task_instances if ti.task_id in tasks_to_clear]
if not tis_to_clear:
LOG.warning("No tasks found in task group %s to clear.", task_group_id)
return
clear_task_instances(
tis=tis_to_clear,
session=session,
dag=dag,
)
LOG.info("Cleared all tasks in task group %s to retry.", task_group_id)
raise Exception( # pylint: disable = broad-exception-raised
f"Retrying entire task group '{task_group_id}' because task retries."
)
But this does not serves my purpose, still only D retries. Any suggestions/idea on this can be done.
本文标签: airflow retry the whole task group when a task retries in that groupStack Overflow
版权声明:本文标题:airflow: retry the whole task group when a task retries in that group - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1738631779a2103791.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论