Netty源码分析-NioEventLoop事件轮询

NioEventLoop是Netty中用来接收客户端获取服务端请求的唯一入口,我们先来看一下NioEventLoop的构造函数

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                EventLoopTaskQueueFactory queueFactory) {
   super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
         rejectedExecutionHandler);
   // 获取Selector对象的SelectorProvider对象
   this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
   // 作为Select选择策略以及一次select的个数
   this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
   // Selector的包装
   final SelectorTuple selectorTuple = openSelector();
   // Selector选择器,用来轮询IO事件
   this.selector = selectorTuple.selector;
   // 没有包装的原始Selector选择器
   this.unwrappedSelector = selectorTuple.unwrappedSelector;
}

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                   boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
                                   RejectedExecutionHandler rejectedExecutionHandler) {
   super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
   // 尾部任务队列
   tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                       boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                       RejectedExecutionHandler rejectedHandler) {
   super(parent);
   // 新增任务时是否唤醒线程,默认为false
   this.addTaskWakesUp = addTaskWakesUp;
   // 最大任务等待数
   this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
   // 当前执行任务的线程池
   this.executor = ThreadExecutorMap.apply(executor, this);
   // 任务队列
   this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
   // 任务队列满时的异常处理器
   this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

Netty是怎么实现一个Channel绑定一个线程,一个线程绑定多个Channel的呢?

从上面的代码段可以看出,一个AbstractChannel会持有一个NioEventLoop对象,而一个NioEventLoop持有一个Selector对象。当使用SelectableChannel#register()API将Channel注册到Selector选择器中时,由于使用的是NioEventLoop中的Selector对象,所以当NioEventLoop进行轮询时,就可以只轮询当前线程绑定的所有Channel对象。

接下来详细分析一下NioEventLoop#run()里的流程

protected void run() {
   int selectCnt = 0;
   for (;;) {
       try {
           int strategy;
           try {
               // 获取selector策略
               strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
               switch (strategy) {
                   case SelectStrategy.CONTINUE:
                       continue;
                   case SelectStrategy.BUSY_WAIT:
                   case SelectStrategy.SELECT:
                       // 获取最近的定时任务开始时间,如果不存在定时任务,则为-1
                       long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                       if (curDeadlineNanos == -1L) {
                           curDeadlineNanos = NONE; // nothing on the calendar
                      }
                       nextWakeupNanos.set(curDeadlineNanos);
                       try {
                           if (!hasTasks()) {
                               // 如果不存在异步任务,直接执行I/O事件轮询
                               strategy = select(curDeadlineNanos);
                          }
                      } finally {
                           nextWakeupNanos.lazySet(AWAKE);
                      }
                   default:
              }
          } catch (IOException e) {
               // 当发生未知异常时,需重新生成一个新的Selector,替换老的Selector,并把注册到老的Selector上面的channel注册到新的Selector上去
               rebuildSelector0();
               selectCnt = 0;
               handleLoopException(e);
               continue;
          }
           // 省略部分代码
           if (ioRatio == 100) {
               try {
                   if (strategy > 0) {
                       // 处理轮询到的I/O事件
                       processSelectedKeys();
                  }
              } finally {
                   // 执行异步任务
                   ranTasks = runAllTasks();
              }
          }
           // 省略部分代码
      } catch (CancelledKeyException e) {
           // 省略部分代码
      } catch (Throwable t) {
           handleLoopException(t);
      }
       // 省略部分代码
  }
}

获取SELECT策略时,需要用到一个hasTasks()方法,用来判断是否存在异步任务。如果存在异步任务,则会执行selector.selectNow()方法,该方法不会阻塞,接下来的switch就会跳到default分支。netty通过这种方式可以保证当存在异步任务时,优先执行异步任务。当不存在异步任务时,就会执行到selector.select()方法,在执行该方法之前,会通过nextScheduledTaskDeadlineNanos()方法获取最近的定时任务的开始时间curDeadlineNanos,如果不存在定时任务,则为Long类型的最大值。接下来会使用curDeadlineNanos作为select的超时时间,如果不存在定时任务,也就是curDeadlineNanos为Long的最大值,就会执行selector.select()方法,也就是不设置超时时间,直到有I/O事件为止才会返回。如果存在定时任务,则会通过定时任务的开始时间计算出select操作的超时时间。

当有I/O事件响应时,则会通过processSelectedKeys()方法处理I/O事件,下面是代码段

private void processSelectedKeys() {
   if (selectedKeys != null) {
       processSelectedKeysOptimized();
  } else {
       processSelectedKeysPlain(selector.selectedKeys());
  }
}

从代码可以看出,这里处理有两个分支,一个是处理优化过的SelectedKey,另一个分支是处理正常的SelectedKey,什么是优化过的key?这里就需要看一下selectedKeys变量的数据结构。跟踪代码可以看出,selectedKeys的数据类型是SelectedSelectionKeySetSelectedSelectionKeySet底层存储结构是用的数组,所以它的遍历效率比较高,原生的存储结构是HashSet,其实也就是HashMap,它的遍历效率没有数组高。当通过SelectorProvider#openSelector()方法获取一个selector时,这里有个特殊操作,就是一个优化开关,如果开关关闭,则使用原生的selectedKeys存储结构,也就是HashSet。如果开关打开,就会使用反射,将SelectorImpl类中的publicKeys与publicSelectedKeys字段替换成SelectedSelectionKeySet的存储类型。可以看下面的代码片段

private SelectorTuple openSelector() {
   final Selector unwrappedSelector;
   try {
       // 获取一个Selector实例
       unwrappedSelector = provider.openSelector();
  } catch (IOException e) {
       throw new ChannelException("failed to open a new selector", e);
  }

   if (DISABLE_KEY_SET_OPTIMIZATION) {
       return new SelectorTuple(unwrappedSelector);
  }
// 获取SelectorImpl的class
   Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
       @Override
       public Object run() {
           try {
               return Class.forName(
                   "sun.nio.ch.SelectorImpl",
                   false,
                   PlatformDependent.getSystemClassLoader());
          } catch (Throwable cause) {
               return cause;
          }
      }
  });
// 省略部分代码
   final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
   final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
   Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
       @Override
       public Object run() {
           try {
               // 反射获取selectedKey字段
               Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
               Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
// 省略部分代码
               Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
               if (cause != null) {
                   return cause;
              }
               cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
               if (cause != null) {
                   return cause;
              }
               // 反射方式设置selectedKeys为SelectedSelectionKeySet类型
               selectedKeysField.set(unwrappedSelector, selectedKeySet);
               // 反射方式设置publicSelectedKeys为SelectedSelectionKeySet类型
               publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
               return null;
          } catch (NoSuchFieldException e) {
               return e;
          } catch (IllegalAccessException e) {
               return e;
          }
      }
  });
// 省略部分代码
}

接下来再看一下processSelectedKeysOptimized()方法的具体逻辑

private void processSelectedKeysOptimized() {
   for (int i = 0; i < selectedKeys.size; ++i) {
       final SelectionKey k = selectedKeys.keys[i];
       selectedKeys.keys[i] = null;
       // 获取I/O事件类型
       final Object a = k.attachment();
       if (a instanceof AbstractNioChannel) {
           // 处理I/O事件
           processSelectedKey(k, (AbstractNioChannel) a);
      } else {
           @SuppressWarnings("unchecked")
           NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
           // 处理异步任务
           processSelectedKey(k, task);
      }
// 是否需要重新轮询
       if (needsToSelectAgain) {
           // 将i+1之前的I/O事件都置为空
           selectedKeys.reset(i + 1);
           //重新轮询
           selectAgain();
           // 将i置为-1,接下来会执行一次i++,然后i又会从0开始执行I/O事件
           i = -1;
      }
  }
}

这里开始处理I/O事件,大家还记得将channel注册到selector时,可以填写一个attach参数。在这就可以通过attachment()方法获取事件类型,如果是AbstractNioChannel类型,则处理I/O事件,否则处理异步任务。接下来有一个needsToSelectAgain标识,表示是否需要重新轮询一次I/O事件,为什么会有这个标识呢?我们先跟踪一下这个标识是在哪里写入的,最终发现,当channel从Selector取消注册超过256次时,就会将这个标志位置为true。

接下来继续看处理I/O事件的详细逻辑processSelectedKey()

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
       final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
       // 省略部分代码
       try {
           int readyOps = k.readyOps();
           // 处理连接事件
           if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
               int ops = k.interestOps();
               ops &= ~SelectionKey.OP_CONNECT;
               k.interestOps(ops);
               // 在ChinnelPipeline上传递Connect事件
               unsafe.finishConnect();
          }

           // 处理可写事件
           if ((readyOps & SelectionKey.OP_WRITE) != 0) {
               ch.unsafe().forceFlush();
          }

           // 处理客户端的读事件与接收事件
           if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
               // 在ChinnelPipeline上传递Read事件
               unsafe.read();
          }
      } catch (CancelledKeyException ignored) {
           unsafe.close(unsafe.voidPromise());
      }
  }

Connect事件的传递比较简单,感兴趣的同学可以自己去看一下源码,这里我们主要看一下读写事件。

首先看一下写事件,从下面的代码可以看出,主要是把buffer里的数据刷新到socket缓冲区,然后发送到客户端

protected void flush0() {
   if (inFlush0) {
       // Avoid re-entrance
       return;
  }
   final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
   if (outboundBuffer == null || outboundBuffer.isEmpty()) {
       return;
  }
   inFlush0 = true;
// 省略部分代码
   try {
       // 真正的将数据写到客户端,底层会调用Nio的SocketChannel#write()方法
       doWrite(outboundBuffer);
  } catch (Throwable t) {
      // 省略部分代码
  } finally {
       inFlush0 = false;
  }
}

下面我们再来看一下Netty是怎么处理读事件的

public void read() {
   assert eventLoop().inEventLoop();
   final ChannelConfig config = config();
   final ChannelPipeline pipeline = pipeline();
   // 获取一个netty自己的内存分配器
   final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
   allocHandle.reset(config);
   boolean closed = false;
   Throwable exception = null;
   try {
       try {
           do {
               // 将数据读取到readBuf中
               int localRead = doReadMessages(readBuf);
               if (localRead == 0) {
                   break;
              }
               if (localRead < 0) {
                   closed = true;
                   break;
              }
// 增加读的次数
               allocHandle.incMessagesRead(localRead);
          } while (allocHandle.continueReading());
      } catch (Throwable t) {
           exception = t;
      }

       int size = readBuf.size();
       for (int i = 0; i < size; i ++) {
           readPending = false;
           // 在ChannelPipeline上传播read事件
           pipeline.fireChannelRead(readBuf.get(i));
      }
       // 清空读的缓冲区
       readBuf.clear();
       // 读取完成,记录读取总的字节数
       allocHandle.readComplete();
       // 传播一次读取完成的事件,一次读取完成可能包含多个SocketChannel
       pipeline.fireChannelReadComplete();

       if (exception != null) {
           closed = closeOnReadError(exception);

           pipeline.fireExceptionCaught(exception);
      }

       if (closed) {
           inputShutdown = true;
           if (isOpen()) {
               close(voidPromise());
          }
      }
  } finally {
       if (!readPending && !config.isAutoRead()) {
           // 移除read 事件
           removeReadOp();
      }
  }
}

当Netty处理读事件时,首先会获取一个内存分配处理器,读取消息到分配的内存里(Netty内存分配器后面分析),然后在ChannelPipeline上传播读事件,最终会在channel上移除read事件。

最后我们来看一下Netty是怎么处理异步任务的

protected boolean runAllTasks(long timeoutNanos) {
   // 从定时任务队列中取出任务添加到普通任务队列中
   fetchFromScheduledTaskQueue();
   // 从任务列表中取出一个任务
   Runnable task = pollTask();
   if (task == null) {
       // 如果任务为空,表示没有普通任务可执行,直接执行tailTask任务队列中的任务
       afterRunningAllTasks();
       return false;
  }
// 计算执行任务的截止时间(相对时间,当前时间减去服务启动时间再加上超时时间)
   final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
   long runTasks = 0;
   long lastExecutionTime;
   for (;;) {
       // 安全的执行任务
       safeExecute(task);
       runTasks ++;
       // 执行任务个数和0x3f(0011 1111)进行与运算,这里指的是当任务个数为64个时,进行一次截止时间判断
       if ((runTasks & 0x3F) == 0) {
           lastExecutionTime = ScheduledFutureTask.nanoTime();
           // 如果当前时间大于了截止时间,则中断执行任务,这是为了把时间留出一部分来执行I/O事件
           if (lastExecutionTime >= deadline) {
               break;
          }
      }
       task = pollTask();
       // 如果任务为空,终止循环
       if (task == null) {
           lastExecutionTime = ScheduledFutureTask.nanoTime();
           break;
      }
  }
   // 最后执行尾部任务队列中的任务
   afterRunningAllTasks();
   this.lastExecutionTime = lastExecutionTime;
   return true;
}

首选执行异步任务有一个超时,通过这个超时时间来计算任务可以执行多久。首先netty会把定时任务合并到普通任务队列中,然后判断这个任务队列中是否有任务,如果没有任务,则先执行尾部任务队列,然后提前返回。如果有任务,则会计算任务的执行截止时间,每当任务执行了64个时,都会判断一下当前时间是否大于截止时间,如果大于了截止时间,则停止执行普通任务,然后再执行尾部任务队列。

原文地址:https://www.cnblogs.com/jhbbd/p/14310324.html