admin管理员组文章数量:1406052
I am working on optimizing a model run in pyspark, using the sparkdf.groupBy().applyInPandas() and the cache methods. However, cache only appears to affect performance the second time I run my code, not the first time. Does anyone know why? How do I get the performance boost from cache to occur the first time?
I'm okay "paying" for caching in a previous step because I get such a performance boost. Consider the goal posts for this performance test to simply be the time it takes to score & save that scored dataset.
- First Run
Processing partition 1 at 2025-03-06 11:30:20.263913-05:00
Partition 1 processed in 255.919597864151 seconds ??? Assuming this is some kind of load into memory?
Processing partition 2 at 2025-03-06 11:34:36.183714-05:00
Partition 2 processed in 28.266658306121826 seconds
Processing partition 3 at 2025-03-06 11:35:04.450641-05:00
Partition 3 processed in 19.673797845840454 seconds
...
...
Processing partition 12 at 2025-03-06 11:38:41.491604-05:00
Partition 12 processed in 18.98810386657715 seconds
All partitions processed in 520.2162759304047 seconds - at an average of 43.35135632753372 seconds per partition.
- Second Run
Processing partition 1 at 2025-03-06 11:45:15.390380-05:00
Partition 1 processed in 2.5390563011169434 seconds
Processing partition 2 at 2025-03-06 11:45:17.929636-05:00
Partition 2 processed in 2.3451147079467773 seconds
Processing partition 3 at 2025-03-06 11:45:20.274882-05:00
Partition 3 processed in 2.273413896560669 seconds
...
...
Processing partition 12 at 2025-03-06 11:45:41.702098-05:00
Partition 12 processed in 2.234912157058716 seconds
All partitions processed in 28.547115802764893 seconds - at an average of 2.3789263168970742 seconds per partition.
Code sample:
results_df = spark.createDataFrame(data = [], schema = results_schema)
results_df.write.partitionBy("scenario_name").mode("overwrite").option("overwriteSchema", "true").saveAsTable("credit_risk.test_schema.perf_output_table1")
partition_df.cache()
beg_time = time.time()
for n_part in range(1, max_partitions + 1):
print(f"Processing partition {n_part} at {datetime.datetime.fromtimestamp(time.time(), pytz.timezone('US/Eastern'))}")
loop_beg_time = time.time()
partition_data = partition_df.filter((f.col("partition_num") == n_part)).drop('partition_num')
ecl_result = partition_data.groupBy('scenario_name').applyInPandas(cashflow_loop, results_schema)
ecl_result.cache()
ecl_result.write.partitionBy("scenario_name").mode("append").saveAsTable("credit_risk.test_schema.perf_output_table1")
print(f"Partition {n_part} processed in {time.time() - loop_beg_time} seconds")
end_time = time.time()
print(f"All partitions processed in {end_time - beg_time} seconds - at an average of {(end_time - beg_time) / max_partitions} seconds per partition.")
Node Performance Screenshot:
Other Thoughts & Ideas:
- My hypothesis on all this is I'm I/O constrained - and trying to just shove everything into memory to get faster performance. But the looking at the performance metrics - that doesn't seem to be the case. We've run this on 1 node & 2 node cluster - no major change in runtimes - so it doesn't appear to be cores/parallelization issue either... I'm stumped!
- For anyone wondering why I am not using a direct applyInPandas on partition_num, we are working with smaller nodes in DEV and have run into all sorts of extraneous errors (python workers crashing, OOM errors, etc...) on the client site that we do not run into in our internal test site. All we have at this point is a hunch that the memory consumed by this process clashes with other jobs and our job dies.
- FWIW it does complete in our test site but the times are extremely variable. 120s - 600s+
- For this problem - I am curious exactly what is happening with cached datasets and if checkpoints are applicable here (if at all)
本文标签: performanceHow do I get pyspark cachepersist to work the first timeStack Overflow
版权声明:本文标题:performance - How do I get pyspark cachepersist to work the first time? - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1744960676a2634633.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论