Netty之读取消息

一 引言

  如果我们不使用Netty而是直接使用JDK的话,可以使用socketChannel.read(ByteBuffer)的方式进行读取。而Netty牛逼的地方就是替我们把读取繁琐操作给做的,还把半包粘包给解决了。本文就来看看Netty是怎么做的

二  NioSocketChannelUnsafe.read

  AbstractNioByteChannel.NioByteUnsafe

public final void read() {
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    // 用来处理内存的分配:池化或者非池化 UnpooledByteBufAllocator
    final ByteBufAllocator allocator = config.getAllocator();
    // 用来计算此次读循环应该分配多少内存 AdaptiveRecvByteBufAllocator 自适应计算缓冲分配
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);// 重置为0

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            byteBuf = allocHandle.allocate(allocator);//这里会根据上一次的读取情况进行自适应的调整大小
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            if (allocHandle.lastBytesRead() <= 0) {// 如果上一次读到的字节数小于等于0,清理引用和跳出循环
                // nothing was read. release the buffer.
                byteBuf.release();// 引用 -1
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;// 如果远程已经关闭连接
                if (close) {
                    // There is nothing left to read as we received an EOF.
                    readPending = false;
                }
                break;
            }

            allocHandle.incMessagesRead(1);//  totalMessages += amt;
            readPending = false;
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());

        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();

        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}

三  ByteBufAllocator

  该接口的实现很多,用户可以配置实现类型。如果不配默认是  PooledByteBufAllocator,也就是池化的同时是直接内存

buffer() // 返回一个 ByteBuf 对象,默认直接内存。如果平台不支持,返回堆内存。
heapBuffer()// 返回堆内存缓存区
directBuffer()// 返回直接内存缓冲区
compositeBuffer() // 返回一个复合缓冲区。可能同时包含堆内存和直接内存。
ioBuffer() // 当当支持 Unsafe 时,返回直接内存的 Bytebuf,否则返回返回基于堆内存,当使用 PreferHeapByteBufAllocator 时返回堆内存

四  RecvByteBufAllocator.Handle

   

  它的作用根据代码里注释

/**
     * Creates a new handle.  The handle provides the actual operations and keeps the internal information which is
     * required for predicting an optimal buffer capacity.
     */

  

ByteBuf allocate(ByteBufAllocator alloc);//创建一个新的接收缓冲区,其容量可能大到足以读取所有入站数据和小到数据足够不浪费它的空间。
int guess();// 猜测所需的缓冲区大小,不进行实际的分配
void reset(ChannelConfig config);// 每次开始读循环之前,重置相关属性
void incMessagesRead(int numMessages);// 增加本地读循环的次数
void lastBytesRead(int bytes); // 设置最后一次读到的字节数
int lastBytesRead(); // 最后一次读到的字节数
void attemptedBytesRead(int bytes); // 设置读操作尝试读取的字节数
void attemptedBytesRead(); // 获取尝试读取的字节数
boolean continueReading(); // 判断是否需要继续读
void readComplete(); // 读结束后调用

  该接口的主要作用就是计算字节数,如同 RecvByteBufAllocator 的文档说的那样,根据预测和计算最佳大小的缓存区,确保不浪费。

RecvByteBufAllocator 的实现类 AdaptiveRecvByteBufAllocator 

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {

    static final int DEFAULT_MINIMUM = 64;
    static final int DEFAULT_INITIAL = 1024;
    static final int DEFAULT_MAXIMUM = 65536;

    private static final int INDEX_INCREMENT = 4;
    private static final int INDEX_DECREMENT = 1;

    private static final int[] SIZE_TABLE;

    static {
        List<Integer> sizeTable = new ArrayList<Integer>();
        for (int i = 16; i < 512; i += 16) {
            sizeTable.add(i);
        }

        for (int i = 512; i > 0; i <<= 1) {
            sizeTable.add(i);
        }

        SIZE_TABLE = new int[sizeTable.size()];
        for (int i = 0; i < SIZE_TABLE.length; i ++) {
            SIZE_TABLE[i] = sizeTable.get(i);
        }
    }

  sizeTable最后的大小是53,大概张这个样子

  

五 doReadBytes

  AbstractNioByteChannel.doReadBytes

protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.attemptedBytesRead(byteBuf.writableBytes());
        return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
    }

  这里就是调用JDK的原生的api完成的,只是里面有很多Netty的技巧。比如读取缓冲区不够会自动扩容,还有读取循环如果读到的int是0 最多循环16次

  

原文地址:https://www.cnblogs.com/juniorMa/p/14312488.html