Netty系列(四)TCP拆包和粘包

Netty系列(四)TCP拆包和粘包

一、拆包和粘包问题

(1) 一个小的Socket Buffer问题

在基于流的传输里比如 TCP/IP,接收到的数据会先被存储到一个 socket 接收缓冲里。不幸的是,基于流的传输并不是一个数据包队列,而是一个字节队列。即使你发送了 2 个独立的数据包,操作系统也不会作为 2 个消息处理而仅仅是作为一连串的字节而言。因此这是不能保证你远程写入的数据就会准确地读取。举个例子,让我们假设操作系统的 TCP/TP 协议栈已经接收了 3 个数据包:

netty5_1.png

由于基于流传输的协议的这种普通的性质,在你的应用程序里读取数据的时候会有很高的可能性被分成下面的片段。

netty5_2.png

因此,一个接收方不管他是客户端还是服务端,都应该把接收到的数据整理成一个或者多个更有意思并且能够让程序的业务逻辑更好理解的数据。在上面的例子中,接收到的数据应该被构造成下面的格式:

netty5_3.png

测试:

  1. 在 client 端向 server 端发送三次数据

    //向服务器发送数据 buf
    f.channel().writeAndFlush(Unpooled.copiedBuffer("ABC".getBytes()));
    f.channel().writeAndFlush(Unpooled.copiedBuffer("DEF".getBytes()));
    f.channel().writeAndFlush(Unpooled.copiedBuffer("GHI".getBytes()));
    
  2. server 端可能将三次传输的数据当成一次请求,服务器收到的结果如下

    ABCDEFGHI
    

(2) 解决方案

拆包和粘包问题的解决方案,根据业界主流协议,在有三种方案,前三种 Netty 已经实现:

  1. 消息定长,例如每个报文的大小固定为200个字节,如果不够,空位补空格。

  2. 在包尾部增加特殊字符进行分割,例如加回车等。

  3. 将消息分为定长消息头和消息体,在消息头中包含表示消息总长度的字段,然后进行业务逻辑的处理。通常设计思烙为消息头的第一个字段使用 int32 来表示消息的总长度。

二、定长方案 - FixedLengthFrameDecoder

FixedLengthFrameDecoder 是固定长度解码器,它能够按照指定的长度对消息进行自动解码,开发者不需要考虑 TCP 的粘包/拆包问题,非常实用。注意:长度不够的忽略。

StringDecoder 的功能非常简单,就是将接收到的对象转换成字符串,然后继续调用后面的 Handler。 FixedLengthFrameDecoder + StringDecoder 组合就是按固定长度的文本解码。

  1. 在 Server 中添加如下配制:

    childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel sc) throws Exception {
            //定长拆包:5个字符,不足5位则忽略
            sc.pipeline().addLast(new FixedLengthFrameDecoder(5));
            //设置字符串形式的解码
            sc.pipeline().addLast(new StringDecoder());
            sc.pipeline().addLast(new ServerHandler());
        }
    })
    
  2. ServerHandler 中接收请求的数据:

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println((String)msg);
    
        //写给客户端
        ChannelFuture f = ctx.writeAndFlush(Unpooled.copiedBuffer(((String)msg).getBytes()));
        //写完成后会自动关闭客户端
        //f.addListener(ChannelFutureListener.CLOSE);
    }
    
  3. Client 发送的数据:

    //向服务器发送数据 buf
    f.channel().writeAndFlush(Unpooled.copiedBuffer("aaaaabbbbb".getBytes()));
    f.channel().writeAndFlush(Unpooled.copiedBuffer("cccccddd".getBytes()));
    
  4. 结果如下,可以看出5个字符作为一个请求处理,不足5位的忽略:

    aaaaa
    bbbbb
    ccccc
    

三、固定分隔符方案 - DelimiterBasedFrameDecoder

LineBasedFrameDecoder 的工作原理是它依次遍历 Bytebuf 中的可读字节,判判断看是否有“ ”或者“ ”,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。

DelimiterBasedFrameDecoder 自动完成以分隔符作为码流结束标识的消息的解码。

  1. 在 Server 中添加如下配制:

    childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
            //ch.pipeline().addLast(new LineBasedFrameDecoder(1024, buf));
            //设置字符串形式的解码
            ch.pipeline().addLast(new StringDecoder());
            ch.pipeline().addLast(new ServerHandler());
        }
    })
    
  2. ServerHandler 中接收请求的数据:

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println((String)msg);
    
        //写给客户端
        ChannelFuture f = ctx.writeAndFlush(Unpooled.copiedBuffer("netty$_".getBytes()));
        //写完成后会自动关闭客户端
        f.addListener(ChannelFutureListener.CLOSE);
    
    }
    

    结果如下,可以看出请求是分三次处理的:

    ABC
    DEF
    GHI
    

四、自定义协议

Netty自定义协议请参考 这篇文章

五、Netty 内部解决拆包与粘包的方案

如果不考虑拆包与粘包的问题,ServerHandler 的处理如下:

public class ServerHandler extends ChannelHandlerAdapter {

    //ctx.write()后自动释放msg
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "utf-8");
        System.out.println("The time server receive order : " + body);

        //写给客户端
        ChannelFuture cf = ctx.write(Unpooled.copiedBuffer(new Date().toLocaleString().getBytes()));
        //写完成后会自动关闭客户端
        //cf.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

(1) 第一个解决方案

最简单的方案是构造一个内部的可积累的缓冲,直到4个字节全部接收到了内部缓冲。下面的代码修改了 ServerHandler 的实现类修复了这个问题。

public class MyHandler extends ChannelHandlerAdapter {

    private ByteBuf buf;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        // 用于缓存每次所有接收的数据
        buf = ctx.alloc().buffer(4); // (1)
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object in) {
        // 读数据前 buf 中保存的还未解析的数据
        String oldData = getBufferString(buf);
        // 本次要读 in 中的数据
        String newData = getBufferString((ByteBuf) in);

        ByteBuf m = (ByteBuf) in;
        buf.writeBytes(m); // (2)
        m.release();

        // 读取 in 中的数据后 buf 中保存的数据
        String totalData = getBufferString(buf);
        int size = buf.readableBytes();

        while (buf.readableBytes() >= 4) {
            byte[] data = new byte[4];
            buf.readBytes(data);
            System.out.println(new String(data));
            ctx.writeAndFlush(Unpooled.copiedBuffer(new Date().toLocaleString().getBytes()));
            //ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

    public String getBufferString(ByteBuf buf) {
        buf.markReaderIndex();
        byte[] bytes = new byte[buf.readableBytes()];
        buf.readBytes(bytes);
        String s = new String(bytes);
        buf.resetReaderIndex();
        return s;
    }
}
  1. ChannelHandler 有 2 个生命周期的监听方法:handlerAdded() 和 handlerRemoved()。你可以完成任意初始化任务只要他不会被阻塞很长的时间。

  2. 首先,所有接收的数据都应该被累积在 buf 变量里。

  3. 然后,处理器必须检查 buf 变量是否有足够的数据,在这个例子中是 4 个字节,然后处理实际的业务逻辑。否则,Netty 会重复调用 channelRead() 当有更多数据到达直到 4 个字节的数据被积累。

(2) 第二个解决方案

尽管第一个解决方案已经解决了拆包与粘包的问题了,但是修改后的处理器看起来不那么的简洁,想象一下如果由多个字段比如可变长度的字段组成的更为复杂的协议时,你的 ChannelHandler 的实现将很快地变得难以维护。

正如你所知的,你可以增加多个 ChannelHandler 到 ChannelPipeline ,因此你可以把一整个 ChannelHandler 拆分成多个模块以减少应用的复杂程度,比如你可以把 ServerHandler 拆分成 2 个处理器:

  • MyDecoder 处理数据拆分的问题
  • ServerHandler 原始版本的实现

幸运地是,Netty 提供了一个可扩展的类,帮你完成 MyDecoder 的开发。

public class MyDecoder extends ByteToMessageDecoder { // (1)

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        out.add(in.readBytes(4)); // (4)
    }
}
  1. ByteToMessageDecoder 是 ChannelHandler 的一个实现类,他可以在处理数据拆分的问题上变得很简单。

  2. 每当有新数据接收的时候,ByteToMessageDecoder 都会调用 decode() 方法来处理内部的那个累积缓冲。

  3. Decode() 方法可以决定当累积缓冲里没有足够数据时可以往 out 对象里放任意数据。当有更多的数据被接收了 ByteToMessageDecoder 会再一次调用 decode() 方法。

  4. 如果在 decode() 方法里增加了一个对象到 out 对象里,这意味着解码器解码消息成功。ByteToMessageDecoder 将会丢弃在累积缓冲里已经被读过的数据。请记得你不需要对多条消息调用 decode(),ByteToMessageDecoder 会持续调用 decode() 直到不放任何数据到 out 里。

下面可以来测试一把了:

private static class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        //1. 不考虑解码的问题,会出现拆包与粘包
        //ch.pipeline().addLast(new ServerHandler());

        //2. netty 原始的方式解决拆包与粘包
        //ch.pipeline().addLast(new MyHandler());

        //3. netty 提供 ByteToMessageDecoder 类解决拆包与粘包
        ch.pipeline().addLast(new MyDecoder());
        ch.pipeline().addLast(new ServerHandler());
    }
}

参考:

  1. 《Netty 解决 TCP 拆包粘包问题》: http://ifeve.com/netty5-user-guide/#流数据的传输处理

每天用心记录一点点。内容也许不重要,但习惯很重要!

原文地址:https://www.cnblogs.com/binarylei/p/8947167.html