admin管理员组

文章数量:1125760

I want to create a new column that contains an array of values for the column names listed in the lookup column.

Sample Input

input_df = spark.createDataFrame([
    Row(id=123, alert=1, operation=1, lookup=[]),
    Row(id=234, alert=0, operation=0, lookup=['alert']),
    Row(id=345, alert=1, operation=0, lookup=['operation']),
    Row(id=456, alert=0, operation=1, lookup=['alert', 'operation']),
])

Expected Output

id alert operation lookup lookup_values
123 1 1 [] []
234 0 0 [alert] [0]
345 1 0 [operation] [0]
456 0 1 [alert, operation] [0, 1]

I want to create a new column that contains an array of values for the column names listed in the lookup column.

Sample Input

input_df = spark.createDataFrame([
    Row(id=123, alert=1, operation=1, lookup=[]),
    Row(id=234, alert=0, operation=0, lookup=['alert']),
    Row(id=345, alert=1, operation=0, lookup=['operation']),
    Row(id=456, alert=0, operation=1, lookup=['alert', 'operation']),
])

Expected Output

id alert operation lookup lookup_values
123 1 1 [] []
234 0 0 [alert] [0]
345 1 0 [operation] [0]
456 0 1 [alert, operation] [0, 1]

What I have tried

input_df.withColumn("lookup_values", F.transform(F.col("lookup"), lambda x: input_df[f'{x}'])).show()

Fails with the error:

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with the name Column<'x_1'> cannot be resolved. Did you mean one of the following? [id, alert, operation, lookup].

This error is surprising because the following code does not produce an error, although it does not yield the intended result:

input_df.withColumn("lookup_values", F.transform(F.col("lookup"), lambda x: input_df['alert'])).show()
id alert operation lookup lookup_values
123 1 1 [] []
234 0 0 [alert] [0]
345 1 0 [operation] [1]
456 0 1 [alert, operation] [0, 0]
Share Improve this question edited 2 days ago Steven 15.2k7 gold badges46 silver badges78 bronze badges asked Jan 9 at 3:25 smurphysmurphy 3741 silver badge10 bronze badges
Add a comment  | 

2 Answers 2

Reset to default 1

Here is an answer without UDF using builtin functions. It should be faster with big volumn of data :

from pyspark.sql import functions as F

input_df.withColumn(
    "lookup_values",
    F.create_map(
        [F.lit("alert"), F.col("alert"), F.lit("operation"), F.col("operation")]
    ),
).withColumn(
    "lookup_values", 
    F.transform(F.col("lookup"), lambda x: F.col("lookup_values")[x])
).display()
+---+-----+---------+------------------+-------------+
| id|alert|operation|            lookup|lookup_values|
+---+-----+---------+------------------+-------------+
|123|    1|        1|                []|           []|
|234|    0|        0|           [alert]|          [0]|
|345|    1|        0|       [operation]|          [0]|
|456|    0|        1|[alert, operation]|       [0, 1]|
+---+-----+---------+------------------+-------------+

One way to do that is to pass the whole row into a UDF, and put the lookup value into the list based on the lookup column:

@func.udf(returnType=ArrayType(IntegerType()))
def lookup_values_udf(row):
    return [row[field] for field in row["lookup"]]

input_df.withColumn(
    "lookup_values", 
    lookup_values_udf(func.struct([func.col(col) for col in input_df.columns]))
).show(
    10, False
)

+---+-----+---------+------------------+-------------+
|id |alert|operation|lookup            |lookup_values|
+---+-----+---------+------------------+-------------+
|123|1    |1        |[]                |[]           |
|234|0    |0        |[alert]           |[0]          |
|345|1    |0        |[operation]       |[0]          |
|456|0    |1        |[alert, operation]|[0, 1]       |
+---+-----+---------+------------------+-------------+

本文标签: