admin管理员组

文章数量:1125368

import multiprocessing
import time


def task_function(x):
    print(f"Processing {x} in process {multiprocessing.current_process().name}")
    time.sleep(1)

def main():
    data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

    with multiprocessing.Pool(processes=4) as pool:
        pool.map_async(task_function, data)

        print("Waiting for tasks to complete...")
        pool.close()
        pool.join()
        print("All tasks completed.")

if __name__ == "__main__":
    main()

I don't know why, in the above code join() correctly blocks the code to continue running. But in one of my complex use cases, join() is not valid.

Below is a part of my complex code.

    with multiprocessing.Pool(processes=4) as read_pool:
        # read_pool = multiprocessing.Pool(4)
        read_jobs = read_pool.map_async(
            func=read_image_task,
            iterable=[(filename, image_directory, queue) for filename in images_to_ocr]
        )
        read_pool.close()
        read_pool.join()
        print(123) # before running the read_image_task function, it print 123

and strangely when i add this read_jobs.get(), then it successfully blocks and doesn't print 123.

        read_pool.join()
        read_jobs.get()
        print(123)
import multiprocessing
import time


def task_function(x):
    print(f"Processing {x} in process {multiprocessing.current_process().name}")
    time.sleep(1)

def main():
    data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

    with multiprocessing.Pool(processes=4) as pool:
        pool.map_async(task_function, data)

        print("Waiting for tasks to complete...")
        pool.close()
        pool.join()
        print("All tasks completed.")

if __name__ == "__main__":
    main()

I don't know why, in the above code join() correctly blocks the code to continue running. But in one of my complex use cases, join() is not valid.

Below is a part of my complex code.

    with multiprocessing.Pool(processes=4) as read_pool:
        # read_pool = multiprocessing.Pool(4)
        read_jobs = read_pool.map_async(
            func=read_image_task,
            iterable=[(filename, image_directory, queue) for filename in images_to_ocr]
        )
        read_pool.close()
        read_pool.join()
        print(123) # before running the read_image_task function, it print 123

and strangely when i add this read_jobs.get(), then it successfully blocks and doesn't print 123.

        read_pool.join()
        read_jobs.get()
        print(123)
Share Improve this question edited 2 days ago hrdom asked 2 days ago hrdomhrdom 1681 silver badge8 bronze badges 4
  • The reason why join() doesn't block is because map_async is non-blocking, meaning that it returns immediately with an AsyncResult object, and the tasks are executed asynchronously in the background. join() ensures that the worker processes have completed their tasks, but in your case, you need to use get() to ensure that the main program waits for the results. – frisko Commented 2 days ago
  • but why my first code is correct. – hrdom Commented 2 days ago
  • 1 It could be because the first code doesn't raise any exception, but the second one (read_image_task) might. – frisko Commented 2 days ago
  • @frisko yeah, you're right. thank you. i just found this reason. – hrdom Commented 2 days ago
Add a comment  | 

2 Answers 2

Reset to default 0

Very strange, I just solved this problem. By replacing the map_async as starmap_async, but i don't know the reason.
I guess it may be due to some reason that my error TypeError: read_image_task() missing 2 required positional arguments: 'image_directory' and 'queue' was not displayed, and the read_pool's 4 processes were interrupted with this error, and then print 123 is normally.

if i don't add read_jobs.get(), this error won't be printed. it's strange.
This error is not printed, and Pycharm doesn't break in this error. . .

I guess you are creating the pool twice (read_pool = multiprocessing.Pool(4)), which may be causing issues with the join() method.

Remember when you call map_async(), it doesn not block until you call get(), which is why the code continues without waiting for the tasks to finish.

In order to fix this one, you need to remove the redundant pool creation and just use read_jobs.get(). This will make sure the proper blocking. Here's an updated example:

with multiprocessing.Pool(processes=4) as read_pool:
    read_jobs = read_pool.map_async(func=read_image_task, iterable=[(filename, image_directory, queue) for filename in images_to_ocr])
    read_pool.close()
    read_pool.join()
    read_jobs.get()

本文标签: multiprocessingpython ignores pooljoin() and continues to run the following codeStack Overflow