netty4.1.32学习(持续更新)

netty4.1.32学习

官方api:https://netty.io/4.1/api/index.html

Netty 实战(精髓):https://waylau.gitbooks.io/essential-netty-in-action/

 

一、简单介绍

本文是根据李林峰书籍《Netty权威指南》(第2版)总结而来。

二、Netty简介

Netty是一个高性能、异步事件驱动的NIO框架,提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。
作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,一些业界著名的开源组件也基于Netty构建,比如RPC框架、zookeeper等。

三、Netty学习路线(与书本顺序基本一致)

1、NIO入门

2、Netty入门应用介绍(helloworld:时间服务器)

3、TCP粘包/拆包问题解决方法:

  (1)LineBasedFromeDecoder+StringDecoder:按行切换的文本解码器
  (2)DelimiterBasedFrameDecoder+StringDecoder:自动完成以分隔符做结束标志的消息的解码器
  (3)FixedLengthFrameDecoder+StringDecoder:自动完成对定长消息的解码器

4、Netty序列化问题解决方法:

  (1)介绍Java序列化的缺点

  (2)MessagePack

  (3)Google的ProtoBuf

    ①ProtoBuf入门与使用

    ②ProtoBuf在netty中使用

  (4)Facebook的Thrift

  (5)JBoss Marshalling

5、Netty多协议开发和应用

  (1)HTTP协议介绍

  (2)Netty HTTP+XML协议栈开发

四、Netty实例(源码)

1、解决粘包/拆包问题(LineBasedFromeDecoder实现)

 1 package com.rs.test.timeserver;
 2 
 3 import io.netty.bootstrap.ServerBootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioServerSocketChannel;
11 import io.netty.handler.codec.LineBasedFrameDecoder;
12 import io.netty.handler.codec.string.StringDecoder;
13 
14 public class TimeServer {
15 
16     public void bind(int port) throws Exception {
17         // 配置服务端的NIO线程组
18         EventLoopGroup bossGroup = new NioEventLoopGroup();
19         EventLoopGroup workerGroup = new NioEventLoopGroup();
20 
21         try {
22             ServerBootstrap b = new ServerBootstrap();
23             b.group(bossGroup, workerGroup)
24                 .channel(NioServerSocketChannel.class)
25                 .option(ChannelOption.SO_BACKLOG, 1024)
26                 .childHandler(new ChildChannelHandler());
27 
28             // 绑定端口,同步等待成功
29             ChannelFuture f = b.bind(port).sync();
30 
31             // 等待服务端监听端口关闭
32             f.channel().closeFuture().sync();
33         } finally {
34             // 优雅退出,释放线程池资源
35             bossGroup.shutdownGracefully();
36             workerGroup.shutdownGracefully();
37         }
38     }
39 
40     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
41 
42         @Override
43         protected void initChannel(SocketChannel ch) throws Exception {
44             // 核心在下面两行,加入了LineBasedFrameDecoder和StringDecoder两个解码器
45             // 所以当消息到达我们的业务处理handler即TimerServerHandler,所看到的消息
46             // 都是前面两个解码器经过处理之后的结果
47             ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
48             ch.pipeline().addLast(new StringDecoder());
49             ch.pipeline().addLast(new TimeServerHandler());
50         }
51 
52     }
53 
54     public static void main(String[] args) throws Exception {
55         int port = 8080;
56         if(args != null && args.length > 0) {
57             try {
58                 port = Integer.valueOf(port);
59             } catch (NumberFormatException e) {
60                 // TODO: handle exception
61             }
62         }
63         new TimeServer().bind(port);
64     }
65 
66 }
TimeServer
 1 package com.rs.test.timeserver;
 2 
 3 import java.sql.Date;
 4 
 5 import io.netty.buffer.ByteBuf;
 6 import io.netty.buffer.Unpooled;
 7 import io.netty.channel.ChannelHandlerAdapter;
 8 import io.netty.channel.ChannelHandlerContext;
 9 import io.netty.channel.ChannelInboundHandlerAdapter;
10 
11 public class TimeServerHandler extends ChannelInboundHandlerAdapter {
12 
13     private int counter = 0;
14 
15     @Override
16     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
17         String body = (String) msg;
18         // counter的作用是标记这是第几次收到客户端的请求
19         System.out.println("The time server receive order : " + body + " ; the counter is : " + ++counter);
20         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? 
21                 new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
22         currentTime = currentTime + System.getProperty("line.separator");
23         ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
24         ctx.write(resp);
25     }
26 
27     @Override
28     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
29         ctx.flush();
30     }
31 
32     @Override
33     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
34         ctx.close();
35     }
36 }
TimeServerHandler
 1 package com.rs.test.timeserver;
 2 import io.netty.bootstrap.Bootstrap;
 3 import io.netty.channel.ChannelFuture;
 4 import io.netty.channel.ChannelInitializer;
 5 import io.netty.channel.ChannelOption;
 6 import io.netty.channel.EventLoopGroup;
 7 import io.netty.channel.nio.NioEventLoopGroup;
 8 import io.netty.channel.socket.SocketChannel;
 9 import io.netty.channel.socket.nio.NioSocketChannel;
10 import io.netty.handler.codec.LineBasedFrameDecoder;
11 import io.netty.handler.codec.string.StringDecoder;
12 
13 public class TimeClient {
14 
15     public void connect(int port, String host) throws Exception {
16         // 配置客户端NIO线程组
17         EventLoopGroup group = new NioEventLoopGroup();
18         try {
19             Bootstrap b = new Bootstrap();
20             b.group(group).channel(NioSocketChannel.class)
21                 .option(ChannelOption.TCP_NODELAY, true)
22                 .handler(new ChannelInitializer<SocketChannel>() {
23 
24                     @Override
25                     protected void initChannel(SocketChannel ch) throws Exception {
26                         // 核心在下面两行,加入了LineBasedFrameDecoder和StringDecoder两个解码器
27                         // 所以当消息到达我们的业务处理handler即TimerServerHandler,所看到的消息
28                         // 都是前面两个解码器经过处理之后的结果
29                         ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
30                         ch.pipeline().addLast(new StringDecoder());
31                         ch.pipeline().addLast(new TimeClientHandler());
32                     }
33                 });
34             // 发起异步连接操作
35             ChannelFuture f = b.connect(host, port).sync();
36 
37             // 等待客户端链路关闭
38             f.channel().closeFuture().sync();
39         } finally {
40             // 优雅退出,释放NIO线程组
41             group.shutdownGracefully();
42         }
43     }
44 
45     public static void main(String[] args) throws Exception {
46         int port = 8080;
47         if(args != null && args.length > 0) {
48             try {
49                 port = Integer.valueOf(port);
50             } catch (NumberFormatException e) {
51                 // 采用默认值
52             }
53         }
54         new TimeClient().connect(port, "localhost");
55     }
56 }
TimeClient
 1 package com.rs.test.timeserver;
 2 import java.util.logging.Logger;
 3 
 4 import io.netty.buffer.ByteBuf;
 5 import io.netty.buffer.Unpooled;
 6 import io.netty.channel.ChannelHandlerAdapter;
 7 import io.netty.channel.ChannelHandlerContext;
 8 import io.netty.channel.ChannelInboundHandlerAdapter;
 9 
10 public class TimeClientHandler extends ChannelInboundHandlerAdapter {
11 
12     private static final Logger logger = Logger.getLogger(TimeServerHandler.class.getName());
13 
14     private int counter;
15 
16     private byte[] req;
17 
18     public TimeClientHandler() {
19         req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
20     }
21 
22     @Override
23     public void channelActive(ChannelHandlerContext ctx) {
24         ByteBuf message = null;
25         for(int i = 0; i < 100; i++) {
26             message = Unpooled.buffer(req.length);
27             message.writeBytes(req);
28             ctx.writeAndFlush(message);
29         }
30     }
31 
32     @Override
33     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
34         String body = (String) msg;
35         // counter的作用是标记这是第几次收到客户端的请求
36         System.out.println("Now is : " + body + " ; the counter is : " + ++counter);
37     }
38 
39     @Override
40     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
41         logger.warning("Unexpected exception from downstream : ");
42         ctx.close();
43     }
44 
45 }
TimeClientHandler

2.使用protobuf作为编解码技术(包括半包处理及序列化),心跳机制动态将客户端关闭

https://github.com/carsonWuu/Netty/tree/master/src/com/rs/test/protobuf

五、相关技术学习

1、Netty断线重连解决方案(client端):①使用心跳机制去检测②启动时连接重试③运行中连接断开时重试

https://www.jianshu.com/p/c78b37a2ca47

原文地址:https://www.cnblogs.com/carsonwuu/p/10170030.html