NIO之路4--MINA框架源码解析

MINA框架是基于NIO的异步IO框架,上一文已经对MINA的理论及实践做了分析,本文将对于MINA的整体源码实现进行分析。

通过MINA的实际案例可以发现,MINA的IO实现相比于NIO的使用要简单很多,因为不需要关心IO的具体实现,只需要关心具体的IO数据即可。MINA服务端整体步骤一共就四步:

1、创建IoService:初始化IoService,服务端就是创建IoAcceptor对象,客户端就是创建IoConnector对象

2、添加IoFilter:添加IO过滤器,每个IoService内部都有一个IO过滤器链IoFIlterChain,调用addBefore或addLast等方法将IO过滤器添加到过滤器链上

3、设置IoHandler:给IoService设置IO数据处理器IoHandler对象,用来处理器具体的IO业务数据

4、绑定监听端口号:调用IoService的bind方法监听服务端需要监听的端口号,并开启Selector监听客户端连接

一、初始化IoService

服务器创建IoService是直接创建IoService的子接口IoAcceptor,实现类为NioSocketAcceptor,实例化代码如下:

1     /** 创建默认实例*/
2     IoAcceptor acceptor1 = new NioSocketAcceptor();
3 
4     /** 创建指定数量IoProcessor的实例*/
5     IoAcceptor acceptor2 = new NioSocketAcceptor(2);
6 
7     /** 创建指定线程池的实例*/
8     Executor executor = Executors.newFixedThreadPool(4);
9     IoAcceptor acceptor3 = new NioSocketAcceptor(executor, new SimpleIoProcessorPool<>(NioProcessor.class));

虽然NioSocketAcceptor是实现了IoAcceptor接口,但是并不是直接实现的,而是继承了抽象的实现了IoAcceptor接口的父类AbstractPollingIoAcceptor,按默认的构造方法为例,初始化代码如下:

  1 /** 无参构造函数*/
  2     public NioSocketAcceptor() {
  3         /** 调用父类构造函数*/
  4         super(new DefaultSocketSessionConfig(), NioProcessor.class);
  5         ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
  6     }
  7 
  8     /** AbstractPollingIoAcceptor构造函数
  9      * @param sessionConfig:IoSession的全局配置
 10      * @param processorClass:IoProcessor的Class
 11      * */
 12     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
 13         /** 调用重载构造函数*/
 14         this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null);
 15     }
 16 
 17     /** AbstractPollingIoAcceptor构造函数
 18      * @param sessionConfig:IoSession的全局配置
 19      * @param executor:线程池
 20      * @param processor:IoProcessor对象
 21      * @param createdProcessor : 是否创建Processor
 22      * @param selectorProvider : SelectorProvider对象,用于创建Selector
 23      * */
 24     private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
 25                                       boolean createdProcessor, SelectorProvider selectorProvider) {
 26         /** 1.调用父亲AbstractIoAcceptor的构造函数*/
 27         super(sessionConfig, executor);
 28 
 29         if (processor == null) {
 30             throw new IllegalArgumentException("processor");
 31         }
 32         /** 2.设置属性processor、createdProcessor值*/
 33         this.processor = processor;
 34         this.createdProcessor = createdProcessor;
 35 
 36         try {
 37             /** 3.根据SelectorProvider对象初始化Selector,和NIO的根据SelectorProvider获取Selector对象逻辑一样 */
 38             init(selectorProvider);
 39 
 40             //标记当前IoAcceptor的Selector已经创建完成,可以接收客户端的连接请求了
 41             selectable = true;
 42         } catch (RuntimeException e) {
 43             throw e;
 44         } catch (Exception e) {
 45             throw new RuntimeIoException("Failed to initialize.", e);
 46         } finally {
 47             if (!selectable) {
 48                 try {
 49                     destroy();
 50                 } catch (Exception e) {
 51                     ExceptionMonitor.getInstance().exceptionCaught(e);
 52                 }
 53             }
 54         }
 55     }
 56 
 57     /** AbstractIoAcceptor构造函数*/
 58     protected AbstractIoAcceptor(IoSessionConfig sessionConfig, Executor executor) {
 59         /** 调用父类AbstractIoService构造函数 */
 60         super(sessionConfig, executor);
 61         defaultLocalAddresses.add(null);
 62     }
 63 
 64     /** AbstractIoService构造函数 */
 65     protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
 66         /** 参数校验*/
 67         if (sessionConfig == null) {
 68             throw new IllegalArgumentException("sessionConfig");
 69         }
 70 
 71         if (getTransportMetadata() == null) {
 72             throw new IllegalArgumentException("TransportMetadata");
 73         }
 74 
 75         if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())) {
 76             throw new IllegalArgumentException("sessionConfig type: " + sessionConfig.getClass() + " (expected: "
 77                     + getTransportMetadata().getSessionConfigType() + ")");
 78         }
 79 
 80         /** 创建IoServiceListenerSupport对象,IoServiceListenerSupport内部有一个IoServiceListener的列表
 81          *  将当前的IoServerListener对象添加到IoServiceListenerSupport的列表中
 82          *  */
 83         listeners = new IoServiceListenerSupport(this);
 84         listeners.add(serviceActivationListener);
 85 
 86         /** 设置属性 sessionConfig*/
 87         this.sessionConfig = sessionConfig;
 88 
 89         //加载异常监听器ExceptionMonitor对象,提前加载防止在使用的时候还没有初始化
 90         ExceptionMonitor.getInstance();
 91         /** 设置线程池,如果没有自定义,就采用默认的newCachedThreadPool*/
 92         if (executor == null) {
 93             this.executor = Executors.newCachedThreadPool();
 94             createdExecutor = true;
 95         } else {
 96             this.executor = executor;
 97             createdExecutor = false;
 98         }
 99 
100         threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
101     }

整个的初始化过程都是在给IoAcceptor的一些属性进行初始化,核心属性包括sessionConfig、executor、IoProcessor等

其中SessionConfig表示IoSession的全局属性配置,主要配置如下:

  1 public interface IoSessionConfig {
  2 
  3         /**
  4          * 获取IoProcessor读取数据的缓冲区大小
  5          */
  6         int getReadBufferSize();
  7 
  8         /**
  9          * 设置IoProcessor读取数据的缓冲区大小,通常会由IoProcessor自动动态调整
 10          */
 11         void setReadBufferSize(int readBufferSize);
 12 
 13         /**
 14          * 获取IoProcessor读取属性的缓冲区大小的最小值
 15          */
 16         int getMinReadBufferSize();
 17 
 18         /**
 19          * 设置IoProcessor读取属性的缓冲区大小的最小值
 20          */
 21         void setMinReadBufferSize(int minReadBufferSize);
 22 
 23         /**
 24          * 获取IoProcessor读取属性的缓冲区大小的最大值
 25          */
 26         int getMaxReadBufferSize();
 27 
 28         /**
 29          * 设置IoProcessor读取属性的缓冲区大小的最大值
 30          */
 31         void setMaxReadBufferSize(int maxReadBufferSize);
 32 
 33         /**
 34          * 获取吞吐量(TPS)的统计间隔,单位为秒,默认是3秒
 35          */
 36         int getThroughputCalculationInterval();
 37 
 38         /**
 39          * 获取吞吐量(TPS)的统计间隔,单位为毫秒
 40          */
 41         long getThroughputCalculationIntervalInMillis();
 42 
 43         /**
 44          * 设置吞吐量的统计间隔时间
 45          */
 46         void setThroughputCalculationInterval(int throughputCalculationInterval);
 47 
 48         /**
 49          * 获取指定状态的空闲时间,不同状态时间可能设置的不一样, 类型如下:
 50          * READER_IDLE : 读数据空闲
 51          * WRITER_IDLE : 写数据空闲
 52          * BOTH_IDLE : 读写都空闲
 53          */
 54         int getIdleTime(IdleStatus status);
 55 
 56         /**
 57          * 获取指定状态的空闲时间,单位为毫秒
 58          */
 59         long getIdleTimeInMillis(IdleStatus status);
 60 
 61         /**
 62          * 设置指定状态的空闲时间
 63          */
 64         void setIdleTime(IdleStatus status, int idleTime);
 65 
 66         /**
 67          * 获取读空闲时间,单位秒
 68          */
 69         int getReaderIdleTime();
 70 
 71         /**
 72          * 获取读空闲时间,单位毫秒
 73          */
 74         long getReaderIdleTimeInMillis();
 75 
 76         /**
 77          * 设置读空闲时间
 78          */
 79         void setReaderIdleTime(int idleTime);
 80 
 81         /**
 82          * 获取写空闲时间,单位秒
 83          */
 84         int getWriterIdleTime();
 85 
 86         /**
 87          * 获取写空闲时间,单位毫秒
 88          */
 89         long getWriterIdleTimeInMillis();
 90 
 91         /**
 92          * 设置写空闲时间
 93          */
 94         void setWriterIdleTime(int idleTime);
 95 
 96         /**
 97          * 获取读写都空闲时间,单位秒
 98          */
 99         int getBothIdleTime();
100 
101         /**
102          * 获取读写都空闲时间,单位毫秒
103          */
104         long getBothIdleTimeInMillis();
105 
106         /**
107          * 设置读写都空闲时间
108          */
109         void setBothIdleTime(int idleTime);
110 
111         /**
112          * 获取写超时时间,单位为秒
113          */
114         int getWriteTimeout();
115 
116         /**
117          * 获取写超时时间,单位毫秒
118          */
119         long getWriteTimeoutInMillis();
120 
121         /**
122          * 设置写超时时间
123          */
124         void setWriteTimeout(int writeTimeout);
125 
126         /**
127          * 获取会话读操作是否开启
128          */
129         boolean isUseReadOperation();
130 
131         /**
132          * 开启或关闭会话读操作,如果开启所有接收到的消息会存储在内存的BlockingQueue中,使客户端应用可以更方便读取接收的消息
133          * 开启这个选项对服务器应用无效,并可能会导致内存泄漏,默认为关闭状态
134          */
135         void setUseReadOperation(boolean useReadOperation);
136 
137         /** 全量设置为IoSessionConfig */
138         void setAll(IoSessionConfig config);
139     }

另外一个属性是线程池对象executor,如果没有自定义线程池传入的话,那么默认会调用Executors.newCachedThreadPool()创建无线程数上限的线程池,所以推荐使用自定义的线程池,避免默认的线程池没有线程数量限制导致线程过多的问题。

还有一个属性是IoProcessor池,默认类为SimpleIoProcessorPool,SimpleIoProcessorPool内部有一个IoProcessor[] pool和一个Executor executor属性,pool是IoProcessor数组,存储所有的IoProcessor,数量可以自定义通过构造函数传入,默认的个数为当前服务器机器的CPU个数+1,比如4核CPU那么就会创建5个IoProcessor,另外每一个IoProcessor初始化的时候都会设置线程池executor属性,如果executor没有自定义同样也会使用Executors.newCachedThreadPool创建。

总结:

IoService初始化的时候涉及到了两个线程池,首先是IoService本身使用的线程池,IoService本身用于接收客户端的连接请求,而连接请求的处理就交给了线程池处理;

另外IoService初始化时会创建多个IoProcessor用于处理客户端具体的IO操作,每一个IoProcessor内部有一个Selector用于监听客户端IO事件,然后将IO事件交给内部的线程池来处理

二、添加IoFilter

IoService内部都一个过滤器链IoFilterChainBuilder对象,默认实现类为DefaultIoFilterChainBuilder类,添加IoFilter时主要有四个方法,分别是在过滤器链的不同位置插入指定过滤器,用法分别如下:

 1         IoAcceptor acceptor = new NioSocketAcceptor();
 2         /** 获取过滤器链 */
 3         DefaultIoFilterChainBuilder ioFilterChain = acceptor.getFilterChain();
 4 
 5         /** 1.在过滤器链的首部加入过滤器 */
 6         ioFilterChain.addFirst("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
 7         /** 2.在过滤器链的指定过滤器前面加入过滤器 */
 8         ioFilterChain.addBefore("logFilter","codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
 9         /** 3.在过滤器链的指定过滤器后面加入过滤器 */
10         ioFilterChain.addAfter("logFilter","codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
11         /** 4.在过滤器链的尾部加入过滤器 */
12         ioFilterChain.addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));

以addFirst为例,实现逻辑如下:

 1 /** 在过滤器链首部插入过滤器
 2      * @param name:过滤器的名称
 3      * @param filter: 过滤器
 4      * */
 5     public synchronized void addFirst(String name, IoFilter filter) {
 6         /** 构造过滤器链路节点对象EntryImpl,并调用register方法加入到列表中 */
 7         register(0, new EntryImpl(name, filter));
 8     }
 9 
10 
11     private final List<Entry> entries;
12 
13     public DefaultIoFilterChainBuilder() {
14         entries = new CopyOnWriteArrayList<Entry>();
15     }
16 
17     /**
18      * 在指定位置插入Entry节点
19      * */
20     private void register(int index, Entry e) {
21         if (contains(e.getName())) {
22             throw new IllegalArgumentException("Other filter is using the same name: " + e.getName());
23         }
24         //调用List的add方法在指定为止插入节点
25         entries.add(index, e);
26     }

从源码可以看出添加IoFilter的逻辑比较简单,过滤器链IoFilterChainBuilder对象内部有一个列表,用于存放所有的IoFilter,添加过滤器就是将IoFilter和名称封装成Entry对象添加到列表中,不同的add方法实际就是调用List的add(int index, Entry e)方法在指定的位置插入元素。addFirst就是在list头部插入元素,addLast就是在list尾部插入元素,addBefore和addAfter先通过遍历查找指定过滤器在List中的位置,然后再将新插入的过滤器插入到指定位置

三、设置IoHandler

 1 public final void setHandler(IoHandler handler) {
 2         if (handler == null) {
 3             throw new IllegalArgumentException("handler cannot be null");
 4         }
 5         /** 判断IoService是否活跃,主要是判断IoServiceListenerSupport是否有活跃的IoServiceListener */
 6         if (isActive()) {
 7             throw new IllegalStateException("handler cannot be set while the service is active.");
 8         }
 9         /** 设置IoHandler属性*/
10         this.handler = handler;
11     }

设置IoHandler的逻辑比较简单,就是给IoService内部的handler属性赋值

四、绑定主机

前面三个步骤都是在初始化服务器并设置各种属性,接下来就是绑定监听的逻辑,源码如下:

 1 public final void bind(SocketAddress localAddress) throws IOException {
 2         if (localAddress == null) {
 3             throw new IllegalArgumentException("localAddress");
 4         }
 5         /** 创建地址列表,将传入的SocketAddress添加到列表中*/
 6         List<SocketAddress> localAddresses = new ArrayList<>(1);
 7         localAddresses.add(localAddress);
 8         /** 内部方法执行绑定逻辑*/
 9         bind(localAddresses);
10     }

调用内部重载的bind方法,源码如下:

 1 public final void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException {
 2         /** 参数校验*/
 3         if (isDisposing()) {
 4             throw new IllegalStateException("The Accpetor disposed is being disposed.");
 5         }
 6 
 7         if (localAddresses == null) {
 8             throw new IllegalArgumentException("localAddresses");
 9         }
10 
11         List<SocketAddress> localAddressesCopy = new ArrayList<>();
12 
13         for (SocketAddress a : localAddresses) {
14             checkAddressType(a);
15             localAddressesCopy.add(a);
16         }
17 
18         if (localAddressesCopy.isEmpty()) {
19             throw new IllegalArgumentException("localAddresses is empty.");
20         }
21 
22         boolean activate = false;
23         synchronized (bindLock) {
24             synchronized (boundAddresses) {
25                 if (boundAddresses.isEmpty()) {
26                     activate = true;
27                 }
28             }
29             /** 判断IoHandler是否为空*/
30             if (getHandler() == null) {
31                 throw new IllegalStateException("handler is not set.");
32             }
33 
34             try {
35                 /**  绑定主机地址 */
36                 Set<SocketAddress> addresses = bindInternal(localAddressesCopy);
37                 synchronized (boundAddresses) {
38                     boundAddresses.addAll(addresses);
39                 }
40             } catch (IOException e) {
41                 throw e;
42             } catch (RuntimeException e) {
43                 throw e;
44             } catch (Exception e) {
45                 throw new RuntimeIoException("Failed to bind to: " + getLocalAddresses(), e);
46             }
47         }
48         /** 激活IoServiceListener */
49         if (activate) {
50             getListeners().fireServiceActivated();
51         }
52     }

虽然代码比较多,但是核心代码就只有 bindInternal这一行, bindInternal是真正的绑定主机的方法,代码如下:

 1 /** 绑定主机*/
 2     protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
 3         /** 创建一个Future对象,当IoAcceptor的Selector处理了该请求后会给Future对象发送一个信号 */
 4         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
 5 
 6         /** 添加到AcceptorOperationFuture队列中*/
 7         registerQueue.add(request);
 8 
 9         /** 开启Acceptor*/
10         startupAcceptor();
11 
12         try {
13             /** 信号量设置为1,相当于加锁处理*/
14             lock.acquire();
15 
16             wakeup();
17         } finally {
18             /** 信号量释放, 相当于解锁处理*/
19             lock.release();
20         }
21 
22         // Now, we wait until this request is completed.
23         request.awaitUninterruptibly();
24 
25         if (request.getException() != null) {
26             throw request.getException();
27         }
28         Set<SocketAddress> newLocalAddresses = new HashSet<>();
29 
30         for (H handle : boundHandles.values()) {
31             newLocalAddresses.add(localAddress(handle));
32         }
33 
34         return newLocalAddresses;
35     }

核心逻辑是调用了startupAcceptor方法,代码如下:

 1 private void startupAcceptor() throws InterruptedException {
 2         if (!selectable) {
 3             registerQueue.clear();
 4             cancelQueue.clear();
 5         }
 6 
 7         /** 创建Acceptor对象, Acceptor实现了Runnable接口*/
 8         Acceptor acceptor = acceptorRef.get();
 9 
10         if (acceptor == null) {
11             lock.acquire();
12             acceptor = new Acceptor();
13 
14             if (acceptorRef.compareAndSet(null, acceptor)) {
15                 /** 将实现了Runnable接口的acceptor对象放入线程池中执行*/
16                 executeWorker(acceptor);
17             } else {
18                 lock.release();
19             }
20         }
21     }
 1 protected final void executeWorker(Runnable worker) {
 2         executeWorker(worker, null);
 3     }
 4 
 5     protected final void executeWorker(Runnable worker, String suffix) {
 6         String actualThreadName = threadName;
 7         if (suffix != null) {
 8             actualThreadName = actualThreadName + '-' + suffix;
 9         }
10         executor.execute(new NamePreservingRunnable(worker, actualThreadName));
11     }

该方法的核心逻辑是创建一个Acceptor对象,Acceptor对象是实现了Runnable接口的,所以是一个可执行逻辑。然后调用executeWorker方法将Acceptor对象交给线程池执行,而线程池就是初始化IoService时初始化的线程池,默认是Executors.newCachedThreadPool

既然Acceptor实现了Runnable接口,那么就再看下Acceptor的具体实现逻辑:

  1 private class Acceptor implements Runnable {
  2         public void run() {
  3             assert (acceptorRef.get() == this);
  4 
  5             int nHandles = 0;
  6 
  7             /** 释放锁*/
  8             lock.release();
  9 
 10             while (selectable) {
 11                 try {
 12                     /** 获取注册的监听端口的数量*/
 13                     nHandles += registerHandles();
 14 
 15                     /** 调用Selector的select()方法,会阻塞当前线程 */
 16                     int selected = select();
 17 
 18                     // Now, if the number of registred handles is 0, we can
 19                     // quit the loop: we don't have any socket listening
 20                     // for incoming connection.
 21                     if (nHandles == 0) {
 22                         acceptorRef.set(null);
 23 
 24                         if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
 25                             assert (acceptorRef.get() != this);
 26                             break;
 27                         }
 28 
 29                         if (!acceptorRef.compareAndSet(null, this)) {
 30                             assert (acceptorRef.get() != this);
 31                             break;
 32                         }
 33 
 34                         assert (acceptorRef.get() == this);
 35                     }
 36 
 37                     if (selected > 0) {
 38                         /** 处理Selector返回的所有的SelectionKey */
 39                         processHandles(selectedHandles());
 40                     }
 41 
 42                     // check to see if any cancellation request has been made.
 43                     nHandles -= unregisterHandles();
 44                 } catch (ClosedSelectorException cse) {
 45                     // If the selector has been closed, we can exit the loop
 46                     ExceptionMonitor.getInstance().exceptionCaught(cse);
 47                     break;
 48                 } catch (Exception e) {
 49                     ExceptionMonitor.getInstance().exceptionCaught(e);
 50 
 51                     try {
 52                         Thread.sleep(1000);
 53                     } catch (InterruptedException e1) {
 54                         ExceptionMonitor.getInstance().exceptionCaught(e1);
 55                     }
 56                 }
 57             }
 58 
 59             /** 当销毁IoService时, 销毁所有的processor*/
 60             if (selectable && isDisposing()) {
 61                 selectable = false;
 62                 try {
 63                     if (createdProcessor) {
 64                         processor.dispose();
 65                     }
 66                 } finally {
 67                     try {
 68                         synchronized (disposalLock) {
 69                             if (isDisposing()) {
 70                                 destroy();
 71                             }
 72                         }
 73                     } catch (Exception e) {
 74                         ExceptionMonitor.getInstance().exceptionCaught(e);
 75                     } finally {
 76                         disposalFuture.setDone();
 77                     }
 78                 }
 79             }
 80         }
 81 
 82         /**
 83          * This method will process new sessions for the Worker class.  All
 84          * keys that have had their status updates as per the Selector.selectedKeys()
 85          * method will be processed here.  Only keys that are ready to accept
 86          * connections are handled here.
 87          * <p/>
 88          * Session objects are created by making new instances of SocketSessionImpl
 89          * and passing the session object to the SocketIoProcessor class.
 90          */
 91         @SuppressWarnings("unchecked")
 92         private void processHandles(Iterator<H> handles) throws Exception {
 93             /** 遍历所有SelectionKey*/
 94             while (handles.hasNext()) {
 95                 H handle = handles.next();
 96                 handles.remove();
 97 
 98                 /** 处理客户端连接请求,封装成NioSocketSession对象*/
 99                 S session = accept(processor, handle);
100 
101                 if (session == null) {
102                     continue;
103                 }
104                 /** 初始化客户端Session对象, 设置上一次读写时间 */
105                 initSession(session, null, null);
106 
107                 /** 将客户端Session添加到IoProcessor线程池,并分配一个IoProcessor和Session进行绑定 */
108                 session.getProcessor().add(session);
109             }
110         }
111     }
112 
113     protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
114 
115         SelectionKey key = null;
116 
117         if (handle != null) {
118             key = handle.keyFor(selector);
119         }
120         /** key有效且必须触发了OP_ACCEPT事件,其他事件不处理*/
121         if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
122             return null;
123         }
124 
125         /** 获取客户端连接的Channel*/
126         SocketChannel ch = handle.accept();
127 
128         if (ch == null) {
129             return null;
130         }
131         /** 封装客户端连接为NioSocketSession对象 */
132         return new NioSocketSession(this, processor, ch);
133     }
134 
135     public NioSocketSession(IoService service, IoProcessor<NioSession> processor, SocketChannel channel) {
136         /** 调用父类构造函数 设置IoProcessor、IoService、SocketChannel属性*/
137         super(processor, service, channel);
138         config = new SessionConfigImpl();
139         /** 将IoService的全局SessionConfig复制给当前session的配置*/
140         config.setAll(service.getSessionConfig());
141     }

从代码中可以看出IoAcceptor的run方法基本上和NIO的服务器启动方法差不多,首先了调用Selector的select()方法获取所有客户端的请求连接accept事件,然后遍历所有的SelectionKey,将客户端的连接封装成NioSocketSession,最后再将NioSocketSession添加到IoProcessor线程池中,而绑定到具体的IoProcessor的逻辑是在IoProcessor的add方法中实现的,这里IoProcessor线程池是通过SimpleIoProcessorPool类实现的,源码如下:

 1 /** 添加SocketSession*/
 2     public final void add(S session) {
 3         /** 根据session获取具体的IoProcessor,并将session添加到IoProcessor中*/
 4         getProcessor(session).add(session);
 5     }
 6 
 7     private IoProcessor<S> getProcessor(S session) {
 8         IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);
 9 
10         if (processor == null) {
11             if (disposed || disposing) {
12                 throw new IllegalStateException("A disposed processor cannot be accessed.");
13             }
14             /** 根据session的ID通过取模算法从数组中获取对应的IoProcessor*/
15             processor = pool[Math.abs((int) session.getId()) % pool.length];
16 
17             if (processor == null) {
18                 throw new IllegalStateException("A disposed processor cannot be accessed.");
19             }
20             /** 给session设置IoProcessor属性*/
21             session.setAttributeIfAbsent(PROCESSOR, processor);
22         }
23 
24         return processor;
25     }

每一个SocketSession会绑定一个IoProcessor,并会缓存在session的attribute中,而获取IoProcessor的方式则是通过取模算法从IoProcessor数组中获取。

然后调用IoProcessor的add方法将session添加到IoProcessor中,处理IO数据的IoProcessor实现类为NioProcessor,实现的add方法源码如下:

 1 /** 新创建的session队列*/
 2     private final Queue<S> newSessions = new ConcurrentLinkedQueue<>();
 3 
 4     /** 需要移除的session队列 */
 5     private final Queue<S> removingSessions = new ConcurrentLinkedQueue<>();
 6 
 7     /** 需要刷新的session队列*/
 8     private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<>();
 9 
10     public final void add(S session) {
11         if (disposed || disposing) {
12             throw new IllegalStateException("Already disposed.");
13         }
14 
15         /** 将新的session加入到队列中*/
16         newSessions.add(session);
17         /** 开启IoProcessor*/
18         startupProcessor();
19     }
20 
21     private void startupProcessor() {
22         /** 原子引用类型,如果不存在就新建Processor*/
23         Processor processor = processorRef.get();
24 
25         if (processor == null) {
26             processor = new Processor();
27 
28             if (processorRef.compareAndSet(null, processor)) {
29                 /** 将可执行的Processor添加到线程池中执行 */
30                 executor.execute(new NamePreservingRunnable(processor, threadName));
31             }
32         }
33 
34         /** 唤醒Selector, 防止一直被阻塞*/
35         wakeup();
36     }

这里将session添加到队列中,然后开启IoProcessor线程,过程和IoSession类型,都是封装成一个NamePreservingRunnable对象交给线程池去执行,而这里的线程池和IoService的线程池是独立的,没给IoProcessor都有一个自己的线程池

Processor具体的执行逻辑如下:

  1 /** Processor线程,用于处理IO读写操作*/
  2     private class Processor implements Runnable {
  3         public void run() {
  4             assert (processorRef.get() == this);
  5             //会话数量
  6             int nSessions = 0;
  7             lastIdleCheckTime = System.currentTimeMillis();
  8             /** 重试次数*/
  9             int nbTries = 10;
 10 
 11             for (;;) {
 12                 try {
 13                     long t0 = System.currentTimeMillis();
 14                     int selected = select(SELECT_TIMEOUT);
 15                     long t1 = System.currentTimeMillis();
 16                     /** 计算select()方法执行的时间*/
 17                     long delta = t1 - t0;
 18 
 19                     if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) {
 20                         // Last chance : the select() may have been
 21                         // interrupted because we have had an closed channel.
 22                         if (isBrokenConnection()) {
 23                             LOG.warn("Broken connection");
 24                         } else {
 25                             // Ok, we are hit by the nasty epoll
 26                             // spinning.
 27                             // Basically, there is a race condition
 28                             // which causes a closing file descriptor not to be
 29                             // considered as available as a selected channel,
 30                             // but
 31                             // it stopped the select. The next time we will
 32                             // call select(), it will exit immediately for the
 33                             // same
 34                             // reason, and do so forever, consuming 100%
 35                             // CPU.
 36                             // We have to destroy the selector, and
 37                             // register all the socket on a new one.
 38                             if (nbTries == 0) {
 39                                 LOG.warn("Create a new selector. Selected is 0, delta = " + delta);
 40                                 registerNewSelector();
 41                                 nbTries = 10;
 42                             } else {
 43                                 nbTries--;
 44                             }
 45                         }
 46                     } else {
 47                         nbTries = 10;
 48                     }
 49 
 50                     /** 处理新添加的Session
 51                      *  将session对应的channel注册到当前IoProcessor的Selector上,并监听OP_READ事件
 52                      * */
 53                     nSessions += handleNewSessions();
 54 
 55                     updateTrafficMask();
 56 
 57                     // Now, if we have had some incoming or outgoing events,
 58                     // deal with them
 59                     if (selected > 0) {
 60                         /** 当select()返回值大于0表示当前已经有IO事件需要处理,则执行process方法进行处理*/
 61                         process();
 62                     }
 63 
 64                     // Write the pending requests
 65                     long currentTime = System.currentTimeMillis();
 66                     flush(currentTime);
 67 
 68                     /**
 69                      * 移除所有需要移除的session,移除之前会将所有需要发送的数据发送给客户端
 70                      * */
 71                     nSessions -= removeSessions();
 72 
 73                     /** 更新所有session的读空闲、写空闲和读写空闲的时间值 */
 74                     notifyIdleSessions(currentTime);
 75 
 76                     // Get a chance to exit the infinite loop if there are no
 77                     // more sessions on this Processor
 78                     if (nSessions == 0) {
 79                         processorRef.set(null);
 80 
 81                         if (newSessions.isEmpty() && isSelectorEmpty()) {
 82                             // newSessions.add() precedes startupProcessor
 83                             assert (processorRef.get() != this);
 84                             break;
 85                         }
 86 
 87                         assert (processorRef.get() != this);
 88 
 89                         if (!processorRef.compareAndSet(null, this)) {
 90                             // startupProcessor won race, so must exit processor
 91                             assert (processorRef.get() != this);
 92                             break;
 93                         }
 94 
 95                         assert (processorRef.get() == this);
 96                     }
 97 
 98                     /** 当IoProcessor销毁了,则移除所有的客户端的SocketSession*/
 99                     if (isDisposing()) {
100                         boolean hasKeys = false;
101 
102                         for (Iterator<S> i = allSessions(); i.hasNext();) {
103                             IoSession session = i.next();
104 
105                             if (session.isActive()) {
106                                 scheduleRemove((S)session);
107                                 hasKeys = true;
108                             }
109                         }
110 
111                         if (hasKeys) {
112                             wakeup();
113                         }
114                     }
115                 } catch (ClosedSelectorException cse) {
116                     // If the selector has been closed, we can exit the loop
117                     // But first, dump a stack trace
118                     ExceptionMonitor.getInstance().exceptionCaught(cse);
119                     break;
120                 } catch (Exception e) {
121                     ExceptionMonitor.getInstance().exceptionCaught(e);
122 
123                     try {
124                         Thread.sleep(1000);
125                     } catch (InterruptedException e1) {
126                         ExceptionMonitor.getInstance().exceptionCaught(e1);
127                     }
128                 }
129             }
130 
131             try {
132                 synchronized (disposalLock) {
133                     if (disposing) {
134                         doDispose();
135                     }
136                 }
137             } catch (Exception e) {
138                 ExceptionMonitor.getInstance().exceptionCaught(e);
139             } finally {
140                 disposalFuture.setValue(true);
141             }
142         }
143     }

这里代码比较多,但是核心的不多,主要功能如下:

1、处理所有新加入的Session,将session对应的channel注册到Selector中,并监听OP_READ事件

2、处理所有的需要移除的session,将session从Selector中移除监听

3、调用Selector的select()方法获取IO事件,如果存在IO事件,则调用process()方法进行处理

4、更新所有session的读写空闲时间

而处理IO事件的逻辑全部在process方法中处理,源码如下:

 1 /** 处理IoProcessor的Selector监听返回的所有IO事件 */
 2     private void process() throws Exception {
 3         /** 遍历所有的SelectionKey*/
 4         for (Iterator<S> i = selectedSessions(); i.hasNext();) {
 5             S session = i.next();
 6             /** 处理单个客户端session的IO事件*/
 7             process(session);
 8             i.remove();
 9         }
10     }
11     /** 获取所有的SelectionKey*/
12     protected Iterator<NioSession> selectedSessions() {
13         return new IoSessionIterator(selector.selectedKeys());
14     }

process方法的逻辑不多,先是获取selector所有的IO事件SelectionKey集合,然后进行遍历所有的SelectionKey集合,调用process(session)方法处理每个Session的IO事件,源码如下:

 1 /***/
 2     private void process(S session) {
 3         /** 处理读事件 */
 4         if (isReadable(session) && !session.isReadSuspended()) {
 5             /** 处理session的读事件*/
 6             read(session);
 7         }
 8 
 9         /** 处理写事件 */
10         if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) {
11             /** 刷新session */
12             flushingSessions.add(session);
13         }
14     }
15 
16     private void read(S session) {
17         /** 获取配置的读缓冲区大小*/
18         IoSessionConfig config = session.getConfig();
19         int bufferSize = config.getReadBufferSize();
20         /** 分配指定大小的缓冲区*/
21         IoBuffer buf = IoBuffer.allocate(bufferSize);
22 
23         final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
24 
25         try {
26             int readBytes = 0;
27             int ret;
28 
29             try {
30                 if (hasFragmentation) {
31                     /** 调用read方法将session中的IO数据读取到缓冲区*/
32                     while ((ret = read(session, buf)) > 0) {
33                         readBytes += ret;
34 
35                         if (!buf.hasRemaining()) {
36                             break;
37                         }
38                     }
39                 } else {
40                     /** 调用read方法将session中的IO数据读取到缓冲区*/
41                     ret = read(session, buf);
42                     if (ret > 0) {
43                         readBytes = ret;
44                     }
45                 }
46             } finally {
47                 /** 从读模式切换到写模式 */
48                 buf.flip();
49             }
50 
51             if (readBytes > 0) {
52                 IoFilterChain filterChain = session.getFilterChain();
53                 /** 将读到的数据交给过滤器链进行过滤处理 */
54                 filterChain.fireMessageReceived(buf);
55                 buf = null;
56 
57                 if (hasFragmentation) {
58                     /** 判断读取到的数据是否小于缓冲区总大小的一半*/
59                     if (readBytes << 1 < config.getReadBufferSize()) {
60                         /** 缩小缓冲区大小 */
61                         session.decreaseReadBufferSize();
62                     } else if (readBytes == config.getReadBufferSize()) {
63                         /** 扩大缓冲区大小 */
64                         session.increaseReadBufferSize();
65                     }
66                 }
67             }
68 
69             if (ret < 0) {
70                 /** 过滤器链关闭 */
71                 IoFilterChain filterChain = session.getFilterChain();
72                 filterChain.fireInputClosed();
73             }
74         } catch (Exception e) {
75             if (e instanceof IOException) {
76                 if (!(e instanceof PortUnreachableException)
77                         || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
78                         || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
79                     scheduleRemove(session);
80                 }
81             }
82             /** 如果抛异常,则通知过滤器链执行异常事件*/
83             IoFilterChain filterChain = session.getFilterChain();
84             filterChain.fireExceptionCaught(e);
85         }
86     }

这里逻辑比较多,但是整体逻辑不复杂,主要逻辑如下:

1、如果是可读事件,那么就执行read方法进行IO数据的读取

2、根据配置的读缓冲区大小创建IoBuffer对象进行内存分配

3、调用read方法从session中读取数据存到IoBuffer中,读取完成将IoBuffer从读模式切换到写模式

4、将读取到的IoBuffer数据交给过滤器链进行所有IO过滤器进行过滤处理

5、将缓冲区大小进行扩容或降容处理

6、将连接关闭或连接异常的事件交给过滤器链进行对应的处理

 接下来就针对每一个步骤分别进行源码分析

1、分配缓冲区

 1 /** 内存分配工具 */
 2     private static IoBufferAllocator allocator = new SimpleBufferAllocator();
 3 
 4     /** 是否使用直接内存,默认为false表示使用堆内内存,值为true表示使用直接内存 */
 5     private static boolean useDirectBuffer = false;
 6 
 7     public static IoBuffer allocate(int capacity) {
 8         return allocate(capacity, useDirectBuffer);
 9     }
10 
11     public static IoBuffer allocate(int capacity, boolean useDirectBuffer) {
12         if (capacity < 0) {
13             throw new IllegalArgumentException("capacity: " + capacity);
14         }
15         /** 调用内存分配工具的allocate方法进行内存分配 */
16         return allocator.allocate(capacity, useDirectBuffer);
17     }

分配缓冲区一共有两个参数,分别是缓冲区的大小和是否使用直接内存,最终调用内存分配器的allocate方法进行内存分配,源码如下:

 1 /** 创建IoBuffer对象,分配内存 */
 2     public IoBuffer allocate(int capacity, boolean direct) {
 3         return wrap(allocateNioBuffer(capacity, direct));
 4     }
 5 
 6     /** 创建NIO的ByteBuffer对象*/
 7     public ByteBuffer allocateNioBuffer(int capacity, boolean direct) {
 8         ByteBuffer nioBuffer;
 9         if (direct) {
10             /** 分配直接内存*/
11             nioBuffer = ByteBuffer.allocateDirect(capacity);
12         } else {
13             /** 分配堆内内存*/
14             nioBuffer = ByteBuffer.allocate(capacity);
15         }
16         return nioBuffer;
17     }
18 
19     /** 将ByteBuffer对象包装成IoBuffer对象 */
20     public IoBuffer wrap(ByteBuffer nioBuffer) {
21         return new SimpleBuffer(nioBuffer);
22     }

这里就回到了Java的NIO分配内存的逻辑了,MINA分配缓冲区的逻辑实际底层就是调用了Java NIO的ByteBuffer的分配逻辑,根据直接内存还是堆内内存进行内存分配,最终分配了之后再封装成了IoBuffer对象。

2、IO数据读取

1 @Override
2     protected int read(NioSession session, IoBuffer buf) throws Exception {
3         /** 从session中获取channel*/
4         ByteChannel channel = session.getChannel();
5         /** 调用Java NIO的channel.read(ByteBuffer buffer)方法读取数据 */
6         return channel.read(buf.buf());
7     }

读取数据的逻辑也不复杂,同样是直接调用了Java NIO的channel读取Buffer数据的方式进行读取,具体的实现逻辑可以参考Java NIO的channel读取缓冲区数据的逻辑

3、过滤器链对IO数据进行过滤处理

 1 public void fireMessageReceived(Object message) {
 2         if (message instanceof IoBuffer) {
 3             session.increaseReadBytes(((IoBuffer) message).remaining(), System.currentTimeMillis());
 4         }
 5 
 6         callNextMessageReceived(head, session, message);
 7     }
 8 
 9     public final void increaseReadBytes(long increment, long currentTime) {
10         if (increment <= 0) {
11             return;
12         }
13         //统计已读字节数
14         readBytes += increment;
15         //设置上一次读操作时间
16         lastReadTime = currentTime;
17         //设置空闲为0
18         idleCountForBoth.set(0);
19         idleCountForRead.set(0);
20 
21         if (getService() instanceof AbstractIoService) {
22             ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
23         }
24     }
25 
26     /** 通知下一个节点处理接收消息事件 */
27     private void callNextMessageReceived(Entry entry, IoSession session, Object message) {
28         try {
29             IoFilter filter = entry.getFilter();
30             NextFilter nextFilter = entry.getNextFilter();
31             /** 依次执行每一个IoFilter的messageReceived方法 */
32             filter.messageReceived(nextFilter, session, message);
33         } catch (Exception e) {
34             fireExceptionCaught(e);
35         } catch (Error e) {
36             fireExceptionCaught(e);
37             throw e;
38         }
39     }

过滤器链路处理消息时,会从过滤器链路的头节点开始依次执行messageReceived方法,直到执行到最后一个过滤器,过滤器链路头节点和尾节点是固定不变的,头节点实现为HeadFilter,尾节点实现为TailFilter。

当处理读事件时,会从HeadFilter开始处理,然后按自定义的过滤器依次执行,最后执行TailFilter的处理;而处理写事件时,顺序完全相反,会从TailFilter开始到HeadFilter结束。所以读数据时最后会执行TailFilter的messageReceived方法,源码如下:

 1  @Override
 2     public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
 3         AbstractIoSession s = (AbstractIoSession) session;
 4 
 5         if (!(message instanceof IoBuffer)) {
 6             s.increaseReadMessages(System.currentTimeMillis());
 7         } else if (!((IoBuffer) message).hasRemaining()) {
 8             s.increaseReadMessages(System.currentTimeMillis());
 9         }
10 
11         // Update the statistics
12         if (session.getService() instanceof AbstractIoService) {
13             ((AbstractIoService) session.getService()).getStatistics().updateThroughput(System.currentTimeMillis());
14         }
15 
16         try {
17             /** 尾部过滤器将消息交给业务处理器IoHandler处理 */
18             session.getHandler().messageReceived(s, message);
19         } finally {
20             if (s.getConfig().isUseReadOperation()) {
21                 s.offerReadFuture(message);
22             }
23         }
24     }

可以看出TailFilter最重要的一个步骤是需要将发送的IO数据交给业务层处理,通过从IoSession中获取IoHandler对象,并调用IoHandler的messageReceived方法交给业务层处理IO数据

4、缓冲区扩容或降容处理

 1 /** 扩容读缓冲区大小为原先的两倍 */
 2     public final void increaseReadBufferSize() {
 3         /** 扩大两倍*/
 4         int newReadBufferSize = getConfig().getReadBufferSize() << 1;
 5         if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
 6             getConfig().setReadBufferSize(newReadBufferSize);
 7         } else {
 8             getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
 9         }
10 
11         deferDecreaseReadBuffer = true;
12     }
13 
14     /**
15      * 降容读缓冲区大小为原先的一半
16      */
17     public final void decreaseReadBufferSize() {
18         if (deferDecreaseReadBuffer) {
19             deferDecreaseReadBuffer = false;
20             return;
21         }
22 
23         if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
24             /** 缩小一半*/
25             getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
26         }
27 
28         deferDecreaseReadBuffer = true;
29     }

配置的读缓冲区大小和实际的IO数据大小可能会存在偏差,所以需要动态的调整读缓冲区的大小,避免缓冲区内存不足或内存浪费,每次扩容都会扩容到原大小的两倍,降容也会缩小到原先的一半

5、连接关闭或连接异常事件处理

 1 public void fireInputClosed() {
 2         /** 获取过滤器链头节点*/
 3         Entry head = this.head;
 4         callNextInputClosed(head, session);
 5     }
 6 
 7     /** 通知下一个节点处理IO输入关闭事件*/
 8     private void callNextInputClosed(Entry entry, IoSession session) {
 9         try {
10             IoFilter filter = entry.getFilter();
11             /** 获取下一个过滤器*/
12             NextFilter nextFilter = entry.getNextFilter();
13             /** 处理IO输入关闭事件*/
14             filter.inputClosed(nextFilter, session);
15         } catch (Throwable e) {
16             fireExceptionCaught(e);
17         }
18     }
19 
20     public void fireExceptionCaught(Throwable cause) {
21         callNextExceptionCaught(head, session, cause);
22     }
23 
24     private void callNextExceptionCaught(Entry entry, IoSession session, Throwable cause) {
25         /** 唤醒关联的Future*/
26         ConnectFuture future = (ConnectFuture) session.removeAttribute(SESSION_CREATED_FUTURE);
27         if (future == null) {
28             try {
29                 IoFilter filter = entry.getFilter();
30                 NextFilter nextFilter = entry.getNextFilter();
31                 /** 执行过滤的异常处理事件*/
32                 filter.exceptionCaught(nextFilter, session, cause);
33             } catch (Throwable e) {
34                 LOGGER.warn("Unexpected exception from exceptionCaught handler.", e);
35             }
36         } else {
37             /** 关闭session*/
38             if (!session.isClosing()) {
39                 // Call the closeNow method only if needed
40                 session.closeNow();
41             }
42 
43             future.setException(cause);
44         }
45     }

 处理逻辑基本上一样,都是先获取过滤器链的头节点,开始处理对应的事件,然后再依次调用下一个节点的过滤器的方法。另外最后一个过滤器TailFilter会将对应的事件会最终交给业务层处理,调用IoHandler的对应方法,让业务层处理事件。

总结MINA服务端的工作机制:

1、创建IoService,每个IoService内部有一个线程池用于处理客户端的连接请求,并且有一个IoProcessor池,默认数量为CPU个数+1,用于处理客户端的IO操作

2、IoService内部有一个Selector用于监听客户端的连接请求,连接成功之后会创建IoSession并交给IoProcessor池处理

3、IoProcessor池根据IoSession的ID进行取模算法选取一个IoProcessor和IoSession进行绑定,一个IoProcessor可以绑定多个IoSession,一个IoSession只可以绑定一个IoProcessor

4、IoProcessor内部也有一个Selector用来监控所有关联的IoSession的IO操作状态,并且有一个线程池用来处理IO操作

5、IoProcessor将处理的IO事件交给IoService绑定的过滤器链,过滤器链上的所有过滤器依次处理IO事件

7、过滤器链的最后一个过滤器为TailFilter,处理完IO事件之后将IO事件交给业务层处理器IoHandler

8、IoHandler处理业务数据,而写数据的话顺序完全相反,才业务层到过滤器链路,通过HeadFilter再由IoProcessor发送出去

原文地址:https://www.cnblogs.com/jackion5/p/13569501.html