Netty 零拷贝(一)NIO 对零拷贝的支持

Netty 零拷贝(二)NIO 对零拷贝的支持

Netty 之美系列目录 (https://www.cnblogs.com/binarylei/p/10117436.html)

相关文章:

Buffer类结构

  • 非直接缓冲区(HeapByteBuffer):在 JVM 内存上分配一个字节数组 byte[] hb
  • 直接缓冲区(DirectByteBuffer):保存一个指向系统内核的地址 long address

一、非直接缓冲区和直接缓冲区

(1) Buffer 分配

// 分配非直接缓冲区
public static ByteBuffer allocate(int capacity) {
    if (capacity < 0)
        throw new IllegalArgumentException();
    return new HeapByteBuffer(capacity, capacity);
}

// 分配直接缓冲区
public static ByteBuffer allocateDirect(int capacity) {
    return new DirectByteBuffer(capacity);
}

(2) ByteBuffer 内存存储

public abstract class Buffer {
    // Invariants: mark <= position <= limit <= capacity
    private int mark = -1;
    private int position = 0;
    private int limit;
    private int capacity;

    // 直接缓冲区指向系统内核的一个地址,之所以放到父类中是为了加快 JNI 的访问速度
    long address;
}

public abstract class ByteBuffer extends Buffer {
    // 非直接缓冲区在 JVM 内存上分配一个字节数组
    final byte[] hb;
}

(3) DirectByteBuffer

DirectByteBuffer(int cap) {
    super(-1, 0, cap, cap);
    boolean pa = VM.isDirectMemoryPageAligned();
    int ps = Bits.pageSize();
    long size = Math.max(1L, (long)cap + (pa ? ps : 0));
    Bits.reserveMemory(size, cap);

    long base = 0;
    try {
        base = unsafe.allocateMemory(size);
    } catch (OutOfMemoryError x) {
        Bits.unreserveMemory(size, cap);
        throw x;
    }
    unsafe.setMemory(base, size, (byte) 0);
    if (pa && (base % ps != 0)) {
        address = base + ps - (base & (ps - 1));
    } else {
        address = base;
    }
    cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
    att = null
}

public byte get() {
    return ((unsafe.getByte(ix(nextGetIndex()))));
}

public ByteBuffer put(byte x) {
    unsafe.putByte(ix(nextPutIndex()), ((x)));
    return this;
}

DirectByteBuffer 对直接缓冲区的操作都委托给了类 sun.misc.Unsafe,Unsafe 都是一些本地方法 native。

public final class Unsafe {
    public native long allocateMemory(long bytes);
    public native byte    getByte(long address);
    public native void    putByte(long address, byte x);
}

二、直接缓冲区应用

使用直接缓冲区可以避免用户空间和系统空间之间的拷贝过程,即零拷贝。

FileChannel inChannel = FileChannel.open(Paths.get("1.png"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("3.png"),
        StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);

// 方式一:内存映射文件,直接缓冲区
MappedByteBuffer inMappedBuf = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
//只有 READ_WRITE,没有 WRITE,因此 outChannel 也要加上 READ
MappedByteBuffer outMappedBuf = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());

byte[] bytes = new byte[inMappedBuf.limit()];
inMappedBuf.get(bytes);
outMappedBuf.put(bytes);

// 方式二:transferTo 也是使用直接缓冲区
//inChannel.transferTo(0, inChannel.size(), outChannel);
//outChannel.transferFrom(inChannel, 0, inChannel.size());

直接缓冲区

三、DirectByteBuffer

Java NIO中的 direct buffer(主要是 DirectByteBuffer)其实是分两部分的:

       Java        |      native
                   |
 DirectByteBuffer  |     malloc'd
 [    address   ] -+-> [   data    ]

其中 DirectByteBuffer 自身是一个 Java 对象,在 Java 堆中;而这个对象中有个 long 类型字段 address,记录着一块调用 malloc() 申请到的 native memory。

FileChannel 的 read(ByteBuffer dst) 函数,write(ByteBuffer src) 函数中,如果传入的参数是 HeapBuffer 类型,则会临时申请一块 DirectBuffer,进行数据拷贝,而不是直接进行数据传输,这是出于什么原因?

// OpenJDK 的 sun.nio.ch.IOUtil
static int write(FileDescriptor fd, ByteBuffer src, long position,
                     NativeDispatcher nd) throws IOException {
    if (src instanceof DirectBuffer)
        return writeFromNativeBuffer(fd, src, position, nd);

    // Substitute a native buffer
    int pos = src.position();
    int lim = src.limit();
    assert (pos <= lim);
    int rem = (pos <= lim ? lim - pos : 0);
    ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
    try {
        bb.put(src);
        bb.flip();
        // Do not update src until we see how many bytes were written
        src.position(pos);

        int n = writeFromNativeBuffer(fd, bb, position, nd);
        if (n > 0) {
            // now update src
            src.position(pos + n);
        }
        return n;
    } finally {
        Util.offerFirstTemporaryDirectBuffer(bb);
    }
}

这里其实是在迁就 OpenJDK 里的 HotSpot VM 的一点实现细节。

HotSpot VM 里的 GC 除了 CMS 之外都是要移动对象的,是所谓 “compacting GC”。

如果要把一个 Java 里的 byte[] 对象的引用传给 native 代码,让 native 代码直接访问数组的内容的话,就必须要保证 native 代码在访问的时候这个 byte[] 对象不能被移动,也就是要被“pin”(钉)住。

可惜 HotSpot VM 出于一些取舍而决定不实现单个对象层面的 object pinning,要 pin 的话就得暂时禁用 GC ——也就等于把整个 Java 堆都给 pin 住。HotSpot VM 对 JNI 的 Critical 系 API 就是这样实现的。这用起来就不那么顺手。

所以 Oracle/Sun JDK / OpenJDK 的这个地方就用了点绕弯的做法。 它假设把 HeapByteBuffer 背后的 byte[] 里的内容拷贝一次是一个时间开销可以接受的操作,同时假设真正的 I/O 可能是一个很慢的操作。

于是它就先把 HeapByteBuffer 背后的 byte[] 的内容拷贝到一个 DirectByteBuffer 背后的 native memory 去,这个拷贝会涉及 sun.misc.Unsafe.copyMemory() 的调用,背后是类似 memcpy() 的实现。这个操作本质上是会在整个拷贝过程中暂时不允许发生 GC 的,虽然实现方式跟 JNI 的 Critical 系 API 不太一样。(具体来说是 Unsafe.copyMemory() 是 HotSpot VM 的一个 intrinsic 方法,中间没有 safepoint 所以 GC 无法发生)。

然后数据被拷贝到 native memory 之后就好办了,就去做真正的 I/O,把 DirectByteBuffer 背后的 native memory 地址传给真正做 I/O 的函数。这边就不需要再去访问 Java 对象去读写要做 I/O 的数据了。

参考:

  1. 《Java NIO中,关于DirectBuffer,HeapBuffer的疑问?》:https://www.zhihu.com/question/57374068/answer/152691891

每天用心记录一点点。内容也许不重要,但习惯很重要!

原文地址:https://www.cnblogs.com/binarylei/p/10053349.html