【网络编程】NIO,Netty实现群聊Code

笔记回顾

  • Server
  • Client
  • Protocol
  • Decode/Encode

Server

package club.interview.io.netty.chat.ok;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * fixme channel组管理
 * - 自定义协议处理粘包半包问题 {@link MyProtocol}
 * - 需要对应解码处理 {@linkplain MyDecoder }
 * - 需要对应编码处理 {@linkplain MyEncoder }
 *
 * @author QuCheng on 2020/8/17.
 */
public class NettyChatServerStudy2 {

    public static void main(String[] args) {
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup worker = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new MyDecoder());
                            pipeline.addLast(new MyEncoder());
                            pipeline.addLast(new MyChatHandler());
                        }
                    });

            ChannelFuture channelFuture = serverBootstrap.bind(888).sync()
                    .addListener((ChannelFutureListener) future -> {
                        if (future.isSuccess()) {
                            System.out.println("Server is done !");
                        }
                    });
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

    static class MyChatHandler extends SimpleChannelInboundHandler<MyProtocol> {

        static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            Channel channel = ctx.channel();
            channelGroup.add(channel);
            String msg = channel.remoteAddress() + " 加入群聊!";
            System.out.println("【Log】" + msg);
            channelGroup.writeAndFlush(new MyProtocol("【系统提醒】" + msg));
        }

        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) {
            Channel channel = ctx.channel();
            String msg = channel.remoteAddress() + " 退出群聊!";
            System.out.println("【Log】" + msg);
            channelGroup.writeAndFlush(new MyProtocol("【系统提醒】" + msg));
        }

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, MyProtocol msg) {
            // 消息转发
            Channel channel = ctx.channel();
            String msg1 = new String(msg.getContent());
            System.out.println("【Log】" + channel.remoteAddress() + " " + msg1);
            channelGroup.forEach(c -> {
                String msg2;
                if (c == channel) {
                    msg2 = "【帅气的我】" + "
	" + msg1;
                } else {
                    msg2 = "【路人】" + channel.remoteAddress() + " 
	" + msg1;
                }
                c.writeAndFlush(new MyProtocol(msg2));
            });
        }
    }
}

Client

package club.interview.io.netty.chat.ok;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.util.Scanner;

/**
 * @author QuCheng on 2020/8/17.
 */
public class NettyChatClientStudy2 {

    public static void main(String[] args) {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();

            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast(new MyDecoder())
                                    .addLast(new MyEncoder())
                                    .addLast(new MyHandler());
                        }
                    });

            ChannelFuture channelFuture = bootstrap.connect("localhost", 888).sync()
                    .addListener(future -> {
                        if (future.isSuccess()) {
                            System.out.println("Client is ok !");
                        }
                    });

            // 发消息
            Scanner scanner = new Scanner(System.in);
            Channel channel = channelFuture.channel();
            while (scanner.hasNextLine()) {
                String s = scanner.nextLine();
                // 测试粘包半包问题
//                for (int i = 0; i < 20; i++) {
//                    channel.writeAndFlush(new MyProtocol(s));
//                }
                channel.writeAndFlush(new MyProtocol(s));
            }
            channel.closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }

    static class MyHandler extends SimpleChannelInboundHandler<MyProtocol> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, MyProtocol msg) throws Exception {
            System.out.println(new String(msg.getContent()));
        }
    }
}

Protocol

package club.interview.io.netty.chat.ok;

/**
 * @author QuCheng on 2020/8/17.
 */
public class MyProtocol {

    private int len;

    private byte[] content;

    public MyProtocol() {
    }

    public MyProtocol(String s) {
        byte[] bytes = s.getBytes();
        this.len = bytes.length;
        this.content = bytes;
    }

    public int getLen() {
        return len;
    }

    public void setLen(int len) {
        this.len = len;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }
}

Decoder/Encoder

package club.interview.io.netty.chat.ok;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;

import java.util.List;

/**
 * @author QuCheng on 2020/8/17.
 */
public class MyDecoder extends ReplayingDecoder<Void> {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        int i = in.readInt();
        byte[] bytes = new byte[i];

        in.readBytes(bytes);

        MyProtocol protocol = new MyProtocol();

        protocol.setLen(i);
        protocol.setContent(bytes);

        out.add(protocol);
    }
}
package club.interview.io.netty.chat.ok;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * @author QuCheng on 2020/8/17.
 */
public class MyEncoder extends MessageToByteEncoder<MyProtocol> {

    @Override
    protected void encode(ChannelHandlerContext ctx, MyProtocol msg, ByteBuf out) throws Exception {
        out.writeInt(msg.getLen());
        out.writeBytes(msg.getContent());
    }
}
原文地址:https://www.cnblogs.com/nightOfStreet/p/13516879.html