admin管理员组文章数量:1122832
I am quite deperate because I simply cannot find a solution for my problem.
Scenario
My python application acts as a gRPC server
. The request that is made is a server side stream so the client asks something and the server
keeps a stream open sending messages.
The list of messages to be returned is not predefined but is given from another part of the program the InfoReceiver
.
So once the InfoReceiver
receives information this information should be passed to my grPC server
which then sends the message out to all clients that are registered.
The streams need to be kept open even though the messages are not created within the service method.
Furthermore when I hit CTRL+C I need to call an registered atexit
method that cleanly closes all streams to the clients and then continues to close the application.
What is happening
When I try to implement it I cannot close the streams when the server
is shut down. I can see that the shutdown signal is received but my atexit method is not called and thus the streams are not closed.
I suppose that I am somehow blocking the executing Thread with my method on the server side but I couldn't get a hang on it.
What have I tried
I looked through every python grpc example in the grpc repository here but did not find my use case or at least was not able to adapt the examples to my use case.
I tried to implement a separate class to handle streams (it keeps a list of all ServicerContexts created by the client calls and adds messages to a message queue once the InfoReceiver
send a message. But while doing that I still had a while loop inside service
servicer method which still blocks I think?
Last but not least I looked in another project I did in C# where I had a similar problem.
There I was able to create a list where I save the stream. Inside my service method I just added the stream to that list, did await context.CancellationToken
and after that removed the stream from the list.
Whenever I wanted to send a message I just went through all streams in the list and called WriteAsync
. I did not manage to copy that over to python.
Small code examples
Servicer stop method
def _stop(self): # Method is registered as first action in __init__
self._stream_manager.cloase_all_streams()
Servicer method
stream = Stream(context)
self._stream_manager.add_stream(stream)
try:
while stream.is_active:
try:
# I need to keep the stream open here but I think the while is blocking the correct shutdown?
message = stream.get_message()
yield message
except queue.Empty:
pass
except grpc.RpcError:
print(f"Stream closed with error: {e}")
finally:
self._stream_manager.remove_stream(stream)
StreamManager
class StreamManager:
def __init__(self):
self._lock = threading.Lock()
self._streams = []
def add_stream(self, stream: Stream):
with self._lock:
self._streams.append(stream)
def remove_stream(self, stream: Stream):
with self._lock:
stream.close()
self._streams.remove(stream)
def send_message(self, message: Message): # this method is then called from the InfoReceiver to distribute the message to all clients.
with self._lock:
for stream in self._streams:
stream.send_message(message)
def close_all_streams(self):
with self._lock:
for stream in self._streams:
stream.close()
self._streams.remove(stream)
Stream
class Stream:
def __init__(self, context: grpc.ServicerContext):
self._context = context
self.is_active = True
self._queue = queue.Queue()
def send_message(self, message: Message):
self._queue.put(message)
def close(self):
self.is_active = False
self._context.cancel()
def get_message(self):
return self.queue.get(block=False)
I am quite deperate because I simply cannot find a solution for my problem.
Scenario
My python application acts as a gRPC server
. The request that is made is a server side stream so the client asks something and the server
keeps a stream open sending messages.
The list of messages to be returned is not predefined but is given from another part of the program the InfoReceiver
.
So once the InfoReceiver
receives information this information should be passed to my grPC server
which then sends the message out to all clients that are registered.
The streams need to be kept open even though the messages are not created within the service method.
Furthermore when I hit CTRL+C I need to call an registered atexit
method that cleanly closes all streams to the clients and then continues to close the application.
What is happening
When I try to implement it I cannot close the streams when the server
is shut down. I can see that the shutdown signal is received but my atexit method is not called and thus the streams are not closed.
I suppose that I am somehow blocking the executing Thread with my method on the server side but I couldn't get a hang on it.
What have I tried
I looked through every python grpc example in the grpc repository here but did not find my use case or at least was not able to adapt the examples to my use case.
I tried to implement a separate class to handle streams (it keeps a list of all ServicerContexts created by the client calls and adds messages to a message queue once the InfoReceiver
send a message. But while doing that I still had a while loop inside service
servicer method which still blocks I think?
Last but not least I looked in another project I did in C# where I had a similar problem.
There I was able to create a list where I save the stream. Inside my service method I just added the stream to that list, did await context.CancellationToken
and after that removed the stream from the list.
Whenever I wanted to send a message I just went through all streams in the list and called WriteAsync
. I did not manage to copy that over to python.
Small code examples
Servicer stop method
def _stop(self): # Method is registered as first action in __init__
self._stream_manager.cloase_all_streams()
Servicer method
stream = Stream(context)
self._stream_manager.add_stream(stream)
try:
while stream.is_active:
try:
# I need to keep the stream open here but I think the while is blocking the correct shutdown?
message = stream.get_message()
yield message
except queue.Empty:
pass
except grpc.RpcError:
print(f"Stream closed with error: {e}")
finally:
self._stream_manager.remove_stream(stream)
StreamManager
class StreamManager:
def __init__(self):
self._lock = threading.Lock()
self._streams = []
def add_stream(self, stream: Stream):
with self._lock:
self._streams.append(stream)
def remove_stream(self, stream: Stream):
with self._lock:
stream.close()
self._streams.remove(stream)
def send_message(self, message: Message): # this method is then called from the InfoReceiver to distribute the message to all clients.
with self._lock:
for stream in self._streams:
stream.send_message(message)
def close_all_streams(self):
with self._lock:
for stream in self._streams:
stream.close()
self._streams.remove(stream)
Stream
class Stream:
def __init__(self, context: grpc.ServicerContext):
self._context = context
self.is_active = True
self._queue = queue.Queue()
def send_message(self, message: Message):
self._queue.put(message)
def close(self):
self.is_active = False
self._context.cancel()
def get_message(self):
return self.queue.get(block=False)
Share
Improve this question
asked Nov 21, 2024 at 9:01
tikotiko
179 bronze badges
1 Answer
Reset to default 0is there a reason why you aren't using the built-in functionality of gRPC to have make/receive streaming calls instead of creating your own Stream and StreamManager?
The official RouteGuide example has information on how to use all the different types of supported RPC calls including request-side streaming, response-side streaming and bi-di streaming. This gRPC guide also explains it in detail.
本文标签: Handle multiple gRPC server side streams in PythonStack Overflow
版权声明:本文标题:Handle multiple gRPC server side streams in Python - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1736312261a1935036.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论