Netty源码分析-ServerBootstrap启动过程

ServerBootStrap启动流程

Tips:代码行后面的数字对应下面中文解释的数字

通过一个简单的示例演示netty server端启动

public static void main(String[] args) throws Exception {
   EventLoopGroup bossGroup = new NioEventLoopGroup(1);// 1
   EventLoopGroup workerGroup = new NioEventLoopGroup();// 2
   final EchoServerHandler serverHandler = new EchoServerHandler();
   try {
       ServerBootstrap b = new ServerBootstrap();// 3
       b.group(bossGroup, workerGroup) //4
          .channel(NioServerSocketChannel.class)//5
          .option(ChannelOption.SO_BACKLOG, 100)//6
          .handler(new LoggingHandler(LogLevel.INFO)) //7
          .childHandler(new ChannelInitializer<SocketChannel>() {//8
               @Override
               public void initChannel(SocketChannel ch) throws Exception {
                   ChannelPipeline p = ch.pipeline();
                   p.addLast(serverHandler);//9
              }
          });
       ChannelFuture f = b.bind(PORT).sync();//10
       // Wait until the server socket is closed.
       f.channel().closeFuture().sync();
  } finally {
       // Shut down all event loops to terminate all threads.
       bossGroup.shutdownGracefully();
       workerGroup.shutdownGracefully();
  }
}

1:定义接收客户端连接的线程池

2:定义处理客户端请求的线程池

3:实例化一个server端启动引导类

4:将boss线程与work线程添加到ServerBootstrap中

5:定义server端的channel类型为NioServerSocketChannel

6:定义Socket为非阻塞

7:NioServerSocketChannel的ChannelPipeline成员添加LoggingHandler处理器

8:NioSocketChannel的ChannelPipeline成员添加ChannelInitializer处理器,当Server端接收到Client连接时,会初始化一个SocketChannel,然后就会回调ChannelInitializer的initChannel方法

9:往NioSocketChannel的ChannelPipeline成员添加业务Handler处理器

10:server绑定端口启动

接下来详细分析一下server端是如何启动的,启动过程中会初始化哪些信息。先看一下AbstractBootStrap#doBind(final SocketAddress localAddress)过程

private ChannelFuture doBind(final SocketAddress localAddress) {
   final ChannelFuture regFuture = initAndRegister();//1
   final Channel channel = regFuture.channel();
   if (regFuture.cause() != null) {
       return regFuture;
  }

   if (regFuture.isDone()) {//2
       ChannelPromise promise = channel.newPromise();
       doBind0(regFuture, channel, localAddress, promise);//3
       return promise;
  } else {
       // Registration future is almost always fulfilled already, but just in case it's not.
       final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
       regFuture.addListener(new ChannelFutureListener() {
           @Override
           public void operationComplete(ChannelFuture future) throws Exception {
               Throwable cause = future.cause();
               if (cause != null) {
                   promise.setFailure(cause);
              } else {
                   promise.registered();

                   doBind0(regFuture, channel, localAddress, promise);
              }
          }
      });
       return promise;
  }
}

1:实例化与初始化Channel信息,并将ServerSocketChannel注册到Selector选择器中

2:判断channel是否注册完成

3:调用ServerSocket.bind()底层方法绑定

注册与初始化channel:AbstractBootstrap#initAndRegister()

final ChannelFuture initAndRegister() {
   Channel channel = null;
   try {
       channel = channelFactory.newChannel(); //1
       init(channel);//2
  } catch (Throwable t) {
       if (channel != null) {
           channel.unsafe().closeForcibly();
           return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
      }
       return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
  }

   ChannelFuture regFuture = config().group().register(channel);//15
   //...省略
   return regFuture;
}

private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();//3

public NioServerSocketChannel() {
   this(newSocket(DEFAULT_SELECTOR_PROVIDER));//4
}

private static ServerSocketChannel newSocket(SelectorProvider provider) {
   try {
       return provider.openServerSocketChannel();//5
  } catch (IOException e) {
       throw new ChannelException(
           "Failed to open a server socket.", e);
  }
}

public NioServerSocketChannel(ServerSocketChannel channel) {
   super(null, channel, SelectionKey.OP_ACCEPT);//6
   config = new NioServerSocketChannelConfig(this, javaChannel().socket());//7
}

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
   super(parent, ch, readInterestOp);
}

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
   super(parent);
   this.ch = ch;//8
   this.readInterestOp = readInterestOp;//9
   try {
       ch.configureBlocking(false);//10
  } catch (IOException e) {
       try {
           ch.close();
      } catch (IOException e2) {
           logger.warn(
               "Failed to close a partially initialized socket.", e2);
      }

       throw new ChannelException("Failed to enter non-blocking mode.", e);
  }
}

protected AbstractChannel(Channel parent) {
   this.parent = parent;//11
   id = newId();//12
   unsafe = newUnsafe();//13
   pipeline = newChannelPipeline();//14
}

1:实例化一个Channel实例,这里的Channel类型就是ServerBootstrap启动时配置的Channel类型,对于ServerBootstrap来说应为NioServerSocketChannel

2:初始化channel一些信息

3:通过java底层的APISelectorProvider.provider()获取一个SelectorProvider实例

5:利用JDK NIO底层api SelectorProvider.openServerSocketChannel()获取ServerSocketChannel实例

8:NioServerSocketChannel对象的成员变量为java底层ServerSocketChannel对象

9:NioServerSocketChannel对象只对SelectionKey.OP_ACCEPT事件感兴趣

10:配置ServerSocketChannel为非阻塞

11:设置当前Channel的父channel,这里父channel默认为空

12:设置一个channel的id

13:设置一个Unsafe对象,用来读写来自客户端的数据

14:设置NioServerSocketChannel接收请求的处理链,这里默认为DefaultChannelPipeline

15:注册当前ServerSocketChannel到Selector选择器中,并且绑定boss线程来进行事件轮询

初始化channel信息ServerBootstrap#init(Channel channel)

void init(Channel channel) {
//1
   setChannelOptions(channel, newOptionsArray(), logger);
   setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));

   ChannelPipeline p = channel.pipeline();

   final EventLoopGroup currentChildGroup = childGroup;
   final ChannelHandler currentChildHandler = childHandler;
   final Entry<ChannelOption<?>, Object>[] currentChildOptions;
   synchronized (childOptions) {
       currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
  }
   final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
//2
   p.addLast(new ChannelInitializer<Channel>() {
       @Override
       public void initChannel(final Channel ch) {
           final ChannelPipeline pipeline = ch.pipeline();
           ChannelHandler handler = config.handler();
           if (handler != null) {
               pipeline.addLast(handler);
          }
           ch.eventLoop().execute(new Runnable() {
               @Override
               public void run() {
                   pipeline.addLast(new ServerBootstrapAcceptor(
                       ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); //3
              }
          });
      }
  });
}

1:还记得在ServerBootstrap启动示例吗,我们会给ServerBootstrap配置一些启动参数,比如handler(),childHandler(),那些参数就是在这里使用的

2:NioServerSocketChannel中的ChannelPipeline添加一个特殊的处理器ChannelInitializer,此时ChannelPipeline链路结构如图所示

3:在接下来的部分会讲,当ServerSockerChannel注册到Selector中完成之后,会触发ChannelPipeline的handlerAdded事件,接着就会调用到2步骤中ChannelInitializer的initChannel方法,并且向ChannelPipeline中添加两个处理器,第一是如果ServerBootstrap配置的handler不为空,则添加。另一个则会添加一个ServerBootstrapAcceptor处理器,当server接收到新的client连接时,则会交给ServerBootstrapAcceptor处理。其实这里就是Doug Lea大师的Nio PPT分享中的Acceptor模块,ServerBootstrapAcceptor中具体细节在之后的《服务端怎么处理客户端连接》章节中分析。此时ChannelPipeline中的结构如下图所示。

当initChannel方法调用完了过后,会把ChannelInitializer从ChannelPipeline中移除掉,此时ChannelPipeline中的结构又变成了如下图所示。

接下来具体看一下Channel是怎么注册的,我们直接定位到AbstractChannel#register0(ChannelPromise promise)方法中,至于是怎么调用到这一步的,可以从上面的initAndRegister()方法的第15个步骤出发

private void register0(ChannelPromise promise) {
   try {
       if (!promise.setUncancellable() || !ensureOpen(promise)) {
           return;
      }
       boolean firstRegistration = neverRegistered;
       doRegister(); //1
       neverRegistered = false;
       registered = true;
       pipeline.invokeHandlerAddedIfNeeded(); //2

       safeSetSuccess(promise);
       pipeline.fireChannelRegistered(); //3
       if (isActive()) { //4
           if (firstRegistration) {
               pipeline.fireChannelActive();//5
          } else if (config().isAutoRead()) {
               beginRead();//6
          }
      }
  } catch (Throwable t) {
       closeForcibly();
       closeFuture.setClosed();
       safeSetFailure(promise, t);
  }
}

1:最终会调用JDK底层的ServerSocketChannel#register()注册方法,将channel注册到Selector选择器中,在NioEventLoop进行事件轮询时,就可以监听到ServerSocketChannel感兴趣的事件。

2:在ChannelPipeline中传递HandlerAdded事件,会调用到ChannelInitializer#initChannel方法进行handler的添加操作

3:在ChannelPipeline中传递Registered事件,首先会传递到HeadContext中,但是HeadContext中基本没做什么特殊处理,然后传递到下一个Handler

4:判断当前是否处于激活状态

5:如果是第一次注册,并且处于激活状态,则传递Active事件

6:如果ServerBootstrap配置的自动读取,则直接开始读取

最后下面是一个ServerBootstrap启动的时序图

 
 
原文地址:https://www.cnblogs.com/jhbbd/p/14279520.html