sofa-bolt源码阅读(1)-服务端的启动

Bolt服务器的核心类是RpcServer,启动的时候调用父类AbstractRemotingServer的startup方法。

com.alipay.remoting.AbstractRemotingServer#startup    
	@Override
    public void startup() throws LifeCycleException {
        super.startup();

        try {
            doInit();

            logger.warn("Prepare to start server on port {} ", port);
            if (doStart()) {
                logger.warn("Server started on port {}", port);
            } else {
                logger.warn("Failed starting server on port {}", port);
                throw new LifeCycleException("Failed starting server on port: " + port);
            }
        } catch (Throwable t) {
            this.shutdown();// do stop to ensure close resources created during doInit()
            throw new IllegalStateException("ERROR: Failed to start the Server!", t);
        }
    }

这里主要做了三件事

  1. 调用父类的startup()方法设置状态为启动

     com.alipay.remoting.AbstractLifeCycle#startup  
    	@Override
        public void startup() throws LifeCycleException {
            if (isStarted.compareAndSet(false, true)) {
                return;
            }
            throw new LifeCycleException("this component has started");
        }
    
  2. 调用实现类的doInit()进行实际的初始化工作

    	com.alipay.remoting.rpc.RpcServer#doInit
            
    	@Override
        protected void doInit() {
            if (this.addressParser == null) {
                this.addressParser = new RpcAddressParser();
            }
            if (this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
                // in server side, do not care the connection service state, so use null instead of global switch
                ConnectionSelectStrategy connectionSelectStrategy = new RandomSelectStrategy(null);
                this.connectionManager = new DefaultServerConnectionManager(connectionSelectStrategy);
                this.connectionManager.startup();
    
                this.connectionEventHandler = new RpcConnectionEventHandler(switches());
                this.connectionEventHandler.setConnectionManager(this.connectionManager);
                this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
            } else {
                this.connectionEventHandler = new ConnectionEventHandler(switches());
                this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
            }
            initRpcRemoting();
            this.bootstrap = new ServerBootstrap();
            this.bootstrap.group(bossGroup, workerGroup)
                .channel(NettyEventLoopUtil.getServerSocketChannelClass())
                .option(ChannelOption.SO_BACKLOG, ConfigManager.tcp_so_backlog())
                .option(ChannelOption.SO_REUSEADDR, ConfigManager.tcp_so_reuseaddr())
                .childOption(ChannelOption.TCP_NODELAY, ConfigManager.tcp_nodelay())
                .childOption(ChannelOption.SO_KEEPALIVE, ConfigManager.tcp_so_keepalive());
    
            // set write buffer water mark
            initWriteBufferWaterMark();
    
            // init byte buf allocator
            if (ConfigManager.netty_buffer_pooled()) {
                this.bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            } else {
                this.bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT)
                    .childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
            }
    
            // enable trigger mode for epoll if need
            NettyEventLoopUtil.enableTriggeredMode(bootstrap);
    
            final boolean idleSwitch = ConfigManager.tcp_idle_switch();
            final int idleTime = ConfigManager.tcp_server_idle();
            final ChannelHandler serverIdleHandler = new ServerIdleHandler();
            final RpcHandler rpcHandler = new RpcHandler(true, this.userProcessors);
            this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    
                @Override
                protected void initChannel(SocketChannel channel) {
                    ChannelPipeline pipeline = channel.pipeline();
                    pipeline.addLast("decoder", codec.newDecoder());
                    pipeline.addLast("encoder", codec.newEncoder());
                    if (idleSwitch) {
                        pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTime,
                            TimeUnit.MILLISECONDS));
                        pipeline.addLast("serverIdleHandler", serverIdleHandler);
                    }
                    pipeline.addLast("connectionEventHandler", connectionEventHandler);
                    pipeline.addLast("handler", rpcHandler);
                    createConnection(channel);
                }
    
                /**
                 * create connection operation<br>
                 * <ul>
                 * <li>If flag manageConnection be true, use {@link DefaultConnectionManager} to add a new connection, meanwhile bind it with the channel.</li>
                 * <li>If flag manageConnection be false, just create a new connection and bind it with the channel.</li>
                 * </ul>
                 */
                private void createConnection(SocketChannel channel) {
                    Url url = addressParser.parse(RemotingUtil.parseRemoteAddress(channel));
                    if (switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
                        connectionManager.add(new Connection(channel, url), url.getUniqueKey());
                    } else {
                        new Connection(channel, url);
                    }
                    channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
                }
            });
        }
    

    这里的代码看似很复杂,其实主要是配置Netty服务器。Netty服务器的可配置选项通过ConfigManager来获取,Netty的业务处理在childHandler里面。

    protected void initChannel(SocketChannel channel) {
                    ChannelPipeline pipeline = channel.pipeline();
                    pipeline.addLast("decoder", codec.newDecoder());
                    pipeline.addLast("encoder", codec.newEncoder());
                    if (idleSwitch) {
                        pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTime,
                            TimeUnit.MILLISECONDS));
                        pipeline.addLast("serverIdleHandler", serverIdleHandler);
                    }
                    pipeline.addLast("connectionEventHandler", connectionEventHandler);
                    pipeline.addLast("handler", rpcHandler);
                    createConnection(channel);
                }
    

    初始化channel的方法里面有6个channelHandler

    1. decoder 解码器

    2. encoder 编码器

    3. idleStateHandler

      Netty自带的空闲处理器,用于触发IdleStateEvent。在这里配置了总空闲时间idleTime(默认值是90000),即idleTime时间内如果通道没有发生读写操作,将出发一个IdleStateEvent事件。

    4. serverIdleHandler

      配合idleStateHandler使用,用来处理IdleStateEvent。当触发该时间后,关闭客户端连接

          @Override
          public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
              if (evt instanceof IdleStateEvent) {
                  try {
                      logger.warn("Connection idle, close it from server side: {}",
                          RemotingUtil.parseRemoteAddress(ctx.channel()));
                      ctx.close();
                  } catch (Exception e) {
                      logger.warn("Exception caught when closing connection in ServerIdleHandler.", e);
                  }
              } else {
                  super.userEventTriggered(ctx, evt);
              }
          }
      
    5. connectionEventHandler

      connect事件处理器,类型由枚举类ConnectionEventType定义

      public enum ConnectionEventType {
          CONNECT, CLOSE, EXCEPTION;
      }
      
      • CONNECT

        connect事件在RpcServer.createConnection()方法里触发了一次

      • CLOSE

        close事件在连接断开时会触发

        com.alipay.remoting.ConnectionEventHandler
        
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            String remoteAddress = RemotingUtil.parseRemoteAddress(ctx.channel());
            infoLog("Connection channel inactive: {}", remoteAddress);
            super.channelInactive(ctx);
            Attribute attr = ctx.channel().attr(Connection.CONNECTION);
            if (null != attr) {
                // add reconnect task
                if (this.globalSwitch != null
                    && this.globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
                    Connection conn = (Connection) attr.get();
                    if (reconnectManager != null) {
                        reconnectManager.reconnect(conn.getUrl());
                    }
                }
                // trigger close connection event
                onEvent((Connection) attr.get(), remoteAddress, ConnectionEventType.CLOSE);
            }
        }
        
      • EXCEPTION

        exception异常事件在源代码里面没有触发的地方,应该是预留的。

      无论是什么事件,最终都调用onEvent方法,转发给ConnectionEventListener,最终由用户自定义的ConnectionEventProcessor来处理具体逻辑

      com.alipay.remoting.ConnectionEventHandler#onEvent
          
      private void onEvent(final Connection conn, final String remoteAddress,
                           final ConnectionEventType type) {
          if (this.eventListener != null) {
              this.eventExecutor.onEvent(new Runnable() {
                  @Override
                  public void run() {
                      ConnectionEventHandler.this.eventListener.onEvent(type, remoteAddress, conn);
                  }
              });
          }
      }
      
      
      com.alipay.remoting.ConnectionEventListener#onEvent
      public void onEvent(ConnectionEventType type, String remoteAddress, Connection connection) {
              List<ConnectionEventProcessor> processorList = this.processors.get(type);
              if (processorList != null) {
                  for (ConnectionEventProcessor processor : processorList) {
                      processor.onEvent(remoteAddress, connection);
                  }
              }
          }
      
    6. handler

      具体业务处理器,注册类为RpcHandler,调用流程图如下

      对于客户端请求的处理交给UserProcessor, 可以调用RpcServer类的registerUserProcessor注册自定义的业务。

  3. 调用dostart启动服务器

    com.alipay.remoting.rpc.RpcServer#doStart
        
    @Override
    protected boolean doStart() throws InterruptedException {
        this.channelFuture = this.bootstrap.bind(new InetSocketAddress(ip(), port())).sync();
        return this.channelFuture.isSuccess();
    }
    
原文地址:https://www.cnblogs.com/huiyao/p/12396122.html