Netty——发送消息流程&高低水位

相关概念

SO_SEND_BUF和SO_REC_BUFF

  • SO_SEND_BUF是操作系统内核的写缓冲区,所有应用程序需要发送到对端的信息,都会放到该缓冲区中,等待发往对端
  • SO_REC_BUFF是操作系统内核的读缓冲区,所有对端发过来的数据都会放到该缓冲区中,等待应用程序取走

ChannelOutboundBuffer

  • 该buffer是Netty等待写入系统内核缓冲区的消息队列。

Channel的高低水位线

  • Netty 中提供一种水位线的标志,提用户当前通道的消息堆积情况;
  • Netty 中的 Channel 都有一个写缓冲区(ChannelOutboundBuffer),这是个 Netty 发数据时的仓库,要发送的数据以数据结构 Entry 的形式存在仓库中,Entry 是个链表中的节点;
  • Netty 中的高低水位线,对应的就是这个链表中节点的数量范围,用于限制程序的写操作,自己在写程序的时候,需要用相应的代码给予配合,从而避免 OOM,增强写数据时的安全性。
  • 设置高低水位线参数(默认 32 * 1024 ~ 64 * 1024);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
  • 设置好了高低水位参数,如果自己在写代码的时候,没有做判断 channel.isWritable() 的,就跟没设置一样!!! 示例代码片段:
// 这是对设置的高低水位线参数的尊重,如果设置了高低水位线,这里却不做判断,直接写,就有可能 OOM;
if (ctx.channel().isActive() && ctx.channel().isWritable()) {
    ctx.writeAndFlush(responseMessage);
} else {
    log.error("message dropped");
}

Netty发送消息的流程

1、调用Channelwrite方法,该方法会将消息加入ChannelOutboundBuffer,此时并没有实际发送,netty会增加该连接发送队列的水位线。
以下是AbstractChannel.java中的代码片段:

public final void write(Object msg, ChannelPromise promise) {
       assertEventLoop();
       ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
       if (outboundBuffer == null) {
           // If the outboundBuffer is null we know the channel was closed and so
           // need to fail the future right away. If it is not null the handling of the rest
           // will be done in flush0()
           // See https://github.com/netty/netty/issues/2362
           safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
           // release message now to prevent resource-leak
           ReferenceCountUtil.release(msg);
           return;
       }
       int size;
       try {
           msg = filterOutboundMessage(msg);
           size = pipeline.estimatorHandle().size(msg);
           if (size < 0) {
               size = 0;
           }
       } catch (Throwable t) {
           safeSetFailure(promise, t);
           ReferenceCountUtil.release(msg);
           return;
       }
       outboundBuffer.addMessage(msg, size, promise);
   }

2、调用Channelflush方法,该方法将ChannelOutboundBuffer中的消息写入内核缓冲区。AbstractChannelHandlerContext.flush方法,准备写入。

public ChannelHandlerContext flush() {
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeFlush();
    } else {
        Runnable task = next.invokeFlushTask;
        if (task == null) {
            next.invokeFlushTask = task = new Runnable() {
                @Override
                public void run() {
                    next.invokeFlush();
                }
            };
        }
        safeExecute(executor, task, channel().voidPromise(), null);
    }
    return this;
}

3、NioSocketChannel.doWrite方法,该方法调用java原生nio将数据写入内核缓冲区。写入完毕,将消息从ChannelOutboundBuffer移除并且减少ChannelOutboundBuffer的水位线。

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        SocketChannel ch = javaChannel();
        int writeSpinCount = config().getWriteSpinCount();
        do {
            if (in.isEmpty()) {
                // All written so clear OP_WRITE
                clearOpWrite();
                // Directly return here so incompleteWrite(...) is not called.
                return;
            }
            // Ensure the pending writes are made of ByteBufs only.
            int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
            ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
            int nioBufferCnt = in.nioBufferCount();
            // Always us nioBuffers() to workaround data-corruption.
            // See https://github.com/netty/netty/issues/2761
            switch (nioBufferCnt) {
                case 0:
                    // We have something else beside ByteBuffers to write so fallback to normal writes.
                    writeSpinCount -= doWrite0(in);
                    break;
                case 1: {
                    // Only one ByteBuf so use non-gathering write
                    // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                    // to check if the total size of all the buffers is non-zero.
                    ByteBuffer buffer = nioBuffers[0];
                    int attemptedBytes = buffer.remaining();
                    final int localWrittenBytes = ch.write(buffer);
                    if (localWrittenBytes <= 0) {
                        incompleteWrite(true);
                        return;
                    }
                    adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                    in.removeBytes(localWrittenBytes);
                    --writeSpinCount;
                    break;
                }
                default: {
                    // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                    // to check if the total size of all the buffers is non-zero.
                    // We limit the max amount to int above so cast is safe
                    long attemptedBytes = in.nioBufferSize();
                    final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                    if (localWrittenBytes <= 0) {
                        incompleteWrite(true);
                        return;
                    }
                    // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
                    adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                            maxBytesPerGatheringWrite);
                    in.removeBytes(localWrittenBytes);
                    --writeSpinCount;
                    break;
                }
            }
        } while (writeSpinCount > 0);
        incompleteWrite(writeSpinCount < 0);
    }

引用:

原文地址:https://www.cnblogs.com/caoweixiong/p/14676840.html