

I have a dataset with 3M+ records and several columns. Here is a sample of my dataset:

item item_base date quantity_1 quantity_2
1 20 202410 600 7493
1 20 202411 17000 16431

I have a dataset with 3M+ records and several columns. Here is a sample of my dataset:

item item_base date quantity_1 quantity_2
1 20 202410 600 7493
1 20 202411 17000 16431

Each item-item_base-date makes a unique key. I need to calculate a new column 'actual_value' based on the following logic:

Let's assume we have a rank operation on the table for item-item_base and order by date, then,
For rank = 1, actual_value = quantity_1,
For rank = 2, actual_value = quantity_1 - quantity_2.
For rank > 2, actual_value = quantity_1 - sum(all prev quantity_1) - sum(all prev quantity_2) - sum(all prev actual_value)

Here is my approach of solving this:

First, I create 2 additional columns cumulative_1 and cumulative_2 which are basically sum of quantity_1 and quantity_2 using sql windows.
SUM(quantity_1) OVER(PARTITION BY item, item_base ORDER BY date ROWS BETWEEN UNBOUNDED PRECEIDING AND 1 PRECEDING) as cumulative_1 and so on. Also, I am creating a rank column as a row_no identifier.

Spark does not support recursive CTEs so implementing sum(all prev actual_value) is tedious. I had to switch to pandas dataframe to complete the calculations. Here is my code:

my_df = df.toPandas() 
my_df['actual_value'] = 0.0

for i in range(len(my_df)):
    if[i, 'rank'] == 1:[i, 'actual_value'] =[i, 'quantity_1']
    elif[i, 'rank'] == 2:[i, 'actual_value'] =[i, 'quantity_1'] -[i, 'quantity_2']
        previous_actual_values = my_df.loc[(my_df['item'] ==[i, 'item']) & 
                                           (my_df['item_base'] ==[i, 'item_base']) & 
                                           (my_df['date'] <[i, 'date']), 'actual_value'].sum()[i, 'actual_value'] =[i, 'quantity_1'] -[i, 'cumulative_2'] -[i, 'cumulative_1'] - previous_actual_values

    if[i, 'actual_value'] < 0:[i, 'actual_value'] = 0

The code does the job and gives me the proper output.

item        | item_base| date    | quantity_1 | quantity_2 | cumulative_1 | cumulative_2 | rank | actual_value
1           | 20       | 202410  | 600        | 7493       |              |              | 1    | 600
1           | 20       | 202411  | 17000      | 16431      | 600          | 7493         | 2    | 569
1           | 20       | 202412  | 785        | 24456      | 17600        | 23924        | 3    | 0
1           | 20       | 202501  | 0          | 25775      | 18385        | 48380        | 4    | 0
1           | 20       | 202502  |            | 26131      | 18385        | 74155        | 5    | 
1           | 20       | 202503  | 0          | 39452      | 18385        | 100286       | 6    | 0
1           | 20       | 202504  |            | 38087      | 18385        | 139738       | 7    | 
1           | 20       | 202505  | 2856       | 28916      | 18385        | 177825       | 8    | 0
1           | 20       | 202506  | 500000     | 42254      | 21241        | 206741       | 9    | 270849
1           | 20       | 202507  |            | 36776      | 521241       | 248995       | 10   | 
1           | 20       | 202508  | 660        | 23523      | 521241       | 285771       | 11   | 0
1           | 20       | 202509  | 1316000    | 25543      | 521901       | 309294       | 12   | 212787
1           | 20       | 202510  | 265220     | 30589      | 1837901      | 334837       | 13   | 0
1           | 20       | 202511  | 47580      |            | 1864421      | 365426       | 14   | 0

Now, the problem. Because I have to use pandas, the code takes forever to work for larger datasets. I need to either find a way to do this in Spark itself or improve to efficiency of the above code. I have considered vectorizing the calculations, but I’m struggling to find an efficient way to calculate the cumulative actual_value for rows where rank > 2.

EDIT: I am unable to fix the format of the output table, here is a screenshot of the output:

Share Improve this question edited Nov 21, 2024 at 16:53 werner 14.8k6 gold badges33 silver badges54 bronze badges asked Nov 21, 2024 at 16:32 RanaRana 851 silver badge9 bronze badges 6
  • Rana have your tried using UDF approach? – Dileep Raj Narayan Thumula Commented Nov 22, 2024 at 3:52
  • @DileepRajNarayanThumula Sadly, I've just not been able to implement the proper way to get sum (prev actual values) with the UDF – Rana Commented Nov 22, 2024 at 6:32
  • – Dileep Raj Narayan Thumula Commented Nov 22, 2024 at 6:51
  • Rana I will post the solution that i have tried let me know if you have any questions regarding the output – Dileep Raj Narayan Thumula Commented Nov 22, 2024 at 8:38
  • @DileepRajNarayanThumula just the clarify, the actual_value for rank 9 = 500000 - 21241 - 206741 -1169 = 270849. Basically (q1 - sum(prev q1) - sum(prev q2) - sum(prev ranks)). Similarly for rank 12, actual_value should be 212787. I'm not sure how you are getting 465372 and 1291157 – Rana Commented Nov 22, 2024 at 9:08
 |  Show 1 more comment

1 Answer 1

Reset to default 0

I have tried the below approach:

window_spec = Window.partitionBy("item", "item_base").orderBy("date")
df = df.withColumn("rank", row_number().over(window_spec))
cumulative_window = window_spec.rowsBetween(Window.unboundedPreceding, -1)
df = (
    df.withColumn("cumulative_1", _sum("quantity_1").over(cumulative_window))
      .withColumn("cumulative_2", _sum("quantity_2").over(cumulative_window))
df = df.fillna({"cumulative_1": 0, "cumulative_2": 0, "quantity_1": 0, "quantity_2": 0})
pandas_df = df.toPandas()
pandas_df['actual_value'] = 0.0
for i in range(len(pandas_df)):
    if[i, 'rank'] == 1:[i, 'actual_value'] =[i, 'quantity_1']
    elif[i, 'rank'] == 2:[i, 'actual_value'] =[i, 'quantity_1'] -[i, 'quantity_2']
        previous_actual_values = pandas_df.loc[
            (pandas_df['item'] ==[i, 'item']) &
            (pandas_df['item_base'] ==[i, 'item_base']) &
            (pandas_df['date'] <[i, 'date']), 'actual_value'
        ].sum()[i, 'actual_value'] = (
  [i, 'quantity_1'] - 
  [i, 'cumulative_1'] - 
  [i, 'cumulative_2'] - 
    if[i, 'actual_value'] < 0:[i, 'actual_value'] = 0

In the above code adding rank column based on item, item_base, and order by date and Calculating cumulative_1 and cumulative_2 Filling nulls with 0 for cumulative columns and quantities and converting to pandas DataFrame for recursive logic. Initializing actual_value column and caluculating actual_value iteratively


item  item_base    date  quantity_1  quantity_2  rank  cumulative_1  
0      1         20  202410         600        7493     1             0   
1      1         20  202411       17000       16431     2           600   
2      1         20  202412         785       24456     3         17600   
3      1         20  202501           0       25775     4         18385   
4      1         20  202502           0       26131     5         18385   
5      1         20  202503           0       39452     6         18385   
6      1         20  202504           0       38087     7         18385   
7      1         20  202505        2856       28916     8         18385   
8      1         20  202506      500000       42254     9         21241   
9      1         20  202507           0       36776    10        521241   
10     1         20  202508         660       23523    11        521241   
11     1         20  202509     1316000       25543    12        521901   
12     1         20  202510      265220       30589    13       1837901   
13     1         20  202511       47580           0    14       2103121   

    cumulative_2  actual_value  
0              0         600.0  
1           7493         569.0  
2          23924           0.0  
3          48380           0.0  
4          74155           0.0  
5         100286           0.0  
6         139738           0.0  
7         177825           0.0  
8         206741      270849.0  
9         248995           0.0  
10        285771           0.0  
11        309294      212787.0  
12        334837           0.0  
13        365426           0.0  
