admin管理员组文章数量:1305316
Currently I'm making calculations using PySpark and trying to match data from multiple dataframes on a specific conditions.
I'm new to PySpark and decided to ask for a help.
My first dataframe contains general information about loans:
ID ContractDate MaturityDate Bank
ID1 2024-06-01 2024-06-18 A
ID2 2024-06-05 2024-06-18 B
ID3 2024-06-10 2024-06-17 C
ID4 2024-06-15 D
ID5 2024-08-01 2024-08-22 A
ID6 2024-08-08 2024-08-23 B
ID7 2024-08-20 D
My second dataframe contains information on how payments were made.
For each loan I have one or more payments done.
ID_loan PaymentDate PaymentSum
ID1 2024-06-02 10
ID1 2024-06-08 40
ID1 2024-06-10 50
ID2 2024-06-06 30
ID2 2024-06-07 90
ID2 2024-06-08 20
ID3 2024-06-11 20
ID3 2024-06-12 30
ID3 2024-06-13 50
ID5 2024-08-10 15
ID5 2024-08-13 35
ID5 2024-08-15 30
ID6 2024-08-15 20
ID6 2024-08-16 20
ID6 2024-08-20 70
My goal is to add to the first data frame a column 'PaymentSum' which will return for each loan the amount of payment made given the fact that the payment was made on the closest date to the 'ContractDate' of loan issued by the bank 'D'.
In other words I have to get the following table:
ID ContractDate MaturityDate Bank PaymentSum
ID1 2024-06-01 2024-06-18 A 50
ID2 2024-06-05 2024-06-18 B 20
ID3 2024-06-10 2024-06-17 C 50
ID4 2024-06-15 D
ID5 2024-08-01 2024-08-22 A 30
ID6 2024-08-08 2024-08-23 B 70
ID7 2024-08-20 D
I do understand that joins here are not enough Any help is highly appreciated!
Currently I'm making calculations using PySpark and trying to match data from multiple dataframes on a specific conditions.
I'm new to PySpark and decided to ask for a help.
My first dataframe contains general information about loans:
ID ContractDate MaturityDate Bank
ID1 2024-06-01 2024-06-18 A
ID2 2024-06-05 2024-06-18 B
ID3 2024-06-10 2024-06-17 C
ID4 2024-06-15 D
ID5 2024-08-01 2024-08-22 A
ID6 2024-08-08 2024-08-23 B
ID7 2024-08-20 D
My second dataframe contains information on how payments were made.
For each loan I have one or more payments done.
ID_loan PaymentDate PaymentSum
ID1 2024-06-02 10
ID1 2024-06-08 40
ID1 2024-06-10 50
ID2 2024-06-06 30
ID2 2024-06-07 90
ID2 2024-06-08 20
ID3 2024-06-11 20
ID3 2024-06-12 30
ID3 2024-06-13 50
ID5 2024-08-10 15
ID5 2024-08-13 35
ID5 2024-08-15 30
ID6 2024-08-15 20
ID6 2024-08-16 20
ID6 2024-08-20 70
My goal is to add to the first data frame a column 'PaymentSum' which will return for each loan the amount of payment made given the fact that the payment was made on the closest date to the 'ContractDate' of loan issued by the bank 'D'.
In other words I have to get the following table:
ID ContractDate MaturityDate Bank PaymentSum
ID1 2024-06-01 2024-06-18 A 50
ID2 2024-06-05 2024-06-18 B 20
ID3 2024-06-10 2024-06-17 C 50
ID4 2024-06-15 D
ID5 2024-08-01 2024-08-22 A 30
ID6 2024-08-08 2024-08-23 B 70
ID7 2024-08-20 D
I do understand that joins here are not enough Any help is highly appreciated!
Share Improve this question asked Feb 3 at 20:31 lenpyspanacblenpyspanacb 3331 silver badge10 bronze badges2 Answers
Reset to default 0You need to use Window function in this scenario.
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
df_1_ref = df_1.withColumn(
"closest_contract_date_from_D", func.first(
func.when(func.col("Bank")=="D", func.col("ContractDate")).otherwise(None), ignorenulls=True
).over(
Window.orderBy(func.asc("ContractDate")).rowsBetween(Window.currentRow, Window.unboundedFollowing)
)
).select(
"ID", "closest_contract_date_from_D"
)
First you create a reference table to collect the closest date to the ContractDate
of loan issued by the bank D. The method I use here is to use a Window function with ranking to find it. For each row, it will only search from the current row to the unbound following row.
df_2 = df_2.join(
df_1_ref, on=[func.col("ID")==func.col("ID_loan"), func.col("PaymentDate")<=func.col("closest_contract_date_from_D")], how="left"
).withColumn(
"rank", func.rank().over(Window.partitionBy("ID_loan").orderBy(func.desc("PaymentDate")))
).filter(
func.col("rank") == 1
).selectExpr(
"ID_loan AS ID", "PaymentSum"
)
Then you can join the reference table back to df_2. In the joining condition, you can add <=
to make sure that you only need the payment date which is smaller that the reference in each record. Then you can use a rank function to find the closest one.
df_1 = df_1.join(df_2, on=["ID"], how="left")
Then you can join in back to the main dataframe.
To resolve your issue please follow below code. For sample i am using loans_data1
and pay_data1
as dataframes.
Code:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, datediff, abs, row_number
from pyspark.sql.window import Window
loans_data1 = [
("ID1", "2024-06-01", "2024-06-18", "A"),
("ID2", "2024-06-05", "2024-06-18", "B"),
("ID3", "2024-06-10", "2024-06-17", "C"),
("ID4", "2024-06-15", None, "D"),
("ID5", "2024-08-01", "2024-08-22", "A"),
("ID6", "2024-08-08", "2024-08-23", "B"),
("ID7", "2024-08-20", None, "D")
]
pay_data1 = [
("ID1", "2024-06-02", 10),
("ID1", "2024-06-08", 40),
("ID1", "2024-06-10", 50),
("ID2", "2024-06-06", 30),
("ID2", "2024-06-07", 90),
("ID2", "2024-06-08", 20),
("ID3", "2024-06-11", 20),
("ID3", "2024-06-12", 30),
("ID3", "2024-06-13", 50),
("ID5", "2024-08-10", 15),
("ID5", "2024-08-13", 35),
("ID5", "2024-08-15", 30),
("ID6", "2024-08-15", 20),
("ID6", "2024-08-16", 20),
("ID6", "2024-08-20", 70)
]
loans_df1 = spark.createDataFrame(loans_data1, ["ID", "ContractDate", "MaturityDate", "Bank"])
payments_df = spark.createDataFrame(pay_data1, ["ID_loan", "PaymentDate", "PaymentSum"])
# First step use filter Bank 'D' and select `ContractDate` and use Cross join
bank_d_loans_df1 = loans_df1.filter(col("Bank") == "D").select(col("ContractDate").alias("BankD_ContractDate"))
cross_join_df = loans_df1.crossJoin(bank_d_loans_df1)
# Make sure to use join payments to the cross join result
joined_df = cross_join_df.join(
payments_df,
col("ID") == col("ID_loan"),
"left"
).withColumn("DateDiff", abs(datediff(col("BankD_ContractDate"), col("PaymentDate"))))
# Use a window function
window_spec = Window.partitionBy("ID").orderBy("DateDiff")
closest_payment_df = joined_df.withColumn("RowNum", row_number().over(window_spec)).filter(col("RowNum") == 1).drop("RowNum", "DateDiff")
# Select the columns
result_df = closest_payment_df.select(
col("ID"),
col("ContractDate"),
col("MaturityDate"),
col("Bank"),
col("PaymentSum")
)
display(result_df)
Output:
本文标签: Merge dataframes with conditions using PySparkStack Overflow
版权声明:本文标题:Merge dataframes with conditions using PySpark - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1741799196a2398119.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论