admin管理员组文章数量:1123266
I am creating a DAG in Airflow, and within this DAG, I need to trigger another DAG using a TimeSensor. The goal is to set the target time to between 2:00 AM 3:00 AM, and if the Timesensor is triggered after 2:00 AM, it should wait until the next day's 2:00 AM. However, I am encountering the following error:
TypeErrors not supported between instances of 'datetime time' and 'datetime datetime"
My code;
from airflow.models import DAG
from datetime import timedelta, datetime
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.operators.python_operator import PythonOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.time_sensor import TimeSensor
from datetime import datetime, time
from airflow.utils.state import State
from airflow.models import DagRun
import pytz
import pendulum
def calculate_next_target_time():
now = pendulum.now('Europe/Istanbul')
target_time = now.replace(hour=2, minute=0, second=0, microsecond=0)
if now >= target_time:
target_time = target_time.add (days=1)
return target_time
def wait_for_time():
next_target_time = calculate_next_target_time()
print(f"Next target time: (next_target_time}")
return next_target_time
with DAG(
'_dag',
default_args={
'depends_on_past' : False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,},
schedule_interval= None,
start_date=datetime (2021, 1, 1), catchup=False) as dag:
dummy_task= SSHOperator (
task_id= "dummy_task"
command = "dummy-task"
ssh_conn_id = "dummy",)
wait_for_2am = TimeSensor (
task_id= 'wait_for_2am', poke_interval=60,
timeout=3000,
mode= 'poke',
target_time=calculate_next_target_time(),
dag=dag)
wait_for_2am_task = PythonOperator (
task_id= "wait_for_2am_task"
python_callable=wait_for_time,
dag=dag)
service_trigger = TriggerDagRunOperator (
task_id= "service_trigger",
trigger_dag_id= "other_dag",)
dummy_task » wait_for_2am_task » wait_for_2am » service_trigger
how can i solve this problem. Timesensor is only available for time (like hour).
本文标签: pythonHow to run a task for specific timeStack Overflow
版权声明:本文标题:python - How to run a task for specific time? - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1736563560a1944676.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论