admin管理员组

文章数量:1122832

I am quite deperate because I simply cannot find a solution for my problem.

Scenario

My python application acts as a gRPC server. The request that is made is a server side stream so the client asks something and the server keeps a stream open sending messages. The list of messages to be returned is not predefined but is given from another part of the program the InfoReceiver. So once the InfoReceiver receives information this information should be passed to my grPC server which then sends the message out to all clients that are registered.

The streams need to be kept open even though the messages are not created within the service method. Furthermore when I hit CTRL+C I need to call an registered atexit method that cleanly closes all streams to the clients and then continues to close the application.

What is happening

When I try to implement it I cannot close the streams when the server is shut down. I can see that the shutdown signal is received but my atexit method is not called and thus the streams are not closed. I suppose that I am somehow blocking the executing Thread with my method on the server side but I couldn't get a hang on it.

What have I tried

I looked through every python grpc example in the grpc repository here but did not find my use case or at least was not able to adapt the examples to my use case.

I tried to implement a separate class to handle streams (it keeps a list of all ServicerContexts created by the client calls and adds messages to a message queue once the InfoReceiver send a message. But while doing that I still had a while loop inside service servicer method which still blocks I think?

Last but not least I looked in another project I did in C# where I had a similar problem. There I was able to create a list where I save the stream. Inside my service method I just added the stream to that list, did await context.CancellationToken and after that removed the stream from the list. Whenever I wanted to send a message I just went through all streams in the list and called WriteAsync. I did not manage to copy that over to python.

Small code examples

Servicer stop method

def _stop(self): # Method is registered as first action in __init__
    self._stream_manager.cloase_all_streams()

Servicer method

stream = Stream(context)
self._stream_manager.add_stream(stream)

try:
    while stream.is_active:
        try:
            # I need to keep the stream open here but I think the while is blocking the correct shutdown?
            message = stream.get_message()
            yield message
        except queue.Empty:
            pass
except grpc.RpcError:
    print(f"Stream closed with error: {e}")
finally:
    self._stream_manager.remove_stream(stream)

StreamManager

class StreamManager:
    def __init__(self):
        self._lock = threading.Lock()
        self._streams = []
    
    def add_stream(self, stream: Stream):
        with self._lock:
            self._streams.append(stream)

    def remove_stream(self, stream: Stream):
        with self._lock:
            stream.close()
            self._streams.remove(stream)
   
    def send_message(self, message: Message): # this method is then called from the InfoReceiver to distribute the message to all clients.
        with self._lock:
            for stream in self._streams:
                stream.send_message(message)

    def close_all_streams(self):
        with self._lock:
            for stream in self._streams:
                stream.close()
                self._streams.remove(stream)

Stream

class Stream:
    def __init__(self, context: grpc.ServicerContext):
        self._context = context
        self.is_active = True
        self._queue = queue.Queue()

    def send_message(self, message: Message):
        self._queue.put(message)

    def close(self):
        self.is_active = False
        self._context.cancel()

    def get_message(self):
        return self.queue.get(block=False)

I am quite deperate because I simply cannot find a solution for my problem.

Scenario

My python application acts as a gRPC server. The request that is made is a server side stream so the client asks something and the server keeps a stream open sending messages. The list of messages to be returned is not predefined but is given from another part of the program the InfoReceiver. So once the InfoReceiver receives information this information should be passed to my grPC server which then sends the message out to all clients that are registered.

The streams need to be kept open even though the messages are not created within the service method. Furthermore when I hit CTRL+C I need to call an registered atexit method that cleanly closes all streams to the clients and then continues to close the application.

What is happening

When I try to implement it I cannot close the streams when the server is shut down. I can see that the shutdown signal is received but my atexit method is not called and thus the streams are not closed. I suppose that I am somehow blocking the executing Thread with my method on the server side but I couldn't get a hang on it.

What have I tried

I looked through every python grpc example in the grpc repository here but did not find my use case or at least was not able to adapt the examples to my use case.

I tried to implement a separate class to handle streams (it keeps a list of all ServicerContexts created by the client calls and adds messages to a message queue once the InfoReceiver send a message. But while doing that I still had a while loop inside service servicer method which still blocks I think?

Last but not least I looked in another project I did in C# where I had a similar problem. There I was able to create a list where I save the stream. Inside my service method I just added the stream to that list, did await context.CancellationToken and after that removed the stream from the list. Whenever I wanted to send a message I just went through all streams in the list and called WriteAsync. I did not manage to copy that over to python.

Small code examples

Servicer stop method

def _stop(self): # Method is registered as first action in __init__
    self._stream_manager.cloase_all_streams()

Servicer method

stream = Stream(context)
self._stream_manager.add_stream(stream)

try:
    while stream.is_active:
        try:
            # I need to keep the stream open here but I think the while is blocking the correct shutdown?
            message = stream.get_message()
            yield message
        except queue.Empty:
            pass
except grpc.RpcError:
    print(f"Stream closed with error: {e}")
finally:
    self._stream_manager.remove_stream(stream)

StreamManager

class StreamManager:
    def __init__(self):
        self._lock = threading.Lock()
        self._streams = []
    
    def add_stream(self, stream: Stream):
        with self._lock:
            self._streams.append(stream)

    def remove_stream(self, stream: Stream):
        with self._lock:
            stream.close()
            self._streams.remove(stream)
   
    def send_message(self, message: Message): # this method is then called from the InfoReceiver to distribute the message to all clients.
        with self._lock:
            for stream in self._streams:
                stream.send_message(message)

    def close_all_streams(self):
        with self._lock:
            for stream in self._streams:
                stream.close()
                self._streams.remove(stream)

Stream

class Stream:
    def __init__(self, context: grpc.ServicerContext):
        self._context = context
        self.is_active = True
        self._queue = queue.Queue()

    def send_message(self, message: Message):
        self._queue.put(message)

    def close(self):
        self.is_active = False
        self._context.cancel()

    def get_message(self):
        return self.queue.get(block=False)
Share Improve this question asked Nov 21, 2024 at 9:01 tikotiko 179 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

is there a reason why you aren't using the built-in functionality of gRPC to have make/receive streaming calls instead of creating your own Stream and StreamManager?

The official RouteGuide example has information on how to use all the different types of supported RPC calls including request-side streaming, response-side streaming and bi-di streaming. This gRPC guide also explains it in detail.

本文标签: Handle multiple gRPC server side streams in PythonStack Overflow