admin管理员组文章数量:1125551
I am writing a program for this following task
The task is to write a program that creates two processes:
a) The 'producer process reads a video frame image directly into a shared memory ring buffer (i.e., a shared memory buffer that can accommodate N frame images for a configurable N - this can be kept as a command line parameter).
b) The 'consumer process reads the frame images from the ring buffer an displays each frame for a configurable amount of time (e.g., 500 ms - aga a program command line parameter)
c) The producer and consumer must synchronize so that
i) The consumer displays an image only after it's been fully produced (i.e., the entire frame has been generated)
ii) The consumer waits for the producer if the next image to be consumed has not yet been fully produced.\
iii) The producer waits for the consumer the ring buffer doesn't have space for the next image to be produced (i.e., consumer has not fully consumed any of the current images in the ring buffer)
below is my code:
from multiprocessing import Process, Lock, Value
from multiprocessing.shared_memory import SharedMemory
import sys
import numpy as np
import cv2
import time
class RingBufferFullException(Exception):
pass
class RingBufferEmptyException(Exception):
pass
class RingBuffer:
def __init__(self, max_size, single_frame):
self.max_size = Value('i', max_size)
self.shm = SharedMemory(create=True, size=max_size*single_frame.nbytes)
self.queue = np.ndarray(shape=(max_size, *single_frame.shape), dtype=single_frame.dtype, buffer=self.shm.buf)
self.tail = Value('i', -1)
self.head = Value('i', 0)
self.size = Value('i', 0)
self.lock = Lock()
def enqueue(self, item):
with self.lock:
if self.size.value == self.max_size.value:
raise RingBufferFullException('Error: Queue is full')
else:
self.tail.value = (self.tail.value + 1) % self.max_size.value
self.queue[self.tail.value] = item
self.size.value += 1
def dequeue(self):
with self.lock:
if self.size.value == 0:
raise RingBufferEmptyException('Error: Queue is empty')
tmp = self.queue[self.head.value]
self.head.value = (self.head.value + 1) % self.max_size.value
self.size.value -= 1
return tmp
def isFull(self):
return self.size.value == self.max_size.value
def isEmpty(self):
return self.size.value == 0
def display(self):
if self.size.value == 0:
print('Queue is empty')
else:
idx = self.head.value
print('---------------------------------------')
for i in range(self.size.value):
print(self.queue[idx])
idx = (idx + 1) % self.max_size.value
print('---------------------------------------')
def clean(self):
self.shm.close()
self.shm.unlink()
def producer(buf):
cap = cv2.VideoCapture(0)
print(cap.isOpened())
while True:
buf.display()
success, frame = cap.read()
if not success:
print('Failed to capture frame')
continue
try:
buf.enqueue(frame)
except RingBufferFullException:
time.sleep(0.5)
def consumer(buf):
while True:
try:
frame = buf.dequeue()
cv2.imshow('Frame', frame)
cv2.waitKey(100)
except RingBufferEmptyException:
time.sleep(0.2)
cv2.destroyAllWindows()
if __name__ == '__main__':
test = cv2.VideoCapture(0)
if not test.isOpened():
print('Error: Could not open camera.')
sys.exit(1)
success, single_frame = test.read()
if not success:
print('Error: Could not capture initial frame.')
sys.exit(1)
test.release()
buf = RingBuffer(10, single_frame)
test.release()
produce = Process(target=producer, args=(buf,))
consume = Process(target=consumer, args=(buf,))
try:
produce.start()
consume.start()
produce.join()
consume.join()
finally:
buf.clean()
At first I thought I could make the ring buffer instance itself a shared memory. I searched for the web and I couldn't find anything. ChatGPT said making a complex custom object a shared memory is not possible in python. And then I thought I could put the instance inside a shared memory and ChatGPT said it is also not possible
Then I made the metadata of the ring buffer a shared memory. now it enqueues and dequeues frames fine but it shows only black images.
The main problem is that the cv2 window only shows black. The images are all black. when I print the dequeued frame it is just matrix with all zeroes. If I print the buffer inside the producer function or consumer function it prints np matrix with different values. But in the cv2 window it is just black images.
What am I doing wrong? Any help is appreciated. Thank you guys in advance.
I am writing a program for this following task
The task is to write a program that creates two processes:
a) The 'producer process reads a video frame image directly into a shared memory ring buffer (i.e., a shared memory buffer that can accommodate N frame images for a configurable N - this can be kept as a command line parameter).
b) The 'consumer process reads the frame images from the ring buffer an displays each frame for a configurable amount of time (e.g., 500 ms - aga a program command line parameter)
c) The producer and consumer must synchronize so that
i) The consumer displays an image only after it's been fully produced (i.e., the entire frame has been generated)
ii) The consumer waits for the producer if the next image to be consumed has not yet been fully produced.\
iii) The producer waits for the consumer the ring buffer doesn't have space for the next image to be produced (i.e., consumer has not fully consumed any of the current images in the ring buffer)
below is my code:
from multiprocessing import Process, Lock, Value
from multiprocessing.shared_memory import SharedMemory
import sys
import numpy as np
import cv2
import time
class RingBufferFullException(Exception):
pass
class RingBufferEmptyException(Exception):
pass
class RingBuffer:
def __init__(self, max_size, single_frame):
self.max_size = Value('i', max_size)
self.shm = SharedMemory(create=True, size=max_size*single_frame.nbytes)
self.queue = np.ndarray(shape=(max_size, *single_frame.shape), dtype=single_frame.dtype, buffer=self.shm.buf)
self.tail = Value('i', -1)
self.head = Value('i', 0)
self.size = Value('i', 0)
self.lock = Lock()
def enqueue(self, item):
with self.lock:
if self.size.value == self.max_size.value:
raise RingBufferFullException('Error: Queue is full')
else:
self.tail.value = (self.tail.value + 1) % self.max_size.value
self.queue[self.tail.value] = item
self.size.value += 1
def dequeue(self):
with self.lock:
if self.size.value == 0:
raise RingBufferEmptyException('Error: Queue is empty')
tmp = self.queue[self.head.value]
self.head.value = (self.head.value + 1) % self.max_size.value
self.size.value -= 1
return tmp
def isFull(self):
return self.size.value == self.max_size.value
def isEmpty(self):
return self.size.value == 0
def display(self):
if self.size.value == 0:
print('Queue is empty')
else:
idx = self.head.value
print('---------------------------------------')
for i in range(self.size.value):
print(self.queue[idx])
idx = (idx + 1) % self.max_size.value
print('---------------------------------------')
def clean(self):
self.shm.close()
self.shm.unlink()
def producer(buf):
cap = cv2.VideoCapture(0)
print(cap.isOpened())
while True:
buf.display()
success, frame = cap.read()
if not success:
print('Failed to capture frame')
continue
try:
buf.enqueue(frame)
except RingBufferFullException:
time.sleep(0.5)
def consumer(buf):
while True:
try:
frame = buf.dequeue()
cv2.imshow('Frame', frame)
cv2.waitKey(100)
except RingBufferEmptyException:
time.sleep(0.2)
cv2.destroyAllWindows()
if __name__ == '__main__':
test = cv2.VideoCapture(0)
if not test.isOpened():
print('Error: Could not open camera.')
sys.exit(1)
success, single_frame = test.read()
if not success:
print('Error: Could not capture initial frame.')
sys.exit(1)
test.release()
buf = RingBuffer(10, single_frame)
test.release()
produce = Process(target=producer, args=(buf,))
consume = Process(target=consumer, args=(buf,))
try:
produce.start()
consume.start()
produce.join()
consume.join()
finally:
buf.clean()
At first I thought I could make the ring buffer instance itself a shared memory. I searched for the web and I couldn't find anything. ChatGPT said making a complex custom object a shared memory is not possible in python. And then I thought I could put the instance inside a shared memory and ChatGPT said it is also not possible
Then I made the metadata of the ring buffer a shared memory. now it enqueues and dequeues frames fine but it shows only black images.
The main problem is that the cv2 window only shows black. The images are all black. when I print the dequeued frame it is just matrix with all zeroes. If I print the buffer inside the producer function or consumer function it prints np matrix with different values. But in the cv2 window it is just black images.
What am I doing wrong? Any help is appreciated. Thank you guys in advance.
Share Improve this question edited 2 days ago Christoph Rackwitz 15.2k5 gold badges38 silver badges49 bronze badges asked Jan 9 at 6:26 Venkatesh BVenkatesh B 113 bronze badges 1 |1 Answer
Reset to default 0When you create a child process and pass to it an argument buf
of type SingleBuffer
, buf
will be serialized using pickle in the parent process and de-serialized in the child process. The problem is that although in your main process you arranged for your numpy.ndarray
instance to use a buffer in shared memory, its only the contents of that data that is being saved and when it is de-serialized in the child process the buffer used is just local (non-shared storage). So your producer and consumer are not really sharing the same data.
The solution is to override how a StringBuffer
is serialized and de-serialized so that you do not even attempt to serialize the ndarray
instance while still serializing shared memory. Then when you de-serialize the instance, you rebuild the ndarray
instance to use the shared memory. For this we need to define methods __getstate__
and __setstate__
. We must also create some additional instance attributes so that we have enough information to re-create the ndarray
, i.e. the array's shape and type:
...
class RingBuffer:
def __init__(self, max_size, single_frame):
self.max_size = Value('i', max_size)
self.shape = single_frame.shape # Save shape
self.dtype = single_frame.dtype # Save dtype
self.shm = SharedMemory(create=True, size=max_size*single_frame.nbytes)
self.queue = np.ndarray(shape=(max_size, *self.shape), dtype=self.dtype, buffer=self.shm.buf)
self.tail = Value('i', -1)
self.head = Value('i', 0)
self.size = Value('i', 0)
self.lock = Lock()
def __getstate__(self):
# Do not save self.queue:
state = self.__dict__.copy()
del state['queue']
return state
def __setstate__(self, state):
self.__dict__.update(state)
# Re-create the ndarray:
self.queue = np.ndarray(shape=(self.max_size.value, *self.shape), dtype=self.dtype, buffer=self.shm.buf)
...
Some Simplifications
First and foremost, for this particular code you posted, all you really need is multithreading, which will greatly simplify the code.
Second, instead of raising exceptions on buffer empty and full conditions and then sleeping for a half second and trying again, we could/should use threading.Condition instances. I have made a few other simplifications, so look closely:
from threading import Thread, Lock, Condition
import sys
import numpy as np
import cv2
import time
class RingBuffer:
def __init__(self, max_size):
self.max_size = max_size
self.buffer = [None] * max_size
self.size = 0
lock = Lock()
self.not_empty = Condition(lock)
self.not_full = Condition(lock)
# These can both be 0 initially:
self.tail = 0
self.head = 0
def enqueue(self, frame):
with self.not_full:
while self.size == self.max_size:
self.not_full.wait()
self.buffer[self.tail] = frame
self.tail = (self.tail + 1) % self.max_size
self.size += 1
# If the consumer was waiting for data to be enqueued,
# let it know that data is now available:
self.not_empty.notify()
def dequeue(self):
with self.not_empty:
while self.size == 0:
self.not_empty.wait()
frame = self.buffer[self.head]
self.head = (self.head + 1) % self.max_size
self.size -= 1
# If the the producer was waiting becuase the ring
# was full, let it know that it can now enqueue:
self.not_full.notify()
return frame
def display(self):
if self.size == 0:
print('Queue is empty')
else:
idx = self.head
print('---------------------------------------')
for i in range(self.size):
print(self.buffer[idx])
idx = (idx + 1) % self.max_size
print('---------------------------------------')
def producer(buf):
cap = cv2.VideoCapture(0)
print(cap.isOpened())
while True:
# Comment out for improved performance:
#buf.display()
success, frame = cap.read()
if not success:
print('Failed to capture frame')
continue
buf.enqueue(frame)
def consumer(buf):
while True:
frame = buf.dequeue()
cv2.imshow('Frame', frame)
# Quit if escape key is hit:
if cv2.waitKey(1 & 0xFF) == 27:
break
cv2.destroyAllWindows()
if __name__ == '__main__':
test = cv2.VideoCapture(0)
if not test.isOpened():
print('Error: Could not open camera.')
sys.exit(1)
buf = RingBuffer(10)
consumer = Thread(target=consumer, args=(buf,))
consumer.start()
# Make this a daemon thread
Thread(target=producer, args=(buf,), daemon=True).start()
# Wait for escape hit on window:
consumer.join()
本文标签: pythonHow can I manage a shared memory in a custom circular buffer data typeStack Overflow
版权声明:本文标题:python - How can I manage a shared memory in a custom circular buffer data type? - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1736668780a1946802.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
cv.VideoCapture
, replace with synthetic data (e.g. numpy random data) – Christoph Rackwitz Commented 2 days ago