netty

服务端

public class TestServer {
    public static String delimiterStr = "$_$";

    public static void main(String[] args) throws Exception {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap()
                    .group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        //设置channel中的多个handler
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline channelPipeline = socketChannel.pipeline();
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_$".getBytes());
                            channelPipeline.addLast("delimiterBasedFrameDecoder", new DelimiterBasedFrameDecoder(4096, delimiter));
                            channelPipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
                            channelPipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));

                            channelPipeline.addLast("testServerChannelHandler", new TestServerChannelHandler());
                        }
                    });
            ChannelFuture channelFuture = serverBootstrap.bind(8889).sync();//同步监听管道
            channelFuture.channel().closeFuture().sync();//同步关闭管道
        } finally {
            boss.shutdownGracefully().sync();
            worker.shutdownGracefully().sync();
        }
    }
}
public class TestServerChannelHandler extends SimpleChannelInboundHandler<String> {
    public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        Channel channel = channelHandlerContext.channel();
        channelGroup.forEach(currChannel -> {
            if (channel == currChannel) {
                currChannel.writeAndFlush("[自己]:" + s + TestServer.delimiterStr);
            } else {
                currChannel.writeAndFlush("[" + currChannel.remoteAddress().toString() + "]:" + s + TestServer.delimiterStr);
            }
        });
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        String clientAddress = ctx.channel().remoteAddress().toString();
        System.out.println("通信管道注册成功 :" + clientAddress);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        String clientAddress = ctx.channel().remoteAddress().toString();
        System.out.println("*通信管道注销成功 :" + clientAddress);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {//通信通道活跃状态
        String clientAddress = ctx.channel().remoteAddress().toString();
        System.out.println("通信管道活跃 :" + clientAddress);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        String clientAddress = ctx.channel().remoteAddress().toString();
        System.out.println("*通信管道关闭" + clientAddress);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {//客户端连接建立
        System.out.println("客户端连接建立");
        //1 通知其它客户端的channel,并发消息
        channelGroup.writeAndFlush("新客户端进入   " + ctx.channel().remoteAddress() + TestServer.delimiterStr);
        //2 建立连接后将通信使用的管道放入channelGroup
        channelGroup.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {//客户端失去连接
        System.out.println("客户端失去连接");
        //1 通知其它客户端的channel,并发消息
        channelGroup.writeAndFlush("客户端   " + ctx.channel().remoteAddress() + " 已经离开了 " + TestServer.delimiterStr);
        //2 会自动从channelGroup移除
    }
}

客户端

public class TestClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            ChannelFuture channelFuture = bootstrap
                    .group(eventLoopGroup)
                    .channel(NioSocketChannel.class)

                    .handler(new ChannelInitializer<SocketChannel>() {
                        //设置channel中的多个handler
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline channelPipeline = socketChannel.pipeline();
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_$".getBytes());
                            channelPipeline.addLast("delimiterBasedFrameDecoder", new DelimiterBasedFrameDecoder(4096, delimiter));
                            channelPipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
                            channelPipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
                            channelPipeline.addLast("testClientChannelHandler", new TestClientChannelHandler());
                        }
                    })
                    .connect("localhost", 8889).sync();
            Channel channel = channelFuture.channel();
            BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
            for (; ; ) {
                channel.writeAndFlush(br.readLine() + "$_$");
            }

        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}
public class TestClientChannelHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(s);
    }
}
原文地址:https://www.cnblogs.com/zzq-include/p/11048306.html