五、NioEventLoop启动

NioEventLoop执行概述:

1、何时被启动

当AbstractChannel.register()方法被执行时,该方法下有这样一段代码:

 if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
      ...

其中的eventLoop.inEventLoop()为判断当前线程是否为eventLoop线程,此处是main线程,并且eventLoop线程在此处为null,所以返回false,并进入eventLoop.execute()方法,此方法在NioEventLoop没有实现,调用的是SingleThreadEventExecutor类中的execute()方法。

boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

inEventLoop返回为false,进入startThread()方法,继续进入doStartThread()方法,

 private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                ....
                updateLastExecutionTime();
                ....
                SingleThreadEventExecutor.this.run();

其中

  1. thread = Thread.currentThread():将当前线程与nioEventLoop绑定。
  2. 更新上次执行的时长。
  3. SingleThreadEventExecutor.this.run()代码,其中SingleThreadEventExecutor.this即为NioEventLoop对 象。在它的run方法封装了启动逻辑。

即当 EventLoop.execute 第一次被调用时, 就会触发 startThread() 的调用, 进而导致了 EventLoop 所对应的 Java 线程的启动。

2、NioEventLoop.run做了什么

主要做了下面三件事:

  1. 检查是否有IO事件。
  2. 处理IO事件。
  3. 处理异步任务队列。
protected void run() {
        for (;;) {
        ...
       select(wakenUp.getAndSet(false));
        ...
       processSelectedKeys();
       ...
       runAllTasks();

select()方法执行逻辑

  • selectDeadLineNanos及任务穿插逻辑处理
   long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                        }
                        break;
                        }
    ...
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }
  1. 计算当前定时任务队列第一个定时任务的截止时间,当前时间 + 截止时间(NioEventLoop底层维持了一个定时任务队列,按照任务的截止时间正序排序)。
  2. 超时,判断有没有进行过select,如果没有则进行一次非阻塞的select操作,并退出当前循环。
  3. 没有超时,则检查当前任务队列(tailTasks和taskQueue)是否有任务,并将wakebUp通过CAS操作设为true。如果有则进行一侧非阻塞的select操作,并退出当前循环。
  • 阻塞式select
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
   // - Selected something,轮询到事件
   // - waken up by user, or用户线程唤醒
   // - the task queue has a pending task.任务队列有任务
   // - a scheduled task is ready for processing 定时任务队列有任务准备好执行
       break;//跳出当前循环
 }
  • 避免JDK空轮询的Bug
              long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The selector returned prematurely many times in a row.
                    // Rebuild the selector to work around the problem.
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                            selectCnt, selector);

                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
  1. 判断是否进行了一个空轮询(当前时间-阻塞时长>=开始时间,说明进行了阻塞),如果小于,则说明执行了一次空轮询。
  2. 判断空轮询次数是否大于512次(SELECTOR_AUTO_REBUILD_THRESHOLD在静态代码块中初始为512),如果大于512次则执行rebuildSelector()方法。
  3. 新建一个selector,并把之前oldSelector上的属性注册到新的selector上。
      ...
            newSelector = openSelector();
      ...

        // Register all channels to the new Selector.
        int nChannels = 0;
        for (;;) {
            try {
                for (SelectionKey key: oldSelector.keys()) {
                    Object a = key.attachment();
                    try {
                        if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
                            continue;
                        }
                        int interestOps = key.interestOps();
                        key.cancel();
                        SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
                        if (a instanceof AbstractNioChannel) {
                            // Update SelectionKey
                            ((AbstractNioChannel) a).selectionKey = newKey;
                        }
                        nChannels ++;
                    } 

3、处理IO事件

  • 先介绍下Netty对Selector的selectedKeys的优化
    SelectedSelectionKeySet继承自AbstractSet,内部维护了一个数组,并重写了其方法
  1. 先判断是否禁用优化,默认为false()。
  2. 创建优化后的SelectedSelectionKeySet,校验后通过反射替换select的selectedKeysField和publicSelectedKeysField。
  private Selector openSelector() {
        final Selector selector;
       ...
            selector = provider.openSelector();
      ...
        if (DISABLE_KEYSET_OPTIMIZATION) {
            return selector;
        }
        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
        Object maybeSelectorImplClass=Class.forName("sun.nio.ch.SelectorImpl",false,PlatformDependent.getSystemClassLoader());
               ...
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
                    selectedKeysField.setAccessible(true);
                    publicSelectedKeysField.setAccessible(true);
                    selectedKeysField.set(selector, selectedKeySet);
                    publicSelectedKeysField.set(selector, selectedKeySet);
           ...       
            selectedKeys = selectedKeySet;
    ...
        return selector;
    }
  • processSelectedKeys()处理IO事件
  private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
            ...
            // 遍历selectedKeys,取得key之后将数组原位置置为空
             selectedKeys[i] = null;
             //取得绑定的Channel
            final Object a = k.attachment();
            ...
            }
            //调用处理方法
  private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        //验证key是否合法
        if (!k.isValid()) {
            final EventLoop eventLoop;
              ...
                eventLoop = ch.eventLoop();
              ...
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }
        //通过验证的进入处理逻辑
        try {
        //获取key事件类型并处理
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                unsafe.finishConnect();
            }
            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

4、runAllTasks()执行逻辑

Netty的任务队列包括taskQueue、tailQueue和scheduledTaskQueue。

  • scheduledTaskQueue任务的添加。
    scheduledTaskQueue的任务是在当调用NioEventLoop(AbstractScheduledEventExecutor)的schedule方法时添加的。代码如下
  <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
  //判断是否是当前线程执行添加操作,如果是直接添加,不是的话另起一个线程去执行添加任务
  //把添加定时任务也作为一个普通的task去执行,所有的任务都是在当前的NioEventLoop中执行,保证线程安全。
        if (inEventLoop()) {
            scheduledTaskQueue().add(task);
        } else {
            execute(new Runnable() {
                @Override
                public void run() {
                    scheduledTaskQueue().add(task);
    ...
        return task;
    }

tips:scheduledTaskQueue是一个优先级队列,排序逻辑是按照截止日期排序,如果截止日期相同,则按照ID排序。

 public int compareTo(Delayed o) {
        if (this == o) {
            return 0;
        }
        ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
        long d = deadlineNanos() - that.deadlineNanos();
        if (d < 0) {
            return -1;
        } else if (d > 0) {
            return 1;
        } else if (id < that.id) {
            return -1;
        } else if (id == that.id) {
            throw new Error();
        } else {
            return 1;
        }
    }
  • 任务的聚合
    ioRatio默认是50,则runAllTask里时间是当前时间-处理IO开始时间
final long ioStartTime = System.nanoTime();
    try {
        processSelectedKeys();
         } finally {
         // Ensure we always run tasks.
           final long ioTime = System.nanoTime() - ioStartTime;
           runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
        }

fetchFromScheduledTaskQueue()-- 循环从定时任务中取出截止日期到期(截止日期<=现在时间-系统启动时间)的task,并添加到taskQueue中,然后循环执行任务。

 protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
        //如果任务都为空,则去执行tailQueue中的任务
            afterRunningAllTasks();
            return false;
        }
        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            safeExecute(task);
            runTasks ++;
           ...
            //当执行任务到64个的时候,计算下执行事件是否超过deadline ,如果是就中止循环
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }
            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }
        //执行收尾任务
        afterRunningAllTasks();
        //将lastExecutionTime 设置为系统启动时间
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }
原文地址:https://www.cnblogs.com/demo-alen/p/13547222.html