admin管理员组

文章数量:1122846

In Pyspark, How to find dataframe size ( Approx. row count : 300 million records) through any available methods in Pyspark.? My Production system is running on < 3.0 spark version. The idea is based on dataframe size i need to calculate shuffle partition number before joining

In Pyspark, How to find dataframe size ( Approx. row count : 300 million records) through any available methods in Pyspark.? My Production system is running on < 3.0 spark version. The idea is based on dataframe size i need to calculate shuffle partition number before joining

Share Improve this question edited Nov 21, 2024 at 13:55 Learn Hadoop asked Nov 21, 2024 at 11:30 Learn HadoopLearn Hadoop 3,0329 gold badges34 silver badges71 bronze badges 2
  • 1 Please refer discussion stackoverflow.com/questions/49492463/… – user11768336 Commented Nov 21, 2024 at 13:50
  • thanks for your response .But my df have 300 million records and not recommended use cache.. The mentioned url suggested to use cache first and can take approx. size. – Learn Hadoop Commented Nov 21, 2024 at 13:54
Add a comment  | 

3 Answers 3

Reset to default 1

You can do this programmatically. We need to calculate each partition size and sum to get the data frame's total size. To calculate total size we need to write a function. Here's a sample code. I have tested it. This way you can calculate the size of the dataframe.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
    
spark=SparkSession.builder.appName("MemSize").getOrCreate()
    
     
schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Marks", IntegerType(), True)
])
    
num_records = 1500
data = [(f"Name{i}", i % 100) for i in range(1, num_records + 1)]  
     
df = spark.createDataFrame(data, schema=schema)
    
 
    
""" 
    Converting df into rdd and used mapPartititions. 
    The iter contains rows present in that partition. 
    The sum  is used to compute the total size of all rows in a partition, in bytes.
"""

def estimate_dataframe_size_via_rdd(df):
    
    partition_sizes = df.rdd.mapPartitions(lambda iter: [sum(len(str(row).encode("utf-8")) for row in iter)]).collect()
    
    total_size = sum(partition_sizes)
    return total_size
        
estimated_size_bytes = estimate_dataframe_size_via_rdd(df)
print(f"DataFrame size: {estimated_size_bytes / (1024**2):.2f} MB")

Use the .count() function. See the documentation for this function, it should exist from spark 1.3.0 upwards. If an approximate count is enough, this page explains that you can use the .approxCount() function, but you first have to convert to RDD. Might be efficient as an exact count is usually not necessary for a shuffle partition number.

To find the approximate size of a DataFrame in PySpark, especially when dealing with a large number of records (around 300 million), you can use the count() method to get the row count. Since your production system is running on Spark version < 3.0, you can use the following approach:

Assuming 'df' is your DataFrame

row_count = df.count()
print(f"Approximate row count: {row_count}")

This will give you the total number of rows in the DataFrame. Based on this count, you can then calculate the number of shuffle partitions before performing a join operation. A common heuristic is to set the number of shuffle partitions to a multiple of the number of cores available in your cluster. For example:1

Set the number of shuffle partitions

spark.conf.set("spark.sql.shuffle.partitions", desired_number_of_partitions)

You can determine desired_number_of_partitions based on your cluster's resources and the size of your data. A typical starting point might be 200 partitions, but you may need to adjust this based on performance observations.

本文标签: pythonHow to calculate Dataframe sizeStack Overflow