Netty:EventLoopGroup

Netty:EventLoopGroup

http://blog.csdn.net/bdmh/article/details/49945765

Group:群组,Loop:循环,Event:事件,这几个东西联在一起,相比大家也大概明白它的用途了。

Netty内部都是通过线程在处理各种数据,EventLoopGroup就是用来管理调度他们的,注册Channel,管理他们的生命周期,下面就来看看EventLoopGroup是怎样工作的。

Netty框架初探中,当我们启动客户端或者服务端时,都要声明一个Group对象

[java] view plain copy
 
  1. EventLoopGroup group = new NioEventLoopGroup();    

这里我们就以NioEventLoopGroup来说明。先看一下它的继承关系

[java] view plain copy
 
  1. NioEventLoopGroup extends MultithreadEventLoopGroup extends MultithreadEventExecutorGroup  

看看NioEventLoopGroup的构造函数

[java] view plain copy
 
  1. public NioEventLoopGroup() {  
  2.     this(0);  
  3. }  
  4. //他会连续调用内部的构造函数,直到用下面的构造去执行父类的构造  
  5. //nThreads此时为0,马上就会提到这个参数的用处  
  6. public NioEventLoopGroup(  
  7.         int nThreads, Executor executor, final SelectorProvider selectorProvider) {  
  8.     super(nThreads, executor, selectorProvider);  
  9. }  

基类MultithreadEventLoopGroup的构造

[java] view plain copy
 
  1. protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {  
  2.     super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);  
  3. }  
  4. //nThreads:内部线程数,如果为0,就取默认值,通常我们会设置为处理器个数*2  
  5. DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(  
  6.         "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));  

继续调用再上一级的MultithreadEventExecutorGroup的构造

[java] view plain copy
 
  1. //这里会根据nThreads创建执行者数组  
  2. private final EventExecutor[] children;  
  3.   
  4. protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {  
  5.     if (nThreads <= 0) {  
  6.         throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));  
  7.     }  
  8.   
  9.     if (executor == null) {  
  10.         executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());  
  11.     }  
  12. //这里创建EventExecutor数组对象  
  13.     children = new EventExecutor[nThreads];  
  14.     if (isPowerOfTwo(children.length)) {  
  15.         chooser = new PowerOfTwoEventExecutorChooser();  
  16.     } else {  
  17.         chooser = new GenericEventExecutorChooser();  
  18.     }  
  19. //此处循环children数组,来创建内部的NioEventLoop对象  
  20.     for (int i = 0; i < nThreads; i ++) {  
  21.         boolean success = false;  
  22.         try {  
  23.             //newChild是abstract方法,运行期会执行具体的实例对象的重载  
  24.             children[i] = newChild(executor, args);  
  25.             success = true;  
  26.         } catch (Exception e) {  
  27.             // TODO: Think about if this is a good exception type  
  28.             throw new IllegalStateException("failed to create a child event loop", e);  
  29.         } finally {  
  30.             //如果没有成功,做关闭处理  
  31.             if (!success) {  
  32.                 for (int j = 0; j < i; j ++) {  
  33.                     children[j].shutdownGracefully();  
  34.                 }  
  35.   
  36.                 for (int j = 0; j < i; j ++) {  
  37.                     EventExecutor e = children[j];  
  38.                     try {  
  39.                         while (!e.isTerminated()) {  
  40.                             e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);  
  41.                         }  
  42.                     } catch (InterruptedException interrupted) {  
  43.                         // Let the caller handle the interruption.  
  44.                         Thread.currentThread().interrupt();  
  45.                         break;  
  46.                     }  
  47.                 }  
  48.             }  
  49.         }  
  50.     }  
  51.   
  52.     ......  
  53. }  

因为我们最初创建的是NioEventLoopGroup对象,所以newChild会执行NioEventLoopGroup的newChild方法,创建NioEventLoop对象。

[java] view plain copy
 
  1. @Override  
  2. protected EventLoop newChild(Executor executor, Object... args) throws Exception {  
  3.     return new NioEventLoop(this, executor, (SelectorProvider) args[0]);  
  4. }  

看看NioEventLoop的继承关系

[java] view plain copy
 
  1. NioEventLoop extends SingleThreadEventLoop extends SingleThreadEventExecutor  

NioEventLoop通过自己的构造行数,一直调用到SingleThreadEventExecutor的构造

[java] view plain copy
 
  1. protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {  
  2.     super(parent);  
  3.   
  4.     if (executor == null) {  
  5.         throw new NullPointerException("executor");  
  6.     }  
  7.   
  8.     this.addTaskWakesUp = addTaskWakesUp;  
  9.     this.executor = executor;  
  10.     taskQueue = newTaskQueue();  
  11. }  

至此,Group和内部的Loop对象以及Executor就创建完毕,那么他们是什么时候被调用的呢?就是在服务端bind和客户端connect时。

服务端的bind方法执行的是AbstractBootstrap的bind方法,bind方法中先会调用validate()方法检查是否有group

[java] view plain copy
 
  1. @SuppressWarnings("unchecked")  
  2. public B validate() {  
  3.     if (group == null) {  
  4.         throw new IllegalStateException("group not set");  
  5.     }  
  6.     if (channelFactory == null) {  
  7.         throw new IllegalStateException("channel or channelFactory not set");  
  8.     }  
  9.     return (B) this;  
  10. }  

所以,假如初始时,没有设置Bootstrap的group的话,就会报错。

最终bind调用doBind,然后调用doBind0,启动一个Runnable线程

[java] view plain copy
 
  1. private static void doBind0(  
  2.         final ChannelFuture regFuture, final Channel channel,  
  3.         final SocketAddress localAddress, final ChannelPromise promise) {  
  4.   
  5.     // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up  
  6.     // the pipeline in its channelRegistered() implementation.  
  7.     channel.eventLoop().execute(new Runnable() {  
  8.         @Override  
  9.         public void run() {  
  10.             if (regFuture.isSuccess()) {  
  11.                 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);  
  12.             } else {  
  13.                 promise.setFailure(regFuture.cause());  
  14.             }  
  15.         }  
  16.     });  
  17. }  

我们这里,channel.eventLoop()得到的是NioEventLoop对象,所以执行NioEventLoop的run方法,开始线程运行。

我们在创建Bootstrap初期,会调用它的group方法,绑定一个group,这样,一个循环就开始运行了。

原文地址:https://www.cnblogs.com/handsome1013/p/7762518.html