admin管理员组

文章数量:1316382

I have a function check_fun where the user inputs one table and additional argumments of column names. This function later on evaluates some checks depending on which variables has been inputed in the function call. This works fine but each evaluation and append takes a lot of time. How can I rewrite it in one call?

Data

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col, when, count
import pandas as pd


# Sample data
data = [(1, 12, 1, 5), (6, 8, 1, 6), (7, 15, 1, 7), (4, 9, 1, 12), (10, 11, 1, 9)]
columns = ["a", "b", "c", "d"]
df = spark.createDataFrame(data, columns)

The original function

def check_fun(df,
          a_input: str = None, 
          b_input: str = None,
          c_input: str = None,   
          d_input: str = None):
    
    columns = ['check', 'description', 'count']
    all_res = pd.DataFrame(columns = columns)
    
    # check1

    if a_input is not None and b_input is not None:
        check1_count = df.filter(col(a_input) > col(b_input)).count()
        check_1_res   = pd.DataFrame([['check1', 'a > b', check1_count]],
                                       columns = columns)
        all_res = all_res.append(check_1_res)

    # check2

    if a_input is not None and c_input is not None:
        check2_count = df.filter(col(a_input) > col(c_input)).count()
        check_2_res   = pd.DataFrame([['check2', 'a > c', check2_count]],
                                       columns = columns)
        all_res = all_res.append(check_2_res)
    # check3

    if a_input is not None and d_input is not None:
        check3_count = df.filter(col(a_input) > col(d_input)).count()
        check_3_res   = pd.DataFrame([['check3', 'a > d', check3_count]],
                                     columns=columns)
        all_res = all_res.append(check_3_res)
    # check4

    if b_input is not None and c_input is not None:
        check4_count = df.filter(col(a_input) <  col(d_input)).count()
        check_4_res   = pd.DataFrame([['check4', 'a <  d', check4_count]],
                                     columns=columns)
        all_res = all_res.append(check_4_res)

    return(all_res)

How I tried solving it:

 a = "a"
 b = "b"
 c = "c"
 d = "d"    



 df.agg(
    when(a is not None and b is not None, sum(when(col(a) > col(b), 1).otherwise(0)).otherwise(None).alias('check1')
)).show()

But this returns an error...

I have a function check_fun where the user inputs one table and additional argumments of column names. This function later on evaluates some checks depending on which variables has been inputed in the function call. This works fine but each evaluation and append takes a lot of time. How can I rewrite it in one call?

Data

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col, when, count
import pandas as pd


# Sample data
data = [(1, 12, 1, 5), (6, 8, 1, 6), (7, 15, 1, 7), (4, 9, 1, 12), (10, 11, 1, 9)]
columns = ["a", "b", "c", "d"]
df = spark.createDataFrame(data, columns)

The original function

def check_fun(df,
          a_input: str = None, 
          b_input: str = None,
          c_input: str = None,   
          d_input: str = None):
    
    columns = ['check', 'description', 'count']
    all_res = pd.DataFrame(columns = columns)
    
    # check1

    if a_input is not None and b_input is not None:
        check1_count = df.filter(col(a_input) > col(b_input)).count()
        check_1_res   = pd.DataFrame([['check1', 'a > b', check1_count]],
                                       columns = columns)
        all_res = all_res.append(check_1_res)

    # check2

    if a_input is not None and c_input is not None:
        check2_count = df.filter(col(a_input) > col(c_input)).count()
        check_2_res   = pd.DataFrame([['check2', 'a > c', check2_count]],
                                       columns = columns)
        all_res = all_res.append(check_2_res)
    # check3

    if a_input is not None and d_input is not None:
        check3_count = df.filter(col(a_input) > col(d_input)).count()
        check_3_res   = pd.DataFrame([['check3', 'a > d', check3_count]],
                                     columns=columns)
        all_res = all_res.append(check_3_res)
    # check4

    if b_input is not None and c_input is not None:
        check4_count = df.filter(col(a_input) <  col(d_input)).count()
        check_4_res   = pd.DataFrame([['check4', 'a <  d', check4_count]],
                                     columns=columns)
        all_res = all_res.append(check_4_res)

    return(all_res)

How I tried solving it:

 a = "a"
 b = "b"
 c = "c"
 d = "d"    



 df.agg(
    when(a is not None and b is not None, sum(when(col(a) > col(b), 1).otherwise(0)).otherwise(None).alias('check1')
)).show()

But this returns an error...

Share Improve this question asked Jan 29 at 10:24 MLENMLEN 2,5613 gold badges22 silver badges41 bronze badges 3
  • Apart from the solution to your problem, it appears there is a mistake in your final if statement: if b_input is not None and c_input is not None: - however, you're comparing a < d in the logic that follows? – Grismar Commented Jan 29 at 10:32
  • @Grismar Indeed that is incorrect. – MLEN Commented Jan 29 at 12:11
  • I'd suggest editing the question, so that there's no error there - which is correct, the if statement, or the logic that follows? You can edit the question to fix it, and people won't need to refer to the comments to make sense of it. – Grismar Commented Jan 29 at 12:53
Add a comment  | 

1 Answer 1

Reset to default 0

You're probably after something like this:

def check_fun(df, a=None, b=None, c=None, d=None):
    columns = ['check', 'description', 'count']

    results = [
        [check_name, description, df.filter(col(x) > col(y)).count()]
        for x, y, check_name, description in [
            (a, b, "check1", "a > b"),
            (a, c, "check2", "a > c"),
            (a, d, "check3", "a > d"),
            (d, a, "check4", "a < d")
        ]
        if x is not None and y is not None
    ]

    return pd.DataFrame(results, columns=columns) if results else pd.DataFrame(columns=columns)

Depending on the correct interpretation of 'check4', which had an inconsistency in the code you shared.

本文标签: pythonChecking for variable when evaluating When inside aggStack Overflow