admin管理员组

文章数量:1334929

As described, I can't pass a large dataframe into the sub-process when using the python standard library multiprocessing, or the concurrent.futures library.

Python version: 3.11.8 system: windows 11 23H2

Error: assert left > 0. OS Error: parameter is wrong.

Here is an example code:

import pandas as pd 
import numpy as np 

def calc_sth(big_df):
    result = [1]
    print("start subprocess")
    return result


def multi_call():
    big_df = pd.DataFrame(np.random.rand(1000000,1000))
    result_list = []
    with ProcessPoolExecutor() as executor:
        with tqdm(total=10) as pbar:
            for i in range(5):
                future = executor.submit(calc_sth, big_df)
                future.add_done_callback(lambda p: pbar.update(1))
                result_list.append(future)

            res = []
            for r in result_list:
                res.append(r.result())
    return res

if __name__ == '__main__':
    res = multi_call()
    print(res)

and here is the error and traceback:

Traceback (most recent call last):
Traceback (most recent call last):
  File "d:\python\Lib\multiprocessing\queues.py", line 246, in _feed
    send_bytes(obj)
  File "d:\python\Lib\multiprocessing\connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "d:\python\Lib\multiprocessing\connection.py", line 289, in _send_bytes
    ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
OSError: [WinError 87] 参数错误。
  File "d:\python\Lib\multiprocessing\process.py", line 314, in _bootstrap
    self.run()
  File "d:\python\Lib\multiprocessing\process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "d:\python\Lib\concurrent\futures\process.py", line 249, in _process_worker
    call_item = call_queue.get(block=True)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "d:\python\Lib\multiprocessing\queues.py", line 103, in get
    res = self._recv_bytes()
          ^^^^^^^^^^^^^^^^^^
  File "d:\python\Lib\multiprocessing\connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "d:\python\Lib\multiprocessing\connection.py", line 334, in _recv_bytes
    return self._get_more_data(ov, maxsize)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "d:\python\Lib\multiprocessing\connection.py", line 353, in _get_more_data
    assert left > 0
           ^^^^^^^^
AssertionError
 10%|████████████████████▊                                                                                                                                                                                           | 1/10 [00:06<00:55,  6.17s/it]Traceback (most recent call last):
 20%|█████████████████████████████████████████▌                                                                                                                                                                      | 2/10 [00:06<00:24,  3.09s/it]  File "d:\python\Lib\multiprocessing\queues.py", line 246, in _feed
 File "d:\python\Lib\multiprocessing\queues.py", line 246, in _feed
    send_bytes(obj)

  File "d:\python\Lib\multiprocessing\connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "d:\python\Lib\multiprocessing\connection.py", line 289, in _send_bytes
    ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
OSError: [WinError 87] 参数错误。

Is it a bug or I just made some mistake? How can I pass a large data into the subprocess?

As described, I can't pass a large dataframe into the sub-process when using the python standard library multiprocessing, or the concurrent.futures library.

Python version: 3.11.8 system: windows 11 23H2

Error: assert left > 0. OS Error: parameter is wrong.

Here is an example code:

import pandas as pd 
import numpy as np 

def calc_sth(big_df):
    result = [1]
    print("start subprocess")
    return result


def multi_call():
    big_df = pd.DataFrame(np.random.rand(1000000,1000))
    result_list = []
    with ProcessPoolExecutor() as executor:
        with tqdm(total=10) as pbar:
            for i in range(5):
                future = executor.submit(calc_sth, big_df)
                future.add_done_callback(lambda p: pbar.update(1))
                result_list.append(future)

            res = []
            for r in result_list:
                res.append(r.result())
    return res

if __name__ == '__main__':
    res = multi_call()
    print(res)

and here is the error and traceback:

Traceback (most recent call last):
Traceback (most recent call last):
  File "d:\python\Lib\multiprocessing\queues.py", line 246, in _feed
    send_bytes(obj)
  File "d:\python\Lib\multiprocessing\connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "d:\python\Lib\multiprocessing\connection.py", line 289, in _send_bytes
    ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
OSError: [WinError 87] 参数错误。
  File "d:\python\Lib\multiprocessing\process.py", line 314, in _bootstrap
    self.run()
  File "d:\python\Lib\multiprocessing\process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "d:\python\Lib\concurrent\futures\process.py", line 249, in _process_worker
    call_item = call_queue.get(block=True)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "d:\python\Lib\multiprocessing\queues.py", line 103, in get
    res = self._recv_bytes()
          ^^^^^^^^^^^^^^^^^^
  File "d:\python\Lib\multiprocessing\connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "d:\python\Lib\multiprocessing\connection.py", line 334, in _recv_bytes
    return self._get_more_data(ov, maxsize)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "d:\python\Lib\multiprocessing\connection.py", line 353, in _get_more_data
    assert left > 0
           ^^^^^^^^
AssertionError
 10%|████████████████████▊                                                                                                                                                                                           | 1/10 [00:06<00:55,  6.17s/it]Traceback (most recent call last):
 20%|█████████████████████████████████████████▌                                                                                                                                                                      | 2/10 [00:06<00:24,  3.09s/it]  File "d:\python\Lib\multiprocessing\queues.py", line 246, in _feed
 File "d:\python\Lib\multiprocessing\queues.py", line 246, in _feed
    send_bytes(obj)

  File "d:\python\Lib\multiprocessing\connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "d:\python\Lib\multiprocessing\connection.py", line 289, in _send_bytes
    ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
OSError: [WinError 87] 参数错误。

Is it a bug or I just made some mistake? How can I pass a large data into the subprocess?

Share Improve this question asked Nov 21, 2024 at 5:44 Frank WangFrank Wang 111 silver badge2 bronze badges 1
  • There are dedicated frameworks for working with huge tables across multiple processes, like pyspark or ray or dask, they only lazily load data from disk so each worker only has a part of the data. – Ahmed AEK Commented Nov 21, 2024 at 9:18
Add a comment  | 

2 Answers 2

Reset to default 2

This is not the way you should be using multiprocessing. Passing large arguments, as you can see, is either very very slow or just doesn't work.

You should either

  1. Used shared memory for your numpy array,
  2. Have each processor generate its own data, if possible.
  3. The main process should pass to each sub process just the data it needs to perform its calculation.

The parameters passed to and returned from subprocesses have to be serialised. If the object(s) being passed to a subprocess are very large, the serialisation can have a significant negative impact on performance.

If you have a large object that you want to pass to multiple subprocesses (where that object's contents are constant) then you could consider pickling the object once then just pass a reference to the subprocesses.

Here's an example that demonstrates the timing difference between "manual" and "implied" serialisation as well as using shared memory.

import pandas as pd 
import numpy as np 
from tempfile import NamedTemporaryFile
import multiprocessing as mp
from typing import Any
import time
import os
from multiprocessing.shared_memory import SharedMemory
import pickle

LOOP = 10

def sub_1(pfile: str) -> Any:
    """Explicit pickle"""
    return pd.read_pickle(pfile).loc[0][0]

def sub_2(df: pd.DataFrame) -> Any:
    """Standard serialisation"""
    return df.loc[0][0]

def sub_3(name: str) -> Any:
    """Shared memory"""
    try:
        shm = SharedMemory(name)
        return pickle.loads(shm.buf).loc[0][0]
    finally:
        shm.close()

def procs() -> int:
    ncpus = os.cpu_count() or 2
    return max(ncpus - 2, 2)

def main() -> None:
    try:
        df = pd.DataFrame(np.random.rand(10_000, 10_000))
        smb = pickle.dumps(df)
        with NamedTemporaryFile(delete=False) as temp:
            temp.write(smb)
        size = len(smb)
        shm = SharedMemory(create=True, size=size)
        shm.buf[:size] = smb
        del smb
        print(f"Pickle file size = {size:,} bytes")
        with mp.Pool(procs()) as pool:
            for func, arg in (sub_1, temp.name), (sub_2, df), (sub_3, shm.name):
                start = time.time()
                for result in [pool.apply_async(func=func, args=(arg,)) for _ in range(LOOP)]:
                    assert result.get() == df.loc[0][0]
                duration = time.time() - start
                print(f"'{func.__doc__}' {duration=:.4f}")
    finally:
        os.unlink(temp.name)
        shm.close()
        shm.unlink()
 
if __name__ == '__main__':
    main()

Output:

Pickle file size = 800,000,596 bytes
'Explicit pickle' duration=2.3946
'Standard serialisation' duration=11.7310
'Shared memory' duration=3.7102

Note:

Performance will vary considerably depending on your platform but one would expect sub_1 ("Explicit pickle") would always be faster than sub_2 ("Standard serialisation"). Interestingly, on MacOS (M2), sub_3 ("Shared memory") is slower than sub_1. However, if you play with the dataframe dimensions you will see these results differ greatly from those shown above. For example, if the dimensions are 10_000 x 5_000, sub_3 performs better than the other two techniques

本文标签: python multiprocessing raise assert error when passing large data into the subprocessStack Overflow