admin管理员组文章数量:1418347
I am using Apache Airflow to convert old partitions to parquet and detach them using the PostgresOperator.
This works fine, but at the moment I am hardcoding the day, as my queries are running relative to now()
. I am wondering how could I use backfilling to make this work, so the queries would use the date I pass as a parameter on airflow.
This is my relevant code right now:
# Task to convert partition to Parquet
convert_to_parquet = PostgresOperator(
task_id='convert_to_parquet',
postgres_conn_id='questdb',
sql="""
alter table ecommerce_stats convert partition to parquet where ts = dateadd('d', -7, systimestamp())'
""",
dag=dag,
)
# Task to detach the partition
detach_partition = PostgresOperator(
task_id='detach_partition',
postgres_conn_id='questdb',
sql="ALTER TABLE ecommerce_stats DETACH PARTITION WHERE ts = dateadd('d', -7, systimestamp()) ",
dag=dag,
)
I am using Apache Airflow to convert old partitions to parquet and detach them using the PostgresOperator.
This works fine, but at the moment I am hardcoding the day, as my queries are running relative to now()
. I am wondering how could I use backfilling to make this work, so the queries would use the date I pass as a parameter on airflow.
This is my relevant code right now:
# Task to convert partition to Parquet
convert_to_parquet = PostgresOperator(
task_id='convert_to_parquet',
postgres_conn_id='questdb',
sql="""
alter table ecommerce_stats convert partition to parquet where ts = dateadd('d', -7, systimestamp())'
""",
dag=dag,
)
# Task to detach the partition
detach_partition = PostgresOperator(
task_id='detach_partition',
postgres_conn_id='questdb',
sql="ALTER TABLE ecommerce_stats DETACH PARTITION WHERE ts = dateadd('d', -7, systimestamp()) ",
dag=dag,
)
Share
Improve this question
asked Jan 29 at 17:05
Javier RamirezJavier Ramirez
4,0951 gold badge27 silver badges36 bronze badges
1 Answer
Reset to default 0In Airflow we can use template variables to access the execution/logical date and, if needed, the start and end of the interval.
The code above can be replaced by
# Task to convert partition to Parquet
convert_to_parquet = PostgresOperator(
task_id='convert_to_parquet',
postgres_conn_id='questdb',
sql="""
alter table ecommerce_stats convert partition to parquet where ts = '{{ (execution_date - macros.timedelta(days=7)) | ds }}'
""",
dag=dag,
)
# Task to detach the partition
detach_partition = PostgresOperator(
task_id='detach_partition',
postgres_conn_id='questdb',
sql="ALTER TABLE ecommerce_stats DETACH PARTITION WHERE ts = '{{ (execution_date - macros.timedelta(days=7)) | ds }}' ",
dag=dag,
)
I am combining a variable, a macro, and the formatter, so in the end the result will have the "YYY-MM-DD" expected by QuestDB. If you need time resolution check the variable reference as it can also be done.
We need to be careful, as variables are not replaced anywhere in the template, just at a specific places. I spent a few minutes debugging about this a while ago before I realised.
Also, on Airflow by default the execution_date is after the interval has passed, so depending on what we need we might have to go back 8 days rather than 7 when doing the time operations. Relevant info at this StackOverflow post.
本文标签: databaseDetach QuestDB partition with airflow dynamicallyStack Overflow
版权声明:本文标题:database - Detach QuestDB partition with airflow dynamically - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1745287937a2651613.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论