admin管理员组文章数量:1404927
I'm writing a DAG on Airflow with the following structure:
- Execute a simple SELECT COUNT(*) on a Starburst database with SQLExecuteQueryOperator;
- Retrieve the result (which should be 0 or 1) and use it to check whether a certain record is in the table or not.
This is the DAG:
import pendulum, os, logging
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import TaskInstance
from airflow.providersmon.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime
def catch_output(ti, **kwargs):
query_result=ti.xcom_pull(task_ids="query_task")
logging.info(f"{query_result}")
with DAG(
'dag_name',
schedule=None,
start_date=pendulum.datetime(2025,2,25, tz='UTC'),
max_active_runs=1,
catchup=False,
default_args={...}
) as dag:
query_task = SQLExecuteQueryOperator(
task_id='query_task',
conn_id='xyz',
show_return_value_in_logs=True,
split_statement=True,
sql='SELECT COUNT(*) FROM TABLE',
return_last=True,
do_xcom_push=True
)
catch_output = PythonOperator(
task_id='catch_output',
python_callable=catch_output,
dag=dag
)
query_task >> catch_output
The first task correctly works and the output is logged on Airflow. But the second task, despite its finishing with success, doesn't log anything. The variable query_result has value None. I think the reason is that nothing is pushed with xcom by the task that runs the query on Starburst, but I don't know how to solve this problem.
Important information: I cannot connect to the database except with SQLExecuteQueryOperator (at least at the moment).
I'm writing a DAG on Airflow with the following structure:
- Execute a simple SELECT COUNT(*) on a Starburst database with SQLExecuteQueryOperator;
- Retrieve the result (which should be 0 or 1) and use it to check whether a certain record is in the table or not.
This is the DAG:
import pendulum, os, logging
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import TaskInstance
from airflow.providersmon.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime
def catch_output(ti, **kwargs):
query_result=ti.xcom_pull(task_ids="query_task")
logging.info(f"{query_result}")
with DAG(
'dag_name',
schedule=None,
start_date=pendulum.datetime(2025,2,25, tz='UTC'),
max_active_runs=1,
catchup=False,
default_args={...}
) as dag:
query_task = SQLExecuteQueryOperator(
task_id='query_task',
conn_id='xyz',
show_return_value_in_logs=True,
split_statement=True,
sql='SELECT COUNT(*) FROM TABLE',
return_last=True,
do_xcom_push=True
)
catch_output = PythonOperator(
task_id='catch_output',
python_callable=catch_output,
dag=dag
)
query_task >> catch_output
The first task correctly works and the output is logged on Airflow. But the second task, despite its finishing with success, doesn't log anything. The variable query_result has value None. I think the reason is that nothing is pushed with xcom by the task that runs the query on Starburst, but I don't know how to solve this problem.
Important information: I cannot connect to the database except with SQLExecuteQueryOperator (at least at the moment).
Share Improve this question edited Apr 5 at 0:03 ennezetaqu asked Mar 9 at 13:39 ennezetaquennezetaqu 211 silver badge4 bronze badges 1- 1 Have you tried using the SqlTableCheckOperator? -> registry.astronomer.io/providers/… – Simon P Commented Mar 11 at 10:43
1 Answer
Reset to default 0I think there are a couple problems. First, catch_output
is used for both a function name and as a reference to a task. One of them probably needs to change so that these are unique.
The other thing is that you're not explicitly passing op_kwargs
or op_args
to the callable in the second task. I remember there being a parameter, provide_context=True
that you can set to make sure ti
is passed into the function. Although, if this was the real issue, python should've raised an exception saying there was a positional argument missing, so I'm inclined to say the problem is with the duplicate catch_output
variable.
本文标签:
版权声明:本文标题:python - How to retrieve the output of a SQL query executed with SQLExecuteQueryOperator in Airflow - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1744869831a2629561.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论