admin管理员组文章数量:1356763
I have a data set on pretty small Microsoft Fabric capacity (which, if you don't know is basically Azure Synapse, which is basically Apache Spark).
Due to limitations with the data source, I am basically getting a full dump every day. So I need to update a "last seen" time stamp on rows I already know (i.e. identical data), and append the changed ones.
This is not hard to do, but I am looking for the most efficient option.
Loading the new data in df_update
and the existing data in df_existing
, I have tried two ways of doing this:
-- 1 -- Using pyspark data frames:
I can solve the task with an outer join like
df_new = df_existing\
.withColumnRenamed('ts', 'ts_old')\
.join(df_update, on=all_columns_but_the_timestamp, how='outer')
return df_new\
.withColumn('ts', coalesce(df_new['ts'], df_new['ts_old']))\
.drop('ts_old')
Unfortunately, this requires me to replace the whole table on disk. That's slow and seems to upset OneLake a bit (seeing the updated data in a query takes additional time). Therefore I tried:
-- 2 -- Using delta lake update
By using
df_new = df_update.exceptAll(df_existing.select(all_columns_but_the_timestamp))
df_duplicates = df_ingest.exceptAll(df_new)
I can get the new and the revisited data.
for row in df_duplicates.collect():
table.update(
' AND '.join([f'{k} = "{v}"' for k, v in row.asDict().items()]),
{'ts': lit(new_timestamp).cast(TimestampType())})
is a woefully slow way to do the updates. df_new
can just be appended to the table afterwards.
I have looked for
-- 3 -- Delta lake update in bulk
Somehow selecting all affected rows in one go and update the value.
table.update(
some_very_neat_condition,
{'ts': lit(new_timestamp).cast(TimestampType())})
Since I don't have reliable IDs, I don't know how to do that, however.
Or is there another option I'm missing?
I have a data set on pretty small Microsoft Fabric capacity (which, if you don't know is basically Azure Synapse, which is basically Apache Spark).
Due to limitations with the data source, I am basically getting a full dump every day. So I need to update a "last seen" time stamp on rows I already know (i.e. identical data), and append the changed ones.
This is not hard to do, but I am looking for the most efficient option.
Loading the new data in df_update
and the existing data in df_existing
, I have tried two ways of doing this:
-- 1 -- Using pyspark data frames:
I can solve the task with an outer join like
df_new = df_existing\
.withColumnRenamed('ts', 'ts_old')\
.join(df_update, on=all_columns_but_the_timestamp, how='outer')
return df_new\
.withColumn('ts', coalesce(df_new['ts'], df_new['ts_old']))\
.drop('ts_old')
Unfortunately, this requires me to replace the whole table on disk. That's slow and seems to upset OneLake a bit (seeing the updated data in a query takes additional time). Therefore I tried:
-- 2 -- Using delta lake update
By using
df_new = df_update.exceptAll(df_existing.select(all_columns_but_the_timestamp))
df_duplicates = df_ingest.exceptAll(df_new)
I can get the new and the revisited data.
for row in df_duplicates.collect():
table.update(
' AND '.join([f'{k} = "{v}"' for k, v in row.asDict().items()]),
{'ts': lit(new_timestamp).cast(TimestampType())})
is a woefully slow way to do the updates. df_new
can just be appended to the table afterwards.
I have looked for
-- 3 -- Delta lake update in bulk
Somehow selecting all affected rows in one go and update the value.
table.update(
some_very_neat_condition,
{'ts': lit(new_timestamp).cast(TimestampType())})
Since I don't have reliable IDs, I don't know how to do that, however.
Or is there another option I'm missing?
Share Improve this question asked Mar 28 at 15:38 Jörg NeulistJörg Neulist 1435 bronze badges 2 |2 Answers
Reset to default 0Q: I have a data set on pretty small Microsoft Fabric capacity (which, if you don't know is basically Azure Synapse, which is basically Apache Spark).
Due to limitations with the data source, I am basically getting a full dump every day. So I need to update a "last seen" time stamp on rows I already know (i.e. identical data), and append the changed ones.
If I understand correctly, you are trying to merge i.e insert or update
use MERGE INTO whenever possible... even traditional databases has the below sql equivalent
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp
delta_table = DeltaTable.forPath(spark, "your tablehere..")
delta_table.alias("existing").merge(
df_update.alias("updates"),
" AND ".join([f"existing.{col} = updates.{col}" for col in all_columns_but_the_timestamp])
).whenMatchedUpdate(set={
"ts": "current_timestamp()"
}).whenNotMatchedInsert(values={
**{col: f"updates.{col}" for col in all_columns_but_the_timestamp},
"ts": "current_timestamp()"
}).execute()
- This is the bread and butter usecase for
DeltaTable.merge()
as Ram mentioned. So something like what Ram suggested in his answer. More docs here. - IMO you should add a key column to your table and use that in
condition
param of yourmerge()
call. Historically every time someone says there is no unique key they either don't know the data, or they haven't tried enough. In any case assuming your row is uniquely identified by a composite key ofall_columns_but_the_timestamp
, you could:
from pyspark.sql import functions as F
df_update = df_update.withColumn(
'all_columns_str',
F.concat(*all_columns_but_the_timestamp) # in practice this would be more complex as you'll
# have to convert all columns to str, handle NULLs, ...
).withColumn(
'generated_key',
F.conv(F.sha2('all_columns_str', 256), 16, 10)
).drop('all_columns_str')
and then:
delta_table.alias("existing").merge(
source = df_update.alias("updates"),
condition = 'existing.generated_key = updates.generated_key'
).whenMatchedUpdate(set={
"ts": "current_timestamp()"
}).whenNotMatchedInsert(values={
**{col: f"updates.{col}" for col in all_columns_but_the_timestamp},
"ts": "current_timestamp()"
}).execute()
- Not sure how you're partitioning your table. If you add a hash as a key then partitioning would be easy. Lets say you decided that 128 is your sweet spot for number of partitions then:
df_update = df_update.withColumn('partition_id', F.col('generated_key') % 128))
and use partition_id
as the partitioning column while creating your delta table.
Also none of the options listed in OP are good fit for this usecase. So do not use them.
本文标签:
版权声明:本文标题:azure synapse - Efficiently updating a single column value for many rows in MS Fabricpysparkdelta - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1744028047a2578396.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
"...basically Azure Synapse, which is basically Apache Spark"
AFAIK this is not true. You have an option to choose Spark or "SQL pool" or "Data Explorer pool" as your compute/engine. And how you interact depends on your engine. See learn.microsoft/en-us/azure/synapse-analytics/… – Kashyap Commented Mar 28 at 17:04