Golang的序列化-RPC和GRPC
作者:尹正杰
版权声明:原创作品,谢绝转载!否则将追究法律责任。
一.RPC概述
1>.什么是RPC
RPC(Remote Procedure Call Protocol),是远程过程调用的缩写,通俗的说就是调用远处(一般指不同的主机)的一个函数。
2>.为什么微服务需要RPC
我们使用微服务化的一个好处就是,不限定服务的提供方使用什么技术选型,能够实现公司跨团队的技术解耦。
这样的话,如果没有统一的服务框架,RPC框架,各个团队的服务提供方就需要各自实现一套序列化、反序列化、网络框架、连接池、收发线程、超时处理、状态机等“业务之外”的重复技术劳动,造成整体的低效。
所以,统一RPC框架把上述“业务之外”的技术劳动统一处理,是服务化首要解决的问题。
二.RPC入门案例
在互联网时代,RPC已经和IPC(进程间通信)一样成为一个不可或缺的基础构件。因此Go语言的标准库也提供了一个简单的RPC实现,我们将以此为入口学习RPC的常见用法。
1>.RPC的服务端
package main import ( "fmt" "net" "net/rpc" ) type Zabbix struct{} /** 定义成员方法: 第一个参数是传入参数. 第二个参数必须是传出参数(引用类型). Go语言的RPC规则: 方法只能有两个可序列化的参数,其中第二个参数是指针类型,并且返回一个error类型,同时必须是公开的方法。 温馨提示: 当调用远程函数之后,如果返回的错误不为空,那么传出参数为空。 */ func (Zabbix) MonitorHosts(name string, response *string) error { *response = name + "主机监控中..." return nil } func main() { /** 进程间交互有很多种,比如基于信号,共享内存,管道,套接字等方式. 1>.rpc基于是TCP的,因此我们需要先开启监听端口 */ listener, err := net.Listen("tcp", ":8888") if err != nil { fmt.Println("开启监听器失败,错误原因: ", err) return } defer listener.Close() fmt.Println("服务启动成功...") /** 2>.接受链接,即接受传输的数据 */ conn, err := listener.Accept() if err != nil { fmt.Println("建立链接失败...") return } defer conn.Close() fmt.Println("建立连接: ", conn.RemoteAddr()) /** 3>.注册rpc服务,维护一个hash表,key值是服务名称,value值是服务的地址。服务器有很多函数,希望被调用的函数需要注册到RPC上。 以下是RegisterName的函数签名: func RegisterName(name string, rcvr interface{}) error 以下是对函数签名相关参数的说明: name: 指的是服务名称。 rcvr: 指的是结构体对象(这个结构体必须含有成员方法)。 */ rpc.RegisterName("zabbix", new(Zabbix)) /** 4>.链接的处理交给RCP框架处理,即rpc调用,并返回执行后的数据,其工作原理大致分为以下3个步骤: (1)read,获取服务名称和方法名,获取请求数据; (2)调用对应服务里面的方法,获取传出数据; (3)write,把数据返回给client; */ rpc.ServeConn(conn) }
2>.RPC的客户端
package main import ( "fmt" "net" "net/rpc" ) func main() { /** 1>.首选是通过rpc.Dial拨号RPC服务 温馨提示: 默认数据传输过程中编码方式是gob,可以选择json */ conn, err := net.Dial("tcp", "127.0.0.1:8888") if err != nil { fmt.Println("链接服务器失败") return } defer conn.Close() /** 2>.把conn和rpc进行绑定 */ client := rpc.NewClient(conn) /** 3>.然后通过client.Call调用具体的RPC方法。其中Call函数的签名如下所示: func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error 以下是对函数签名的相关参数进行补充说明: serviceMethod: 用点号(.)链接的RPC服务名字和方法名字 args: 指定输入参数 reply: 指定输出参数接收的 */ var data string err = client.Call("zabbix.MonitorHosts", "Nginx", &data) if err != nil { fmt.Println("远程接口调用失败,错误原因: ", err) return } fmt.Println(data) }
三.跨语言的RPC
标准库的RPC默认采用Go语言特有的gob编码,因此从其它语言调用Go语言实现的RPC服务将比较困难。跨语言是互联网时代RPC的一个首要条件,这里我们再来实现一个跨语言的RPC。
得益于RPC的框架设计,Go语言的RPC其实也是很容易实现跨语言支持的。这里我们将尝试通过官方自带的net/rpc/jsonrpc扩展实现一个跨语言RPC。
1>.RPC的服务端
package main import ( "fmt" "net" "net/rpc" "net/rpc/jsonrpc" ) type OpenFalcon struct{} /** 定义成员方法: 第一个参数是传入参数. 第二个参数必须是传出参数(引用类型). Go语言的RPC规则: 方法只能有两个可序列化的参数,其中第二个参数是指针类型,并且返回一个error类型,同时必须是公开的方法。 温馨提示: 当调用远程函数之后,如果返回的错误不为空,那么传出参数为空。 */ func (OpenFalcon) MonitorHosts(name string, response *string) error { *response = name + "主机监控中..." return nil } func main() { /** 进程间交互有很多种,比如基于信号,共享内存,管道,套接字等方式. 1>.rpc基于是TCP的,因此我们需要先开启监听端口 */ listener, err := net.Listen("tcp", ":8888") if err != nil { fmt.Println("开启监听器失败,错误原因: ", err) return } defer listener.Close() fmt.Println("服务启动成功...") /** 2>.接受链接,即接受传输的数据 */ conn, err := listener.Accept() if err != nil { fmt.Println("建立链接失败...") return } defer conn.Close() fmt.Println("建立连接: ", conn.RemoteAddr()) /** 3>.注册rpc服务,维护一个hash表,key值是服务名称,value值是服务的地址。服务器有很多函数,希望被调用的函数需要注册到RPC上。 以下是RegisterName的函数签名: func RegisterName(name string, rcvr interface{}) error 以下是对函数签名相关参数的说明: name: 指的是服务名称。 rcvr: 指的是结构体对象(这个结构体必须含有成员方法)。 */ rpc.RegisterName("open_falcon", new(OpenFalcon)) /** 4>.链接的处理交给RCP框架处理,即rpc调用,并返回执行后的数据,其工作原理大致分为以下3个步骤: (1)read,获取服务名称和方法名,获取请求数据; (2)调用对应服务里面的方法,获取传出数据; (3)write,把数据返回给client; */ jsonrpc.ServeConn(conn) }
2>.RPC的客户端
package main import ( "fmt" "net/rpc/jsonrpc" ) func main() { /** 首选是通过rpc.Dial拨号RPC服务 温馨提示: 默认数据传输过程中编码方式是gob,可以选择json,需要导入"net/rpc/jsonrpc"包。 */ conn, err := jsonrpc.Dial("tcp", "127.0.0.1:8888") if err != nil { fmt.Println("链接服务器失败") return } defer conn.Close() var data string /** 其中Call函数的签名如下所示: func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error 以下是对函数签名的相关参数进行补充说明: serviceMethod: 用点号(.)链接的RPC服务名字和方法名字 args: 指定输入参数 reply: 指定输出参数接收的 */ err = conn.Call("open_falcon.MonitorHosts", "Httpd", &data) if err != nil { fmt.Println("远程接口调用失败,错误原因: ", err) return } fmt.Println(data) }
四.ProtoBuf
博主推荐阅读: https://www.cnblogs.com/yinzhengjie2020/p/12741943.html
五.GRPC框架
1>.什么是GRPC
GRPC是Google公司基于Protobuf开发的跨语言的开源RPC框架。 GRPC是一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计。目前提供C,Java和Go语言版本,分别是:grpc, grpc-java, grpc-go. 其中C版本支持 C, C++, Node.js, Python, Ruby, Objective-C, PHP 和 C# 支持. GRPC基于HTTP/2标准设计,带来诸如双向流、流控、头部压缩、单 TCP 连接上的多复用请求等特。这些特性使得其在移动设备上表现更好,更省电和节省空间占用。 博主推荐阅读: GRPC 官方文档中文版: http://doc.oschina.net/grpc?t=60133 GRPC官网: https://grpc.io
2>.安装grpc环境
C:Usersyinzhengjie>go get -u -v google.golang.org/grpc
3>.基于protobuf编写Grpc服务
////protobuf默认支持的版本是2.0,现在一般使用3.0版本,所以需要手动指定版本号 //c语言的编程风格 syntax = "proto3"; //指定包名 package pb; //定义传输数据的格式 message People{ string name = 1; //1表示表示字段是1 数据库中表的主键id等于1,主键不能重复,标示位数据不能重复 //标示位不能使用19000 -19999 系统预留位 int32 age = 2; //结构体嵌套 student s = 3; //使用数组/切片 repeated string phone = 4; //oneof的作用是多选一 oneof data{ int32 score = 5; string city = 6; bool good = 7; } } //oneof c语言中的联合体 message student{ string name = 1; int32 age = 6; } //通过定义服务,然后借助框架,帮助实现部分的rpc代码 service Hello{ rpc World(student)returns(student); }
////protobuf默认支持的版本是2.0,现在一般使用3.0版本,所以需要手动指定版本号 //c语言的编程风格 // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.21.0 // protoc v3.11.4 // source: grpc.proto //指定包名 package pb import ( context "context" proto "github.com/golang/protobuf/proto" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" ) const ( // Verify that this generated code is sufficiently up-to-date. _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) // Verify that runtime/protoimpl is sufficiently up-to-date. _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) // This is a compile-time assertion that a sufficiently up-to-date version // of the legacy proto package is being used. const _ = proto.ProtoPackageIsVersion4 //定义传输数据的格式 type People struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` //1表示表示字段是1 数据库中表的主键id等于1,主键不能重复,标示位数据不能重复 //标示位不能使用19000 -19999 系统预留位 Age int32 `protobuf:"varint,2,opt,name=age,proto3" json:"age,omitempty"` //结构体嵌套 S *Student `protobuf:"bytes,3,opt,name=s,proto3" json:"s,omitempty"` //使用数组/切片 Phone []string `protobuf:"bytes,4,rep,name=phone,proto3" json:"phone,omitempty"` //oneof的作用是多选一 // // Types that are assignable to Data: // *People_Score // *People_City // *People_Good Data isPeople_Data `protobuf_oneof:"data"` } func (x *People) Reset() { *x = People{} if protoimpl.UnsafeEnabled { mi := &file_grpc_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *People) String() string { return protoimpl.X.MessageStringOf(x) } func (*People) ProtoMessage() {} func (x *People) ProtoReflect() protoreflect.Message { mi := &file_grpc_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use People.ProtoReflect.Descriptor instead. func (*People) Descriptor() ([]byte, []int) { return file_grpc_proto_rawDescGZIP(), []int{0} } func (x *People) GetName() string { if x != nil { return x.Name } return "" } func (x *People) GetAge() int32 { if x != nil { return x.Age } return 0 } func (x *People) GetS() *Student { if x != nil { return x.S } return nil } func (x *People) GetPhone() []string { if x != nil { return x.Phone } return nil } func (m *People) GetData() isPeople_Data { if m != nil { return m.Data } return nil } func (x *People) GetScore() int32 { if x, ok := x.GetData().(*People_Score); ok { return x.Score } return 0 } func (x *People) GetCity() string { if x, ok := x.GetData().(*People_City); ok { return x.City } return "" } func (x *People) GetGood() bool { if x, ok := x.GetData().(*People_Good); ok { return x.Good } return false } type isPeople_Data interface { isPeople_Data() } type People_Score struct { Score int32 `protobuf:"varint,5,opt,name=score,proto3,oneof"` } type People_City struct { City string `protobuf:"bytes,6,opt,name=city,proto3,oneof"` } type People_Good struct { Good bool `protobuf:"varint,7,opt,name=good,proto3,oneof"` } func (*People_Score) isPeople_Data() {} func (*People_City) isPeople_Data() {} func (*People_Good) isPeople_Data() {} type Student struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` Age int32 `protobuf:"varint,6,opt,name=age,proto3" json:"age,omitempty"` } func (x *Student) Reset() { *x = Student{} if protoimpl.UnsafeEnabled { mi := &file_grpc_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *Student) String() string { return protoimpl.X.MessageStringOf(x) } func (*Student) ProtoMessage() {} func (x *Student) ProtoReflect() protoreflect.Message { mi := &file_grpc_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use Student.ProtoReflect.Descriptor instead. func (*Student) Descriptor() ([]byte, []int) { return file_grpc_proto_rawDescGZIP(), []int{1} } func (x *Student) GetName() string { if x != nil { return x.Name } return "" } func (x *Student) GetAge() int32 { if x != nil { return x.Age } return 0 } var File_grpc_proto protoreflect.FileDescriptor var file_grpc_proto_rawDesc = []byte{ 0x0a, 0x0a, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0xab, 0x01, 0x0a, 0x06, 0x50, 0x65, 0x6f, 0x70, 0x6c, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x61, 0x67, 0x65, 0x12, 0x19, 0x0a, 0x01, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0b, 0x2e, 0x70, 0x62, 0x2e, 0x73, 0x74, 0x75, 0x64, 0x65, 0x6e, 0x74, 0x52, 0x01, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x12, 0x16, 0x0a, 0x05, 0x73, 0x63, 0x6f, 0x72, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x48, 0x00, 0x52, 0x05, 0x73, 0x63, 0x6f, 0x72, 0x65, 0x12, 0x14, 0x0a, 0x04, 0x63, 0x69, 0x74, 0x79, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x04, 0x63, 0x69, 0x74, 0x79, 0x12, 0x14, 0x0a, 0x04, 0x67, 0x6f, 0x6f, 0x64, 0x18, 0x07, 0x20, 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x04, 0x67, 0x6f, 0x6f, 0x64, 0x42, 0x06, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x2f, 0x0a, 0x07, 0x73, 0x74, 0x75, 0x64, 0x65, 0x6e, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x61, 0x67, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x61, 0x67, 0x65, 0x32, 0x2a, 0x0a, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x12, 0x21, 0x0a, 0x05, 0x57, 0x6f, 0x72, 0x6c, 0x64, 0x12, 0x0b, 0x2e, 0x70, 0x62, 0x2e, 0x73, 0x74, 0x75, 0x64, 0x65, 0x6e, 0x74, 0x1a, 0x0b, 0x2e, 0x70, 0x62, 0x2e, 0x73, 0x74, 0x75, 0x64, 0x65, 0x6e, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( file_grpc_proto_rawDescOnce sync.Once file_grpc_proto_rawDescData = file_grpc_proto_rawDesc ) func file_grpc_proto_rawDescGZIP() []byte { file_grpc_proto_rawDescOnce.Do(func() { file_grpc_proto_rawDescData = protoimpl.X.CompressGZIP(file_grpc_proto_rawDescData) }) return file_grpc_proto_rawDescData } var file_grpc_proto_msgTypes = make([]protoimpl.MessageInfo, 2) var file_grpc_proto_goTypes = []interface{}{ (*People)(nil), // 0: pb.People (*Student)(nil), // 1: pb.student } var file_grpc_proto_depIdxs = []int32{ 1, // 0: pb.People.s:type_name -> pb.student 1, // 1: pb.Hello.World:input_type -> pb.student 1, // 2: pb.Hello.World:output_type -> pb.student 2, // [2:3] is the sub-list for method output_type 1, // [1:2] is the sub-list for method input_type 1, // [1:1] is the sub-list for extension type_name 1, // [1:1] is the sub-list for extension extendee 0, // [0:1] is the sub-list for field type_name } func init() { file_grpc_proto_init() } func file_grpc_proto_init() { if File_grpc_proto != nil { return } if !protoimpl.UnsafeEnabled { file_grpc_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*People); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_grpc_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Student); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } } file_grpc_proto_msgTypes[0].OneofWrappers = []interface{}{ (*People_Score)(nil), (*People_City)(nil), (*People_Good)(nil), } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_grpc_proto_rawDesc, NumEnums: 0, NumMessages: 2, NumExtensions: 0, NumServices: 1, }, GoTypes: file_grpc_proto_goTypes, DependencyIndexes: file_grpc_proto_depIdxs, MessageInfos: file_grpc_proto_msgTypes, }.Build() File_grpc_proto = out.File file_grpc_proto_rawDesc = nil file_grpc_proto_goTypes = nil file_grpc_proto_depIdxs = nil } // Reference imports to suppress errors if they are not otherwise used. var _ context.Context var _ grpc.ClientConnInterface // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. const _ = grpc.SupportPackageIsVersion6 // HelloClient is the client API for Hello service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type HelloClient interface { World(ctx context.Context, in *Student, opts ...grpc.CallOption) (*Student, error) } type helloClient struct { cc grpc.ClientConnInterface } func NewHelloClient(cc grpc.ClientConnInterface) HelloClient { return &helloClient{cc} } func (c *helloClient) World(ctx context.Context, in *Student, opts ...grpc.CallOption) (*Student, error) { out := new(Student) err := c.cc.Invoke(ctx, "/pb.Hello/World", in, out, opts...) if err != nil { return nil, err } return out, nil } // HelloServer is the server API for Hello service. type HelloServer interface { World(context.Context, *Student) (*Student, error) } // UnimplementedHelloServer can be embedded to have forward compatible implementations. type UnimplementedHelloServer struct { } func (*UnimplementedHelloServer) World(context.Context, *Student) (*Student, error) { return nil, status.Errorf(codes.Unimplemented, "method World not implemented") } func RegisterHelloServer(s *grpc.Server, srv HelloServer) { s.RegisterService(&_Hello_serviceDesc, srv) } func _Hello_World_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(Student) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(HelloServer).World(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: "/pb.Hello/World", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(HelloServer).World(ctx, req.(*Student)) } return interceptor(ctx, in, info, handler) } var _Hello_serviceDesc = grpc.ServiceDesc{ ServiceName: "pb.Hello", HandlerType: (*HelloServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "World", Handler: _Hello_World_Handler, }, }, Streams: []grpc.StreamDesc{}, Metadata: "grpc.proto", }
4>.grpcServer.go
package main import ( "context" "google.golang.org/grpc" "net" "yinzhengjie/pb" ) //定义一个结构体,继承自HelloServer接口(该接口是我们通过protobuf代码生成的) type HelloService struct {} func (HelloService)World(ctx context.Context, req*pb.Student) (*pb.Student, error){ req.Name += " nihao" req.Age += 10 return req,nil } func main() { //先获取grpc对象 grpcServer := grpc.NewServer() //注册服务 pb.RegisterHelloServer(grpcServer,new(HelloService)) //开启监听 lis,err := net.Listen("tcp",":8888") if err != nil { return } defer lis.Close() //先获取grpc服务端对象 grpcServer.Serve(lis) }
5>.grpcClient.go
package main import ( "google.golang.org/grpc" "context" "fmt" "yinzhengjie/pb" ) func main() { //和grpc服务端建立连接 grpcCnn ,err := grpc.Dial("127.0.0.1:8888",grpc.WithInsecure()) if err != nil { fmt.Println(err) return } defer grpcCnn.Close() //得到一个客户端对象 client :=pb.NewHelloClient(grpcCnn) var s pb.Student s.Name = "Jason Yin" s.Age = 20 resp,err := client.World(context.TODO(),&s) fmt.Println(resp,err) }