admin管理员组

文章数量:1379483

My goal is to dynamically create some SFTPToS3Operator to retrieve files from a server to Amazon S3.

For instance, ,my filename is xxx_date_hours_minutes_seconds.csv, so I want to fetch filename from the server to use it in the SFTPToS3Operator operator.

The main problem is that the {{ ds }} is not interpreted

with DAG(...) 
 as dag: 
 init_task = DummyOperator(task_id='dag_start')
 alertPerfOperatorsTask = list_files_to_transfer()
 init_task >> alertPerfOperatorsTask

The goal of list_files_to_transfer is to create a list of task, one task for each file :

def list_files_to_transfer():
    logger = logging.getLogger('airflow.task')
    yesterday_date = '{{ macros.ds_add(ds, -1) }}'
    logger.info("yesterday_date: " + yesterday_date)
    sftp_hook = SFTPHook(ssh_conn_id='SFTP__gas__PointBreakExporter')
    with sftp_hook.get_conn() as sftp_client:
        remote_path = 'out/IQUAL_DEV/'
        ...
   return alertPerfOperatorsTask (list of SFTPToS3Operator)

If I look at my log, I get :

INFO - yesterday_date: `{{ macros.ds_add(ds, -1) }}` instead of yesterday_date: 2025-03-19

and of course I am using this date to find file but obvioulsy didn't find any file named xxx_{{ macros.ds_add(ds, -1) }}.csv

I don't understand why {{ macros.ds_add(ds, -1) }} isn't interpreted

My goal is to dynamically create some SFTPToS3Operator to retrieve files from a server to Amazon S3.

For instance, ,my filename is xxx_date_hours_minutes_seconds.csv, so I want to fetch filename from the server to use it in the SFTPToS3Operator operator.

The main problem is that the {{ ds }} is not interpreted

with DAG(...) 
 as dag: 
 init_task = DummyOperator(task_id='dag_start')
 alertPerfOperatorsTask = list_files_to_transfer()
 init_task >> alertPerfOperatorsTask

The goal of list_files_to_transfer is to create a list of task, one task for each file :

def list_files_to_transfer():
    logger = logging.getLogger('airflow.task')
    yesterday_date = '{{ macros.ds_add(ds, -1) }}'
    logger.info("yesterday_date: " + yesterday_date)
    sftp_hook = SFTPHook(ssh_conn_id='SFTP__gas__PointBreakExporter')
    with sftp_hook.get_conn() as sftp_client:
        remote_path = 'out/IQUAL_DEV/'
        ...
   return alertPerfOperatorsTask (list of SFTPToS3Operator)

If I look at my log, I get :

INFO - yesterday_date: `{{ macros.ds_add(ds, -1) }}` instead of yesterday_date: 2025-03-19

and of course I am using this date to find file but obvioulsy didn't find any file named xxx_{{ macros.ds_add(ds, -1) }}.csv

I don't understand why {{ macros.ds_add(ds, -1) }} isn't interpreted

Share Improve this question edited Mar 20 at 10:32 Naveed Ahmed 5032 silver badges13 bronze badges asked Mar 20 at 10:17 maxime Gmaxime G 1,7801 gold badge15 silver badges31 bronze badges 4
  • I believe you need to create a parameter on the @task and pass in the jinja – Simon P Commented Mar 20 at 12:20
  • i alredy tried it but dosn't work. like i define yesterday_date = '{{ macros.ds_add(ds, -1) }}' on the as dag : then add it to list_files_to_transfer(yesterday_date) but it still not reconized – maxime G Commented Mar 20 at 12:50
  • airflow.apache./docs/apache-airflow/stable/howto/operator/… – Simon P Commented Mar 20 at 13:03
  • how it is supposed to help me ? – maxime G Commented Mar 20 at 14:53
Add a comment  | 

1 Answer 1

Reset to default 1

I would used task decorator to get the list of files, then pass that list into another task to format the tasks ready for the SFTPToS3Operator, using dynamic tasks. https://www.astronomer.io/docs/learn/dynamic-tasks/

@dag()
def taskflow() -> DAG:
    sftp_conn_id = "<sftp_conn_id>"
    remote_directory = "<remote_directory>"

    @task
    def list_files_in_directory(sftp_conn_id: str, remote_directory: str, ds=None):
        print(f"Execution date is: {ds}")
        sftp_hook = SFTPHook(ssh_conn_id=sftp_conn_id)
        with sftp_hook.get_conn() as sftp_client:
            return sftp_client.listdir(remote_directory)

    @task
    def prepare_download_tasks(file_names: list[str]) -> list[dict]:
        return [{"sftp_path": file_name, "s3_key": f"s3/prefix/{file_name}"} for file_name in file_names]

    files_to_download = list_files_in_directory(
        ds="{{ ds }}",
        sftp_conn_id=sftp_conn_id,
        remote_directory=remote_directory,
    )

    SFTPToS3Operator.partial(
        task_id="sftp_to_s3",
        sftp_conn_id="sftp_conn",
        s3_conn_id="s3_conn",
        s3_bucket="s3_bucket",
    ).expand_kwargs(download_tasks)


taskflow()

本文标签: python dsisn39t interpreted in my airflow dagStack Overflow