bossGroup

向bossGroup里注册通道

流程图

 书接上文

ServerBootstrap.bind(hostnameport)

 1 final ChannelFuture initAndRegister() {
 2         Channel channel = null;
 3         try {
 4             channel = channelFactory.newChannel();
 5             init(channel);
 6         } catch (Throwable t) {
 7             if (channel != null) {
 8                 // channel can be null if newChannel crashed (eg SocketException("too many open files"))
 9                 channel.unsafe().closeForcibly();
10                 // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
11                 return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
12             }
13             // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
14             return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
15         }
16 
17         ChannelFuture regFuture = config().group().register(channel);
18         if (regFuture.cause() != null) {
19             if (channel.isRegistered()) {
20                 channel.close();
21             } else {
22                 channel.unsafe().closeForcibly();
23             }
24         }
25 
26         return regFuture;
27     }

上篇文章说了init方法,接下去就是注册通道

config().group().register(channel)方法

首先调用的是MultithreadEventLoopGroup.register(Channel channel)方法

1 public ChannelFuture register(Channel channel) {
2         return next().register(channel);
3     }
4 
5 public EventLoop next() {
6         return (EventLoop) super.next();
7     }

调用MultithreadEventExecutorGroup的next()方法

1 public EventExecutor next() {
2     return chooser.next();
3 }

chooser选择器。MultithreadEventExecutorGroup构造方法中实例化。

 1 protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
 2                                             EventExecutorChooserFactory chooserFactory, Object... args) {
 3         if (nThreads <= 0) {
 4             throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
 5         }
 6 
 7         if (executor == null) {
 8             executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
 9         }
10 
11         children = new EventExecutor[nThreads];
12 
13         for (int i = 0; i < nThreads; i ++) {
14             boolean success = false;
15             try {
16                 children[i] = newChild(executor, args);
17                 success = true;
18             } catch (Exception e) {
19                 // TODO: Think about if this is a good exception type
20                 throw new IllegalStateException("failed to create a child event loop", e);
21             } finally {
22                 if (!success) {
23                     for (int j = 0; j < i; j ++) {
24                         children[j].shutdownGracefully();
25                     }
26 
27                     for (int j = 0; j < i; j ++) {
28                         EventExecutor e = children[j];
29                         try {
30                             while (!e.isTerminated()) {
31                                 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
32                             }
33                         } catch (InterruptedException interrupted) {
34                             // Let the caller handle the interruption.
35                             Thread.currentThread().interrupt();
36                             break;
37                         }
38                     }
39                 }
40             }
41         }
42 
43         chooser = chooserFactory.newChooser(children);//实例化选择器
44 
45         final FutureListener<Object> terminationListener = new FutureListener<Object>() {
46             @Override
47             public void operationComplete(Future<Object> future) throws Exception {
48                 if (terminatedChildren.incrementAndGet() == children.length) {
49                     terminationFuture.setSuccess(null);
50                 }
51             }
52         };
53 
54         for (EventExecutor e: children) {
55             e.terminationFuture().addListener(terminationListener);
56         }
57 
58         Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
59         Collections.addAll(childrenSet, children);
60         readonlyChildren = Collections.unmodifiableSet(childrenSet);
61     }

chooserFactory.newChooser(children);

其实就前面讲过的选择器工厂DefaultEventExecutorChooserFactory,如何选择一个执行器。

1 public EventExecutorChooser newChooser(EventExecutor[] executors) {
2         if (isPowerOfTwo(executors.length)) {
3             return new PowerOfTwoEventExecutorChooser(executors);
4         } else {
5             return new GenericEventExecutorChooser(executors);
6         }
7     }

chooser.next();

 1 private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
 2         private final AtomicInteger idx = new AtomicInteger();
 3         private final EventExecutor[] executors;
 4 
 5         PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
 6             this.executors = executors;
 7         }
 8 
 9         @Override
10         public EventExecutor next() {
11             return executors[idx.getAndIncrement() & executors.length - 1];
12         }
13     }
14 
15     private static final class GenericEventExecutorChooser implements EventExecutorChooser {
16         // Use a 'long' counter to avoid non-round-robin behaviour at the 32-bit overflow boundary.
17         // The 64-bit long solves this by placing the overflow so far into the future, that no system
18         // will encounter this in practice.
19         private final AtomicLong idx = new AtomicLong();
20         private final EventExecutor[] executors;
21 
22         GenericEventExecutorChooser(EventExecutor[] executors) {
23             this.executors = executors;
24         }
25 
26         @Override
27         public EventExecutor next() {
28             return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
29         }
30     }

 选到一个NioEventLoop,执行注册,调用的是NioEventLoopregister。子类没有这个方法,由父类SingleThreadEventLoop实现。

1 public ChannelFuture register(Channel channel) {
2         return register(new DefaultChannelPromise(channel, this));
3     }

创建了一个DefaultChannelPromise把通道和NioEventLoop都封装进去,DefaultChannelPromise是对Future做了加强,也是异步回调,可以设置很多监听,也可以将结果写入。

1 public ChannelFuture register(final ChannelPromise promise) {
2         ObjectUtil.checkNotNull(promise, "promise");
3         promise.channel().unsafe().register(this, promise);
4         return promise;
5     }

Channel.Unsafe

这里的Unsafe不是JDK的Unsafe,是netty封装的Channel的内部类,因为他是一些操作NIO底层的方法,不建议外部用的,所有也叫Unsafe。我们所说的把通道注册进bossGroup实际上就是调用他的实现类。promise.channel()方法返回NioServerSocketChannel,他的unsafe方法由它的父类AbstractNioChannel实现。

1 public NioUnsafe unsafe() {
2 
3     return (NioUnsafe) super.unsafe();
4 }
5 //父类unsafe方法
6 public Unsafe unsafe() {
7 
8     return unsafe;
9 }

返回值 在初始化AbstractChannel时 指定

1 protected AbstractChannel(Channel parent) {
2 
3     this.parent = parent;
4    id = newId();
5    unsafe = newUnsafe();
6    pipeline = newChannelPipeline();
7 }

newUnsafe(),又由子类AbstractNioMessageChannel实现

1 protected AbstractNioUnsafe newUnsafe() {
2 
3     return new NioMessageUnsafe();
4 }

所以config().group().register(channel)调用的是NioMessageUnsafe.register方法,但是自己没有这个方法,所以最终调用父类AbstractUnsafe的register方法,AbstractUnsafe为AbstractChannel的内部类.

这个就是真正的注册方法:

 1 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
 2             ObjectUtil.checkNotNull(eventLoop, "eventLoop");
 3             if (isRegistered()) {
 4                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
 5                 return;
 6             }
 7             if (!isCompatible(eventLoop)) {
 8                 promise.setFailure(
 9                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
10                 return;
11             }
12 
13             AbstractChannel.this.eventLoop = eventLoop;//设置通道的事件循环,1对1,只设置一次
14        //只能当前线程是eventLoop的线程才可以注册,防止多线程并发问题,所以即使多线程来操作,也是安全的,会按照一定顺序提交到任务队列里
15             if (eventLoop.inEventLoop()) {
16                 register0(promise);
17             } else {//否则就当做任务提交给eventLoop的任务队列
18                 try {
19                     eventLoop.execute(new Runnable() {
20                         @Override
21                         public void run() {
22                             register0(promise);
23                         }
24                     });
25                 } catch (Throwable t) {
26                     logger.warn(
27                             "Force-closing a channel whose registration task was not accepted by an event loop: {}",
28                             AbstractChannel.this, t);
29                     closeForcibly();
30                     closeFuture.setClosed();
31                     safeSetFailure(promise, t);
32                 }
33             }
34         }

这里有个很巧妙的点,就是eventLoop.inEventLoop()这个的判断,就是判断调用这个方法的是不是eventLoop的线程,如果是,那就是同一个线程调用,直接就注册,否则属于多线程调用,可能会有问题,所以还是提交一个任务给eventLoop的线程去执行,这样就是单线程,不会有线程安全问题。

此时当前线程是main肯定不是事件循环里的线程,事件循环里的线程还没创建呢,所以会提交到队列。eventLoop为NioEventLoop,执行父类SingleThreadEventExecutor的execute方法。

1 public void execute(Runnable task) {
2 
3     ObjectUtil.checkNotNull(task, "task");
4    execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
5 }
 1 private void execute(Runnable task, boolean immediate) {
 2         boolean inEventLoop = inEventLoop();
 3         addTask(task);//添加任务到taskQueue
 4         if (!inEventLoop) {//当前线程不是事件循环线程
 5             startThread();//让线程工厂开启线程
 6             if (isShutdown()) {
 7                 boolean reject = false;
 8                 try {
 9                     if (removeTask(task)) {
10                         reject = true;
11                     }
12                 } catch (UnsupportedOperationException e) {
13                     // The task queue does not support removal so the best thing we can do is to just move on and
14                     // hope we will be able to pick-up the task before its completely terminated.
15                     // In worst case we will log on termination.
16                 }
17                 if (reject) {
18                     reject();
19                 }
20             }
21         }
22 
23         if (!addTaskWakesUp && immediate) {
24             wakeup(inEventLoop);//唤醒线程,添加了一个空的任务去唤醒
25         }
26     }

task为register0(promise);方法,addTask是给taskqueue添加task,

startThread();

 1 private static final int ST_NOT_STARTED = 1;//没启动
 2     private static final int ST_STARTED = 2;//启动了
 3     private static final int ST_SHUTTING_DOWN = 3;//正在关闭中
 4     private static final int ST_SHUTDOWN = 4;//关闭了
 5     private static final int ST_TERMINATED = 5;//终止了
 6 
 7 private void startThread() {
 8         if (state == ST_NOT_STARTED) {
 9             if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
10                 boolean success = false;
11                 try {
12                     doStartThread();//具体的开启线程
13                     success = true;
14                 } finally {
15                     if (!success) {
16                         STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
17                     }
18                 }
19             }
20         }
21     }

doStartThread()

其实里面就是调用SingleThreadEventExecutor.this.run();,这里其实executor.execute已经启动了新的线程来执行new Runnable()里的任务,也就是执行NioEventLoop. run()。

 1 private void doStartThread() {
 2         assert thread == null;
 3         executor.execute(new Runnable() {
 4             @Override
 5             public void run() {
 6                 thread = Thread.currentThread();
 7                 if (interrupted) {
 8                     thread.interrupt();
 9                 }
10 
11                 boolean success = false;
12                 updateLastExecutionTime();
13                 try {
14                     SingleThreadEventExecutor.this.run();
15                     success = true;
16                 } catch (Throwable t) {
17                     logger.warn("Unexpected exception from an event executor: ", t);
18                 } finally {
19                     for (;;) {
20                         int oldState = state;
21                         if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
22                                 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
23                             break;
24                         }
25                     }
26 
27                     // Check if confirmShutdown() was called at the end of the loop.
28                     if (success && gracefulShutdownStartTime == 0) {
29                         if (logger.isErrorEnabled()) {
30                             logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
31                                     SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
32                                     "be called before run() implementation terminates.");
33                         }
34                     }
35 
36                     try {
37                         // Run all remaining tasks and shutdown hooks. At this point the event loop
38                         // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
39                         // graceful shutdown with quietPeriod.
40                         for (;;) {
41                             if (confirmShutdown()) {
42                                 break;
43                             }
44                         }
45 
46                         // Now we want to make sure no more tasks can be added from this point. This is
47                         // achieved by switching the state. Any new tasks beyond this point will be rejected.
48                         for (;;) {
49                             int oldState = state;
50                             if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
51                                     SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
52                                 break;
53                             }
54                         }
55 
56                         // We have the final set of tasks in the queue now, no more can be added, run all remaining.
57                         // No need to loop here, this is the final pass.
58                         confirmShutdown();
59                     } finally {
60                         try {
61                             cleanup();
62                         } finally {
63                             // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
64                             // the future. The user may block on the future and once it unblocks the JVM may terminate
65                             // and start unloading classes.
66                             // See https://github.com/netty/netty/issues/6596.
67                             FastThreadLocal.removeAll();
68 
69                             STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
70                             threadLock.countDown();
71                             int numUserTasks = drainTasks();
72                             if (numUserTasks > 0 && logger.isWarnEnabled()) {
73                                 logger.warn("An event executor terminated with " +
74                                         "non-empty task queue (" + numUserTasks + ')');
75                             }
76                             terminationFuture.setSuccess(null);
77                         }
78                     }
79                 }
80             }
81         });
82     }

executor在SingleThreadEventExecutor构造方法中实例化,this.executor = ThreadExecutorMap.apply(executor, this);

 1 public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
 2         ObjectUtil.checkNotNull(executor, "executor");
 3         ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
 4         return new Executor() {
 5             @Override
 6             public void execute(final Runnable command) {
 7                 executor.execute(apply(command, eventExecutor));
 8             }
 9         };
10     }

apply方法中的executor为SingleThreadEventExecutor构造方法的入参参数,根据继承关系可知SingleThreadEventExecutor构造方法在NioEventLoop构造方法中调用了,而NioEventLoop构造方法在之前已说明,在

MultithreadEventExecutorGroup的构造方法中调用所以apply方法中的executor为ThreadPerTaskExecutor

ThreadPerTaskExecutor该类的execute比较简单,利用DefaultThreadFactory实话为thread实现异步执行,但是thread为FastThreadLocalThread而不是原生的thread.

apply方法中的eventExecutor为SingleThreadEventExecutor对象eventExecutor

apply方法内部的apply

 1 public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
 2         ObjectUtil.checkNotNull(command, "command");
 3         ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
 4         return new Runnable() {
 5             @Override
 6             public void run() {
 7                 setCurrentEventExecutor(eventExecutor);
 8                 try {
 9                     command.run();
10                 } finally {
11                     setCurrentEventExecutor(null);
12                 }
13             }
14         };
15     }

setCurrentEventExecutor(eventExecutor);设置本地线程变量,利用了FastThreadLocal

command.run()方法调用的为doStartThread()方法内部的异步执行方法体SingleThreadEventExecutor.this.run();下文详解

 

sync()

main线程如果完成了注册流程后,就会调用sync()尝试阻塞,调用的就是:

ChannelFuture channelFuture = b.bind(hostname, port).sync();

最终到了DefaultPromise的await()方法:

 1 public Promise<V> await() throws InterruptedException {
 2         if (isDone()) {//完成了就返回
 3             return this;
 4         }
 5 
 6         if (Thread.interrupted()) {
 7             throw new InterruptedException(toString());
 8         }
 9 
10         checkDeadLock();
11 
12         synchronized (this) {
13             while (!isDone()) {//判断是否完成
14                 incWaiters();
15                 try {
16                     wait();//阻塞
17                 } finally {
18                     decWaiters();
19                 }
20             }
21         }
22         return this;
23     }

如果这个时候注册和绑定端口完成了,就会返回,否则就会wait();阻塞。

safeSetSuccess(promise);

绑定是事件循环的线程去做的,完成绑定后会调用safeSetSuccess(promise);方法,设置成功的结果,唤醒阻塞的主线程。最终是调用DefaultPromise的checkNotifyWaiters:

1 protected final void safeSetSuccess(ChannelPromise promise) {
2             if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
3                 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
4             }
5         }

调用DefaultChannelPromise的trySuccess()

 1 private boolean setValue0(Object objResult) {
 2         if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
 3             RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
 4             if (checkNotifyWaiters()) {
 5                 notifyListeners();
 6             }
 7             return true;
 8         }
 9         return false;
10     }

方法checkNotifyWaiters()

1 private synchronized boolean checkNotifyWaiters() {
2         if (waiters > 0) {
3             notifyAll();
4         }
5         return listeners != null;
6     }

方法notifyListeners()

 1 private void notifyListeners() {
 2         EventExecutor executor = executor();
 3         if (executor.inEventLoop()) {
 4             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
 5             final int stackDepth = threadLocals.futureListenerStackDepth();
 6             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
 7                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
 8                 try {
 9                     notifyListenersNow();
10                 } finally {
11                     threadLocals.setFutureListenerStackDepth(stackDepth);
12                 }
13                 return;
14             }
15         }
16 
17         safeExecute(executor, new Runnable() {
18             @Override
19             public void run() {
20                 notifyListenersNow();
21             }
22         });
23     }

注册绑定监听并阻塞

阻塞被唤醒后,就注册一个监听的回调,然后阻塞关闭事件,其实就意味着不关闭就永远运行着了:

 1 ChannelFuture cf = bootstrap.bind(8888).sync();
 2             cf.addListener((ChannelFutureListener) future -> {
 3                 if (cf.isSuccess()) {
 4                     System.out.println("监听端口 8888 成功");
 5                 } else {
 6                     System.out.println("监听端口 8888 失败");
 7                 }
 8             });
 9             System.out.println("服务器开始提供服务");
10             cf.channel().closeFuture().sync();

当然也可以这样写,先添加监听,然后阻塞,反正结果都是一样的:

 1 ChannelFuture cf = bootstrap.bind(8888);
 2             cf.addListener((ChannelFutureListener) future -> {
 3                 if (cf.isSuccess()) {
 4                     System.out.println("监听端口 8888 成功");
 5                 } else {
 6                     System.out.println("监听端口 8888 失败");
 7                 }
 8             });
 9             System.out.println("服务器开始提供服务");
10             cf.sync();

如果addListener是添加在同步后面的,此时可能事件循环线程已经完成了注册和绑定,addListener里面就会提交一个任务,然后事件循环会去执行这个任务。

控制台打印结果

服务器开始提供服务
监听端口 8888 成功

至此注册流程基本结束,后续就看事件循环中的新线程如何运行了。

 



原文地址:https://www.cnblogs.com/xiaojiesir/p/15321127.html