1. 新建两个maven项目, 分别为
netty-server: netty的服务端: 消息的消费者
netty-client: netty的客户端: 消息的生产者
2. 分别引入netty的maven依赖
1 <dependency> 2 <groupId>io.netty</groupId> 3 <artifactId>netty-all</artifactId> 4 <version>4.1.6.Final</version> 5 </dependency>
3. netty-server服务端
3.1. 服务端引导类
1 public class EchoServer { 2 3 private int port; 4 5 private EchoServer(int port) { 6 this.port = port; 7 } 8 9 private void start() throws Exception { 10 System.out.println("Echo Server Start"); 11 EventLoopGroup group = new NioEventLoopGroup(); 12 try { 13 ServerBootstrap b = new ServerBootstrap(); 14 b.group(group) 15 .channel(NioServerSocketChannel.class) 16 .localAddress(new InetSocketAddress(port)) 17 .childHandler(new ChannelInitializer<SocketChannel>() { 18 @Override 19 public void initChannel(SocketChannel ch) throws Exception { 20 ch.pipeline().addLast(new EchoServerHandler()); 21 } 22 }); 23 ChannelFuture f = b.bind().sync(); 24 System.out.println("Server Start Listen At: " + port); 25 f.channel().closeFuture().sync(); 26 } finally { 27 group.shutdownGracefully(); 28 } 29 } 30 31 public static void main(String[] args) throws Exception { 32 int port; 33 if (args.length > 0) { 34 port = Integer.parseInt(args[0]); 35 } else { 36 port = 8080; 37 } 38 new EchoServer(port).start(); 39 } 40 }
3.2. 服务端消息消费业务
1 public class EchoServerHandler extends ChannelInboundHandlerAdapter { 2 3 @Override 4 public void channelRead(ChannelHandlerContext ctx, Object msg) { 5 ByteBuf bb = (ByteBuf) msg; 6 bb.markReaderIndex(); 7 System.out.println("Server Received: " + ByteBufUtil.hexDump(bb.readBytes(bb.readableBytes()))); 8 bb.resetReaderIndex(); 9 ctx.write(msg); 10 } 11 12 @Override 13 public void channelReadComplete(ChannelHandlerContext ctx) { 14 ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); 15 } 16 17 @Override 18 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 19 cause.printStackTrace(); 20 ctx.close(); 21 } 22 }
4. netty-client客户端
4.1. 客户端引导类
1 public class EchoClient { 2 private String host; 3 private int port; 4 5 private EchoClient(String host, int port) { 6 this.host = host; 7 this.port = port; 8 } 9 10 private void start() throws Exception { 11 System.out.println("Echo Client Start"); 12 EventLoopGroup group = new NioEventLoopGroup(); 13 try { 14 Bootstrap b = new Bootstrap(); 15 b.group(group) 16 .channel(NioSocketChannel.class) 17 .remoteAddress(new InetSocketAddress(host, port)) 18 .handler(new ChannelInitializer<SocketChannel>() { 19 @Override 20 public void initChannel(SocketChannel ch) throws Exception { 21 ch.pipeline().addLast(new EchoClientHandler()); 22 } 23 }); 24 25 ChannelFuture f = b.connect().sync(); 26 System.out.println("Server Client Listen Host: [" + host + "] And Port: [" + port + "]"); 27 f.channel().closeFuture().sync(); 28 } finally { 29 group.shutdownGracefully(); 30 } 31 } 32 33 public static void main(String[] args) throws Exception { 34 String host = "127.0.0.1"; 35 int port = 8080; 36 int len = 2; 37 if (args.length == len) { 38 host = args[0]; 39 port = Integer.parseInt(args[1]); 40 } 41 42 new EchoClient(host, port).start(); 43 } 44 45 }
4.2 客户端消息生产业务
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty Rocks!", CharsetUtil.UTF_8)); } @Override public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { System.out.println("Client Received: " + ByteBufUtil.hexDump(in.readBytes(in.readableBytes()))); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
5. 测试:
5.1. 启动服务端
5.2. 启动客户端
5.3. 再看服务端