启动服务(上)服务端:NioServerSocketChannel 是什么时候激活的

启动服务(上)服务端:NioServerSocketChannel 是什么时候激活的

Netty 系列目录(https://www.cnblogs.com/binarylei/p/10117436.html)

本文会从请求处理的角度分析 Netty 源码,包含以下 7 个过程:启动服务、构建连接、接收数据、业务处理、发送数据、断开连接、关闭服务。

Netty 服务端启动,最主要的工作就是绑定端口、构建连接。

1. 主线分析

我们先回顾一下 Server 启动的代码:

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)         // 配置参数Server
  .channel(NioServerSocketChannel.class)
  .childHandler(new ChannelInitializer<SocketChannel>() { ... });

ChannelFuture f = b.bind(8888).sync();  // 启动服务,绑定端口
f.channel().closeFuture().sync();       // 关闭服务

1.1 主线

ServerBootstrap 启动的核心方法是 bind。服务器在启动时主要工作如下:

  1. 初始化:最主要的是初始化并配置 ServerSocketChannel,包括 TCP 参数、Handler 的配置。
  2. 注册:包括注册到 NioEventLoop 和将 Channel 注册到 Selector 两部分工作。需要注意,ServerSocketChannel 注册完成后,不会立刻注册感兴趣的事件来启动任务。
  3. 绑定端口:只有注册完成后才会启动端口绑定,并在端口绑定完成后注册 OP_ACCEPT 启动任务。
  4. 启动任务:在 Selector 上注册 OP_ACCEPT 事件,开始构建连接。

ServerBootstrap#bind 主要的执行过程如下:

ServerBootstrap#bind
    -> initAndRegister
        -> ServerBootstrap#init           # 初始化
        -> AbstractUnsafe#register        # 注册
    -> doBind0
        -> AbstractUnsafe#bind            # 绑定端口
        -> AbstractUnsafe#beginRead       # 启动任务,通过pipeline.fireChannelActive()

以上的初始化启动工作,分别是在 main thread 和 boss thread 两个线程上执行。

  1. main thread 线程

    • 创建 Selector。

    • 创建 ServerSocketChannel,并初始化。

    • 将 Channel 注册到 EventLoopGroup。

  2. boss thread 线程

    • 将 Channel 注册到 EventLoopGroup 对应的 Selector。
    • 绑定地址启动。
    • Channel 将 OP_ACCEPT 事件注册到 Selector。

注意:Netty 为了避免上下文切换,采用了一种局部串行化的执行方式。也就是将任务提交到 Channel 注册的 EventLoopGroup 上执行。对于 ServerSocketChannel 而言就是 EventLoop。

1.2 知识点

(1)启动服务的本质

  • Selector:Selector selector = SelectorProviderImpl.openSelector();
  • ServerSocketChannel:ServerSocketChannel serverSocketChannel = provider.openServerSocketChannel();
  • 注册 Channel 到 Selector:selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
  • 绑定端口:javaChannel().bind(localAddress, config.getBacklog());
  • 注册感兴趣事件到 Selector:javaChannel().bind(localAddress, config.getBacklog());

(2)Selector 创建

每个 NioEventLoop 对应一个自己的 Selector,在创建 NioEventLoop 时会创建 Selector。一旦有 Channel 注册到 NioEventLoop 上,就会通过调用 startThread 方法启动该线程。

(3)感兴趣事件注册时机

  • Channel 注册时都是 0:selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this) 。
  • 感兴趣事件注册:当 pipeline.fireChannelActive() 时,调用 beginRead 注册感兴趣的事件。对于 NioServerSocketChannel 而言,就是注册 OP_ACCEPT 事件,NioSocketChannel 则是 OP_READ 事件。

2. 源码分析

图1:服务端启动流程
  • 初始化:init() 方法,对应第 1~6 步。先通过反射创建 ServerSocketChannel,再通过 init 进行初始化。初始化主要是配置 ServerSocketChannel 的 TCP 参数、附加属性、自定义 Handler、以及构建连接的 ServerBootstrapAcceptor。
  • 注册:register() 方法,对应第 7~10 步。包含到 EventLoop 和 Selector 的注册。其中 doRegister 调用 javaChannal.register() 注册到 Selector。
  • 绑定端口:doBind0() 方法,对应第 11~15 步。绑定端口后,触发 pipeline#fireChannelActive 开始接收连接。其中 doBind 调用 javaChannal.bind() 绑定端口。
  • 启动任务:beginRead() 方法。通过 pipeline#fireChannelActive 方法调用 beginRead,将 OP_ACCEPT 注册到 Selector 上,此时服务端已经启动成功,可以接收客户端的连接。其中 beginRead 调用 selectionKey.interestOps 注册感兴趣的事件。
private ChannelFuture doBind(final SocketAddress localAddress) {
    // 1. 初始化和注册。initAndRegister是异步执行
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    // 2. 绑定端口:注意doBind0必须在initAndRegister执行完成后
    //    doBind0是同步执行,也就是只有javaChannel.bind之后,才会注册OP_ACCEPT事件
    if (regFuture.isDone()) {
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

2.1 初始化

Server 看需要配置的参数有:

  1. 线程池:服务端通常会构建两个线程池。bossGroup 负责接收连接,childGroup 负责处理网络 IO 以及系统 Task。
  2. TCP 参数和其它附加配置。
  3. 自定义 Handler。如日志输出等。
ServerBootstrap#initAndRegister
    -> ChannelFactory#newChannel                            # 通过反射创建连接
    -> init
        -> setChannelOptions                                # TCP参数设置
        -> setAttributes                                    # Channel附加属性设置
        -> ChannelPipeline#addLast(handler)                 # ServerChannel配置的Handler
        -> ChannelPipeline#addLast(ServerBootstrapAcceptor) # 接收连接的Handler

说明: initAndRegister 先通过反射创建 ServerSocketChannel,再通过 init 进行初始化。初始化主要是配置 ServerSocketChannel 的 TCP 参数、附加属性、自定义 Handler、以及接收连接的 Handler。

void init(Channel channel) throws Exception {
    // 1. 配置 TCP 参数和附加属性
    setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
    setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));
    ...
 
    // 2. 配置Handler,其中ServerBootstrapAcceptor为接收客户端的Handler
    pipeline.addLast(config.handler());
    pipeline.addLast(new ServerBootstrapAcceptor(
        channel, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}

说明:ServerBootstrapAcceptor 是专门用于接收客户端请求的 Handler。pipeline.addLast 添加 ChannelInitializer 时,ChannelInitializer 执行 initChannel 方法后,会将 ChannelInitializer 自身从 pipeline 中移除。Hander 执行过程如下:

思考1:ServerSocketChannel 创建为什么要用 provider.openServerSocketChannel()?

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    // ServerSocketChannel.open() 每秒创建 5000 个连接时性能会下将 1%
    // https://github.com/netty/netty/issues/2308
    return provider.openServerSocketChannel();
}

2.2 注册

ServerBootstrap#initAndRegister
    -> EventLoopGroup#register
        -> SingleThreadEventLoop#register
           -> AbstractUnsafe#register       √

说明: 其实 Channel 注册包含以下几件事:

  1. 绑定线程:将 Channel 注册到 EventLoop。

  2. 绑定 Selector:将 Channel 注册到 EventLoop 对应的 Selector。

  3. 激活 Channel:如果 Channel 已经是 Active 状态,注册感兴趣的事件到 Selector 上,启动任务。需要注意的是,对于 ServerSocket 而言,此时不可能是 Active 状态,只有 bind 成功后才会注册 OP_ACCEPT 来接收客户端连接。

config().group().register(channel)

注意:这里的 config().group() 是 bossGroup。Channel 是如何注册到 NioEventLoopGroup 是的详见:https://www.cnblogs.com/binarylei/p/10135712.html

2.3 绑定端口

ServerBootstrap#doBind0
    -> AbstractChannel#bind
        -> AbstractUnsafe#bind                          √
            -> NioServerSocketChannel#doBind
            -> DefaultChannelPipeline#fireChannelActive
                -> AbstractUnsafe#beginRead             √

说明: Channel 绑定端口成功后会通过 pipeline#fireChannelActive 方法调用 beginRead,将 OP_ACCEPT 注册到 Selector 上,此时服务端已经启动成功,可以接收客户端的连接。

// AbstractUnsafe#bind
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    doBind(localAddress);          // javaChannel.bind()
    pipeline.fireChannelActive();  // beginRead()
}

NioServerSocketChannel#doBind 则会调用最底层的 javaChannel.bind() 绑定端口。

2.4 激活 Channel

pipeline#fireChannelActive[tail...head] -> head#readIfIsAutoRead
    -> channel#read -> pipeline#read[head...tail] -> head#read
        -> AbstractUnsafe#beginRead
            -> AbstractNioChannel#doBeginRead

说明:pipeline#fireChannelActive 最终会调用 doBeginRead 方法,将 OP_ACCEPT 注册到 Selector 上,从而启动服务。

// HeadContext#readIfIsAutoRead
private void readIfIsAutoRead() {
    if (channel.config().isAutoRead()) {
        channel.read();
    }
}

说明: DefaultChannelConfig 中所有的 Channel 的 autoRead 属性都默认是 1,也就是 true。channel.read 通过 pipeline 从 tail -> head 传播,调用 head.read 方法时最终调用 unsafe.beginRead()。

unsafe.beginRead 直接调用 doBeginRead 方法激活 Channel。事实上,也就是将感兴趣的事件注册到 Selector 上。对于 NioServerSocketChannel 而言,就是注册 OP_ACCEPT 事件,NioSocketChannel 则是 OP_READ 事件。

@Override
protected void doBeginRead() throws Exception {
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

思考1:channel.read()、pipeline.read()、ctx.read()、unsafe.read() 区别?

  • channel.read():直接调用 pipeline.read()。
  • pipeline.read():调用 tail.read(),从 head 或 tail 经历全部的 Handler。实际上,最后的 head.read() 调用 unsafe.beginRead(),这个方法会注册 OP_ACCEPT 或 OP_READ 事件从而激活 Channel。
  • ctx.read():从当前 ctx 开始之后的全部的 Handler。如果发送数据,需要使用 ctx.write 而不是 ctx.channel().write。
  • unsafe.read():最底层的 API。和 unsafe.beginRead() 不同,unsafe#read 会真正从 socket revbuf 读取数据。

每天用心记录一点点。内容也许不重要,但习惯很重要!

原文地址:https://www.cnblogs.com/binarylei/p/10040765.html