aio-java

 1 //创建一个AsynchronousSocketChannel实例,
 2 //AsynchronousChannelGroup在linux中是EPollPort,在windows中是Iopc,
 3 //AsynchronousChannelGroup内持有fileDescriptor到channel的映射,从epoll返回的事件可以间接的找到fileDescriptor,通过映射找到channel,从而完成io;
 4 //AsynchronousChannelGroup还持有线程池,自动开启,用于处理io,执行CompletionHandler。
 5 //线程池默认是一个new ThreadPool(Executors.newCachedThreadPool(threadFactory), isFixed, poolSize),
 6 //并启动poolSize个线程,poolSize的大小为Runtime.getRuntime().availableProcessors();
 7 //线程池由EPollPort(AsynchronousChannelGroupImpl的实现类)管理,会详细说EPollPort
 8 public static AsynchronousSocketChannel open(AsynchronousChannelGroup group)
 9     throws IOException
10 {
11     AsynchronousChannelProvider provider = (group == null) ?
12       AsynchronousChannelProvider.provider() : group.provider();
13     return provider.openAsynchronousSocketChannel(group);
14 }
 1 public AsynchronousSocketChannel openAsynchronousSocketChannel(AsynchronousChannelGroup group)
 2   throws IOException
 3 {
 4   return new UnixAsynchronousSocketChannelImpl(toPort(group));
 5 }
 6 
 7 private Port toPort(AsynchronousChannelGroup group) throws IOException {
 8   if (group == null) {
 9     return defaultEventPort();
10   } else {
11     if (!(group instanceof EPollPort))
12       throw new IllegalChannelGroupException();
13     return (Port)group;
14   }
15 }
16     
17 private EPollPort defaultEventPort() throws IOException {
18   if (defaultPort == null) {
19     synchronized (LinuxAsynchronousChannelProvider.class) {
20       if (defaultPort == null) {
21             //默认会创建一个Executors.newCachedThreadPool(threadFactory),并启动poolSize个线程,poolSize在创建ThreadPool时会设置
22         defaultPort = new EPollPort(this, ThreadPool.getDefault()).start();
23       }
24     }
25   }
26   return defaultPort;
27 }
28     
29 static ThreadPool createDefault() {
30   int poolSize = getDefaultThreadPoolInitialSize();
31   if (poolSize < 0) {
32     poolSize = Runtime.getRuntime().availableProcessors();//与核心数相同,双核四线程,availableProcessors返回4
33   }
34 
35   ThreadFactory threadFactory = getDefaultThreadPoolThreadFactory();
36   if (threadFactory == null) {
37     threadFactory = defaultThreadFactory();
38   }
39 
40   ExecutorService executorService = Executors.newCachedThreadPool(threadFactory);
41   boolean isFixed = false;
42   return new ThreadPool(executorService, isFixed, poolSize);
43 }
44     
45 //Executors.newCachedThreadPool(threadFactory)中的threadFactory
46 static ThreadFactory defaultThreadFactory() {
47   return System.getSecurityManager() == null ? (var0) -> {//该ThreadFactory每次newThread都会返回一个Thread
48     Thread var1 = new Thread(var0);
49     var1.setDaemon(true);
50       return var1;
51   } : (var0) -> {
52     PrivilegedAction var1 = () -> {
53       InnocuousThread var1 = new InnocuousThread(var0);
54       var1.setDaemon(true);
55       return var1;
56     };
57     return (Thread)AccessController.doPrivileged(var1);
58   };
59 }
60 
61 //close时会关闭线程池
62 void implClose() throws IOException {
63   // remove the mapping
64   port.unregister(fdVal);
65 
66   // close file descriptor
67   nd.close(fd);
68 
69   // All outstanding I/O operations are required to fail
70   finish(false, true, true);
71 }
72 final void unregister(int fd) {
73   boolean checkForShutdown = false;
74 
75   fdToChannelLock.writeLock().lock();
76   try {
77     fdToChannel.remove(Integer.valueOf(fd));
78     // last key to be removed so check if group is shutdown
79     if (fdToChannel.isEmpty())
80       checkForShutdown = true;
81   } finally {
82     fdToChannelLock.writeLock().unlock();
83   }
84 
85   // continue shutdown
86   if (checkForShutdown && isShutdown()) {
87     try {
88           //关闭线程池
89       shutdownNow();
90     } catch (IOException ignore) { }
91   }
92 }
View Code
 1 //重点类,EPollPort
 2 EPollPort(AsynchronousChannelProvider provider, ThreadPool pool)
 3 throws IOException
 4 {
 5   super(provider, pool);
 6 
 7   // open epoll
 8   //重点,创建epoll,之后所有的socket都会注册在该epoll上
 9   this.epfd = epollCreate();
10 
11   // create socket pair for wakeup mechanism
12   int[] sv = new int[2];
13   try {
14     socketpair(sv);
15     // register one end with epoll
16     //重点,将sv[0]注册在epfd上,当sv[0]有事件发生时,epollWait就会wakeup,这是为了充分利用epollWait线程,也是为了。
17     epollCtl(epfd, EPOLL_CTL_ADD, sv[0], POLLIN);
18   } catch (IOException x) {
19     close0(epfd);
20     throw x;
21   }
22   this.sp = sv;
23 
24   // allocate the poll array
25   this.address = allocatePollArray(MAX_EPOLL_EVENTS);
26 
27   // create the queue and offer the special event to ensure that the first threads polls
28   // epoll返回的事件都会放在该队列上
29   this.queue = new ArrayBlockingQueue<Event>(MAX_EPOLL_EVENTS);
30   //首个线程需要waitPoll,其余线程在queue上等待处理事件(event)
31   this.queue.offer(NEED_TO_POLL);
32 }
  1  /*
  2  * Task to process events from epoll and dispatch to the channel's
  3  * onEvent handler.
  4  *
  5  * Events are retreived from epoll in batch and offered to a BlockingQueue
  6  * where they are consumed by handler threads. A special "NEED_TO_POLL"
  7  * event is used to signal one consumer to re-poll when all events have
  8  * been consumed.
  9  */
 10 //线程池中运行的就是EventHandlerTask
 11 private class EventHandlerTask implements Runnable {
 12   private Event poll() throws IOException {
 13     try {
 14       for (;;) {
 15             //等待事件就绪
 16         int n = epollWait(epfd, address, MAX_EPOLL_EVENTS);
 17         /*
 18          * 'n' events have been read. Here we map them to their
 19          * corresponding channel in batch and queue n-1 so that
 20          * they can be handled by other handler threads. The last
 21          * event is handled by this thread (and so is not queued).
 22          */
 23         fdToChannelLock.readLock().lock();
 24         try {
 25           while (n-- > 0) {
 26             long eventAddress = getEvent(address, n);
 27             int fd = getDescriptor(eventAddress);
 28 
 29             // wakeup
 30             //用于wakeup,sp[0]是fileDescriptor,sp[1]相当于一个socket,当需要唤醒epollWait时就向sp[1]写入数据。
 31             //唤醒有两个作用,1为了关闭socket时,shutdown线程池,
 32             //2为了充分利用正在epollWait的线程,唤醒它去执行任务,也避免了单线程下没有可执行线程的问题
 33             if (fd == sp[0]) {
 34               if (wakeupCount.decrementAndGet() == 0) {
 35                 // no more wakeups so drain pipe
 36                 drain1(sp[0]);
 37               }
 38 
 39               // queue special event if there are more events
 40               // to handle.
 41               //如果不是单纯的wakeup,而是因为有事件发生,则入队EXECUTE_TASK_OR_SHUTDOWN,
 42               //这样可以让阻塞在queue.take处的线程尽快执行handlerTask
 43               if (n > 0) {
 44                 queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
 45                 continue;
 46               }
 47               //如果只是单纯的被wakeup,则由该线程执行handlerTask,并将NEED_TO_POLL入队,表示需要有线程来epoll
 48               return EXECUTE_TASK_OR_SHUTDOWN;
 49           }
 50 
 51           PollableChannel channel = fdToChannel.get(fd);
 52           if (channel != null) {
 53               int events = getEvents(eventAddress);
 54               Event ev = new Event(channel, events);
 55 
 56               // n-1 events are queued; This thread handles
 57               // the last one except for the wakeup
 58               if (n > 0) {
 59                   queue.offer(ev);
 60               } else {//由当前线程处理最后一个事件,避免单线程下没有可执行线程,多线程下也可充分利用线程
 61                   return ev;
 62               }
 63             }
 64           }//while
 65         } finally {
 66           fdToChannelLock.readLock().unlock();
 67         }
 68       }
 69     } finally {
 70       // to ensure that some thread will poll when all events have
 71       // been consumed
 72       queue.offer(NEED_TO_POLL);
 73     }
 74   }
 75 
 76     public void run() {
 77       Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
 78           Invoker.getGroupAndInvokeCount();
 79       final boolean isPooledThread = (myGroupAndInvokeCount != null);
 80       boolean replaceMe = false;
 81       Event ev;
 82       try {
 83         for (;;) {
 84           // reset invoke count
 85           if (isPooledThread)
 86             myGroupAndInvokeCount.resetInvokeCount();
 87 
 88           try {
 89               replaceMe = false;
 90               //线程池中多数进程会阻塞在这里,等待事件的到来
 91               ev = queue.take();
 92 
 93               // no events and this thread has been "selected" to
 94               // poll for more.
 95               //从队列中得到NEED_TO_POLL元素的线程,会去epoll
 96               if (ev == NEED_TO_POLL) {
 97               try {
 98                 ev = poll();
 99               } catch (IOException x) {
100                 x.printStackTrace();
101                 return;
102               }
103               }
104           } catch (InterruptedException x) {
105             continue;
106           }
107 
108           // handle wakeup to execute task or shutdown
109           //从队列中(也可能是直接返回的)获取到EXECUTE_TASK_OR_SHUTDOWN元素的线程去执行handlerTask,
110           //handlerTask为null意味着关闭socket,要shutdown线程池
111           if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
112             Runnable task = pollTask();
113             if (task == null) {
114                 // shutdown request
115                 return;
116             }
117             // run task (may throw error/exception)
118             replaceMe = true;
119             task.run();
120             continue;
121           }
122 
123           // process event
124           try {
125                 //多数情况会执行这里,即epoll到事件(非wakeup)需要处理,
126                 //以读事件为例,处理逻辑大致就是读数据到ByteBuffer,然后回调CompletionHandler,
127                 //我们在CompletionHandler中只需要处理已经读到数据的ByteBuffer即可。
128             ev.channel().onEvent(ev.events(), isPooledThread);
129           } catch (Error x) {
130             replaceMe = true; throw x;
131           } catch (RuntimeException x) {
132             replaceMe = true; throw x;
133           }
134         }
135       } finally {
136         // last handler to exit when shutdown releases resources
137         int remaining = threadExit(this, replaceMe);
138         if (remaining == 0 && isShutdown()) {
139           implClose();
140         }
141       }
142     }
143 }

是不是注册一次读事件,之后每当可读时CompletionHandler都会被回调呢?不是的,java的aio框架中限定了注册一次事件,也只监听一次事件,这是通过EPOLLONESHOT限制的。

 1 // invoke by clients to register a file descriptor
 2 @Override
 3 void startPoll(int fd, int events) {
 4   
 5   //EPOLLONESHOT只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把事件加入到EPOLL里。
 6   //EPOLLIN触发该事件,表示对应的文件描述符上有可读数据。(包括对端SOCKET正常关闭);
 7   //EPOLLOUT触发该事件,表示对应的文件描述符上可以写数据;
 8   // update events (or add to epoll on first usage)
 9   int err = epollCtl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));
10   if (err == ENOENT)
11     err = epollCtl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));
12   if (err != 0)
13     throw new AssertionError();     // should not happen
14 }

其实nio也同样使用了epoll,只不过nio不自带线程池框架,并且select的返回只意味着事件的就绪,而aio框架中CompletionHandler的回调意味着事件的完成(比如read中得到的ByteBuffer已经被填好了数据,这也是线程池的任务)。个人感觉,aio适合简单处理,nio更适合复杂处理。

原文地址:https://www.cnblogs.com/holoyong/p/7353716.html