admin管理员组

文章数量:1405571

I have attached the dbt code which created a temp table: emp__dbt_tmp with the new records. Lets say on 20th March a record came in my raw table and that record made its way to my transient table. On 22nd march,the same record got deletedin source raw table with cdc_operation as D and is_deleted=true , but the same record doesnot get deleted in my transient target table. How can we make sure the record gets deleted in the transient table as well. Should we make use of post-hook or use some other incremental strategy?

DBT code:
  

DBT code:
  {{ config(
            materialized='incremental',
            unique_key=['mandt', 'loc'],
            on_schema_change='append_new_columns',
            partition_by={
                "field": "gcp_updated_ts",
                "data_type": "timestamp",
                "granularity": "day"  
                },
            
        ) }}

    WITH source_data AS (
            SELECT 
            COALESCE(SAFE_CAST(mandt AS STRING), '') AS mandt, COALESCE(SAFE_CAST(loc AS STRING), '') AS loc,
 SAFE_CAST(RECORDSTAMP AS TIMESTAMP) AS gcp_updated_ts, SAFE_CAST(OPERATION_FLAG AS STRING) AS cdc_operation, SAFE_CAST(IS_DELETED AS BOOLEAN) AS is_deleted,
                ROW_NUMBER() OVER (PARTITION BY  mandt,loc ORDER BY recordstamp DESC) AS row_num
            FROM {{ source('raw', 'emp') }}
           
                  {% if is_incremental() %}
                  WHERE  
                    recordstamp > TIMESTAMP('2025-03-22 08:28:48.783725+00:00')
                    AND is_deleted = false
                {% endif %}
        )

        SELECT * except(row_num) FROM source_data WHERE row_num = 1

 Backend dbt merge statement:
merge into `vk`.`transient`.`emp` as DBT_INTERNAL_DEST
        using (
        select
        * from `vk`.`transient`.`emp__dbt_tmp`
        ) as DBT_INTERNAL_SOURCE
        on (
                    DBT_INTERNAL_SOURCE.mandt = DBT_INTERNAL_DEST.mandt
                ) and (
                    DBT_INTERNAL_SOURCE.loc = DBT_INTERNAL_DEST.loc
                ) 

    
    when matched then update set
        `mandt` = DBT_INTERNAL_SOURCE.`mandt`,`loc` = DBT_INTERNAL_SOURCE.`loc`,`posn` = DBT_INTERNAL_SOURCE.`posn`,`gcp_updated_ts` = DBT_INTERNAL_SOURCE.`gcp_updated_ts`,
        `is_deleted`=DBT_INTERNAL_SOURCE.`is_deleted` , cdc_operation=DBT_INTERNAL_SOURCE.`cdc_operation`

    when not matched then insert
        (`mandt`, `loc`, `posn`,`gcp_updated_ts`,`is_deleted`,`cdc_operation`)    values
        (`mandt`, `loc`, `posn`,`gcp_updated_ts`,`is_deleted`,`cdc_operation`)

I have attached the dbt code which created a temp table: emp__dbt_tmp with the new records. Lets say on 20th March a record came in my raw table and that record made its way to my transient table. On 22nd march,the same record got deletedin source raw table with cdc_operation as D and is_deleted=true , but the same record doesnot get deleted in my transient target table. How can we make sure the record gets deleted in the transient table as well. Should we make use of post-hook or use some other incremental strategy?

DBT code:
  

DBT code:
  {{ config(
            materialized='incremental',
            unique_key=['mandt', 'loc'],
            on_schema_change='append_new_columns',
            partition_by={
                "field": "gcp_updated_ts",
                "data_type": "timestamp",
                "granularity": "day"  
                },
            
        ) }}

    WITH source_data AS (
            SELECT 
            COALESCE(SAFE_CAST(mandt AS STRING), '') AS mandt, COALESCE(SAFE_CAST(loc AS STRING), '') AS loc,
 SAFE_CAST(RECORDSTAMP AS TIMESTAMP) AS gcp_updated_ts, SAFE_CAST(OPERATION_FLAG AS STRING) AS cdc_operation, SAFE_CAST(IS_DELETED AS BOOLEAN) AS is_deleted,
                ROW_NUMBER() OVER (PARTITION BY  mandt,loc ORDER BY recordstamp DESC) AS row_num
            FROM {{ source('raw', 'emp') }}
           
                  {% if is_incremental() %}
                  WHERE  
                    recordstamp > TIMESTAMP('2025-03-22 08:28:48.783725+00:00')
                    AND is_deleted = false
                {% endif %}
        )

        SELECT * except(row_num) FROM source_data WHERE row_num = 1

 Backend dbt merge statement:
merge into `vk`.`transient`.`emp` as DBT_INTERNAL_DEST
        using (
        select
        * from `vk`.`transient`.`emp__dbt_tmp`
        ) as DBT_INTERNAL_SOURCE
        on (
                    DBT_INTERNAL_SOURCE.mandt = DBT_INTERNAL_DEST.mandt
                ) and (
                    DBT_INTERNAL_SOURCE.loc = DBT_INTERNAL_DEST.loc
                ) 

    
    when matched then update set
        `mandt` = DBT_INTERNAL_SOURCE.`mandt`,`loc` = DBT_INTERNAL_SOURCE.`loc`,`posn` = DBT_INTERNAL_SOURCE.`posn`,`gcp_updated_ts` = DBT_INTERNAL_SOURCE.`gcp_updated_ts`,
        `is_deleted`=DBT_INTERNAL_SOURCE.`is_deleted` , cdc_operation=DBT_INTERNAL_SOURCE.`cdc_operation`

    when not matched then insert
        (`mandt`, `loc`, `posn`,`gcp_updated_ts`,`is_deleted`,`cdc_operation`)    values
        (`mandt`, `loc`, `posn`,`gcp_updated_ts`,`is_deleted`,`cdc_operation`)
Share Improve this question asked Mar 22 at 10:32 Vaishak NambiarVaishak Nambiar 1
Add a comment  | 

1 Answer 1

Reset to default 0

The merge strategy in dbt does not support delete rows in the target table. So you can try some alternatives:

1. Let dbt insert the deleted rows in the target table and use a post_hook to delete then afterwards

{{ config(
     ...
     post_hook = ["DELETE FROM {{ this }} WHERE is_deleted"]

)}}
-- ...
SELECT ..., is_deleted
FROM ...
{% if is_incremental() %}
                  WHERE  
    recordstamp > TIMESTAMP('2025-03-22 08:28:48.783725+00:00')
    -- AND is_deleted = false -- let the rows be updated in the target table with is_deleted=true
{% endif %}
-- ...

The drawback of using this approach is the deleted rows will be available for a instant in the target table.

2. Customize the get_merge_sql macro and use a new incremental strategy to add the support of delete rows in the target table:

Add a file: macros/custom_get_merge_sql.sql

{# Original macro: https://github/dbt-labs/dbt-bigquery/blob/0995665e490cdee9c408d26aac8e1c19fefaebe0/dbt/include/bigquery/macros/materializations/incremental.sql#L1 #}
{% macro dbt_bigquery_validate_get_incremental_strategy(config) %}
 
  {%- set strategy = config.get("incremental_strategy") or 'merge' -%}

  {% set invalid_strategy_msg -%}
    Invalid incremental strategy provided: {{ strategy }}
    Expected one of: 'merge', 'insert_overwrite','merge_with_delete'
  {%- endset %}
  {% if strategy not in ['merge', 'insert_overwrite','merge_with_delete'] %}
    {% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
  {% endif %}

  {% do return(strategy) %}
{% endmacro %}

{# Original macro: https://github/dbt-labs/dbt-adapters/blob/af33935b119347cc021554ea854884bce986ef8d/dbt-adapters/src/dbt/include/global_project/macros/materializations/models/incremental/merge.sql#L7 #}
{% macro default__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) -%}
    {%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%}
    {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
    {%- set merge_update_columns = config.get('merge_update_columns') -%}
    {%- set merge_exclude_columns = config.get('merge_exclude_columns') -%}
    {%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%}
    {%- set sql_header = config.get('sql_header', none) -%}
    
    {% if unique_key %}
        {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %}
            {% for key in unique_key %}
                {% set this_key_match %}
                    DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }}
                {% endset %}
                {% do predicates.append(this_key_match) %}
            {% endfor %}
        {% else %}
            {% set unique_key_match %}
                DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
            {% endset %}
            {% do predicates.append(unique_key_match) %}
        {% endif %}
    {% else %}
        {% do predicates.append('FALSE') %}
    {% endif %}

    {{ sql_header if sql_header is not none }}

    merge into {{ target }} as DBT_INTERNAL_DEST
        using {{ source }} as DBT_INTERNAL_SOURCE
        on {{"(" ~ predicates | join(") and (") ~ ")"}}
    
    {# Add the support of delete rows in the target table #}
    {% if config.get('incremental_strategy') == 'merge_with_delete' %}
    when matched and DBT_INTERNAL_SOURCE.is_deleted = true then
        delete
    {% endif %}
    
    {% if unique_key %}
    when matched then update set
        {% for column_name in update_columns -%}
            {{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }}
            {%- if not loop.last %}, {%- endif %}
        {%- endfor %}
    {% endif %}
    
    {% if config.get('incremental_strategy') == 'merge_with_delete' %}
    when not matched and DBT_INTERNAL_SOURCE.id_deleted = false then insert
    {% else %}
    when not matched then insert
    {% endif %}
        ({{ dest_cols_csv }})
    values
        ({{ dest_cols_csv }})

{% endmacro %}

Now you can use the new incremental_strategy=merge_with_delete. This requires the model to have a column named is_deleted, but you can drop it with a post_hook:

{{ config(
     ...
    materialized = 'incremental',
    incremental_strategy = 'merge_with_delete',
    post_hook = ["ALTER TABLE {{ this }} DROP COLUMN IF EXISTS is_deleted"],

)}}
-- ...
SELECT ..., is_deleted
FROM ...
{% if is_incremental() %}
                  WHERE  
    recordstamp > TIMESTAMP('2025-03-22 08:28:48.783725+00:00')
    -- AND is_deleted = false -- let the rows be updated in the target table with is_deleted=true
{% endif %}
-- ...

The drawback of using this approach is the need of keeping this macro compatible with the original one when upgrade the dbt version.

本文标签: sqlHow can I make my incremental dbt models handle the deleted records of sourceStack Overflow