admin管理员组

文章数量:1355589

I want to run my dag based on the value of dag_run which means if the dag is triggered manually then it should pass a particular value in the dag task otherwise if it is scheduled it should pass some other value. I am trying the below code :-

  def get_the_date(**context):
    run_type = context['dag_run'].run_type
       if run_type=="manual":
         return " {{ ds }}"
       else:
        return "2025-02-24"

 with DAG(
    dag_id="mydag"
    start_date=datetime(2024, 9, 11),
    schedule_interval=SCHEDULE
     )

 my_task = BashOperator(
    task_id = 'my_task',
    bash_command = ' echo " # Use the value of the funciton get_the_date here
       )

Although when I try this, I am getting the below error :-

  File "/opt/airflow/dags//dag_mydag.py", line 62, in get_the_date
run_type = context['dag_run'].run_type

Not sure how to proceed. KeyError: 'dag_run'

I want to run my dag based on the value of dag_run which means if the dag is triggered manually then it should pass a particular value in the dag task otherwise if it is scheduled it should pass some other value. I am trying the below code :-

  def get_the_date(**context):
    run_type = context['dag_run'].run_type
       if run_type=="manual":
         return " {{ ds }}"
       else:
        return "2025-02-24"

 with DAG(
    dag_id="mydag"
    start_date=datetime(2024, 9, 11),
    schedule_interval=SCHEDULE
     )

 my_task = BashOperator(
    task_id = 'my_task',
    bash_command = ' echo " # Use the value of the funciton get_the_date here
       )

Although when I try this, I am getting the below error :-

  File "/opt/airflow/dags//dag_mydag.py", line 62, in get_the_date
run_type = context['dag_run'].run_type

Not sure how to proceed. KeyError: 'dag_run'

Share Improve this question asked Mar 31 at 11:23 AviatorAviator 7401 gold badge9 silver badges19 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

Your code isn't complete since you're not using the construct...

with DAG() as dag:

you missed the as dag part. But I don't know how you got KeyError: 'dag_run'

Anyway... the code looks fine in general you are correctly accessing context['dag_run'].run_type.

I'm not sure if you can directly plug a python function into a BashOperator but you can do either of the following:

  • use jinja templates that are resolved by airflow to execute bash directly in the BashOperator
  • pull an xcom from an upstream task.

Below are three tasks determine_date_task works with echo_date_xcom using xcoms. and the echo_date_jinja_task stands alone using jinja.

from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

def determine_date(**kwargs):
    dag_run = kwargs.get('dag_run')
    if dag_run.run_type == "manual":
        return kwargs['ds']  # Return the execution date (ds) if manual
    else:
        return "2025-02-24"  # Return a fixed date otherwise

with DAG(
    dag_id="combined_xcom_jinja_dag",
    start_date=datetime(2025, 4, 1),
    schedule_interval="@daily",
    catchup=False
) as dag:
    
    determine_date_task = PythonOperator(
        task_id="determine_date",
        python_callable=determine_date,
        provide_context=True
    )
    
    `= BashOperator(
        task_id="echo_date_xcom",
        bash_command="echo 'Date from XCom: {{ ti.xcom_pull(task_ids='determine_date') }}'"
    )
    
    echo_date_jinja_task = BashOperator(
        task_id="echo_date_jinja",
        bash_command="""
        {% if dag_run.run_type == "manual" %}
        echo "Execution Date (Jinja): {{ ds }}"
        {% else %}
        echo "Fixed Date (Jinja): 2025-02-24"
        {% endif %}
        """
    )
    
    determine_date_task >> echo_date_xcom_task >> echo_date_jinja_task

本文标签: airflowprocess the dag based on the value of dagrunStack Overflow