Netty入门实例

通过代码和代码中的详细注释,可以对Netty有个入门的了解

TimeServer(初始化服务,绑定端口,指定IO handler)

 1 import io.netty.bootstrap.ServerBootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.ChannelOption;
 5 import io.netty.channel.EventLoopGroup;
 6 import io.netty.channel.nio.NioEventLoopGroup;
 7 import io.netty.channel.socket.SocketChannel;
 8 import io.netty.channel.socket.nio.NioServerSocketChannel;
 9 
10 /**
11  * 服务端
12  *
13  * @author zhya
14  * @date 2018/9/14
15  **/
16 public class TimeServer {
17 
18     public static void main(String[] args) {
19         try {
20             // 启动服务端,绑定8080端口
21             new TimeServer().bind(8080);
22         } catch (InterruptedException e) {
23             e.printStackTrace();
24         }
25     }
26 
27     /**
28      * 初始化服务端
29      *
30      * @param port
31      * @throws InterruptedException
32      */
33     public void bind(int port) throws InterruptedException {
34         // group 是NIO线程组,用于网络事件,实际就是Reactor线程组
35         // bossGroup 用于处理连接
36         EventLoopGroup bossGroup = new NioEventLoopGroup();
37         // bossGroup 用于处理Socket读写操作
38         EventLoopGroup workerGroup = new NioEventLoopGroup();
39 
40         try {
41             // 服务初始化工具,封装初始化服务的复杂代码
42             ServerBootstrap serverBootstrap = new ServerBootstrap();
43             // 指定group
44             serverBootstrap.group(bossGroup, workerGroup)
45                     // 设置channel类型
46                     .channel(NioServerSocketChannel.class)
47                     // 设置缓存
48                     .option(ChannelOption.SO_BACKLOG, 1024)
49                     // 在创建NioSocketChannel成功后,将他的ChannelHandler(TimeClientHandler)设置到ChannelPipeline中,用于处理网络IO事件
50                     .childHandler(new ChildChannelHandler());
51 
52             // 绑定端口(同步,阻塞,等待绑定完成)
53             ChannelFuture future = serverBootstrap.bind(port).sync();
54 
55             // future接收异步通知回调,阻塞,等待链路关闭后退出
56             future.channel().closeFuture().sync();
57         } finally {
58             // 优雅的关闭线程组
59             bossGroup.shutdownGracefully();
60             workerGroup.shutdownGracefully();
61         }
62     }
63 
64     /**
65      * IO事件处理器
66      *
67      * @author zhya
68      * @date 2018/9/14
69      **/
70     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
71         @Override
72         protected void initChannel(SocketChannel socketChannel) throws Exception {
73             // 设置处理IO事件的处理器
74             socketChannel.pipeline().addLast(new TimeServerHandler());
75         }
76     }
77 }

TimeServerHandler(处理服务端Channel中的读写事件)

 1 import io.netty.buffer.ByteBuf;
 2 import io.netty.buffer.Unpooled;
 3 import io.netty.channel.ChannelHandlerAdapter;
 4 import io.netty.channel.ChannelHandlerContext;
 5 
 6 import java.time.LocalDateTime;
 7 
 8 /**
 9  * 对服务端网络事件进行读写操作的处理器
10  *
11  * @author zhya
12  * @date 2018/9/14
13  **/
14 public class TimeServerHandler extends ChannelHandlerAdapter {
15 
16     /**
17      * 读到部分客户端数据后回调
18      *
19      * @param ctx
20      * @param msg
21      * @throws Exception
22      */
23     @Override
24     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
25         // 将客户端数据转成ByteBuf
26         ByteBuf byteBuf = (ByteBuf) msg;
27         byte[] req = new byte[byteBuf.readableBytes()];
28 
29         // 从ByteBuf中读取客户端发送的数据
30         byteBuf.readBytes(req);
31 
32         // 展示客户端发送来的数据
33         String body = new String(req, "UTF-8");
34         System.out.println("request body : " + body);
35 
36         String currentTime = LocalDateTime.now().toString();
37         // 回写数据给客户端(write只是把待发送消息放到发送缓存数组中)
38         ctx.write(Unpooled.copiedBuffer(currentTime.getBytes()));
39     }
40 
41     /**
42      * 完成读客户端数据后回调
43      *
44      * @param ctx
45      * @throws Exception
46      */
47     @Override
48     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
49         System.out.println("channel read complete");
50         // 刷新,将消息队列中的消息发送给SocketChannel
51         ctx.flush();
52     }
53 
54     /**
55      * 遇到错误后回调
56      *
57      * @param ctx
58      * @param cause
59      * @throws Exception
60      */
61     @Override
62     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
63         System.out.println("encounter exception");
64         System.out.println(cause.getMessage());
65         // 发送异常关闭ChannelHandlerContext,释放资源
66         ctx.close();
67     }
68 }

TimeClient(初始化客户端SocketChannel,连接ServerSocket,指定IO handler)

 1 import io.netty.bootstrap.Bootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.ChannelOption;
 5 import io.netty.channel.EventLoopGroup;
 6 import io.netty.channel.nio.NioEventLoopGroup;
 7 import io.netty.channel.socket.SocketChannel;
 8 import io.netty.channel.socket.nio.NioSocketChannel;
 9 
10 /**
11  * 客户端
12  *
13  * @author zhya
14  * @date 2018/9/14
15  **/
16 public class TimeClient {
17     public static void main(String[] args) {
18         try {
19             // 构造100个线程,开启客户端连接服务器
20             for (int i = 0; i < 100; i++) {
21                 new Thread(() -> {
22                     try {
23                         // 开启客户端连接服务器
24                         new TimeClient().connect("127.0.0.1", 8080);
25                     } catch (Exception e) {
26                         e.printStackTrace();
27                     }
28                 }).start();
29             }
30         } catch (Exception e) {
31             e.printStackTrace();
32         }
33     }
34 
35     /**
36      * 初始化客户端
37      *
38      * @param host
39      * @param port
40      * @throws Exception
41      */
42     public void connect(String host, int port) throws Exception {
43         // 负责处理客户端IO事件的NIO线程组
44         EventLoopGroup group = new NioEventLoopGroup();
45         try {
46             // 客户端初始化工具类,封装初始化客户端的复杂代码
47             Bootstrap bootstrap = new Bootstrap();
48             // 指定group
49             bootstrap.group(group)
50                     // 设置channel类型为NioSocketChannel(socket client)
51                     .channel(NioSocketChannel.class)
52                     // 设置TCP参数,无延时
53                     .option(ChannelOption.TCP_NODELAY, true)
54                     // 在创建NioSocketChannel成功后,将他的ChannelHandler(TimeClientHandler)设置到ChannelPipeline中,用于处理网络IO事件
55                     .handler(new ChannelInitializer<SocketChannel>() {
56                         @Override
57                         protected void initChannel(SocketChannel socketChannel) throws Exception {
58                             // 设置处理IO事件的处理器
59                             socketChannel.pipeline().addLast(new TimeClientHandler());
60                         }
61                     });
62             // 阻塞,指导成功建立连接
63             ChannelFuture future = bootstrap.connect(host, port).sync();
64 
65             // 阻塞,等待异步回调通知
66             future.channel().closeFuture().sync();
67         } finally {
68             // 优雅的关闭线程组,释放资源
69             group.shutdownGracefully();
70         }
71     }
72 }

TimeClientHandler(处理客户端channel连接、读写事件)

 1 import io.netty.buffer.ByteBuf;
 2 import io.netty.buffer.Unpooled;
 3 import io.netty.channel.ChannelHandlerAdapter;
 4 import io.netty.channel.ChannelHandlerContext;
 5 
 6 /**
 7  * 处理客户端IO事件的处理器
 8  *
 9  * @author zhya
10  * @date 2018/9/14
11  **/
12 public class TimeClientHandler extends ChannelHandlerAdapter {
13 
14     /**
15      * 定义发送数据的载体/容器
16      */
17     private final ByteBuf firstMsg;
18 
19     /**
20      * 初始化要发送的数据和ByteBuf
21      */
22     public TimeClientHandler() {
23         byte[] req = "get time".getBytes();
24         firstMsg = Unpooled.buffer(req.length);
25         firstMsg.writeBytes(req);
26     }
27 
28     /**
29      * 当channel可用的时候回调
30      *
31      * @param ctx
32      * @throws Exception
33      */
34     @Override
35     public void channelActive(ChannelHandlerContext ctx) throws Exception {
36         System.out.println("channel active");
37         // 向服务端写入并刷新数据
38         ctx.writeAndFlush(firstMsg);
39     }
40 
41     /**
42      * 当读取到数据时回调(并非读取到所有的数据)
43      *
44      * @param ctx
45      * @param msg
46      * @throws Exception
47      */
48     @Override
49     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
50         // 转换
51         ByteBuf byteBuf = (ByteBuf) msg;
52         byte[] rep = new byte[byteBuf.readableBytes()];
53         byteBuf.readBytes(rep);
54 
55         // 输出
56         String body = new String(rep, "UTF-8");
57         System.out.println("reponse body : " + body);
58     }
59 
60     /**
61      * 当遇到异常时回调
62      *
63      * @param ctx
64      * @param cause
65      * @throws Exception
66      */
67     @Override
68     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
69         System.out.println("encounter exception");
70         System.out.println(cause.getMessage());
71 
72         // 停止ChannelHandlerContext,释放资源
73         ctx.close();
74     }
75 }
原文地址:https://www.cnblogs.com/zhya/p/9645862.html