服务端
public class TestServer { public static String delimiterStr = "$_$"; public static void main(String[] args) throws Exception { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap() .group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { //设置channel中的多个handler @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline channelPipeline = socketChannel.pipeline(); ByteBuf delimiter = Unpooled.copiedBuffer("$_$".getBytes()); channelPipeline.addLast("delimiterBasedFrameDecoder", new DelimiterBasedFrameDecoder(4096, delimiter)); channelPipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8)); channelPipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8)); channelPipeline.addLast("testServerChannelHandler", new TestServerChannelHandler()); } }); ChannelFuture channelFuture = serverBootstrap.bind(8889).sync();//同步监听管道 channelFuture.channel().closeFuture().sync();//同步关闭管道 } finally { boss.shutdownGracefully().sync(); worker.shutdownGracefully().sync(); } } }
public class TestServerChannelHandler extends SimpleChannelInboundHandler<String> { public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { Channel channel = channelHandlerContext.channel(); channelGroup.forEach(currChannel -> { if (channel == currChannel) { currChannel.writeAndFlush("[自己]:" + s + TestServer.delimiterStr); } else { currChannel.writeAndFlush("[" + currChannel.remoteAddress().toString() + "]:" + s + TestServer.delimiterStr); } }); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { String clientAddress = ctx.channel().remoteAddress().toString(); System.out.println("通信管道注册成功 :" + clientAddress); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { String clientAddress = ctx.channel().remoteAddress().toString(); System.out.println("*通信管道注销成功 :" + clientAddress); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {//通信通道活跃状态 String clientAddress = ctx.channel().remoteAddress().toString(); System.out.println("通信管道活跃 :" + clientAddress); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { String clientAddress = ctx.channel().remoteAddress().toString(); System.out.println("*通信管道关闭" + clientAddress); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception {//客户端连接建立 System.out.println("客户端连接建立"); //1 通知其它客户端的channel,并发消息 channelGroup.writeAndFlush("新客户端进入 " + ctx.channel().remoteAddress() + TestServer.delimiterStr); //2 建立连接后将通信使用的管道放入channelGroup channelGroup.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {//客户端失去连接 System.out.println("客户端失去连接"); //1 通知其它客户端的channel,并发消息 channelGroup.writeAndFlush("客户端 " + ctx.channel().remoteAddress() + " 已经离开了 " + TestServer.delimiterStr); //2 会自动从channelGroup移除 } }
客户端
public class TestClient { public static void main(String[] args) throws Exception { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); ChannelFuture channelFuture = bootstrap .group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { //设置channel中的多个handler @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline channelPipeline = socketChannel.pipeline(); ByteBuf delimiter = Unpooled.copiedBuffer("$_$".getBytes()); channelPipeline.addLast("delimiterBasedFrameDecoder", new DelimiterBasedFrameDecoder(4096, delimiter)); channelPipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8)); channelPipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8)); channelPipeline.addLast("testClientChannelHandler", new TestClientChannelHandler()); } }) .connect("localhost", 8889).sync(); Channel channel = channelFuture.channel(); BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); for (; ; ) { channel.writeAndFlush(br.readLine() + "$_$"); } } finally { eventLoopGroup.shutdownGracefully(); } } }
public class TestClientChannelHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { System.out.println(s); } }