workerGroup注册NioSocketChannel

workerGroup注册NioSocketChannel

 

书接上文,ServerBootstrapAcceptor的channelRead(ChannelHandlerContext ctx, Object msg)方法将NioSocketChannel注册到workerGroup。

public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);

            setChannelOptions(child, childOptions, logger);
            setAttributes(child, childAttrs);

            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

注册的是NioSocketChannel类型

 

而bossGroup注册的是NioServerSocketChannel

 此参数是在NioServerSocketChannel的doReadMessages(List<Object> buf)方法中初始化。

 childGroup.register(child)内部实现和bossGroup一样,也是从workerGroup中选一个NioEventLoop进行注册。

可看到workerGroup的线程已开启,2-1为bossGroup,3-1为workerGroup

然后就是workerGroup的线程执行注册任务,后续操作和bossGroup类似。注册选择器,执行pipeline.invokeHandlerAddedIfNeeded();方法,执行我们自定义的childHandler的handlerAdded方法,执行ChannelInitializer的initChannel方法,将我们自定义的处理器添加进去。然后执行相应的handlerAdded,channelRegistered,channelActive方法。完成任务之后,就进入select阻塞了。

 读取消息

跟之前一样,接收到消息,调用processSelectedKeys等后续方法

public final void read() {
            final ChannelConfig config = config();
            if (shouldBreakReadReady(config)) {
                clearReadPending();
                return;
            }
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();//字节缓冲区分配器
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        }
                        break;
                    }

                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);//传递事件
                    byteBuf = null;
                } while (allocHandle.continueReading());

                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }

doReadBytes(byteBuf)

protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();//获取分配处理器
        allocHandle.attemptedBytesRead(byteBuf.writableBytes());//设置可写的字节
        return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());//读取通道的数据,写入字节缓冲区
    }

writeBytes(ScatteringByteChannel in, int length) :向缓冲区写入数据

public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
        ensureWritable(length);
        int writtenBytes = setBytes(writerIndex, in, length);
        if (writtenBytes > 0) {
            writerIndex += writtenBytes;
        }
        return writtenBytes;
    }

PooledByteBuf.setBytes(int index, ScatteringByteChannel in, int length)

in.read:通道从底层去读取socket缓冲区的数据到字节缓冲区里:

public final int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
        try {
            return in.read(internalNioBuffer(index, length));
        } catch (ClosedChannelException ignored) {
            return -1;
        }
    }

PooledByteBuf.internalNioBuffer(int index, int length):返回ByteBuffer ,底层封装了ByteBuffer

public final ByteBuffer internalNioBuffer(int index, int length) {
        checkIndex(index, length);
        return _internalNioBuffer(index, length, false);
    }

_internalNioBuffer(int index, int length, boolean duplicate)

final ByteBuffer _internalNioBuffer(int index, int length, boolean duplicate) {
        index = idx(index);
        ByteBuffer buffer = duplicate ? newInternalNioBuffer(memory) : internalNioBuffer();
        buffer.limit(index + length).position(index);
        return buffer;
    }

直接缓冲区DirectByteBuffer,少了一次从内核到用户空间的数据拷贝

 pipeline.fireChannelRead(byteBuf);

把缓冲区数据传递到管道里,让处理器处理,传递到我自定义的处理器中,读取出来:

 客户端:

原文地址:https://www.cnblogs.com/xiaojiesir/p/15387085.html