admin管理员组文章数量:1415697
I have an async service that processes data. My current approach is to process different folds created using TimeSeriesSplit
in parallel, since this is a CPU heavy task I decided to uses concurrent.futures.process.ProcessPoolExecutor
where each worker loads it's own fold (using the list of corresponding indices that get passed as argument to this blocking function process_single_fold
) and then performs CPU heavy operations on that data. Whenever a fold is processed I store it in the database, using asyncio.as_completed(futures)
.
However I noticed that when I increase the max_workers
in ProcessPoolExecutor
(e.g to 5) my code hangs sometimes, I guess depending on the workload on my machine (and in that case even though I see log messages that the processing finished, I still see that some workers are still 'running').
with ProcessPoolExecutor(max_workers=min(config.n_splits, os.cpu_count())) as pool:
loop = asyncio.get_running_loop()
table_created_flag = asyncio.Event()
futures = [
loop.run_in_executor(
pool,
process_single_fold,
ctx._settings.SRCDB_URL,
fold,
train_index,
test_index,
config,
)
for fold, (train_index, test_index) in enumerate(splits)
]
table_cache = {}
for fold, future in enumerate(asyncio.as_completed(futures)):
await store_dataframes_incrementally(
ctx.dst_db.engine,
future,
config,
fold,
table_created_flag,
table_cache,
)
process_single_fold
performs I/O connecting to the database (creates it's own SQLAlchemy engine using NullPool) and loads the data for a specific fold. The table flag I use in the code is an indicator whether the table has been created or not (which happens when the first fold is processed and ready)
My question is, first, is there a flaw in the design of this solution and why? and how do I handle this problem of a process getting randomly terminated (possibly by the OS) or hanging (can I kill the process and retry for that fold for example) and second if this is an expected outcome and the main thing I need to do is set max_workers
wisely?
I am using WSL.
I have an async service that processes data. My current approach is to process different folds created using TimeSeriesSplit
in parallel, since this is a CPU heavy task I decided to uses concurrent.futures.process.ProcessPoolExecutor
where each worker loads it's own fold (using the list of corresponding indices that get passed as argument to this blocking function process_single_fold
) and then performs CPU heavy operations on that data. Whenever a fold is processed I store it in the database, using asyncio.as_completed(futures)
.
However I noticed that when I increase the max_workers
in ProcessPoolExecutor
(e.g to 5) my code hangs sometimes, I guess depending on the workload on my machine (and in that case even though I see log messages that the processing finished, I still see that some workers are still 'running').
with ProcessPoolExecutor(max_workers=min(config.n_splits, os.cpu_count())) as pool:
loop = asyncio.get_running_loop()
table_created_flag = asyncio.Event()
futures = [
loop.run_in_executor(
pool,
process_single_fold,
ctx._settings.SRCDB_URL,
fold,
train_index,
test_index,
config,
)
for fold, (train_index, test_index) in enumerate(splits)
]
table_cache = {}
for fold, future in enumerate(asyncio.as_completed(futures)):
await store_dataframes_incrementally(
ctx.dst_db.engine,
future,
config,
fold,
table_created_flag,
table_cache,
)
process_single_fold
performs I/O connecting to the database (creates it's own SQLAlchemy engine using NullPool) and loads the data for a specific fold. The table flag I use in the code is an indicator whether the table has been created or not (which happens when the first fold is processed and ready)
My question is, first, is there a flaw in the design of this solution and why? and how do I handle this problem of a process getting randomly terminated (possibly by the OS) or hanging (can I kill the process and retry for that fold for example) and second if this is an expected outcome and the main thing I need to do is set max_workers
wisely?
I am using WSL.
Share Improve this question edited Feb 4 at 19:41 Ahmed Troudi asked Feb 4 at 19:21 Ahmed TroudiAhmed Troudi 33 bronze badges 7- 1 It can be tricky to mix multiprocessing and asyncio. Do you really need asyncio here? Because of your ProcessPoolExecutor, each member of the pool is going to be in a different process with its own memory space, so they won't impede each other. – Tim Roberts Commented Feb 4 at 19:33
- My service listens to a rabbitMQ queue, consumes task messages as they come and should preferably be available for ping requests, that is the reason I am making it async. – Ahmed Troudi Commented Feb 4 at 19:37
- I am open to other suggestions though and if making it fully synchronous also makes sense with rabbitMQ and in a production setting then I would like to know more about that – Ahmed Troudi Commented Feb 4 at 20:52
- Where is the rabbit listener? Is that in the main process? I don't think you can share a listener across multiple processes, and if you do they don't share state. That's what I mean about being "tricky". – Tim Roberts Commented Feb 4 at 22:40
- it is in the main process, when a message is received the main process creates workers and assign independent tasks to the workers, I don't want the workers to share state that's why I'm using the ProcessPoolExecutor. – Ahmed Troudi Commented Feb 5 at 7:54
1 Answer
Reset to default 0I had a silly bug in that the ProcessPoolExecutor was getting recreated for each request and I believe that was causing the hanging problem as a new request could disrupt one that is already running. Make sure the pool is created only once and reused by multiple requests.
本文标签: pythonProcessPoolExecutor() with asyncio hangs randomlyStack Overflow
版权声明:本文标题:python - ProcessPoolExecutor() with asyncio hangs randomly - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1745237521a2649123.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论