admin管理员组文章数量:1122846
I have a dataset with 3M+ records and several columns. Here is a sample of my dataset:
item | item_base | date | quantity_1 | quantity_2 |
---|---|---|---|---|
1 | 20 | 202410 | 600 | 7493 |
1 | 20 | 202411 | 17000 | 16431 |
I have a dataset with 3M+ records and several columns. Here is a sample of my dataset:
item | item_base | date | quantity_1 | quantity_2 |
---|---|---|---|---|
1 | 20 | 202410 | 600 | 7493 |
1 | 20 | 202411 | 17000 | 16431 |
Each item-item_base-date
makes a unique key. I need to calculate a new column 'actual_value' based on the following logic:
Let's assume we have a rank operation on the table for item-item_base and order by date, then,
For rank = 1
, actual_value = quantity_1
,
For rank = 2
, actual_value = quantity_1 - quantity_2
.
For rank > 2
, actual_value = quantity_1 - sum(all prev quantity_1) - sum(all prev quantity_2) - sum(all prev actual_value)
Here is my approach of solving this:
First, I create 2 additional columns cumulative_1
and cumulative_2
which are basically sum of quantity_1 and quantity_2 using sql windows.
SUM(quantity_1) OVER(PARTITION BY item, item_base ORDER BY date ROWS BETWEEN UNBOUNDED PRECEIDING AND 1 PRECEDING) as cumulative_1
and so on. Also, I am creating a rank column as a row_no identifier.
Spark does not support recursive CTEs so implementing sum(all prev actual_value)
is tedious. I had to switch to pandas dataframe to complete the calculations. Here is my code:
my_df = df.toPandas()
my_df['actual_value'] = 0.0
for i in range(len(my_df)):
if my_df.at[i, 'rank'] == 1:
my_df.at[i, 'actual_value'] = my_df.at[i, 'quantity_1']
elif my_df.at[i, 'rank'] == 2:
my_df.at[i, 'actual_value'] = my_df.at[i, 'quantity_1'] - my_df.at[i, 'quantity_2']
else:
previous_actual_values = my_df.loc[(my_df['item'] == my_df.at[i, 'item']) &
(my_df['item_base'] == my_df.at[i, 'item_base']) &
(my_df['date'] < my_df.at[i, 'date']), 'actual_value'].sum()
my_df.at[i, 'actual_value'] = my_df.at[i, 'quantity_1'] - my_df.at[i, 'cumulative_2'] - my_df.at[i, 'cumulative_1'] - previous_actual_values
if my_df.at[i, 'actual_value'] < 0:
my_df.at[i, 'actual_value'] = 0
The code does the job and gives me the proper output.
item | item_base| date | quantity_1 | quantity_2 | cumulative_1 | cumulative_2 | rank | actual_value
------------|----------|---------|------------|------------|--------------|--------------|------|--------------
1 | 20 | 202410 | 600 | 7493 | | | 1 | 600
1 | 20 | 202411 | 17000 | 16431 | 600 | 7493 | 2 | 569
1 | 20 | 202412 | 785 | 24456 | 17600 | 23924 | 3 | 0
1 | 20 | 202501 | 0 | 25775 | 18385 | 48380 | 4 | 0
1 | 20 | 202502 | | 26131 | 18385 | 74155 | 5 |
1 | 20 | 202503 | 0 | 39452 | 18385 | 100286 | 6 | 0
1 | 20 | 202504 | | 38087 | 18385 | 139738 | 7 |
1 | 20 | 202505 | 2856 | 28916 | 18385 | 177825 | 8 | 0
1 | 20 | 202506 | 500000 | 42254 | 21241 | 206741 | 9 | 270849
1 | 20 | 202507 | | 36776 | 521241 | 248995 | 10 |
1 | 20 | 202508 | 660 | 23523 | 521241 | 285771 | 11 | 0
1 | 20 | 202509 | 1316000 | 25543 | 521901 | 309294 | 12 | 212787
1 | 20 | 202510 | 265220 | 30589 | 1837901 | 334837 | 13 | 0
1 | 20 | 202511 | 47580 | | 1864421 | 365426 | 14 | 0
Now, the problem. Because I have to use pandas, the code takes forever to work for larger datasets. I need to either find a way to do this in Spark itself or improve to efficiency of the above code. I have considered vectorizing the calculations, but I’m struggling to find an efficient way to calculate the cumulative actual_value for rows where rank > 2.
EDIT: I am unable to fix the format of the output table, here is a screenshot of the output:
Share Improve this question edited Nov 21, 2024 at 16:53 werner 14.8k6 gold badges33 silver badges54 bronze badges asked Nov 21, 2024 at 16:32 RanaRana 851 silver badge9 bronze badges 6- Rana have your tried using UDF approach? – Dileep Raj Narayan Thumula Commented Nov 22, 2024 at 3:52
- @DileepRajNarayanThumula Sadly, I've just not been able to implement the proper way to get sum (prev actual values) with the UDF – Rana Commented Nov 22, 2024 at 6:32
- i.imgur.com/YomlHdB.png – Dileep Raj Narayan Thumula Commented Nov 22, 2024 at 6:51
- Rana I will post the solution that i have tried let me know if you have any questions regarding the output – Dileep Raj Narayan Thumula Commented Nov 22, 2024 at 8:38
- @DileepRajNarayanThumula just the clarify, the actual_value for rank 9 = 500000 - 21241 - 206741 -1169 = 270849. Basically (q1 - sum(prev q1) - sum(prev q2) - sum(prev ranks)). Similarly for rank 12, actual_value should be 212787. I'm not sure how you are getting 465372 and 1291157 – Rana Commented Nov 22, 2024 at 9:08
1 Answer
Reset to default 0I have tried the below approach:
window_spec = Window.partitionBy("item", "item_base").orderBy("date")
df = df.withColumn("rank", row_number().over(window_spec))
cumulative_window = window_spec.rowsBetween(Window.unboundedPreceding, -1)
df = (
df.withColumn("cumulative_1", _sum("quantity_1").over(cumulative_window))
.withColumn("cumulative_2", _sum("quantity_2").over(cumulative_window))
)
df = df.fillna({"cumulative_1": 0, "cumulative_2": 0, "quantity_1": 0, "quantity_2": 0})
pandas_df = df.toPandas()
pandas_df['actual_value'] = 0.0
for i in range(len(pandas_df)):
if pandas_df.at[i, 'rank'] == 1:
pandas_df.at[i, 'actual_value'] = pandas_df.at[i, 'quantity_1']
elif pandas_df.at[i, 'rank'] == 2:
pandas_df.at[i, 'actual_value'] = pandas_df.at[i, 'quantity_1'] - pandas_df.at[i, 'quantity_2']
else:
previous_actual_values = pandas_df.loc[
(pandas_df['item'] == pandas_df.at[i, 'item']) &
(pandas_df['item_base'] == pandas_df.at[i, 'item_base']) &
(pandas_df['date'] < pandas_df.at[i, 'date']), 'actual_value'
].sum()
pandas_df.at[i, 'actual_value'] = (
pandas_df.at[i, 'quantity_1'] -
pandas_df.at[i, 'cumulative_1'] -
pandas_df.at[i, 'cumulative_2'] -
previous_actual_values
)
if pandas_df.at[i, 'actual_value'] < 0:
pandas_df.at[i, 'actual_value'] = 0
print(pandas_df)
In the above code adding rank column based on item, item_base, and order by date and Calculating cumulative_1 and cumulative_2 Filling nulls with 0 for cumulative columns and quantities and converting to pandas DataFrame for recursive logic. Initializing actual_value column and caluculating actual_value iteratively
Results:
item item_base date quantity_1 quantity_2 rank cumulative_1
0 1 20 202410 600 7493 1 0
1 1 20 202411 17000 16431 2 600
2 1 20 202412 785 24456 3 17600
3 1 20 202501 0 25775 4 18385
4 1 20 202502 0 26131 5 18385
5 1 20 202503 0 39452 6 18385
6 1 20 202504 0 38087 7 18385
7 1 20 202505 2856 28916 8 18385
8 1 20 202506 500000 42254 9 21241
9 1 20 202507 0 36776 10 521241
10 1 20 202508 660 23523 11 521241
11 1 20 202509 1316000 25543 12 521901
12 1 20 202510 265220 30589 13 1837901
13 1 20 202511 47580 0 14 2103121
cumulative_2 actual_value
0 0 600.0
1 7493 569.0
2 23924 0.0
3 48380 0.0
4 74155 0.0
5 100286 0.0
6 139738 0.0
7 177825 0.0
8 206741 270849.0
9 248995 0.0
10 285771 0.0
11 309294 212787.0
12 334837 0.0
13 365426 0.0
本文标签:
版权声明:本文标题:python - How to optimize field calculation for large dataset in Azure Synapse Spark notebooks with cumulative sum? - Stack Overf 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1736308902a1933820.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论