admin管理员组

文章数量:1305316

Currently I'm making calculations using PySpark and trying to match data from multiple dataframes on a specific conditions.

I'm new to PySpark and decided to ask for a help.

My first dataframe contains general information about loans:

ID     ContractDate MaturityDate Bank
ID1    2024-06-01   2024-06-18   A       
ID2    2024-06-05   2024-06-18   B       
ID3    2024-06-10   2024-06-17   C       
ID4    2024-06-15                D

ID5    2024-08-01   2024-08-22   A       
ID6    2024-08-08   2024-08-23   B            
ID7    2024-08-20                D

My second dataframe contains information on how payments were made.

For each loan I have one or more payments done.

ID_loan PaymentDate PaymentSum
ID1     2024-06-02  10
ID1     2024-06-08  40
ID1     2024-06-10  50
ID2     2024-06-06  30
ID2     2024-06-07  90
ID2     2024-06-08  20
ID3     2024-06-11  20
ID3     2024-06-12  30
ID3     2024-06-13  50

ID5     2024-08-10  15
ID5     2024-08-13  35
ID5     2024-08-15  30
ID6     2024-08-15  20
ID6     2024-08-16  20
ID6     2024-08-20  70

My goal is to add to the first data frame a column 'PaymentSum' which will return for each loan the amount of payment made given the fact that the payment was made on the closest date to the 'ContractDate' of loan issued by the bank 'D'.

In other words I have to get the following table:

ID     ContractDate MaturityDate Bank PaymentSum
ID1    2024-06-01   2024-06-18   A    50 
ID2    2024-06-05   2024-06-18   B    20   
ID3    2024-06-10   2024-06-17   C    50   
ID4    2024-06-15                D

ID5    2024-08-01   2024-08-22   A    30   
ID6    2024-08-08   2024-08-23   B    70       
ID7    2024-08-20                D

I do understand that joins here are not enough Any help is highly appreciated!

Currently I'm making calculations using PySpark and trying to match data from multiple dataframes on a specific conditions.

I'm new to PySpark and decided to ask for a help.

My first dataframe contains general information about loans:

ID     ContractDate MaturityDate Bank
ID1    2024-06-01   2024-06-18   A       
ID2    2024-06-05   2024-06-18   B       
ID3    2024-06-10   2024-06-17   C       
ID4    2024-06-15                D

ID5    2024-08-01   2024-08-22   A       
ID6    2024-08-08   2024-08-23   B            
ID7    2024-08-20                D

My second dataframe contains information on how payments were made.

For each loan I have one or more payments done.

ID_loan PaymentDate PaymentSum
ID1     2024-06-02  10
ID1     2024-06-08  40
ID1     2024-06-10  50
ID2     2024-06-06  30
ID2     2024-06-07  90
ID2     2024-06-08  20
ID3     2024-06-11  20
ID3     2024-06-12  30
ID3     2024-06-13  50

ID5     2024-08-10  15
ID5     2024-08-13  35
ID5     2024-08-15  30
ID6     2024-08-15  20
ID6     2024-08-16  20
ID6     2024-08-20  70

My goal is to add to the first data frame a column 'PaymentSum' which will return for each loan the amount of payment made given the fact that the payment was made on the closest date to the 'ContractDate' of loan issued by the bank 'D'.

In other words I have to get the following table:

ID     ContractDate MaturityDate Bank PaymentSum
ID1    2024-06-01   2024-06-18   A    50 
ID2    2024-06-05   2024-06-18   B    20   
ID3    2024-06-10   2024-06-17   C    50   
ID4    2024-06-15                D

ID5    2024-08-01   2024-08-22   A    30   
ID6    2024-08-08   2024-08-23   B    70       
ID7    2024-08-20                D

I do understand that joins here are not enough Any help is highly appreciated!

Share Improve this question asked Feb 3 at 20:31 lenpyspanacblenpyspanacb 3331 silver badge10 bronze badges
Add a comment  | 

2 Answers 2

Reset to default 0

You need to use Window function in this scenario.

from pyspark.sql import SparkSession
from pyspark.sql import functions as func

df_1_ref = df_1.withColumn(
    "closest_contract_date_from_D", func.first(
        func.when(func.col("Bank")=="D", func.col("ContractDate")).otherwise(None), ignorenulls=True
    ).over(
        Window.orderBy(func.asc("ContractDate")).rowsBetween(Window.currentRow, Window.unboundedFollowing)
    )
).select(
    "ID", "closest_contract_date_from_D"
)

First you create a reference table to collect the closest date to the ContractDate of loan issued by the bank D. The method I use here is to use a Window function with ranking to find it. For each row, it will only search from the current row to the unbound following row.

df_2 = df_2.join(
    df_1_ref, on=[func.col("ID")==func.col("ID_loan"), func.col("PaymentDate")<=func.col("closest_contract_date_from_D")], how="left"
).withColumn(
    "rank", func.rank().over(Window.partitionBy("ID_loan").orderBy(func.desc("PaymentDate")))
).filter(
    func.col("rank") == 1
).selectExpr(
    "ID_loan AS ID", "PaymentSum"
)

Then you can join the reference table back to df_2. In the joining condition, you can add <= to make sure that you only need the payment date which is smaller that the reference in each record. Then you can use a rank function to find the closest one.

df_1 = df_1.join(df_2, on=["ID"], how="left")

Then you can join in back to the main dataframe.

To resolve your issue please follow below code. For sample i am using loans_data1 and pay_data1 as dataframes.

Code:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, datediff, abs, row_number
from pyspark.sql.window import Window

loans_data1 = [
    ("ID1", "2024-06-01", "2024-06-18", "A"),
    ("ID2", "2024-06-05", "2024-06-18", "B"),
    ("ID3", "2024-06-10", "2024-06-17", "C"),
    ("ID4", "2024-06-15", None, "D"),
    ("ID5", "2024-08-01", "2024-08-22", "A"),
    ("ID6", "2024-08-08", "2024-08-23", "B"),
    ("ID7", "2024-08-20", None, "D")
]

pay_data1 = [
    ("ID1", "2024-06-02", 10),
    ("ID1", "2024-06-08", 40),
    ("ID1", "2024-06-10", 50),
    ("ID2", "2024-06-06", 30),
    ("ID2", "2024-06-07", 90),
    ("ID2", "2024-06-08", 20),
    ("ID3", "2024-06-11", 20),
    ("ID3", "2024-06-12", 30),
    ("ID3", "2024-06-13", 50),
    ("ID5", "2024-08-10", 15),
    ("ID5", "2024-08-13", 35),
    ("ID5", "2024-08-15", 30),
    ("ID6", "2024-08-15", 20),
    ("ID6", "2024-08-16", 20),
    ("ID6", "2024-08-20", 70)
]

loans_df1 = spark.createDataFrame(loans_data1, ["ID", "ContractDate", "MaturityDate", "Bank"])
payments_df = spark.createDataFrame(pay_data1, ["ID_loan", "PaymentDate", "PaymentSum"])

# First step use filter Bank 'D' and select `ContractDate` and use Cross join
bank_d_loans_df1 = loans_df1.filter(col("Bank") == "D").select(col("ContractDate").alias("BankD_ContractDate"))

cross_join_df = loans_df1.crossJoin(bank_d_loans_df1)

# Make sure to use join payments to the cross join result
joined_df = cross_join_df.join(
    payments_df,
    col("ID") == col("ID_loan"),
    "left"
).withColumn("DateDiff", abs(datediff(col("BankD_ContractDate"), col("PaymentDate"))))

# Use a window function 
window_spec = Window.partitionBy("ID").orderBy("DateDiff")
closest_payment_df = joined_df.withColumn("RowNum", row_number().over(window_spec)).filter(col("RowNum") == 1).drop("RowNum", "DateDiff")

# Select the columns
result_df = closest_payment_df.select(
    col("ID"), 
    col("ContractDate"), 
    col("MaturityDate"), 
    col("Bank"), 
    col("PaymentSum")
)

display(result_df)

Output:

本文标签: Merge dataframes with conditions using PySparkStack Overflow