admin管理员组文章数量:1122846
I am using multiprocessing
's shared memory to share a numpy array between tasks. While each task should originally just read the array, I was curious if writing was also possible. I wrote the following example to test it in a similar situation as I actually use it. In this toy example, each process "opens" the array, adds 1 to the first element (which is initialized as 0.0
) and returns it. The returned values of the first index should therefore be [1,2,3,...]
, which is mostly the case, but if I run it a few times, every now and then, I get an issue where two values are the same.
Is there a way to avoid these conflicts? I know that in this example it would make no sense (or not cause any speedup if other processes need to wait), but I found no way to control the access, so any pointers would be appreciated to fix the actually different problem.
import numpy as np
from multiprocessing import shared_memory, Pool
from itertools import repeat
import time
def test_shm(N=500, n_proc=8, name='example'):
# create a shared memory array
a = np.random.rand(N, N, N).astype(np.float64)
a[0, 0, 0] = 0.0
shm = shared_memory.SharedMemory(
name=name, create=True, size=a.nbytes)
b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
b[:] = a[:]
shm.close()
with Pool(n_proc) as p:
res = p.starmap(
work, zip(range(n_proc), repeat(name), repeat(a.dtype), repeat(N)))
for r in res:
print(f'{r[0]}\t{r[1]}')
res = np.array([r[0] for r in res])
print('not ' * int(~np.all(np.sort(res) == 1 + np.arange(n_proc))) + 'all good')
shm.unlink()
def work(i, name, dtype, N=500):
shm = shared_memory.SharedMemory(name=name)
arr = np.ndarray((N, N, N), dtype=dtype, buffer=shm.buf)
# now do some work
time.sleep(2)
val = arr[0, 0, 0:2].copy()
val[0] += 1.0
arr[0, 0, 0] = val[0]
shm.close()
return val
if __name__ == '__main__':
test_shm()
I am using multiprocessing
's shared memory to share a numpy array between tasks. While each task should originally just read the array, I was curious if writing was also possible. I wrote the following example to test it in a similar situation as I actually use it. In this toy example, each process "opens" the array, adds 1 to the first element (which is initialized as 0.0
) and returns it. The returned values of the first index should therefore be [1,2,3,...]
, which is mostly the case, but if I run it a few times, every now and then, I get an issue where two values are the same.
Is there a way to avoid these conflicts? I know that in this example it would make no sense (or not cause any speedup if other processes need to wait), but I found no way to control the access, so any pointers would be appreciated to fix the actually different problem.
import numpy as np
from multiprocessing import shared_memory, Pool
from itertools import repeat
import time
def test_shm(N=500, n_proc=8, name='example'):
# create a shared memory array
a = np.random.rand(N, N, N).astype(np.float64)
a[0, 0, 0] = 0.0
shm = shared_memory.SharedMemory(
name=name, create=True, size=a.nbytes)
b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
b[:] = a[:]
shm.close()
with Pool(n_proc) as p:
res = p.starmap(
work, zip(range(n_proc), repeat(name), repeat(a.dtype), repeat(N)))
for r in res:
print(f'{r[0]}\t{r[1]}')
res = np.array([r[0] for r in res])
print('not ' * int(~np.all(np.sort(res) == 1 + np.arange(n_proc))) + 'all good')
shm.unlink()
def work(i, name, dtype, N=500):
shm = shared_memory.SharedMemory(name=name)
arr = np.ndarray((N, N, N), dtype=dtype, buffer=shm.buf)
# now do some work
time.sleep(2)
val = arr[0, 0, 0:2].copy()
val[0] += 1.0
arr[0, 0, 0] = val[0]
shm.close()
return val
if __name__ == '__main__':
test_shm()
Share
Improve this question
asked Nov 21, 2024 at 16:18
John SmithJohn Smith
1,2331 gold badge16 silver badges36 bronze badges
1
|
2 Answers
Reset to default 2you have a data race on this array, you are doing concurrent reads and writes without synchronization, this is a bug even in multithreaded code with lists, it is not limited to shared memory, in this case you need a multiprocessing.Lock to protect reads and writes
On windows this is slightly complicated as locks need to be passed in the initializer.
# global scope
lock_var = None
def set_lock(value):
global lock_var
lock_var = value
...
with Pool(n_proc, initializer=set_lock, initargs=(lock,)) as p:
...
def work(i, name, dtype, N=500):
shm = shared_memory.SharedMemory(name=name)
arr = np.ndarray((N, N, N), dtype=dtype, buffer=shm.buf)
# now do some work
time.sleep(2)
with lock_var:
val = arr[0, 0, 0:2].copy()
val[0] += 1.0
arr[0, 0, 0] = val[0]
shm.close()
return val
import numpy as np
from multiprocessing import shared_memory, Pool, Lock
from itertools import repeat
import time
def test_shm(N=500, n_proc=8, name='example'):
# create a shared memory array
a = np.random.rand(N, N, N).astype(np.float64)
a[0, 0, 0] = 0.0
shm = shared_memory.SharedMemory(
name=name, create=True, size=a.nbytes)
b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
b[:] = a[:]
shm.close()
lock = Lock() # Create a lock for synchronization
with Pool(n_proc) as p:
res = p.starmap(
work, zip(range(n_proc), repeat(name), repeat(a.dtype), repeat(N), repeat(lock)))
for r in res:
print(f'{r[0]}\t{r[1]}')
res = np.array([r[0] for r in res])
print('not ' * int(~np.all(np.sort(res) == 1 + np.arange(n_proc))) + 'all good')
shm.unlink()
def work(i, name, dtype, N=500, lock=None):
shm = shared_memory.SharedMemory(name=name)
arr = np.ndarray((N, N, N), dtype=dtype, buffer=shm.buf)
with lock: # Acquire the lock
time.sleep(2)
val = arr[0, 0, 0:2].copy()
val[0] += 1.0
arr[0, 0, 0] = val[0]
shm.close()
return val
if __name__ == '__main__':
test_shm()
本文标签: pythonconflict when using multiprocessing39s share memoryStack Overflow
版权声明:本文标题:python - conflict when using multiprocessing's share memory - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1736309002a1933861.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
Lock()
from multiprocessing to synchronize access to the shared memory may fix the issue... – Bhargav Commented Nov 21, 2024 at 16:22