admin管理员组

文章数量:1404927

I'm writing a DAG on Airflow with the following structure:

  1. Execute a simple SELECT COUNT(*) on a Starburst database with SQLExecuteQueryOperator;
  2. Retrieve the result (which should be 0 or 1) and use it to check whether a certain record is in the table or not.

This is the DAG:

import pendulum, os, logging
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import TaskInstance
from airflow.providersmon.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime

def catch_output(ti, **kwargs):
    query_result=ti.xcom_pull(task_ids="query_task")
    logging.info(f"{query_result}")

with DAG(
   'dag_name',
   schedule=None,
   start_date=pendulum.datetime(2025,2,25, tz='UTC'),
   max_active_runs=1,
   catchup=False,
   default_args={...}
) as dag:

query_task = SQLExecuteQueryOperator(
    task_id='query_task',
    conn_id='xyz',
    show_return_value_in_logs=True,
    split_statement=True,
    sql='SELECT COUNT(*) FROM TABLE',
    return_last=True,
    do_xcom_push=True
)

catch_output = PythonOperator(
    task_id='catch_output',
    python_callable=catch_output,
    dag=dag
)

query_task >> catch_output 

The first task correctly works and the output is logged on Airflow. But the second task, despite its finishing with success, doesn't log anything. The variable query_result has value None. I think the reason is that nothing is pushed with xcom by the task that runs the query on Starburst, but I don't know how to solve this problem.

Important information: I cannot connect to the database except with SQLExecuteQueryOperator (at least at the moment).

I'm writing a DAG on Airflow with the following structure:

  1. Execute a simple SELECT COUNT(*) on a Starburst database with SQLExecuteQueryOperator;
  2. Retrieve the result (which should be 0 or 1) and use it to check whether a certain record is in the table or not.

This is the DAG:

import pendulum, os, logging
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import TaskInstance
from airflow.providersmon.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime

def catch_output(ti, **kwargs):
    query_result=ti.xcom_pull(task_ids="query_task")
    logging.info(f"{query_result}")

with DAG(
   'dag_name',
   schedule=None,
   start_date=pendulum.datetime(2025,2,25, tz='UTC'),
   max_active_runs=1,
   catchup=False,
   default_args={...}
) as dag:

query_task = SQLExecuteQueryOperator(
    task_id='query_task',
    conn_id='xyz',
    show_return_value_in_logs=True,
    split_statement=True,
    sql='SELECT COUNT(*) FROM TABLE',
    return_last=True,
    do_xcom_push=True
)

catch_output = PythonOperator(
    task_id='catch_output',
    python_callable=catch_output,
    dag=dag
)

query_task >> catch_output 

The first task correctly works and the output is logged on Airflow. But the second task, despite its finishing with success, doesn't log anything. The variable query_result has value None. I think the reason is that nothing is pushed with xcom by the task that runs the query on Starburst, but I don't know how to solve this problem.

Important information: I cannot connect to the database except with SQLExecuteQueryOperator (at least at the moment).

Share Improve this question edited Apr 5 at 0:03 ennezetaqu asked Mar 9 at 13:39 ennezetaquennezetaqu 211 silver badge4 bronze badges 1
  • 1 Have you tried using the SqlTableCheckOperator? -> registry.astronomer.io/providers/… – Simon P Commented Mar 11 at 10:43
Add a comment  | 

1 Answer 1

Reset to default 0

I think there are a couple problems. First, catch_output is used for both a function name and as a reference to a task. One of them probably needs to change so that these are unique.

The other thing is that you're not explicitly passing op_kwargs or op_args to the callable in the second task. I remember there being a parameter, provide_context=True that you can set to make sure ti is passed into the function. Although, if this was the real issue, python should've raised an exception saying there was a positional argument missing, so I'm inclined to say the problem is with the duplicate catch_output variable.

本文标签: