Mina源码阅读笔记(四)—Mina的连接IoConnector2

接着Mina源码阅读笔记(四)—Mina的连接IoConnector1,,我们继续:

AbstractIoAcceptor:

001 package org.apache.mina.core.rewrite.service;
002  
003 import java.io.IOException;
004 import java.net.SocketAddress;
005 import java.util.ArrayList;
006 import java.util.Collections;
007 import java.util.HashSet;
008 import java.util.List;
009 import java.util.Set;
010 import java.util.concurrent.Executor;
011  
012 public abstract class AbstractIoAcceptor extends AbstractIoService implements
013         IoAcceptor {
014  
015     private final List<SocketAddress> defaultLocalAddresses = new ArrayList<SocketAddress>();
016  
017     private final List<SocketAddress> unmodifiableDeffaultLocalAddresses = Collections
018             .unmodifiableList(defaultLocalAddresses);
019  
020     private final Set<SocketAddress> boundAddresses = new HashSet<SocketAddress>();
021  
022     private boolean disconnectOnUnbind = true;
023  
024     /** 这里不是很明白,为什么要用protected 而 不是private */
025     protected final Object bindLock = new Object();
026  
027     /**
028      * 注意这个构造方法是一定要写的,否则编译不通过:抽象类继承时候,构造方法都要写,而且必须包含super
029      *
030      * @param param
031      *            sessionConfig
032      * @param executor
033      */
034     protected AbstractIoAcceptor(Object param, Executor executor) {
035         super(param, executor);
036         defaultLocalAddresses.add(null);
037     }
038  
039     @Override
040     public SocketAddress getLocalAddress() {
041  
042         Set<SocketAddress> localAddresses = getLocalAddresses();
043         if (localAddresses.isEmpty()) {
044             return null;
045         }
046         return localAddresses.iterator().next();
047     }
048  
049     @Override
050     public final Set<SocketAddress> getLocalAddresses() {
051         Set<SocketAddress> localAddresses = new HashSet<SocketAddress>();
052         synchronized (boundAddresses) {
053             localAddresses.addAll(boundAddresses);
054         }
055         return localAddresses;
056     }
057  
058     @Override
059     public void bind(SocketAddress localAddress) throws IOException {
060         // TODO Auto-generated method stub
061  
062     }
063  
064     @Override
065     public void bind(Iterable<? extends SocketAddress> localAddresses)
066             throws IOException {
067         // TODO isDisposing()
068  
069         if (localAddresses == null) {
070             throw new IllegalArgumentException("localAddresses");
071         }
072  
073         List<SocketAddress> localAddressesCopy = new ArrayList<SocketAddress>();
074  
075         for (SocketAddress a : localAddresses) {
076             // TODO check address type
077             localAddressesCopy.add(a);
078         }
079  
080         if (localAddressesCopy.isEmpty()) {
081             throw new IllegalArgumentException("localAddresses is empty");
082         }
083  
084         boolean active = false;
085  
086         synchronized (bindLock) {
087             synchronized (boundAddresses) {
088                 if (boundAddresses.isEmpty()) {
089                     active = true;
090                 }
091             }
092         }
093         /** implement in abstractIoService */
094         if (getHandler() == null) {
095             throw new IllegalArgumentException("handler is not set");
096         }
097  
098         try {
099             Set<SocketAddress> addresses = bindInternal(localAddressesCopy);
100  
101             synchronized (boundAddresses) {
102                 boundAddresses.addAll(addresses);
103             }
104         catch (IOException e) {
105             throw e;
106         catch (RuntimeException e) {
107             throw e;
108         catch (Throwable e) {
109             throw new RuntimeException("Filed ti bind");
110         }
111          
112         if(active){
113             //do sth
114         }
115     }
116  
117     protected abstract Set<SocketAddress> bindInternal(
118             List<? extends SocketAddress> localAddress) throws Exception;
119  
120     @Override
121     public void unbind(SocketAddress localAddress) {
122         // TODO Auto-generated method stub
123          
124     }
125 }
polling:
01 package org.apache.mina.core.rewrite.polling;
02  
03 import java.net.SocketAddress;
04 import java.nio.channels.ServerSocketChannel;
05 import java.util.List;
06 import java.util.Set;
07 import java.util.concurrent.Executor;
08 import java.util.concurrent.Semaphore;
09 import java.util.concurrent.atomic.AtomicReference;
10  
11 import org.apache.mina.core.rewrite.service.AbstractIoAcceptor;
12  
13 public abstract class AbstractPollingIoAcceptor extends AbstractIoAcceptor {
14  
15     private final Semaphore lock = new Semaphore(1);
16  
17     private volatile boolean selectable;
18  
19     private AtomicReference<Acceptor> acceptorRef = new AtomicReference<Acceptor>();
20  
21     /**
22      * define the num of sockets that can wait to be accepted.
23      */
24     protected int backlog = 50;
25  
26     /**
27      * 一样的,这个构造方法也要写
28      *
29      * @param param
30      * @param executor
31      */
32     protected AbstractPollingIoAcceptor(Object param, Executor executor) {
33         super(param, executor);
34         // TODO Auto-generated constructor stub
35     }
36  
37     /**
38      * init the polling system. will be called at construction time
39      *
40      * @throws Exception
41      */
42     protected abstract void init() throws Exception;
43  
44     protected abstract void destory() throws Exception;
45  
46     protected abstract int select() throws Exception;
47     /**这里有点儿变动*/
48     protected abstract ServerSocketChannel open(SocketAddress localAddress) throws Exception;
49  
50     @Override
51     protected Set<SocketAddress> bindInternal(
52             List<? extends SocketAddress> localAddress) throws Exception {
53         // ...
54         try {
55             lock.acquire();
56             Thread.sleep(10);
57         finally {
58             lock.release();
59         }
60         // ...
61         return null;
62     }
63  
64     /**
65      * this class is called by startupAcceptor() method it's a thread accepting
66      * incoming connections from client
67      *
68      * @author ChenHui
69      *
70      */
71     private class Acceptor implements Runnable {
72         @Override
73         public void run() {
74             assert (acceptorRef.get() == this);
75  
76             int nHandles = 0;
77  
78             lock.release();
79  
80             while (selectable) {
81                 try {
82                     int selected = select();
83  
84                     // nHandles+=registerHandles();
85  
86                     if (nHandles == 0) {
87                         acceptorRef.set(null);
88                         // ...
89                     }
90                 catch (Exception e) {
91  
92                 }
93             }
94         }
95     }
96 }
好了最后看NioSoeketAcceptor:
001 package org.apache.mina.rewrite.transport.socket.nio;
002  
003 import java.net.InetSocketAddress;
004 import java.net.ServerSocket;
005 import java.net.SocketAddress;
006 import java.nio.channels.SelectionKey;
007 import java.nio.channels.Selector;
008 import java.nio.channels.ServerSocketChannel;
009 import java.util.concurrent.Executor;
010  
011 import org.apache.mina.core.rewrite.polling.AbstractPollingIoAcceptor;
012 import org.apache.mina.rewrite.transport.socket.SocketAcceptor;
013  
014 public final class NioSocketAcceptor extends AbstractPollingIoAcceptor
015         implements SocketAcceptor {
016  
017     private volatile Selector selector;
018  
019     protected NioSocketAcceptor(Object param, Executor executor) {
020         super(param, executor);
021         // TODO Auto-generated constructor stub
022     }
023  
024     @Override
025     public int getManagedSessionCount() {
026         // TODO Auto-generated method stub
027         return 0;
028     }
029  
030     /**
031      * 这个方法继承自AbstractIoAcceptor
032      *
033      * The type NioSocketAcceptor must implement the inherited abstract method
034      * SocketAcceptor.getLocalAddress() to override
035      * AbstractIoAcceptor.getLocalAddress()
036      */
037     @Override
038     public InetSocketAddress getLocalAddress() {
039         // TODO Auto-generated method stub
040         return null;
041     }
042  
043     @Override
044     public void setDefaultLocalAddress(InetSocketAddress localAddress) {
045         // TODO Auto-generated method stub
046  
047     }
048  
049     @Override
050     public boolean isReuseAddress() {
051         // TODO Auto-generated method stub
052         return false;
053     }
054  
055     @Override
056     protected void init() throws Exception {
057         selector = Selector.open();
058     }
059  
060     @Override
061     protected void destory() throws Exception {
062         if (selector != null) {
063             selector.close();
064         }
065     }
066  
067     @Override
068     protected int select() throws Exception {
069         return selector.select();
070     }
071  
072     @Override
073     protected void dispose0() throws Exception {
074         // TODO Auto-generated method stub
075  
076     }
077  
078     protected ServerSocketChannel open(SocketAddress localAddress)
079             throws Exception {
080         ServerSocketChannel channel =ServerSocketChannel.open();
081          
082         boolean success=false;
083          
084         try{
085             channel.configureBlocking(false);
086              
087             ServerSocket socket=channel.socket();
088              
089             socket.setReuseAddress(isReuseAddress());
090              
091             socket.bind(localAddress);
092              
093             channel.register(selector, SelectionKey.OP_ACCEPT);
094              
095             success=true;
096         }finally{
097             if(!success){
098                 //close(channel);
099             }
100         }
101         return channel;
102     }
103  
104     @Override
105     public boolean isActive() {
106         // TODO Auto-generated method stub
107         return false;
108     }
109  
110 }
------------------------------------------------------

到此为止将连接部分都写完了,在连接部分还有些零碎的东西,比如handlerpolling,这些都只是稍稍提了一下,具体后面会在介绍其他部分是肯定还会碰上,我还是想把重心放在最主要的部分去写,下一篇应该要写到session了。

原文地址:https://www.cnblogs.com/wuyida/p/6300943.html