server 服务端
入口
package com.sxmd.gateway.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; /** * Description: 服务端 * * @author cy * @date 2019年09月03日 8:51 * Version 1.0 */ public class ServerMain { public static void main(String[] args){ ServerMain.bind(8754); } public static void bind(int port) { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap boot = new ServerBootstrap(); boot.group(boss,worker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ServerPipeLine()); ChannelFuture f = null; f = boot.bind(port).sync(); System.out.println("服务端开始监听,等待客户端连接"); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
package com.sxmd.gateway.server; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; /** * Description: * * @author cy * @date 2019年09月03日 9:10 * Version 1.0 */ public class ServerPipeLine extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel channel) throws Exception { //以$为分隔符 ByteBuf buf = Unpooled.copiedBuffer("$".getBytes()); // 解决粘包的问题 ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new DelimiterBasedFrameDecoder(2018,buf)); // pipeline.addLast(new StringDecoder()); pipeline.addLast(new ServerInHandler()); } }
package com.sxmd.gateway.server; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import java.net.SocketAddress; /** * Description: * * @author cy * @date 2019年09月03日 9:14 * Version 1.0 */ public class ServerInHandler extends SimpleChannelInboundHandler<Object> { @Override protected void channelRead0(ChannelHandlerContext cxf, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] reg = new byte[buf.readableBytes()]; buf.readBytes(reg); String body = new String(reg, "UTF-8"); System.out.println(Thread.currentThread().getName() + "服务端收到的消息:" + body); // 回复消息 String respMsg = "你好," + body + ",我收到了你的消息$"; ByteBuf byteBuf = Unpooled.copiedBuffer(respMsg.getBytes()); cxf.writeAndFlush(byteBuf); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { /**flush:将消息发送队列中的消息写入到 SocketChannel 中发送给对方,为了频繁的唤醒 Selector 进行消息发送 * Netty 的 write 方法并不直接将消息写如 SocketChannel 中,调用 write 只是把待发送的消息放到发送缓存数组中,再通过调用 flush * 方法,将发送缓冲区的消息全部写入到 SocketChannel 中 * */ ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { /**当发生异常时,关闭 ChannelHandlerContext,释放和它相关联的句柄等资源 */ cause.printStackTrace(); ctx.close(); } }
客户端 client
package com.sxmd.gateway.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoop; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; /** * Description: 客户端 * * @author cy * @date 2019年09月03日 8:51 * Version 1.0 */ public class ClientMain { public static void main(String[] args) { for (int i = 0; i < 1; i++) { new Thread(new ClientThread()).start(); } } static class ClientThread implements Runnable{ @Override public void run() { connect("192.168.141.124",8754); } public void connect(String host,int port){ EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ClientPipeLine()); ChannelFuture sync = b.connect(host, port).sync(); System.out.println(Thread.currentThread().getName() + ",客户端发起异步连接.........."); /**等待客户端链路关闭*/ sync.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { group.shutdownGracefully(); } } } }
package com.sxmd.gateway.client; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; /** * Description: * * @author cy * @date 2019年09月03日 9:35 * Version 1.0 */ public class ClientPipeLine extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel channel) throws Exception { //以$为分隔符 ByteBuf buf = Unpooled.copiedBuffer("$".getBytes()); ChannelPipeline pipeline = channel.pipeline(); // 防止粘包的问题 pipeline.addLast(new DelimiterBasedFrameDecoder(2018,buf)); // pipeline.addLast(new StringDecoder()); pipeline.addLast(new ClientInHandler()); } }
package com.sxmd.gateway.client; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * Description: * * @author cy * @date 2019年09月03日 9:39 * Version 1.0 */ public class ClientInHandler extends SimpleChannelInboundHandler<Object> { /** * 当客户端和服务端 TCP 链路建立成功之后,Netty 的 NIO 线程会调用 channelActive 方法 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户连接服务端成功"); for (int i = 0; i < 10; i++) { String reqMsg = "我是客户端"+i+"$"; ByteBuf reqByteBuf = Unpooled.copiedBuffer(reqMsg.getBytes()); /** * writeBytes:将指定的源数组的数据传输到缓冲区 * 调用 ChannelHandlerContext 的 writeAndFlush 方法将消息发送给服务器 */ ctx.writeAndFlush(reqByteBuf); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println(Thread.currentThread().getName() + "服务端返回:" + body); } /** * 当发生异常时,打印异常 日志,释放客户端资源 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { /**释放资源*/ cause.printStackTrace(); ctx.close(); } }