grpc python 源码分析(1):server 的创建和启动

from concurrent import futures
import time
import grpc
from example import helloworld_pb2_grpc, helloworld_pb2


# 实现 proto 文件中定义的 GreeterServicer
class Greeter(helloworld_pb2_grpc.GreeterServicer):
    # 实现 proto 文件中定义的 rpc 调用
    def SayHello(self, request, context):
        return helloworld_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

    def SayHelloAgain(self, request, context):
        return helloworld_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))


def serve():
    # 启动 rpc 服务
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    try:
        while True:
            time.sleep(60 * 60 * 24)  # one day in seconds
    except KeyboardInterrupt:
        server.stop(0)


if __name__ == '__main__':
    serve()

1️⃣ 创建 server
这里我们传了一个线程池给 grpc 的 server ,这个线程池用来处理请求。
经过重重调用,最后我们得到的 server 是 _Server 的实例

class _Server(grpc.Server):

    # pylint: disable=too-many-arguments
    def __init__(self, thread_pool, generic_handlers, interceptors, options,
                 maximum_concurrent_rpcs, compression):
        completion_queue = cygrpc.CompletionQueue()
        server = cygrpc.Server(_augment_options(options, compression))
        server.register_completion_queue(completion_queue)
        self._state = _ServerState(completion_queue, server, generic_handlers,
                                   _interceptor.service_pipeline(interceptors),
                                   thread_pool, maximum_concurrent_rpcs)

    def add_generic_rpc_handlers(self, generic_rpc_handlers):
        _validate_generic_rpc_handlers(generic_rpc_handlers)
        _add_generic_handlers(self._state, generic_rpc_handlers)

    def add_insecure_port(self, address):
        return _common.validate_port_binding_result(
            address, _add_insecure_port(self._state, _common.encode(address)))

    def add_secure_port(self, address, server_credentials):
        return _common.validate_port_binding_result(
            address,
            _add_secure_port(self._state, _common.encode(address),
                             server_credentials))

    def start(self):
        _start(self._state)

    def wait_for_termination(self, timeout=None):
        # NOTE(https://bugs.python.org/issue35935)
        # Remove this workaround once threading.Event.wait() is working with
        # CTRL+C across platforms.
        return _common.wait(self._state.termination_event.wait,
                            self._state.termination_event.is_set,
                            timeout=timeout)

    def stop(self, grace):
        return _stop(self._state, grace)

    def __del__(self):
        if hasattr(self, '_state'):
            # We can not grab a lock in __del__(), so set a flag to signal the
            # serving daemon thread (if it exists) to initiate shutdown.
            self._state.server_deallocated = True

  

cygrpc.CompletionQueue 和 cygrpc.Server 都是调用底层的 c++ core ,我们不去管它。
再来看看这个 _ServerState 的代码

class _ServerState(object):

    # pylint: disable=too-many-arguments
    def __init__(self, completion_queue, server, generic_handlers,
                 interceptor_pipeline, thread_pool, maximum_concurrent_rpcs):
        self.lock = threading.RLock()
        self.completion_queue = completion_queue
        self.server = server
        self.generic_handlers = list(generic_handlers)
        self.interceptor_pipeline = interceptor_pipeline
        self.thread_pool = thread_pool
        self.stage = _ServerStage.STOPPED
        self.termination_event = threading.Event()
        self.shutdown_events = [self.termination_event]
        self.maximum_concurrent_rpcs = maximum_concurrent_rpcs
        self.active_rpc_count = 0

        # TODO(https://github.com/grpc/grpc/issues/6597): eliminate these fields.
        self.rpc_states = set()
        self.due = set()

        # A "volatile" flag to interrupt the daemon serving thread
        self.server_deallocated = False

  

从这里我们可以看到,python 的 server 只是对底层的简单封装,关于网络IO的处理完全是底层的 c++ core 负责,python 主要负责调用开发者的接口处理请求。

2️⃣ 注册接口方法
这步负责将我们开发好的接口注册到服务器上,调用的是编译 proto 文件生成的 _pb2_grpc 后缀文件的函数。

 
def add_GreeterServicer_to_server(servicer, server):
    rpc_method_handlers = {
        'SayHello': grpc.unary_unary_rpc_method_handler(
            servicer.SayHello,#接口调用
            request_deserializer=helloworld__pb2.HelloRequest.FromString,#反系列方法
            response_serializer=helloworld__pb2.HelloReply.SerializeToString, #系列方法
        ),
        'SayHelloAgain': grpc.unary_unary_rpc_method_handler(
            servicer.SayHelloAgain,
            request_deserializer=helloworld__pb2.HelloRequest.FromString,
            response_serializer=helloworld__pb2.HelloReply.SerializeToString,
        ),
    }
    generic_handler = grpc.method_handlers_generic_handler(
        'Greeter', rpc_method_handlers)
    server.add_generic_rpc_handlers((generic_handler,))

  

请求的路由分发使用的是字典,key 是我们定义的接口名,value 则是一个命名元组,里面保存的我们的接口方法、序列化方法和反序列化。

3️⃣ 绑定监听端口
这个最后是调用 c++ core 的代码,直接忽略

4️⃣ 服务启动
serverstart 方法只是调用 _start 函数

    def start(self):
        _start(self._state)

  

def _start(state):
    with state.lock:
        if state.stage is not _ServerStage.STOPPED:
            raise ValueError('Cannot start already-started server!')
        state.server.start()
        state.stage = _ServerStage.STARTED
        _request_call(state)

        thread = threading.Thread(target=_serve, args=(state,))
        thread.daemon = True
        thread.start()

  



原文地址:https://www.cnblogs.com/a00ium/p/14169408.html