admin管理员组

文章数量:1123266

I am creating a DAG in Airflow, and within this DAG, I need to trigger another DAG using a TimeSensor. The goal is to set the target time to between 2:00 AM 3:00 AM, and if the Timesensor is triggered after 2:00 AM, it should wait until the next day's 2:00 AM. However, I am encountering the following error:

TypeErrors not supported between instances of 'datetime time' and 'datetime datetime"

My code;

  from airflow.models import DAG
  from datetime import timedelta, datetime
  from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor 
  from airflow.operators.python_operator import PythonOperator 
  from airflow.providers.ssh.operators.ssh import SSHOperator 
  from airflow.operators.trigger_dagrun import TriggerDagRunOperator 
  from airflow.sensors.time_sensor import TimeSensor 
  from datetime import datetime, time 
  from airflow.utils.state import State 
  from airflow.models import DagRun 
  import pytz 
  import pendulum

  def calculate_next_target_time():
     now = pendulum.now('Europe/Istanbul')
     target_time = now.replace(hour=2, minute=0, second=0, microsecond=0)
     if now >= target_time:
        target_time = target_time.add (days=1)
     return target_time
  def wait_for_time():
    next_target_time = calculate_next_target_time()
    print(f"Next target time: (next_target_time}")
    return next_target_time

  with DAG(
    '_dag',
    default_args={
    'depends_on_past' : False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,},
     schedule_interval= None,
    start_date=datetime (2021, 1, 1), catchup=False) as dag:
    dummy_task= SSHOperator (
       task_id= "dummy_task"
       command =  "dummy-task"
       ssh_conn_id = "dummy",)

    wait_for_2am = TimeSensor (
       task_id= 'wait_for_2am',                      poke_interval=60, 
       timeout=3000, 
       mode= 'poke',
       target_time=calculate_next_target_time(),
dag=dag)

    wait_for_2am_task = PythonOperator (
      task_id= "wait_for_2am_task"
      python_callable=wait_for_time,
dag=dag)

   service_trigger = TriggerDagRunOperator (
     task_id= "service_trigger",
     trigger_dag_id= "other_dag",)

dummy_task » wait_for_2am_task » wait_for_2am » service_trigger

how can i solve this problem. Timesensor is only available for time (like hour).

本文标签: pythonHow to run a task for specific timeStack Overflow