grpc学习

安装环境

  • 要求
    • Python 2.7, or Python 3.4 or higher
    • pip version 9.0.1 or higher
  • 安装grpcio模块
python -m pip install grpcio
  • 安装
 pip install grpcio-tools

下载例子

grpc-tools使用

python -m grpc_tools.protoc -I=[proto所在文件目录]--python_out=[输出目录] --grpc_python_out=[输出目录] [.proto文件]

grpc使用python

  • 定义.proto文件
//最好使用proto3
syntax = "proto3";
//定义一个服务
service RouteGuide {
    //声明一个方法
   rpc GetFeature(MyMsg) returns (Ret) {}
}
message MyMsg {
  string name = 1;
  int32 id = 2;
}
message Ret{
  string name = 1;
}
  • 生成_pb2和_pb2_grpc
python -m grpc_tools.protoc -I=[proto所在文件目录]--python_out=[输出目录] --grpc_python_out=[输出目录] [.proto文件]
  • 创建服务
# 实现 proto 文件中定义的 RouteGuideServicer
class RouteGuideServicer(sever_pb2_grpc.RouteGuideServicer):
    # 实现 proto 文件中定义的 rpc 调用
    def GetFeature(self, request, context):
        return sever_pb2.Ret(name="sssss")

在_pb2_grpc文件里面可以看到要实现的地方

实现启动服务

def serve():
    # 启动 rpc 服务
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    sever_pb2_grpc.add_RouteGuideServicer_to_server(RouteGuideServicer(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    try:
        while True:
            time.sleep(60*60*24) # one day in seconds
    except KeyboardInterrupt:
        server.stop(0)

if __name__ == '__main__':
    serve()
  • 创建客户端
    首先创建一个Creating a stub
  channel = grpc.insecure_channel('localhost:50051')
    stub = sever_pb2_grpc.RouteGuideStub(channel)

通过stub调用方法

 response = stub.GetFeature(sever_pb2.MyMsg(name='czl',id=1))

所有实现的代码

import grpc
import sever_pb2
import sever_pb2_grpc
def run():
    channel = grpc.insecure_channel('localhost:50051')
    stub = sever_pb2_grpc.RouteGuideStub(channel)
    response = stub.GetFeature(sever_pb2.MyMsg(name='czl',id=1))
    print (response)
if __name__ == '__main__':
    run()

grpc的保活机制

可以注册回调函数
channel.subscribe(self.callback)
下面是官方对于这个函数的解释,大致的意识就是传递一个回调函数,当channel状态改变的时候会调用这个回调函数Channel Connectivity是他的状态
subscribe

abstract subscribe(callback, try_to_connect=False)[source]¶
Subscribe to this Channel’s connectivity state machine.

A Channel may be in any of the states described by ChannelConnectivity. This method allows application to monitor the state transitions. The typical use case is to debug or gain better visibility into gRPC runtime’s state.

Parameters
callback – A callable to be invoked with ChannelConnectivity argument. ChannelConnectivity describes current state of the channel. The callable will be invoked immediately upon subscription and again for every change to ChannelConnectivity until it is unsubscribed or this Channel object goes out of scope.

try_to_connect – A boolean indicating whether or not this Channel should attempt to connect immediately. If set to False, gRPC runtime decides when to connect.

Channel Connectivity

Channel Connectivity
class grpc.ChannelConnectivity[source]
Mirrors grpc_connectivity_state in the gRPC Core.

IDLE
The channel is idle.

CONNECTING
The channel is connecting.

READY
The channel is ready to conduct RPCs.

TRANSIENT_FAILURE
The channel has seen a failure from which it expects to recover.

SHUTDOWN
The channel has seen a failure from which it cannot recover.

回调函数编写例子

    channel.subscribe(callback)
    def callback(self, *args, **kwargs):
        """
        回调
        :return:
        """
        state = args[0]
        if self.callback_handler:
            with lock:
                return self.callback_handler.dispatch(self, state)
from grpc import ChannelConnectivity
class DefaultCallBackHandler(object):
    def __init__(self, channel):
        self.channel = channel

    def dispatch(self, channel, state):
        if state == ChannelConnectivity.TRANSIENT_FAILURE:
            return self.transient_failure(channel)
        elif state == ChannelConnectivity.SHUTDOWN:
            return self.shut_down(channel)
        elif state == ChannelConnectivity.CONNECTING:
            return self.connecting(channel)
        elif state == ChannelConnectivity.READY:
            return self.ready(channel)
        elif state == ChannelConnectivity.IDLE:
            return self.idle(channel)

    def shut_down(self, channel):
        channel.state = "SHUTDOWN"
        channel.reconnect()
        print("[%s] server error with shutdown" % channel.connect_id)

    def connecting(self, channel):
        channel.state = "CONNECTING"
        print("[%s] I am trying to connect server" % channel.connect_id)

    def ready(self, channel):
        channel.state = "READY"
        print("[%s] I am ready to send a request" % channel.connect_id)

    def transient_failure(self, channel):
        channel.state = "TRANSIENT_FAILURE"
        channel.reconnect()
        print("[%s] someting wrong with this channel" % channel.connect_id)

    def idle(self, channel):
        channel.state = "IDLE"
        print("[%s] waiting" % channel.connect_id)
原文地址:https://www.cnblogs.com/fangaojun/p/13100537.html