admin管理员组文章数量:1355589
I want to run my dag based on the value of dag_run which means if the dag is triggered manually then it should pass a particular value in the dag task otherwise if it is scheduled it should pass some other value. I am trying the below code :-
def get_the_date(**context):
run_type = context['dag_run'].run_type
if run_type=="manual":
return " {{ ds }}"
else:
return "2025-02-24"
with DAG(
dag_id="mydag"
start_date=datetime(2024, 9, 11),
schedule_interval=SCHEDULE
)
my_task = BashOperator(
task_id = 'my_task',
bash_command = ' echo " # Use the value of the funciton get_the_date here
)
Although when I try this, I am getting the below error :-
File "/opt/airflow/dags//dag_mydag.py", line 62, in get_the_date
run_type = context['dag_run'].run_type
Not sure how to proceed. KeyError: 'dag_run'
I want to run my dag based on the value of dag_run which means if the dag is triggered manually then it should pass a particular value in the dag task otherwise if it is scheduled it should pass some other value. I am trying the below code :-
def get_the_date(**context):
run_type = context['dag_run'].run_type
if run_type=="manual":
return " {{ ds }}"
else:
return "2025-02-24"
with DAG(
dag_id="mydag"
start_date=datetime(2024, 9, 11),
schedule_interval=SCHEDULE
)
my_task = BashOperator(
task_id = 'my_task',
bash_command = ' echo " # Use the value of the funciton get_the_date here
)
Although when I try this, I am getting the below error :-
File "/opt/airflow/dags//dag_mydag.py", line 62, in get_the_date
run_type = context['dag_run'].run_type
Not sure how to proceed. KeyError: 'dag_run'
Share Improve this question asked Mar 31 at 11:23 AviatorAviator 7401 gold badge9 silver badges19 bronze badges1 Answer
Reset to default 0Your code isn't complete since you're not using the construct...
with DAG() as dag:
you missed the
as dag
part. But I don't know how you got KeyError: 'dag_run'
Anyway...
the code looks fine in general you are correctly accessing context['dag_run'].run_type
.
I'm not sure if you can directly plug a python function into a BashOperator but you can do either of the following:
- use jinja templates that are resolved by airflow to execute bash directly in the BashOperator
- pull an xcom from an upstream task.
Below are three tasks determine_date_task
works with echo_date_xcom
using xcoms. and the echo_date_jinja_task
stands alone using jinja.
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
def determine_date(**kwargs):
dag_run = kwargs.get('dag_run')
if dag_run.run_type == "manual":
return kwargs['ds'] # Return the execution date (ds) if manual
else:
return "2025-02-24" # Return a fixed date otherwise
with DAG(
dag_id="combined_xcom_jinja_dag",
start_date=datetime(2025, 4, 1),
schedule_interval="@daily",
catchup=False
) as dag:
determine_date_task = PythonOperator(
task_id="determine_date",
python_callable=determine_date,
provide_context=True
)
`= BashOperator(
task_id="echo_date_xcom",
bash_command="echo 'Date from XCom: {{ ti.xcom_pull(task_ids='determine_date') }}'"
)
echo_date_jinja_task = BashOperator(
task_id="echo_date_jinja",
bash_command="""
{% if dag_run.run_type == "manual" %}
echo "Execution Date (Jinja): {{ ds }}"
{% else %}
echo "Fixed Date (Jinja): 2025-02-24"
{% endif %}
"""
)
determine_date_task >> echo_date_xcom_task >> echo_date_jinja_task
本文标签: airflowprocess the dag based on the value of dagrunStack Overflow
版权声明:本文标题:airflow - process the dag based on the value of dag_run - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1743951434a2567362.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论