关于netty的多个handler链式模式

1. 老规矩, 引入我们喜闻乐见的maven依赖

1 <dependency>
2     <groupId>io.netty</groupId>
3     <artifactId>netty-all</artifactId>
4     <version>4.1.6.Final</version>
5 </dependency>

2. 服务端

  2.1: 服务端引导类:

 1 public class EchoServer {
 2 
 3     private int port;
 4 
 5     private EchoServer(int port) {
 6         this.port = port;
 7     }
 8 
 9     private void start() throws Exception {
10         System.out.println("Echo Server Start");
11         EventLoopGroup group = new NioEventLoopGroup();
12         try {
13             ServerBootstrap b = new ServerBootstrap();
14             b.group(group)
15                     .channel(NioServerSocketChannel.class)
16                     .localAddress(new InetSocketAddress(port))
17                     .childHandler(new ChannelInitializer<SocketChannel>() {
18                         @Override
19                         public void initChannel(SocketChannel ch) throws Exception {
20                             ch.pipeline().addLast(new EchoOutboundHandler1());
21                             ch.pipeline().addLast(new EchoOutboundHandler2());
22                             
23                             ch.pipeline().addLast(new EchoInboundHandler1());
24                             ch.pipeline().addLast(new EchoInboundHandler2());
25                         }
26                     });
27             ChannelFuture f = b.bind().sync();
28             System.out.println("Server Start Listen At: " + port);
29             f.channel().closeFuture().sync();
30         } finally {
31             group.shutdownGracefully();
32         }
33     }
34 
35     public static void main(String[] args) throws Exception {
36         int port;
37         if (args.length > 0) {
38             port = Integer.parseInt(args[0]);
39         } else {
40             port = 8080;
41         }
42         new EchoServer(port).start();
43     }
44 }

  2.2 EchoOutboundHandler1

 1 public class EchoInboundHandler1 extends ChannelInboundHandlerAdapter {
 2 
 3     @Override
 4     public void channelRead(ChannelHandlerContext ctx, Object msg) {
 5         // 读取msg中的数据
 6         ByteBuf result = (ByteBuf) msg;
 7         byte[] bytes = new byte[result.readableBytes()];
 8         result.readBytes(bytes);
 9         String resultStr = new String(bytes);
10         System.out.println("Server Received: " + resultStr + ": Inbound 1 Is OK");
11         
12         // 处理完msg中的数据后往msg中重新存放新的数据继续传递
13         result.writeBytes(resultStr.getBytes());
14         ctx.fireChannelRead(result);
15     }
16     
17 }

  2.3 EchoInboundHandler2

 1 public class EchoInboundHandler2 extends ChannelInboundHandlerAdapter {
 2 
 3     @Override
 4     public void channelRead(ChannelHandlerContext ctx, Object msg) {
 5         // 读取msg中的数据
 6         ByteBuf result = (ByteBuf) msg;
 7         byte[] bytes = new byte[result.readableBytes()];
 8         result.readBytes(bytes);
 9         String resultStr = new String(bytes);
10         System.out.println("Server Received: " + resultStr + ": Inbound 2 Is OK");
11 
12         // 处理完msg中的数据后往msg中重新存放新的数据继续传递
13         result.writeBytes(resultStr.getBytes());
14         ctx.write(result);
15     }
16 
17 }

  2.4 EchoOutboundHandler2 

 1 public class EchoOutboundHandler2 extends ChannelOutboundHandlerAdapter {
 2 
 3     @Override
 4     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
 5         // 读取msg中的数据
 6         ByteBuf result = (ByteBuf) msg;
 7         byte[] bytes = new byte[result.readableBytes()];
 8         result.readBytes(bytes);
 9         String resultStr = new String(bytes);
10         System.out.println("Server Received: " + resultStr + ": Outbound 2 Is OK");
11         
12         // 处理完msg中的数据后往msg中重新存放新的数据继续传递
13         result.writeBytes(resultStr.getBytes());
14         ctx.write(result);
15     }
16 }

  2.5 EchoOutboundHandler1 

 1 public class EchoOutboundHandler1 extends ChannelOutboundHandlerAdapter {
 2 
 3     @Override
 4     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
 5         // 读取msg中的数据
 6         ByteBuf result = (ByteBuf) msg;
 7         byte[] bytes = new byte[result.readableBytes()];
 8         result.readBytes(bytes);
 9         String resultStr = new String(bytes);
10         System.out.println("Server Received: " + resultStr + ": Outbound 1 Is OK");
11 
12         // 处理完msg中的数据后往msg中重新存放新的数据继续传递
13         result.writeBytes(resultStr.getBytes());
14         ctx.write(result);
15         ctx.flush();
16     }
17 }

3. 客户端

 1 public class EchoClient {
 2     
 3     private String host;
 4     private int port;
 5 
 6     private EchoClient(String host, int port) {
 7         this.host = host;
 8         this.port = port;
 9     }
10 
11     private void start() throws Exception {
12         System.out.println("Echo Client Start");
13         EventLoopGroup group = new NioEventLoopGroup();
14         try {
15             Bootstrap b = new Bootstrap();
16             b.group(group)
17                     .channel(NioSocketChannel.class)
18                     .remoteAddress(new InetSocketAddress(host, port))
19                     .handler(new ChannelInitializer<SocketChannel>() {
20                         @Override
21                         public void initChannel(SocketChannel ch) throws Exception {
22                             ch.pipeline().addLast(new EchoClientHandler());
23                         }
24                     });
25             ChannelFuture f = b.connect().sync();
26             System.out.println("Server Client Listen IP: [" + host + ":" + port + "]");
27             f.channel().closeFuture().sync();
28         } finally {
29             group.shutdownGracefully();
30         }
31     }
32 
33     public static void main(String[] args) throws Exception {
34         String host = "127.0.0.1";
35         int port = 8080;
36         int len = 2;
37         if (args.length == len) {
38             host = args[0];
39             port = Integer.parseInt(args[1]);
40         }
41         new EchoClient(host, port).start();
42     }
43 
44 }
 1 public class EchoClientHandler extends ChannelInboundHandlerAdapter {
 2 
 3     @Override
 4     public void channelActive(ChannelHandlerContext ctx) {
 5         ctx.writeAndFlush(Unpooled.copiedBuffer("Netty Rocks!", CharsetUtil.UTF_8));
 6     }
 7 
 8     @Override
 9     public void channelRead(ChannelHandlerContext ctx, Object msg) {
10         // 读取msg中的数据
11         ByteBuf result = (ByteBuf) msg;
12         byte[] bytes = new byte[result.readableBytes()];
13         result.readBytes(bytes);
14         String resultStr = new String(bytes);
15         System.out.println("Echo Client Received Is OK: " + resultStr);
16         ctx.close();
17     }
18 
19     @Override
20     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
21         cause.printStackTrace();
22         ctx.close();
23     }
24 }

4. 结果:

  服务端:

  客户端:

5. 注意事项:

5.1. ChannelInboundHandler之间的传递, 通过调用 ctx.fireChannelRead(msg) 实现; 调用ctx.write(msg) 将传递到ChannelOutboundHandler

5.2. ctx.write()方法执行后, 需要调用flush()方法才能令它立即执行

5.3. ChannelOutboundHandler 在注册的时候需要放在最后一个ChannelInboundHandler之前, 否则将无法传递到ChannelOutboundHandler

原文地址:https://www.cnblogs.com/yanwu0527/p/9105560.html