java netty之ServerBootstrap的启动

通过前面的几篇文章,对整个netty部分的架构已经运行原理都有了一定的了解,那么这篇文章来分析一个经常用到的类:ServerBootstrap,一般对于服务器端的编程它用到的都还算是比较的多。。看一看它的初始化,以及它的运行原理。。。

首先我们还是引入一段代码,通过分析这段代码来分析ServerBootstrap的运行。。。

[java] view plain copy
 
  1. EventLoopGroup bossGroup = new NioEventLoopGroup();   //这个是用于serversocketchannel的eventloop  
  2.         EventLoopGroup workerGroup = new NioEventLoopGroup();    //这个是用于处理accept到的channel  
  3.         try {  
  4.             ServerBootstrap b = new ServerBootstrap();    //构建serverbootstrap对象  
  5.             b.group(bossGroup, workerGroup);   //设置时间循环对象,前者用来处理accept事件,后者用于处理已经建立的连接的io  
  6.             b.channel(NioServerSocketChannel.class);   //用它来建立新accept的连接,用于构造serversocketchannel的工厂类  
  7.             b.childHandler(new ChannelInitializer<SocketChannel>(){      //为accept channel的pipeline预添加的inboundhandler  
  8.                 @Override     //当新连接accept的时候,这个方法会调用  
  9.                 protected void initChannel(SocketChannel ch) throws Exception {  
  10.                     // TODO Auto-generated method stub  
  11.                     ch.pipeline().addLast(new MyChannelHandler());   //为当前的channel的pipeline添加自定义的处理函数  
  12.                 }  
  13.                   
  14.             });  
  15.             //bind方法会创建一个serverchannel,并且会将当前的channel注册到eventloop上面,  
  16.             //会为其绑定本地端口,并对其进行初始化,为其的pipeline加一些默认的handler  
  17.             ChannelFuture f = b.bind(80).sync();      
  18.             f.channel().closeFuture().sync();  //相当于在这里阻塞,直到serverchannel关闭  
  19.         } finally {  
  20.             bossGroup.shutdownGracefully();  
  21.             workerGroup.shutdownGracefully();  
  22.         }  

这段代码在前面的文章也有用到,基本上其意思也都在上面的注释中说的比较清楚了,那么我们接下来具体的分析其中的方法调用,首先是ServerBootstrap的group方法:

[java] view plain copy
 
  1. //这里parent用于执行server的accept时间事件,child才是用于执行获取的channel连接的事件  
  2. public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {  
  3.     super.group(parentGroup);  
  4.     if (childGroup == null) {  
  5.         throw new NullPointerException("childGroup");  
  6.     }  
  7.     if (this.childGroup != null) {  
  8.         throw new IllegalStateException("childGroup set already");  
  9.     }  
  10.     this.childGroup = childGroup;  
  11.     return this;  
  12. }  

这个方法是用来设置eventloopgroup,首先调用了父类的group方法(abstractbootstrap),就不将父类的方法列出来了,其实意思都差不多,eventloopgroup属性的值。。。

好了,接下来我们再来看一下channel方法:

[java] view plain copy
 
  1. //构造serversocketchannel factory  
  2. public B channel(Class<? extends C> channelClass) {  
  3.     if (channelClass == null) {  
  4.         throw new NullPointerException("channelClass");  
  5.     }  
  6.     return channelFactory(new BootstrapChannelFactory<C>(channelClass));  //构造工厂类  
  7. }  
  8.   
  9. /** 
  10.  * {@link ChannelFactory} which is used to create {@link Channel} instances from 
  11.  * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)} 
  12.  * is not working for you because of some more complex needs. If your {@link Channel} implementation 
  13.  * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for 
  14.  * simplify your code. 
  15.  */  
  16. @SuppressWarnings("unchecked")  
  17. public B channelFactory(ChannelFactory<? extends C> channelFactory) {  
  18.     if (channelFactory == null) {  
  19.         throw new NullPointerException("channelFactory");  
  20.     }  
  21.     if (this.channelFactory != null) {  
  22.         throw new IllegalStateException("channelFactory set already");  
  23.     }  
  24.   
  25.     this.channelFactory = channelFactory;   //设置  
  26.     return (B) this;  
  27. }  

该方法主要是用于构造用于产生channel的工厂类,在我们这段代码说白了就是用于实例化serversocketchannel的工厂类。。。

接下来我们再来看一下childHandler方法:

[java] view plain copy
 
  1. //设置childHandler,这个是当有channel accept之后为其添加的handler  
  2. public ServerBootstrap childHandler(ChannelHandler childHandler) {  
  3.     if (childHandler == null) {  
  4.         throw new NullPointerException("childHandler");  
  5.     }  
  6.     this.childHandler = childHandler;  
  7.     return this;  
  8. }  

这个很简单吧,就是一个赋值,具体说他有什么用,前面的注释有说明,不过以后的分析会说明它有什么用的。。。

接下来我们来看一下bind方法,这个比较重要吧:

[java] view plain copy
 
  1. //最终将会创建serverchannel,然后会将其绑定到这个地址,然后对其进行初始化  
  2. public ChannelFuture bind(int inetPort) {  
  3.     return bind(new InetSocketAddress(inetPort));  
  4. }  

好吧,接下来再来看bind方法:

[java] view plain copy
 
  1. public ChannelFuture bind(SocketAddress localAddress) {  
  2.     validate();  
  3.     if (localAddress == null) {  
  4.         throw new NullPointerException("localAddress");  
  5.     }  
  6.     return doBind(localAddress);  
  7. }  

好吧,再来看看doBind方法:

[java] view plain copy
 
  1. private ChannelFuture doBind(final SocketAddress localAddress) {  
  2.     final ChannelFuture regPromise = initAndRegister();   //在这里创建serverchanel,并对其进行初始化,并将其注册到eventloop当中去  
  3.     final Channel channel = regPromise.channel();  
  4.     final ChannelPromise promise = channel.newPromise();  
  5.     if (regPromise.isDone()) {  
  6.         doBind0(regPromise, channel, localAddress, promise);   //将当前的serverchannel绑定地址  
  7.     } else {  
  8.         regPromise.addListener(new ChannelFutureListener() {  
  9.             @Override  
  10.             public void operationComplete(ChannelFuture future) throws Exception {  
  11.                 doBind0(future, channel, localAddress, promise);  
  12.             }  
  13.         });  
  14.     }  
  15.   
  16.     return promise;  
  17. }  

这里调用了一个比较重要的方法:initAndRegister,我们来看看它的定义:

[java] view plain copy
 
  1. //创建初始化以及注册serverchanel  
  2.     final ChannelFuture initAndRegister() {  
  3.         //利用工厂类创建channel  
  4.         final Channel channel = channelFactory().newChannel();  
  5.         try {  
  6.             init(channel);  //init函数留给了后面来实现,用于初始化channel,例如为其的pipeline加上handler  
  7.         } catch (Throwable t) {  
  8.             channel.unsafe().closeForcibly();  
  9.             return channel.newFailedFuture(t);  
  10.         }  
  11.   
  12.         ChannelPromise regPromise = channel.newPromise();  
  13.         group().register(channel, regPromise);  //将当前创建的serverchannel注册到eventloop上面去  
  14.         if (regPromise.cause() != null) {  
  15.             if (channel.isRegistered()) {  
  16.                 channel.close();  
  17.             } else {  
  18.                 channel.unsafe().closeForcibly();  
  19.             }  
  20.         }  
  21.   
  22.         // If we are here and the promise is not failed, it's one of the following cases:  
  23.         // 1) If we attempted registration from the event loop, the registration has been completed at this point.  
  24.         //    i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.  
  25.         // 2) If we attempted registration from the other thread, the registration request has been successfully  
  26.         //    added to the event loop's task queue for later execution.  
  27.         //    i.e. It's safe to attempt bind() or connect() now:  
  28.         //         because bind() or connect() will be executed *after* the scheduled registration task is executed  
  29.         //         because register(), bind(), and connect() are all bound to the same thread.  
  30.   
  31.         return regPromise;  
  32.     }  

代码还是很简单,而且也相对比较好理解,无非就是利用前面说到过的channel工厂类来创建一个serversocketchannel,然后调用init方法对这个刚刚生成的channel进行一些初始化的操作,然后在调用eventloopgroup的register方法,将当前这个channel的注册到group上,那么以后这个channel的事件都在这个group上面执行,说白了也就是一些accept。、。。

好,我们先来看看这个init方法吧:

[java] view plain copy
 
  1. @Override  
  2. //初始化chanel,当用channel factory构造channel以后,会调用这个函数来初始化,说白了就是为当前的channel的pipeline加入一些handler  
  3. void init(Channel channel) throws Exception {  
  4.     //先初始化一些配置  
  5.     final Map<ChannelOption<?>, Object> options = options();  
  6.     synchronized (options) {  
  7.         channel.config().setOptions(options);  
  8.     }  
  9.    //初始化一些属性  
  10.     final Map<AttributeKey<?>, Object> attrs = attrs();  
  11.     synchronized (attrs) {  
  12.         for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {  
  13.             @SuppressWarnings("unchecked")  
  14.             AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();  
  15.             channel.attr(key).set(e.getValue());  
  16.         }  
  17.     }  
  18.   
  19.     //获取当前channel的pipeline  
  20.     ChannelPipeline p = channel.pipeline();  
  21.     if (handler() != null) {  
  22.         p.addLast(handler());  
  23.     }  
  24.   
  25.     final EventLoopGroup currentChildGroup = childGroup;  
  26.     final ChannelHandler currentChildHandler = childHandler;  
  27.     final Entry<ChannelOption<?>, Object>[] currentChildOptions;  
  28.     final Entry<AttributeKey<?>, Object>[] currentChildAttrs;  
  29.     synchronized (childOptions) {  
  30.         currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));  
  31.     }  
  32.     synchronized (childAttrs) {  
  33.         currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));  
  34.     }  
  35.   
  36.     p.addLast(new ChannelInitializer<Channel>() {  
  37.         @Override  
  38.         public void initChannel(Channel ch) throws Exception {  
  39.             //这是一个inboundher,将其加入到serverchannel的pipeline上面去  
  40.             ch.pipeline().addLast(new ServerBootstrapAcceptor(  
  41.                     currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));  
  42.         }  
  43.     });  
  44. }  

代码还是相对很简单,首先初始化一些配置参数,然后初始化属性,最后还要为当前的channel的pipeline添加一个handler,这个handler用来当channel注册到eventloop上面之后对其进行一些初始化,我们还是来看看channelInitalizer的定义吧:

[java] view plain copy
 
  1. public abstract class ChannelInitializer<C extends Channel> extends ChannelStateHandlerAdapter {  
  2.   
  3.     private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);  
  4.   
  5.     /** 
  6.      * This method will be called once the {@link Channel} was registered. After the method returns this instance 
  7.      * will be removed from the {@link ChannelPipeline} of the {@link Channel}. 
  8.      * 
  9.      * @param ch            the {@link Channel} which was registered. 
  10.      * @throws Exception    is thrown if an error occours. In that case the {@link Channel} will be closed. 
  11.      */  
  12.     protected abstract void initChannel(C ch) throws Exception;  
  13.   
  14.     @SuppressWarnings("unchecked")  
  15.     @Override  
  16.     public final void channelRegistered(ChannelHandlerContext ctx)  
  17.             throws Exception {  
  18.         boolean removed = false;  
  19.         boolean success = false;  
  20.         try {  
  21.             //调用用户定义的init函数对当前的channel进行初始化  
  22.             initChannel((C) ctx.channel());  
  23.             ctx.pipeline().remove(this);  
  24.             removed = true;  
  25.             ctx.fireChannelRegistered();  
  26.             success = true;  
  27.         } catch (Throwable t) {  
  28.             logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);  
  29.         } finally {  
  30.             if (!removed) {  
  31.                 ctx.pipeline().remove(this);  
  32.             }  
  33.             if (!success) {  
  34.                 ctx.close();  
  35.             }  
  36.         }  
  37.     }  
  38.   
  39.     @Override  
  40.     public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {  
  41.         ctx.fireInboundBufferUpdated();  
  42.     }  
  43. }  

它有一个channelRegistered方法,这个方法是在当前pipeline所属的channel注册到eventloop上面之后会激活的方法,它则是调用了用户自定义的函数来初始化channel,然后在将当前handler移除。。。也就是执行

[java] view plain copy
 
  1. ch.pipeline().addLast(new ServerBootstrapAcceptor(  
  2.                        currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));  

这里又为当前的serversocketchannel添加了另外一个handler,来看看该类型的定义吧:

[java] view plain copy
 
  1. private static class ServerBootstrapAcceptor  
  2.         extends ChannelStateHandlerAdapter implements ChannelInboundMessageHandler<Channel> {  
  3.   
  4.     private final EventLoopGroup childGroup;  
  5.     private final ChannelHandler childHandler;  
  6.     private final Entry<ChannelOption<?>, Object>[] childOptions;  
  7.     private final Entry<AttributeKey<?>, Object>[] childAttrs;  
  8.   
  9.     @SuppressWarnings("unchecked")  
  10.     ServerBootstrapAcceptor(  
  11.             EventLoopGroup childGroup, ChannelHandler childHandler,  
  12.             Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {  
  13.         this.childGroup = childGroup;  //这个是用于管理accept的channel的eventloop  
  14.         this.childHandler = childHandler;  
  15.         this.childOptions = childOptions;  
  16.         this.childAttrs = childAttrs;  
  17.     }  
  18.   
  19.     @Override  
  20.     public MessageBuf<Channel> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {  
  21.         return Unpooled.messageBuffer();  
  22.     }  
  23.   
  24.     @Override  
  25.     @SuppressWarnings("unchecked")  
  26.     //当有数据进来的时候,会调用这个方法来处理数据,这里进来的数据就是accept的channel  
  27.     public void inboundBufferUpdated(ChannelHandlerContext ctx) {  
  28.         MessageBuf<Channel> in = ctx.inboundMessageBuffer(); //获取buf  
  29.         for (;;) {  
  30.             Channel child = in.poll();  
  31.             if (child == null) {  
  32.                 break;  
  33.             }  
  34.   
  35.             child.pipeline().addLast(childHandler);   //为accept的channel的pipeline加入用户定义的初始化handler  
  36.   
  37.             for (Entry<ChannelOption<?>, Object> e: childOptions) {  
  38.                 try {  
  39.                     if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {  
  40.                         logger.warn("Unknown channel option: " + e);  
  41.                     }  
  42.                 } catch (Throwable t) {  
  43.                     logger.warn("Failed to set a channel option: " + child, t);  
  44.                 }  
  45.             }  
  46.   
  47.             for (Entry<AttributeKey<?>, Object> e: childAttrs) {  
  48.                 child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());  
  49.             }  
  50.   
  51.             try {  
  52.                 childGroup.register(child);   //将当前accept的channel注册到eventloop  
  53.             } catch (Throwable t) {  
  54.                 child.unsafe().closeForcibly();  
  55.                 logger.warn("Failed to register an accepted channel: " + child, t);  
  56.             }  
  57.         }  
  58.     }  
  59.   
  60.     @Override  
  61.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  
  62.         final ChannelConfig config = ctx.channel().config();  
  63.         if (config.isAutoRead()) {  
  64.             // stop accept new connections for 1 second to allow the channel to recover  
  65.             // See https://github.com/netty/netty/issues/1328  
  66.             config.setAutoRead(false);  
  67.             ctx.channel().eventLoop().schedule(new Runnable() {  
  68.                 @Override  
  69.                 public void run() {  
  70.                    config.setAutoRead(true);  
  71.                 }  
  72.             }, 1, TimeUnit.SECONDS);  
  73.         }  
  74.         // still let the exceptionCaught event flow through the pipeline to give the user  
  75.         // a chance to do something with it  
  76.         ctx.fireExceptionCaught(cause);  
  77.     }  
  78. }  

主要是有一个比较重要的方法,inboundBufferUpdated,这个方法是在有数据进来的时候会调用的,用于处理进来的数据,也就是accept到的channel,这里就知道我们定义的chidHandler的用处了吧,netty会将这个handler直接加入到刚刚accept到的channel的pipeline上面去。。。最后还要讲当前accept到的channel注册到child eventloop上面去,这里也就完完全全的明白了最开始定义的两个eventloopgroup的作用了。。。

好了,serversocketchannel的init以及register差不多了,然后会调用doBind0方法,将当前的serversocketchannel绑定到一个本地端口,

[java] view plain copy
 
  1. //将chanel绑定到一个本地地址  
  2.     private static void doBind0(  
  3.             final ChannelFuture regFuture, final Channel channel,  
  4.             final SocketAddress localAddress, final ChannelPromise promise) {  
  5.   
  6.         // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up  
  7.         // the pipeline in its channelRegistered() implementation.  
  8.   
  9.         channel.eventLoop().execute(new Runnable() {  
  10.             @Override  
  11.             //匿名内部类想要访问外面的参数,那么外面的参数必须是要final的才行  
  12.             public void run() {  
  13.                 if (regFuture.isSuccess()) {  
  14.                     //调用channel的bind方法,将当前的channl绑定到一个本地地址,其实是调用的是pipeline的bind方法,但是最终又是调用的当前  
  15.                     //channel的unsafe对象的bind方法  
  16.                     channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);  
  17.                 } else {  
  18.                     promise.setFailure(regFuture.cause());  
  19.                 }  
  20.             }  
  21.         });  
  22.     }  

其实这里调用bind方法最终还是调用serversocketchannel的unsafe对象的bind方法。。。。

到这里,整个serverbootstrap 就算初始化完成了,而且也可以开始运行了。。。

[java] view plain copy
 
  1. b.childHandler(new ChannelInitializer<SocketChannel>(){      //为accept channel的pipeline预添加的inboundhandler  
  2.                 @Override     //当新连接accept的时候,这个方法会调用  
  3.                 protected void initChannel(SocketChannel ch) throws Exception {  
  4.                     // TODO Auto-generated method stub  
  5.                     ch.pipeline().addLast(new MyChannelHandler());   //为当前的channel的pipeline添加自定义的处理函数  
  6.                 }  
  7.                   
  8.             });  

这段代码的意思是对于刚刚accept到的channel,将会在它的pipeline上面添加handler,这个handler的用处主要是就是用户自定义的initChannel方法,就是初始化这个channel,说白了就是为它的pipeline上面添加自己定义的handler。。。

这样整个serverbootstrap是怎么运行的也就差不多了。。。

刚开始接触到netty的时候觉得这里一头雾水,通过这段时间对其代码的阅读,总算搞懂了其整个运行的原理,而且觉得其设计还是很漂亮的,虽然有的时候会觉得有那么一点点的繁琐。。。。

整个运行过程总结为一下几个步骤:

(1)创建用于两个eventloopgroup对象,一个用于管理serversocketchannel,一个用于管理accept到的channel

(2)创建serverbootstrap对象,

(3)设置eventloopgroup

(4)创建用于构建用到的channel的工厂类

(5)设置childhandler,它的主要功能主要是用户定义代码来初始化accept到的channel

(6)创建serversocketchannel,并对它进行初始化,绑定端口,以及register,并为serversocketchannel的pipeline设置默认的handler

通过这几个步骤,整个serverbootstrap也就算是运行起来了。。。

原文地址:https://www.cnblogs.com/405845829qq/p/5195966.html