admin管理员组

文章数量:1289394

I'm having some troubles trying to improve the performance of a code in Python. This code is reading a huge amount of data (really big) from Databricks.

I filter it doing something like this:

                    data_filtered = (
                    data
                    .filter(
                        (data["id"] == event['id']) &
                        (data["Topic"].isin(topics))
                    )
                )

but still, this data is quite big. For some instances I need to declare my sparkSession like:

{
        "sark.kryoserializer.buffer.max": "2047m",
        "spark.dynamicAllocation.enabled": "true",
        "spark.dynamicAllocation.minExecutors": "1",
        "spark.dynamicAllocation.maxExecutors": "100",
        "spark.dynamicAllocation.initialExecutors": "50",
        "spark.executor.memory": "16g",
        "spark.driver.memory": "16g",
        "spark.executor.cores": "4",
        "spark.driver.cores": "4",
        "spark.sql.shuffle.partitions": "750",
        "spark.sql.catalogImplementation": "hive",
        "spark.driver.maxResultSize": "4g"
    }

and sometimes it had been not enough. And the problem always comes with the .collect() I'm doing to this filtered data:

data_collected = data_filtered.rdd.map(lambda row: row.asDict()).collect()

I need this data like this (or I dont know some other way) because I am looping the data creating a dictionary with every row. I have tried some other methods, using foreach/toLocalIterator for example (maybe wrongly), but I had a problem, I think because one column is converted to json json.loads() when I .append() it to my final dictionary.

Thanks!

I'm having some troubles trying to improve the performance of a code in Python. This code is reading a huge amount of data (really big) from Databricks.

I filter it doing something like this:

                    data_filtered = (
                    data
                    .filter(
                        (data["id"] == event['id']) &
                        (data["Topic"].isin(topics))
                    )
                )

but still, this data is quite big. For some instances I need to declare my sparkSession like:

{
        "sark.kryoserializer.buffer.max": "2047m",
        "spark.dynamicAllocation.enabled": "true",
        "spark.dynamicAllocation.minExecutors": "1",
        "spark.dynamicAllocation.maxExecutors": "100",
        "spark.dynamicAllocation.initialExecutors": "50",
        "spark.executor.memory": "16g",
        "spark.driver.memory": "16g",
        "spark.executor.cores": "4",
        "spark.driver.cores": "4",
        "spark.sql.shuffle.partitions": "750",
        "spark.sql.catalogImplementation": "hive",
        "spark.driver.maxResultSize": "4g"
    }

and sometimes it had been not enough. And the problem always comes with the .collect() I'm doing to this filtered data:

data_collected = data_filtered.rdd.map(lambda row: row.asDict()).collect()

I need this data like this (or I dont know some other way) because I am looping the data creating a dictionary with every row. I have tried some other methods, using foreach/toLocalIterator for example (maybe wrongly), but I had a problem, I think because one column is converted to json json.loads() when I .append() it to my final dictionary.

Thanks!

Share Improve this question edited 18 hours ago samkart 6,6543 gold badges16 silver badges34 bronze badges asked Feb 20 at 17:14 diegodiego 1771 gold badge1 silver badge9 bronze badges 4
  • collect will attempt to bring the entire data set to the driver, so yes, running out of memory on the driver is entirely possible. – Andrew Commented Feb 20 at 17:29
  • yeah, i know. but how could I handle that differently with this json which is not always with the same structure – diego Commented Feb 20 at 17:30
  • 1 You should find a way to avoid moving all your data onto the driver. What would be the next step in your processing logic for the dictionary? data_filtered.forEach or using an udf might be a good way to go. – werner Commented Feb 21 at 17:38
  • (data["id"] == event['id']) looks like a join condition being used within filter() - why not use inner join instead? joins can be internally optimized even with huge data – samkart Commented 18 hours ago
Add a comment  | 

2 Answers 2

Reset to default 0

Stop using collect(). Collect is not a big data tool. It's a fits 'on my computer tool'.

If you need to call a function on every row the big data tool for that is a User Defined Function.(UDF). The performance won't be good but hey at least you get the job done. (It doesn't perform well as it ships the data back and for the the python interpreter one row at a time.)

I'm not sure why you are creating a dictionary for each row. You can access each row via a select statement, so why create a dictionary? Perhaps consider exploring other options using spark/pyspark tools instead of python tools.

using collect() is not always a smart choice. collect() gathers all the rows across different nodes and loads it into the driver's memory. This can be especially dangerous when dealing with large volumes of data (You are fitting a large amount in your main memory) which leads to Memory Overload.

If you just need a subset, use take(n)

or use aggregation on workers to reduce the volume of data ex: reduceByKey()

本文标签: pysparkSpark trying to avoid use of collect()Stack Overflow