admin管理员组

文章数量:1123097

I am in corporate environment. I use gRPC as RPC means to implement a workflow automation. Some of the messages do implement one-directional streaming, typically for uploading/downloading files. I am experiencing failure during file upload/upstream from my personal computer to server on a corporate datacenter. The failure is sporadic, sometimes succeeds, others not. Way more failures than successes.

  • There is no issue when both the client and server run on two servers of the datacenter, respectively.
  • There is no issue when both the client and server run on my personal computer.
  • There is no issue when the client is running on datacenter server and the server on my personal computer.
  • The issue only appears when the client is running on my personal computer and the server on datacenter computer. So upstreaming a file from pc to datacenter server.

This is the exception I get on the client side, on failure:

Traceback (most recent call last):
  File "<some-path>/client.py", line 39, in <module>
    upload_file(stub, file_loc, file_rem=file_rem)
  File "<some-path>/client.py", line 24, in upload_file
    resp = stub.upload_file(read_iterfile(file_loc, file_rem))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<some-path>/.venv/lib/python3.12/site-packages/grpc/_channel.py", line 1536, in __call__
    return _end_unary_response_blocking(state, call, False, None)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<some-path>/.venv/lib/python3.12/site-packages/grpc/_channel.py", line 1006, in _end_unary_response_blocking
    raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.UNAVAILABLE
    details = "sendmsg: Result too large (34)"
    debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"sendmsg: Result too large (34)", grpc_status:14, created_time:"2025-01-10T17:26:25.646875+01:00"}"

I tried and tried to understand which layer issues: "sendmsg: Result too large (34)".

Any ideas are welcome.

Below I'll paste a simplified client and server implementation which reproduces the issue on my setup. Reproducing this elsewhere I assume would be very difficult. I spend a lot of time tweaking with grpc options. Nothing helped. The samples below are with default options.

Client:

import os
import sys
import grpc

import test_pb2
import test_pb2_grpc

def read_iterfile(file_loc, file_rem, chunk_size=16*1024):

    yield test_pb2.upstream(file=file_rem)

    with open(file_loc, mode="rb") as fh:
        i = 0
        while chunk := fh.read(chunk_size):
            i += 1
            yield test_pb2.upstream(data_chunk=chunk)
            print("chunk {}, size {}".format(i, len(chunk)))
        print("total chunks: {}.".format(i))


def upload_file(stub, file_loc, file_rem):

    resp = stub.upload_file(read_iterfile(file_loc, file_rem))
    print("file '{}' upload {}".format(file_loc, resp.message))


if __name__ == '__main__':

    ipaddr = sys.argv[1]
    port = sys.argv[2]
    file_loc = sys.argv[3]
    file_rem = 'file_rem.test'

    s_addr = '{}:{}'.format(ipaddr, port)
    channel = grpc.insecure_channel(s_addr)
    stub = test_pb2_grpc.debugStub(channel)

    upload_file(stub, file_loc, file_rem=file_rem)

Server:

import sys
from concurrent import futures
import multiprocessing
import grpc

import test_pb2
import test_pb2_grpc


class debug_server(test_pb2_grpc.debugServicer):

    def upload_file(self, req_iterator, context):

        fh = None
        file_loc = None
        i = 0
        for req in req_iterator:
            if req.file:
                file_loc = req.file
                print("file_loc: {}".format(file_loc))
                fh = open(file_loc, 'wb')
            else:
                i += 1
                print("chunk {}, size {}".format(i, len(req.data_chunk)))
                fh.write(req.data_chunk)

        print("total chunks: {}".format(i))

        if fh is None:
            status = 'failed'
        else:
            fh.close()
            status = 'success'

        print("file '{}' upload {}".format(file_loc, status))

        return test_pb2.generic_msg(message=status)


def start_server(port):

    server = grpc.server(futures.ThreadPoolExecutor(max_workers=3))
    o_debug_server = debug_server()
    test_pb2_grpc.add_debugServicer_to_server(o_debug_server, server)
    bind_addr = "[::]:{}".format(port)
    server.add_insecure_port(bind_addr)

    server.start()
    print("gRPC server started: port {}".format(port))
    server.wait_for_termination()
    print("gRPC server terminated: port {}".format(port))


if __name__ == '__main__':

    port = sys.argv[1]

    mp_server = multiprocessing.Process(target=start_server, args=(port,))
    mp_server.start()
    print("gRPC server process started: port {}".format(port))
    mp_server.join()
    print("gRPC server process terminated: port {}".format(port))

Protobuf message definition, file: test.proto:

syntax = "proto3";

package debugging;

service debug {
    rpc upload_file(stream upstream) returns (generic_msg) {}
}

message generic_msg {
  string message = 1;
}

message upstream {
  oneof request {
    string file = 1;
    bytes data_chunk = 2;
  }
}

Generate gRPC code:

python -m grpc_tools.protoc --python_out=${PWD} --grpc_python_out=${PWD} -I${PWD} test.proto

Start server:

python server.py <server-port>

Start client:

python client.py <server-ipaddr> <server-port> <file-to-upload>

python version used: 3.12.4

grpc version: 1.65.4

MacOS 15.2

本文标签: gRPC file streaming from client to server breaks downStack Overflow