java-----NIO总结(三)

前两篇博客我们通过将accept和read设置成非阻塞式的方式实现了同步非阻塞模式,但是缺点在于在服务端需要为每个客户端都要创建一个线程来处理每个客户端的请求,这点相对来说比较耗费服务端资源,比如我们通常用到的Socket长连接用于心跳检测,其实客户端根本就没有数据要发送,只是想要告诉服务端我还活着而已,这时候为客户端单独创建一个线程的话,未免就有点小题大做了,因此迫切的需要出现一种可以减少在服务端创建线程个数的技术,也就是我们NIO的产生了,下面我们讲讲NIO的实现原理;

        在NIO中最主要是有三种角色的,Buffer缓冲区,Channel通道,Selector选择器,他们三者的作用主要是:

        Buffer缓存区:主要用于存储数据,其实说白了是一个容器,更直白点就是个数组,所有的数据都是在缓冲区处理的,我们应用程序都是从缓存中读取数据的,这些读取的数据是由通道传送过来的;应用程序在写数据的时候,数据是写在缓冲区的,随后由通道发送出去;在NIO中,所有的缓冲区类型都继承自抽象类Buffer,最常用的就是我们的ByteBuffer,当然其他基本类型同样也存在着对应的Buffer;

        Channel通道:通道呢,也是一个对象,通过他我们可以传递数据,注意是传递数据,数据的处理还是由Buffer来处理的,我们永远都不可能直接将字节数据直接写到通道中,同样也不可能从通道中直接读到数据,也就是应用程序是和Buffer打交道的,Buffer是和Channel打交道的,所有被Selector注册的通道只能是继承了SelectableChannel子类的通道;可以用一个比较形象的必须来看看Buffer和Channel的关系,通道可以理解为是火车轨道,Buffer缓存区可以认为是火车站,火车在火车站载上乘客后是通过轨道传输的,你不可能直接在轨道上接人吧;

        Selector选择器:Selector可以认为是统领性的作用,如果还是以上面火车的例子来看的话,Selector可以认为是全国火车调度系统,用来管理到底哪个火车到站了,哪个火车出站了;应用程序可以向Selector注册他所关心的Channel通道,以及具体某个通道所关心什么事件,Selector内部是维护着一个所有已经注册通道的列表的,那么Selector和Channel到底是怎么联系起来的呢?实际上是通过SelectionKey来实现的,这个SelectionKey标志了当前Channel的状态,Selector中维护着三个SelectionKey集合,分别是已注册的SelectionKey集合,已选择的SelectionKey集合,已取消的SelectionKey集合,我们只关心已选择的SelectionKey集合,Selector的内部实现原理是:他会不断的轮询已经注册的Channel,当检测到某个已经注册的Channel上面发生了其感兴趣的事件(这里的事件可以是连接事件、读事件、写事件),那么就会将当前通道对应的SelectionKey添加到已选择的SelectionKey集合中,随后在调用Selector的select方法的时候就可以获取到这个以选择的SelectionKey集合,有了这个集合之后我们便可以利用集合中的SelectionKey获取其对应的Channel,随后便可以将Channel中的内容写到Buffer中,或者将Buffer中的内容写到Channel中了;这里有一点需要注意就是我们调用的select方法可以是包含参数的select(timeout),也可以是不包含参数的select()具体要看应用场景,他两最大的区别在于使用有参数的select的话会每个参数时间被唤醒一次,不管注册的Channel有没有感兴趣的事件到来,使用没有参数的select的话只要注册的Channel没有感兴趣的事件到来就会一直阻塞下去,正是因为用到了select,所以造成了NIO的同步模式,因为在操作系统层面上,实际上还是是会一直阻塞着,等着我们注册到Selector上面的Channel有感兴趣的事件到来,但是在应用程序层面上,却解决了阻塞问题,具体来讲就是我们把Selector放到一个线程中运行,只要Selector没有关闭就一直在那检测,因为会调用Selector的select方法,所以会导致Selector的阻塞,但是并不能导致Channel的阻塞,我们依然可以向Buffer中写入数据接着把数据从Buffer写到Channel中,也可以从Channel写数据到Buffer,从Buffer中读取数据;

        这就是NIO中三个主要角色了,大致NIO的原理其实就是围绕Selector进行的,我们只需要创建一个线程一直检测Selector选择器上面是否有注册的Channel对应的感兴趣的事件到来就可以了,有的话获取对应Channel,创建Buffer进行数据的写入和写出就可以了;

        下面我们以一个实例看看NIO的具体用法:

        服务端:

        (1):创建一个ServerSocketChannel对象,并且设置该通道为非阻塞式;

        (2):利用ServerSockeChannel创建一个服务端Socket对象,也就是ServerSocket对象;

        (3):为当前ServerSocket对象绑定IP地址以及端口号;

        (4):创建多路复用器Selector对象;

        (5):将当前通道注册到Selector上面,同时注册OP_ACCEPT事件;注意,对于服务端的ServerSocketChannel来说只能注册一种事件,就是OP_ACCEPT事件;

        (6):接着便是开启Selector监控线程,调用Selector的无参或者有参的select方法,获得那些发生感兴趣事件的已注册通道对应的SelectionKey集合;

        (7):有了SelectionKey集合就可以获得到他们对应的Channel,随后利用Buffer进行具体的读写操作了;

        具体代码:

[java] view plain copy
 
  1. public class NIOServerSelectorThread implements Runnable{  
  2.   
  3.     public Selector selector;//用于轮询的多路复用器  
  4.     public ServerSocketChannel serverSocketChannel;  
  5.     public ServerSocket serverSocket;  
  6.       
  7.     public NIOServerSelectorThread(int port)  
  8.     {  
  9.         try {  
  10.             //打开ServerSocketChannel,用于监听客户端的连接,他是所有客户端连接的父管道  
  11.             serverSocketChannel = ServerSocketChannel.open();  
  12.             //将管道设置为非阻塞模式  
  13.             serverSocketChannel.configureBlocking(false);  
  14.             //利用ServerSocketChannel创建一个服务端Socket对象,即ServerSocket  
  15.             serverSocket = serverSocketChannel.socket();  
  16.             //为服务端Socket绑定监听端口  
  17.             serverSocket.bind(new InetSocketAddress(port));  
  18.             //创建多路复用器  
  19.             selector = Selector.open();  
  20.             //将ServerSocketChannel注册到Selector多路复用器上,并且监听ACCEPT事件  
  21.             serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);  
  22.             System.out.println("The server is start in port: "+port);  
  23.         } catch (IOException e) {  
  24.             e.printStackTrace();  
  25.         }  
  26.     }  
  27.     @Override  
  28.     public void run() {  
  29.         while(selector.isOpen())  
  30.         {  
  31.             //多路复用器没有关闭  
  32.             try {  
  33.                 //无论是否有读写事件发生,selector每间隔1s都被唤醒一次  
  34.                 selector.select(1000);  
  35.                 //获取到处于就绪状态的Channel  
  36.                 Iterator<SelectionKey> it = selector.selectedKeys().iterator();  
  37.                 SelectionKey key = null;  
  38.                 while(it.hasNext())  
  39.                 {  
  40.                     key = it.next();  
  41.                     it.remove();//避免重复处理  
  42.                     handleInput(key);  
  43.                 }  
  44.             } catch (IOException e) {  
  45.                 e.printStackTrace();  
  46.             }  
  47.         }  
  48.     }  
  49.       
  50.     /** 
  51.      * 处理已就绪通道的请求 
  52.      * @param key 
  53.      */  
  54.     public void handleInput(SelectionKey key)  
  55.     {  
  56.         if(key.isValid())  
  57.         {  
  58.             if(key.isAcceptable())  
  59.             {  
  60.                 ServerSocketChannel ssc = null;  
  61.                 SocketChannel sc = null;  
  62.                 try {  
  63.                     //处理新接入的请求  
  64.                     ssc = (ServerSocketChannel) key.channel();  
  65.                     sc = ssc.accept();  
  66.                     if(sc != null)  
  67.                     {  
  68.                         sc.configureBlocking(false);  
  69.                         sc.register(selector, SelectionKey.OP_READ);  
  70.                     }  
  71.                 } catch (IOException e) {  
  72.                     e.printStackTrace();  
  73.                 }  
  74.             }else if(key.isReadable())  
  75.             {  
  76.                 handleRead(key);  
  77.             }  
  78.         }  
  79.     }  
  80.       
  81.     /** 
  82.      * 处理通道中的读事件 
  83.      * @param key 
  84.      */  
  85.     public void handleRead(SelectionKey key)  
  86.     {  
  87.         SocketChannel sc = (SocketChannel) key.channel();//获得通道  
  88.         ByteBuffer buffer = ByteBuffer.allocate(1024);//创建缓存区  
  89.         try {  
  90.             //将通道中的数据写入到缓存区中,由于之前我们已经将ServerSocket设置成了非阻塞模式,因此read操作是非阻塞的  
  91.             //它的返回值有三种,返回值大于0表示读到字节,返回值等于0表示没有读到字节,返回值等于-1表示链路已经关闭,我们需  
  92.             //要关闭SocketChannel释放掉资源  
  93.             int readBytes = sc.read(buffer);  
  94.             if(readBytes > 0)  
  95.             {  
  96.                 //表示从缓存区中读到了字节  
  97.                 //将limit设置成position,将position设置成0,为了便于我们读取到已经从通道中存储到缓存中的数据  
  98.                 buffer.flip();  
  99.                 byte[] data = new byte[buffer.remaining()];  
  100.                 //将缓存区中的内容存储到字节数组中  
  101.                 buffer.get(data);  
  102.                 //创建对应于字节数组的字符串  
  103.                 String content = new String(data);  
  104.                 System.out.println("Server receiver message: "+content);  
  105.                 //如果我们需要向客户端回送应答消息,比如告诉客户端服务端已经收到他发的消息了  
  106.                 responseWrite(sc, "Server has responsed message");  
  107.             }else if(readBytes < 0)  
  108.             {  
  109.                 //表示链路已经关闭,我们应该释放掉资源  
  110.                 //关闭对端资源  
  111.                 key.cancel();  
  112.                 sc.close();  
  113.                 buffer.clear();  
  114.             }  
  115.         } catch (IOException e) {  
  116.             e.printStackTrace();  
  117.         }  
  118.     }  
  119.       
  120.     /** 
  121.      * 服务端回送客户端内容 
  122.      * @param sc 
  123.      * @param responseContent 
  124.      */  
  125.     public void responseWrite(SocketChannel sc,String responseContent)  
  126.     {  
  127.         ByteBuffer buffer = null;  
  128.         if(responseContent != null && responseContent.length() != 0)  
  129.         {  
  130.             //首先将要发送的内容转换为字节数组  
  131.             byte[] data = responseContent.getBytes();  
  132.             try {  
  133.                 buffer = ByteBuffer.allocate(data.length);  
  134.                 //将字节数组写入Buffer缓存区中  
  135.                 buffer.put(data);  
  136.                 //调用flip方法,将limit的值设置成position,将position的值设置成0,便于我们将写入缓存中的数据写入到通道中  
  137.                 buffer.flip();  
  138.                 //将缓存区中的内容写入通道中  
  139.                 //需要注意的一点就是这里的write也是异步操作的,因为我们把SocketChannel设置成了非阻塞式,他并不能够保证一次能够  
  140.                 //把需要发送的字节数组全部发送完,这就会出现"写半包"的问题,一般情况下解决这个问题的方法是注册写事件,不断轮询Selector  
  141.                 //将没有发送完的ByteBuffer发送完毕,可以通过ByteBuffer的hasRemaining()方法判断消息有没有发送完成  
  142.                 sc.write(buffer);  
  143.             } catch (IOException e) {  
  144.                 e.printStackTrace();  
  145.             }  
  146.         }  
  147.     }  
  148. }  
[java] view plain copy
 
  1. public class NIOServer {  
  2.     public static void main(String[] args) {  
  3.         NIOServerSelectorThread server = new NIOServerSelectorThread(9898);  
  4.         new Thread(server).start();  
  5.     }  
  6. }   

        客户端:

        (1):创建一个SocketChannel对象,并且设置该Channel为非阻塞类型;

        (2):创建一个Selector对象;

        (3):为SocketChannel创建一个客户端Socket对象;

        (4):将当前通道注册到Selector选择器上面,同时注册OP_CONNECT事件;

        客户端具体代码:

[java] view plain copy
 
  1. public class NIOClientSelectorThread implements Runnable {  
  2.   
  3.     public String host = null;  
  4.     public int port = -1;  
  5.     public Selector selector = null;//客户端选择器  
  6.     public SocketChannel socketChannel = null;  
  7.       
  8.     public NIOClientSelectorThread(String host,int port) {  
  9.         this.host = host;  
  10.         this.port = port;  
  11.         try {  
  12.             //创建SocketChannel通道  
  13.             socketChannel = SocketChannel.open();  
  14.             //设置通道为非阻塞状态  
  15.             socketChannel.configureBlocking(false);  
  16.             selector = Selector.open();  
  17.         } catch (IOException e) {  
  18.             e.printStackTrace();  
  19.         }  
  20.     }  
  21.     @Override  
  22.     public void run() {  
  23.         doConnect();  
  24.         while(selector.isOpen())  
  25.         {  
  26.             try {  
  27.                 //无论是否有读写事件发生,selector每间隔1s都被唤醒一次  
  28.                 selector.select(1000);  
  29.                 //获得所有已就绪的通道  
  30.                 Iterator<SelectionKey> it = selector.selectedKeys().iterator();  
  31.                 SelectionKey key = null;  
  32.                 while(it.hasNext())  
  33.                 {  
  34.                     key = it.next();  
  35.                     it.remove();  
  36.                     handleInput(key);  
  37.                 }  
  38.             } catch (IOException e) {  
  39.                 e.printStackTrace();  
  40.             }  
  41.         }  
  42.     }  
  43.       
  44.     /** 
  45.      * 判断客户端异步连接服务器有没有成功 
  46.      */  
  47.     public void doConnect()  
  48.     {  
  49.         try {  
  50.             //异步连接服务器  
  51.             boolean connected = socketChannel.connect(new InetSocketAddress(host, port));  
  52.             //判断是否连接成功,如果连接成功则直接注册读状态位到多路复用器  
  53.             //如果没有连接成功,表示客户端已经发送了SYN包,服务端没有返回ACK包,物理连接没有建立起来  
  54.             if(connected)  
  55.             {  
  56.                 //连接成功,直接注册读事件  
  57.                 socketChannel.register(selector, SelectionKey.OP_READ);  
  58.                 //接着向服务端发送消息  
  59.                 sendWrite(socketChannel, "Client send message");  
  60.             }else   
  61.             {  
  62.                 socketChannel.register(selector, SelectionKey.OP_CONNECT);  
  63.             }  
  64.         } catch (Exception e) {  
  65.             e.printStackTrace();  
  66.         }  
  67.     }  
  68.       
  69.     /** 
  70.      * 处理已就绪通道的请求 
  71.      * @param key 
  72.      */  
  73.     public void handleInput(SelectionKey key)  
  74.     {  
  75.         SocketChannel sc = null;  
  76.         if(key.isValid())  
  77.         {  
  78.             if(key.isConnectable())  
  79.             {  
  80.                 try {  
  81.                     //判断是否连接成功  
  82.                     sc = (SocketChannel) key.channel();  
  83.                     if(sc.finishConnect())  
  84.                     {  
  85.                         //判断有没有连接结束  
  86.                         //注册读事件  
  87.                         sc.register(selector, SelectionKey.OP_READ);  
  88.                         sendWrite(sc, "Client send message");  
  89.                     }  
  90.                 } catch (IOException e) {  
  91.                     e.printStackTrace();  
  92.                 }  
  93.             }else if(key.isReadable())  
  94.             {  
  95.                 handleRead(key);  
  96.             }  
  97.         }  
  98.     }  
  99.       
  100.     /** 
  101.      * 处理读事件 
  102.      * @param key 
  103.      */  
  104.     public void handleRead(SelectionKey key)  
  105.     {  
  106.         SocketChannel sc = (SocketChannel) key.channel();  
  107.         //得到Buffer缓存区对象  
  108.         ByteBuffer buffer = ByteBuffer.allocate(1024);  
  109.         try {  
  110.             //将SocketChannel里面的数据写到Buffer里面  
  111.             int readBytes = sc.read(buffer);  
  112.             if(readBytes > 0)  
  113.             {  
  114.                 //表示读到了数据  
  115.                 //将limit设置为position的值,将position设置为0,为了读取出已经从通道中写入缓存中的值  
  116.                 buffer.flip();  
  117.                 byte[] data = new byte[buffer.remaining()];  
  118.                 //将数据固化到byte数组中  
  119.                 buffer.get(data);  
  120.                 //将字符数组转换为字符串  
  121.                 String content = new String(data);  
  122.                 System.out.println(content);  
  123.             }else if(readBytes < 0)  
  124.             {  
  125.                 //表示读取数据失败,需要释放资源  
  126.                 key.cancel();  
  127.                 sc.close();  
  128.                 buffer.clear();  
  129.             }  
  130.         } catch (IOException e) {  
  131.             e.printStackTrace();  
  132.         }  
  133.     }  
  134.       
  135.     /** 
  136.      * 向服务端发送消息 
  137.      * @param sc 
  138.      * @param content 
  139.      */  
  140.     public void sendWrite(SocketChannel sc,String content)  
  141.     {  
  142.         ByteBuffer buffer = ByteBuffer.allocate(1024);  
  143.         if(content != null && content.length() != 0)  
  144.         {  
  145.             try {  
  146.                 //将字符串转换为字符数组  
  147.                 byte[] data = content.getBytes();  
  148.                 //将字符数组的内容写入缓存中  
  149.                 buffer.put(data);  
  150.                 //调用flip,将limit的值设置成position,将position的值设置为0以便于将从IO写入缓存中的内容写入到通道中  
  151.                 buffer.flip();  
  152.                 sc.write(buffer);  
  153.             } catch (IOException e) {  
  154.                 e.printStackTrace();  
  155.             }  
  156.         }  
  157.     }  
  158. }  
[java] view plain copy
 
  1. public class NIOClient {  
  2.   
  3.     public static void main(String[] args) {  
  4.         NIOClientSelectorThread client = new NIOClientSelectorThread("127.0.0.1", 9898);  
  5.         new Thread(client).start();  
  6.     }  
  7. }  

        随后我们运行服务端,接着连续运行三次客户端代码,在服务端看到了下面输出信息:

    

        可以看到我们分别启动了三个客户端,并且三个客户端都能向服务端发送消息,接着我们查看服务端启动了几个线程:

                                          

        只有两个,一个是主线程,一个是Selector所在的线程,这相对于我们前面对于每个客户端开启一个线程的话将是一个很大的改进啊!

        这就是我们传统NIO的使用方法了,相对来说还是比较复杂的,这里有个问题出现了,那就是我们是使用Selector来进行判断到底哪个通道上面有感兴趣的事件发生,如果有的话则进行对应的操作就可以,这对于我们的OP_ACCEPT和OP_CONNECT来说是没什么影响的,但是对于通道发生OP_READ或者OP_WRITE事件的话,很容易出现等待的情况,所以一般情况下我们可以在读或者写操作的时候引入线程池来实现,具体实现是:我们的Selector只负责监控到底哪个通道有什么事情发生,具体发生的这个事情是要做什么就由线程池来完成就好了,比如服务端收到客户端的请求之后,如果Selector发现某一通道发生了读事件,那么会将这个通道交给读线程池来进行处理,由读线程池完成对客户端数据的读取操作,当读线程完成读操作之后,就该数据还给我们的控制线程,也就是Selecrot所在的线程,由控制线程完成后续的业务处理,在业务处理结束之后,如果服务端需要回送数据给客户端的话,则会将要回送的数据和协同到交给写线程池,由写线程池分配线程完成向客户端发送数据的操作

原文地址:https://www.cnblogs.com/songjy2116/p/7662456.html