admin管理员组

文章数量:1122846

I am using multiprocessing's shared memory to share a numpy array between tasks. While each task should originally just read the array, I was curious if writing was also possible. I wrote the following example to test it in a similar situation as I actually use it. In this toy example, each process "opens" the array, adds 1 to the first element (which is initialized as 0.0) and returns it. The returned values of the first index should therefore be [1,2,3,...], which is mostly the case, but if I run it a few times, every now and then, I get an issue where two values are the same.

Is there a way to avoid these conflicts? I know that in this example it would make no sense (or not cause any speedup if other processes need to wait), but I found no way to control the access, so any pointers would be appreciated to fix the actually different problem.

import numpy as np
from multiprocessing import shared_memory, Pool
from itertools import repeat
import time


def test_shm(N=500, n_proc=8, name='example'):
    # create a shared memory array
    a = np.random.rand(N, N, N).astype(np.float64)
    a[0, 0, 0] = 0.0

    shm = shared_memory.SharedMemory(
        name=name, create=True, size=a.nbytes)
    b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
    b[:] = a[:]
    shm.close()

    with Pool(n_proc) as p:
        res = p.starmap(
            work, zip(range(n_proc), repeat(name), repeat(a.dtype), repeat(N)))

    for r in res:
        print(f'{r[0]}\t{r[1]}')

    res = np.array([r[0] for r in res])
    print('not ' * int(~np.all(np.sort(res) == 1 + np.arange(n_proc))) + 'all good')

    shm.unlink()


def work(i, name, dtype, N=500):
    shm = shared_memory.SharedMemory(name=name)
    arr = np.ndarray((N, N, N), dtype=dtype, buffer=shm.buf)
    # now do some work
    time.sleep(2)
    val = arr[0, 0, 0:2].copy()
    val[0] += 1.0
    arr[0, 0, 0] = val[0]
    shm.close()

    return val


if __name__ == '__main__':
    test_shm()

I am using multiprocessing's shared memory to share a numpy array between tasks. While each task should originally just read the array, I was curious if writing was also possible. I wrote the following example to test it in a similar situation as I actually use it. In this toy example, each process "opens" the array, adds 1 to the first element (which is initialized as 0.0) and returns it. The returned values of the first index should therefore be [1,2,3,...], which is mostly the case, but if I run it a few times, every now and then, I get an issue where two values are the same.

Is there a way to avoid these conflicts? I know that in this example it would make no sense (or not cause any speedup if other processes need to wait), but I found no way to control the access, so any pointers would be appreciated to fix the actually different problem.

import numpy as np
from multiprocessing import shared_memory, Pool
from itertools import repeat
import time


def test_shm(N=500, n_proc=8, name='example'):
    # create a shared memory array
    a = np.random.rand(N, N, N).astype(np.float64)
    a[0, 0, 0] = 0.0

    shm = shared_memory.SharedMemory(
        name=name, create=True, size=a.nbytes)
    b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
    b[:] = a[:]
    shm.close()

    with Pool(n_proc) as p:
        res = p.starmap(
            work, zip(range(n_proc), repeat(name), repeat(a.dtype), repeat(N)))

    for r in res:
        print(f'{r[0]}\t{r[1]}')

    res = np.array([r[0] for r in res])
    print('not ' * int(~np.all(np.sort(res) == 1 + np.arange(n_proc))) + 'all good')

    shm.unlink()


def work(i, name, dtype, N=500):
    shm = shared_memory.SharedMemory(name=name)
    arr = np.ndarray((N, N, N), dtype=dtype, buffer=shm.buf)
    # now do some work
    time.sleep(2)
    val = arr[0, 0, 0:2].copy()
    val[0] += 1.0
    arr[0, 0, 0] = val[0]
    shm.close()

    return val


if __name__ == '__main__':
    test_shm()
Share Improve this question asked Nov 21, 2024 at 16:18 John SmithJohn Smith 1,2331 gold badge16 silver badges36 bronze badges 1
  • 2 This is due to the race condition caused by concurrent access to shared memory without proper synchronization. Uisng Lock() from multiprocessing to synchronize access to the shared memory may fix the issue... – Bhargav Commented Nov 21, 2024 at 16:22
Add a comment  | 

2 Answers 2

Reset to default 2

you have a data race on this array, you are doing concurrent reads and writes without synchronization, this is a bug even in multithreaded code with lists, it is not limited to shared memory, in this case you need a multiprocessing.Lock to protect reads and writes

On windows this is slightly complicated as locks need to be passed in the initializer.

# global scope
lock_var = None
def set_lock(value):
    global lock_var
    lock_var = value

...
with Pool(n_proc, initializer=set_lock, initargs=(lock,)) as p:
...

def work(i, name, dtype, N=500):
    shm = shared_memory.SharedMemory(name=name)
    arr = np.ndarray((N, N, N), dtype=dtype, buffer=shm.buf)
    # now do some work
    time.sleep(2)
    with lock_var:
        val = arr[0, 0, 0:2].copy()
        val[0] += 1.0
        arr[0, 0, 0] = val[0]
    shm.close()

    return val
import numpy as np
from multiprocessing import shared_memory, Pool, Lock
from itertools import repeat
import time


def test_shm(N=500, n_proc=8, name='example'):
    # create a shared memory array
    a = np.random.rand(N, N, N).astype(np.float64)
    a[0, 0, 0] = 0.0

    shm = shared_memory.SharedMemory(
        name=name, create=True, size=a.nbytes)
    b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
    b[:] = a[:]
    shm.close()

    lock = Lock()  # Create a lock for synchronization

    with Pool(n_proc) as p:
        res = p.starmap(
            work, zip(range(n_proc), repeat(name), repeat(a.dtype), repeat(N), repeat(lock)))

    for r in res:
        print(f'{r[0]}\t{r[1]}')

    res = np.array([r[0] for r in res])
    print('not ' * int(~np.all(np.sort(res) == 1 + np.arange(n_proc))) + 'all good')

    shm.unlink()


def work(i, name, dtype, N=500, lock=None):
    shm = shared_memory.SharedMemory(name=name)
    arr = np.ndarray((N, N, N), dtype=dtype, buffer=shm.buf)
    
    with lock:  # Acquire the lock
        time.sleep(2)
        val = arr[0, 0, 0:2].copy()
        val[0] += 1.0
        arr[0, 0, 0] = val[0]
    
    shm.close()

    return val


if __name__ == '__main__':
    test_shm()

本文标签: pythonconflict when using multiprocessing39s share memoryStack Overflow