Netty——Protobuf编解码

什么是protocol buffers?

Protocol buffers是谷歌的语言中立,平台中立的,可扩展机制的序列化数据结构框架,可以看作是xml,但是体积更小,传输速率更快,使用更加简单。

一旦你定义了你的数据格式,你可以使用生成源代码去轻松地从各种数据流读和写你的结构化数据并且使用不同的语言。protobuf有2.0版本和3.0版本,3.0版本十grpc框架的基础。

Protocol buffers目前支持Java, Python, Objective-C, 和C++生成代码。新的proto3语言版本,你可以使用Go, JavaNano, Ruby, 和 C#。

使用Protocol buffers,您可以编写一个.proto描述您希望存储的数据结构。 Protocol buffers编译器创建一个实现自动编码和解析协议缓冲区数据的类,并使用高效的二进制格式。

生成的类为组成Protocol buffers的字段提供getter和setter。


使用Protobuf编写一个编码解码最简单程序

  • 在 .proto结尾的文件中定义消息格式。
  • 使用protocol buffers编译器将 .proto结尾的文件生成对应语言的源代码(本demo使用java编译器)。
  • 使用Java protocol buffer API 去读写消息。

定义一个Student.proto文件

syntax ="proto2";

package com.zhihao.miao.protobuf;

//optimize_for 加快解析的速度
option optimize_for = SPEED;
option java_package = "com.zhihao.miao.protobuf";
option java_outer_classname="DataInfo";

message Student{
    required string name = 1;
    optional int32 age = 2;
    optional string address = 3;
}

在Java项目中,除非你已经明确指定了java_package,否则package 用作Java的包名。即使您提供java_package,您仍然应该定义一个package,以避免在Protocol Buffers名称空间和非Java语言中的名称冲突。

package的定义之后,我们可以看到两个定义的java选项:java_packagejava_outer_classnamejava_package指定您生成的类应该存放的Java包名称。

如果没有明确指定它,将会使用package定义的name作为包名,但这些名称通常不是适合的Java包名称(因为它们通常不以域名开头)。 java_outer_classname选项定义应该包含此文件中所有类的类名。

如果你不明确地给出一个java_outer_classname,它将通过将文件名转换为驼峰的方式来生成。 例如,默认情况下,“my_proto.proto”将使用“MyProto”作为外部类名称。

每个元素上的“= 1”,“= 2”标记标识字段在二进制编码中使用的唯一“标签”。你可以将经常使用或者重复的字段标注成1-15,因为在进行编码的时候因为少一个字节进行编码,所以效率更高。

  • required:必须提供该字段的值,否则被认为没有初始化。尝试构建一个未初始化的值被会抛出RuntimeException。解析一个为初始化的消息会抛出IOException。除此之外与optional一样。
  • optional:可以设置或不设置该字段。 如果未设置可选字段值,则使用默认值。
  • repeated:字段可能重复任意次数(包括零)。 重复值的顺序将保留在protocol buffer中。 将重复的字段视为动态大小的数组。

(本列子中没有字段定义成repeated类型,定义成repeated类型其实就是java中List类型的字段)。

慎重使用required类型,将required类型的字段更改为optional会有一些问题,而将optional类型的字段更改为required类型,则没有问题。


编译

使用protocol buffers编译器将对应的.proto文件编译成对应的类。

1、编译器的安装,下载地址

2、修改环境变量

vim .bash_profile
export PATH=/Users/naeshihiroshi/software/work/protoc-3.3.0-osx-x86_64/bin
source .bash_profile
which protoc
/Users/naeshihiroshi/software/work/protoc-3.3.0-osx-x86_64/bin/protoc

3、进入项目目录,执行编译语句如下:

netty_lecture git:(master) ✗ protoc --java_out=src/main/java  src/protobuf/Student.proto   

--java_out后面第一个参数指定代码的路径,具体的包名在.proto文件中的java_package指定了,第二个指定要编译的proto文件。

自动生成的类名是DataInfo(在java_outer_classname中指定了),自动生成的类太长,这边就不列出来了。

编写序列化反序列化测试类

package com.zhihao.miao.protobuf;

//实际使用protobuf序列化框架客户端将对象转译成字节数组,然后通过协议传输到服务器端,服务器端可以是其他的语言框架(比如说python)将
//字节对象反编译成java对象
public class ProtobuffTest {
    public static void main(String[] args) throws Exception{
        DataInfo.Student student = DataInfo.Student.newBuilder().setName("张三").setAge(20).setAddress("北京").build();

        //将对象转译成字节数组,序列化
        byte[] student2ByteArray = student.toByteArray();

        //将字节数组转译成对象,反序列化
        DataInfo.Student student2 = DataInfo.Student.parseFrom(student2ByteArray);

        System.out.println(student2.getName());
        System.out.println(student2.getAge());
        System.out.println(student2.getAddress());
    }
}

执行测试类,控制台打印:

张三
20
北京

Protobuf与netty结合

在 Netty 数据传输过程中可以有很多选择,比如;字符串、json、xml、java 对象,但为了保证传输的数据具备;良好的通用性、方便的操作性和传输的高性能,我们可以选择 protobuf 作为我们的数据传输格式。

Netty为protobuf提供了两个编码器(ProtobufVarint32LengthFieldPrepender、ProtobufEncoder),两个解码器(ProtobufVarint32FrameDecoder、ProtobufDecoder)。

写一个使用Protobuf作为序列化框架,Netty作为传输层的最简单的demo,需求描述:

  • 客户端传递一个User对象给服务端(User对象包括姓名,年龄,密码)
  • 客户端接收客户端的User对象并且将其相应的银行账户等信息反馈给客户端

代码示例

  • DataInfo.proto
syntax ="proto2";

package com.zhihao.miao.netty.sixthexample;

option optimize_for = SPEED;
option java_package = "com.zhihao.miao.test.day06";
option java_outer_classname="DataInfo";

message RequestUser{
    optional string user_name = 1;
    optional int32 age = 2;
    optional string password = 3;
}

message ResponseBank{
    optional string bank_no = 1;
    optional double money = 2;
    optional string bank_name=3;
}

使用Protobuf编译器进行编译,生成DataInfo对象。

  • NettyServer.java
public class NettyServer {

    public static void main(String[] args) {
        new NettyServer().bing(7397);
    }

    private void bing(int port) {
        //配置服务端NIO线程组
        EventLoopGroup parentGroup = new NioEventLoopGroup(); 
        EventLoopGroup childGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class)    //非阻塞模式
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childHandler(new ServerChannelInitializer());
            ChannelFuture f = b.bind(port).sync();
            System.out.println("netty server start done.");
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            childGroup.shutdownGracefully();
            parentGroup.shutdownGracefully();
        }
    }
}
  • ServerChannelInitializer.java
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel) {
        // 添加ProtobufVarint32FrameDecoder解码器,主要用于Protobuf的半包处理
        channel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
     // 添加ProtobufDecoder解码器,它的参数是com.google.protobuf.MessageLite,实际上就是告诉ProtobufDecoder需要解码的目标类是什么,
// 否则仅仅从字节数组中是无法判断出要解码的目标类型信息的(服务端需要解析的是客户端请求,所以是Request) channel.pipeline().addLast(
new ProtobufDecoder(DataInfo.RequestUser.getDefaultInstance()));
     // 添加Protobuf...FieldPrepender编码器,主要用于Protobuf的半包处理 channel.pipeline().addLast(
new ProtobufVarint32LengthFieldPrepender());
// 添加ProtobufEncoder编码器 channel.pipeline().addLast(
new ProtobufEncoder()); // 在管道中添加我们自己的业务处理Handler channel.pipeline().addLast(new ServerHandler()); } }
  • ServerHandler.java
public class ServerHandler extends ChannelInboundHandlerAdapter<DataInfo.RequestUser> {

    /**
     * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        SocketChannel channel = (SocketChannel) ctx.channel();
        System.out.println("客户端链接到本服务端。channelId:" + channel.id());
    }

    /**
     * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端断开链接" + ctx.channel().localAddress().toString());
    }

     /**
      * 由于ProtobufDecoder已经对消息进行了自动解码,因此接收到的订购请求消息可以直接使用RequestUser
      * 然后直接使用ResponseBank构造应答消息返回给客户端,由于使用了ProtobufEncoder,
      * 所以不需要对DataInfo.ResponseBank进行手工编码
      */ @Override
public void channelRead(ChannelHandlerContext ctx, DataInfo.RequestUser msg) throws Exception {
     System.out.println(msg.getUserName());
     System.out.println(msg.getAge());
     System.out.println(msg.getPassword());

     DataInfo.ResponseBank bank = DataInfo.ResponseBank.newBuilder().setBankName("中国工商银行").setBankNo("6222222200000000000").setMoney(560000.23).build();
     ctx.channel().writeAndFlush(bank);
}
   /**
     * 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        System.out.println("异常信息:
" + cause.getMessage());
    }

}
  • NettyClient.java
public class NettyClient {

    public static void main(String[] args) {
        new NettyClient().connect("127.0.0.1", 7397);
    }

    private void connect(String inetHost, int inetPort) {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup).channel(NioSocketChannel.class).handler(new ClientChannelInitializer());
ChannelFuture f
= b.connect(inetHost, inetPort).sync(); System.out.println("itstack-demo-netty client start done."); f.channel().writeAndFlush(MsgUtil.buildMsg(f.channel().id().toString(),"你好,使用protobuf通信格式的服务端")); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); } } }
  • ClientChannelInitializer.java
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        // 添加ProtobufVarint32FrameDecoder解码器,主要用于Protobuf的半包处理
        channel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
     // 添加ProtobufDecoder解码器,它的参数是com.google.protobuf.MessageLite,实际上就是告诉ProtobufDecoder需要解码的目标类是什么,
// 否则仅仅从字节数组中是无法判断出要解码的目标类型信息的(客户端需要解析的是服务端请求,所以是Response) channel.pipeline().addLast(new ProtobufDecoder(DataInfo.ResponseBank.getDefaultInstance()));
     // 添加Protobuf...FieldPrepender编码器,主要用于Protobuf的半包处理 channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
// 添加ProtobufEncoder编码器 channel.pipeline().addLast(new ProtobufEncoder()); // 在管道中添加我们自己的业务处理Handler channel.pipeline().addLast(new ClientHandler());
    } 
}
  • ClientHandler.java
public class ClientHandler extends ChannelInboundHandlerAdapter<DataInfo.ResponseBank> {

    /**
     * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        SocketChannel channel = (SocketChannel) ctx.channel();
        System.out.println("本客户端链接到服务端。channelId:" + channel.id());

     DataInfo.RequestUser user = DataInfo.RequestUser.newBuilder()
     .setUserName("zhihao.miao").setAge(27).setPassword("123456").build();
ctx.channel().writeAndFlush(user);
}
   /**
     * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("断开链接" + ctx.channel().localAddress().toString());
    }

     /**
      * 由于ProtobufDecoder已经对消息进行了自动解码,因此接收到的响应消息可以直接使用ResponseBank
      */
    @Override
    public void channelRead(ChannelHandlerContext ctx, DataInfo.ResponseBank msg) throws Exception {
     System.out.println(msg.getBankNo());
     System.out.println(msg.getBankName());
     System.out.println(msg.getMoney());
}
   /**
     * 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        System.out.println("异常信息:
" + cause.getMessage());
    }

}

运行服务器端和客户端,服务器控制台打印:

七月 03, 2017 11:12:03 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xa1a63b58, L:/0:0:0:0:0:0:0:0:8899] READ: [id: 0x08c534f3, L:/127.0.0.1:8899 - R:/127.0.0.1:65448]
七月 03, 2017 11:12:03 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xa1a63b58, L:/0:0:0:0:0:0:0:0:8899] READ COMPLETE
zhihao.miao
27
123456

客户端控制台打印:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
6222222200000000000
中国工商银行
560000.23

引用:

原文地址:https://www.cnblogs.com/caoweixiong/p/14684453.html