admin管理员组

文章数量:1125551

I am writing a program for this following task

The task is to write a program that creates two processes:

a) The 'producer process reads a video frame image directly into a shared memory ring buffer (i.e., a shared memory buffer that can accommodate N frame images for a configurable N - this can be kept as a command line parameter).

b) The 'consumer process reads the frame images from the ring buffer an displays each frame for a configurable amount of time (e.g., 500 ms - aga a program command line parameter)

c) The producer and consumer must synchronize so that

i) The consumer displays an image only after it's been fully produced (i.e., the entire frame has been generated)

ii) The consumer waits for the producer if the next image to be consumed has not yet been fully produced.\

iii) The producer waits for the consumer the ring buffer doesn't have space for the next image to be produced (i.e., consumer has not fully consumed any of the current images in the ring buffer)

below is my code:

from multiprocessing import Process, Lock, Value
from multiprocessing.shared_memory import SharedMemory

import sys
import numpy as np
import cv2
import time

class RingBufferFullException(Exception):
    pass

class RingBufferEmptyException(Exception):
    pass

class RingBuffer:

    def __init__(self, max_size, single_frame):
        self.max_size = Value('i', max_size)
        self.shm = SharedMemory(create=True, size=max_size*single_frame.nbytes)
        self.queue = np.ndarray(shape=(max_size, *single_frame.shape), dtype=single_frame.dtype, buffer=self.shm.buf)
        self.tail = Value('i', -1)
        self.head = Value('i', 0)
        self.size = Value('i', 0)
        self.lock = Lock()

    def enqueue(self, item):
        with self.lock:
            if self.size.value == self.max_size.value:
                raise RingBufferFullException('Error: Queue is full')

            else:
                self.tail.value = (self.tail.value + 1) % self.max_size.value
                self.queue[self.tail.value] = item
                self.size.value += 1

    def dequeue(self):
        with self.lock:
            if self.size.value == 0:
                raise RingBufferEmptyException('Error: Queue is empty')
            
            tmp = self.queue[self.head.value]
            self.head.value = (self.head.value + 1) % self.max_size.value
            self.size.value -= 1

            return tmp
    
    def isFull(self):
        return self.size.value == self.max_size.value
    
    def isEmpty(self):
        return self.size.value == 0
    
    def display(self):
        if self.size.value == 0:
            print('Queue is empty')

        else:
            idx = self.head.value
            print('---------------------------------------')
            for i in range(self.size.value):
                print(self.queue[idx])
                idx = (idx + 1) % self.max_size.value
            print('---------------------------------------')

    def clean(self):
        self.shm.close()
        self.shm.unlink()

def producer(buf):
    cap = cv2.VideoCapture(0)
    print(cap.isOpened())

    while True:
        buf.display()
        success, frame = cap.read()
        
        if not success:
            print('Failed to capture frame')
            continue

        try:
            buf.enqueue(frame)
        
        except RingBufferFullException:
            time.sleep(0.5)

def consumer(buf):
    while True:
        try:
            frame = buf.dequeue()
            cv2.imshow('Frame', frame)
            cv2.waitKey(100)
        
        except RingBufferEmptyException:
            time.sleep(0.2)
    
    cv2.destroyAllWindows()

if __name__ == '__main__':
    test = cv2.VideoCapture(0)
    if not test.isOpened():
        print('Error: Could not open camera.')
        sys.exit(1)

    success, single_frame = test.read()
    if not success:
        print('Error: Could not capture initial frame.')
        sys.exit(1)

    test.release()
    buf = RingBuffer(10, single_frame)
    test.release()
    
    produce = Process(target=producer, args=(buf,))
    consume = Process(target=consumer, args=(buf,))

    try:
        produce.start()
        consume.start()

        produce.join()
        consume.join()
        
    finally:
        buf.clean()

At first I thought I could make the ring buffer instance itself a shared memory. I searched for the web and I couldn't find anything. ChatGPT said making a complex custom object a shared memory is not possible in python. And then I thought I could put the instance inside a shared memory and ChatGPT said it is also not possible

Then I made the metadata of the ring buffer a shared memory. now it enqueues and dequeues frames fine but it shows only black images.

The main problem is that the cv2 window only shows black. The images are all black. when I print the dequeued frame it is just matrix with all zeroes. If I print the buffer inside the producer function or consumer function it prints np matrix with different values. But in the cv2 window it is just black images.

What am I doing wrong? Any help is appreciated. Thank you guys in advance.

I am writing a program for this following task

The task is to write a program that creates two processes:

a) The 'producer process reads a video frame image directly into a shared memory ring buffer (i.e., a shared memory buffer that can accommodate N frame images for a configurable N - this can be kept as a command line parameter).

b) The 'consumer process reads the frame images from the ring buffer an displays each frame for a configurable amount of time (e.g., 500 ms - aga a program command line parameter)

c) The producer and consumer must synchronize so that

i) The consumer displays an image only after it's been fully produced (i.e., the entire frame has been generated)

ii) The consumer waits for the producer if the next image to be consumed has not yet been fully produced.\

iii) The producer waits for the consumer the ring buffer doesn't have space for the next image to be produced (i.e., consumer has not fully consumed any of the current images in the ring buffer)

below is my code:

from multiprocessing import Process, Lock, Value
from multiprocessing.shared_memory import SharedMemory

import sys
import numpy as np
import cv2
import time

class RingBufferFullException(Exception):
    pass

class RingBufferEmptyException(Exception):
    pass

class RingBuffer:

    def __init__(self, max_size, single_frame):
        self.max_size = Value('i', max_size)
        self.shm = SharedMemory(create=True, size=max_size*single_frame.nbytes)
        self.queue = np.ndarray(shape=(max_size, *single_frame.shape), dtype=single_frame.dtype, buffer=self.shm.buf)
        self.tail = Value('i', -1)
        self.head = Value('i', 0)
        self.size = Value('i', 0)
        self.lock = Lock()

    def enqueue(self, item):
        with self.lock:
            if self.size.value == self.max_size.value:
                raise RingBufferFullException('Error: Queue is full')

            else:
                self.tail.value = (self.tail.value + 1) % self.max_size.value
                self.queue[self.tail.value] = item
                self.size.value += 1

    def dequeue(self):
        with self.lock:
            if self.size.value == 0:
                raise RingBufferEmptyException('Error: Queue is empty')
            
            tmp = self.queue[self.head.value]
            self.head.value = (self.head.value + 1) % self.max_size.value
            self.size.value -= 1

            return tmp
    
    def isFull(self):
        return self.size.value == self.max_size.value
    
    def isEmpty(self):
        return self.size.value == 0
    
    def display(self):
        if self.size.value == 0:
            print('Queue is empty')

        else:
            idx = self.head.value
            print('---------------------------------------')
            for i in range(self.size.value):
                print(self.queue[idx])
                idx = (idx + 1) % self.max_size.value
            print('---------------------------------------')

    def clean(self):
        self.shm.close()
        self.shm.unlink()

def producer(buf):
    cap = cv2.VideoCapture(0)
    print(cap.isOpened())

    while True:
        buf.display()
        success, frame = cap.read()
        
        if not success:
            print('Failed to capture frame')
            continue

        try:
            buf.enqueue(frame)
        
        except RingBufferFullException:
            time.sleep(0.5)

def consumer(buf):
    while True:
        try:
            frame = buf.dequeue()
            cv2.imshow('Frame', frame)
            cv2.waitKey(100)
        
        except RingBufferEmptyException:
            time.sleep(0.2)
    
    cv2.destroyAllWindows()

if __name__ == '__main__':
    test = cv2.VideoCapture(0)
    if not test.isOpened():
        print('Error: Could not open camera.')
        sys.exit(1)

    success, single_frame = test.read()
    if not success:
        print('Error: Could not capture initial frame.')
        sys.exit(1)

    test.release()
    buf = RingBuffer(10, single_frame)
    test.release()
    
    produce = Process(target=producer, args=(buf,))
    consume = Process(target=consumer, args=(buf,))

    try:
        produce.start()
        consume.start()

        produce.join()
        consume.join()
        
    finally:
        buf.clean()

At first I thought I could make the ring buffer instance itself a shared memory. I searched for the web and I couldn't find anything. ChatGPT said making a complex custom object a shared memory is not possible in python. And then I thought I could put the instance inside a shared memory and ChatGPT said it is also not possible

Then I made the metadata of the ring buffer a shared memory. now it enqueues and dequeues frames fine but it shows only black images.

The main problem is that the cv2 window only shows black. The images are all black. when I print the dequeued frame it is just matrix with all zeroes. If I print the buffer inside the producer function or consumer function it prints np matrix with different values. But in the cv2 window it is just black images.

What am I doing wrong? Any help is appreciated. Thank you guys in advance.

Share Improve this question edited 2 days ago Christoph Rackwitz 15.2k5 gold badges38 silver badges49 bronze badges asked Jan 9 at 6:26 Venkatesh BVenkatesh B 113 bronze badges 1
  • debugging required (minimal reproducible example). remove the cv.VideoCapture, replace with synthetic data (e.g. numpy random data) – Christoph Rackwitz Commented 2 days ago
Add a comment  | 

1 Answer 1

Reset to default 0

When you create a child process and pass to it an argument buf of type SingleBuffer, buf will be serialized using pickle in the parent process and de-serialized in the child process. The problem is that although in your main process you arranged for your numpy.ndarray instance to use a buffer in shared memory, its only the contents of that data that is being saved and when it is de-serialized in the child process the buffer used is just local (non-shared storage). So your producer and consumer are not really sharing the same data.

The solution is to override how a StringBuffer is serialized and de-serialized so that you do not even attempt to serialize the ndarray instance while still serializing shared memory. Then when you de-serialize the instance, you rebuild the ndarray instance to use the shared memory. For this we need to define methods __getstate__ and __setstate__. We must also create some additional instance attributes so that we have enough information to re-create the ndarray, i.e. the array's shape and type:

...
class RingBuffer:

    def __init__(self, max_size, single_frame):
        self.max_size = Value('i', max_size)
        self.shape = single_frame.shape  # Save shape
        self.dtype = single_frame.dtype  # Save dtype
        self.shm = SharedMemory(create=True, size=max_size*single_frame.nbytes)
        self.queue = np.ndarray(shape=(max_size, *self.shape), dtype=self.dtype, buffer=self.shm.buf)
        self.tail = Value('i', -1)
        self.head = Value('i', 0)
        self.size = Value('i', 0)
        self.lock = Lock()

    def __getstate__(self):
        # Do not save self.queue:
        state = self.__dict__.copy()
        del state['queue']
        return state

    def __setstate__(self, state):
        self.__dict__.update(state)
        # Re-create the ndarray:
        self.queue = np.ndarray(shape=(self.max_size.value, *self.shape), dtype=self.dtype, buffer=self.shm.buf)
...

Some Simplifications

First and foremost, for this particular code you posted, all you really need is multithreading, which will greatly simplify the code.

Second, instead of raising exceptions on buffer empty and full conditions and then sleeping for a half second and trying again, we could/should use threading.Condition instances. I have made a few other simplifications, so look closely:

from threading import Thread, Lock, Condition

import sys
import numpy as np
import cv2
import time

class RingBuffer:

    def __init__(self, max_size):
        self.max_size = max_size
        self.buffer = [None] * max_size
        self.size = 0
        lock = Lock()
        self.not_empty = Condition(lock)
        self.not_full = Condition(lock)
        # These can both be 0 initially:
        self.tail = 0
        self.head = 0

    def enqueue(self, frame):
        with self.not_full:
            while self.size == self.max_size:
                self.not_full.wait()

            self.buffer[self.tail] = frame
            self.tail = (self.tail + 1) % self.max_size
            self.size += 1
            # If the consumer was waiting for data to be enqueued,
            # let it know that data is now available:
            self.not_empty.notify()

    def dequeue(self):
        with self.not_empty:
            while self.size == 0:
                self.not_empty.wait()

            frame = self.buffer[self.head]
            self.head = (self.head + 1) % self.max_size
            self.size -= 1
            # If the the producer was waiting becuase the ring
            # was full, let it know that it can now enqueue:
            self.not_full.notify()

            return frame

    def display(self):
        if self.size == 0:
            print('Queue is empty')
        else:
            idx = self.head
            print('---------------------------------------')
            for i in range(self.size):
                print(self.buffer[idx])
                idx = (idx + 1) % self.max_size
            print('---------------------------------------')

def producer(buf):
    cap = cv2.VideoCapture(0)
    print(cap.isOpened())

    while True:
        # Comment out for improved performance:
        #buf.display()

        success, frame = cap.read()

        if not success:
            print('Failed to capture frame')
            continue

        buf.enqueue(frame)

def consumer(buf):
    while True:
        frame = buf.dequeue()
        cv2.imshow('Frame', frame)
        # Quit if escape key is hit:
        if cv2.waitKey(1 & 0xFF) == 27:
            break

    cv2.destroyAllWindows()

if __name__ == '__main__':
    test = cv2.VideoCapture(0)
    if not test.isOpened():
        print('Error: Could not open camera.')
        sys.exit(1)

    buf = RingBuffer(10)

    consumer = Thread(target=consumer, args=(buf,))
    consumer.start()

    # Make this a daemon thread
    Thread(target=producer, args=(buf,), daemon=True).start()

    # Wait for escape hit on window:
    consumer.join()

本文标签: pythonHow can I manage a shared memory in a custom circular buffer data typeStack Overflow