Java中的BIO、NIO、AIO-3

Java中的BIO、NIO、AIO-3

这一篇是代码篇,敲代码有助于理解记忆这些抽象的东西:

参考资料:

目录

Java BIO代码

服务器

  1. package sock; 
  2.  
  3. import java.io.BufferedReader; 
  4. import java.io.IOException; 
  5. import java.io.InputStreamReader; 
  6. import java.io.PrintWriter; 
  7. import java.net.ServerSocket; 
  8. import java.net.Socket; 
  9. import java.util.ArrayList; 
  10. import java.util.LinkedList; 
  11. import java.util.List; 
  12.  
  13.  
  14.  
  15. /* 
  16. 存在多线程对数据结构并发操作不安全的问题 
  17. */ 
  18. public class socketServerT extends ServerSocket
  19. private static final int port = 2018
  20. private static boolean isPrint = false
  21. private static List<String> user_list = new ArrayList(); 
  22. private static List<ServerThread> threadlist = new ArrayList<>();//应该使用线程安全的集合 
  23. private static LinkedList<String> message = new LinkedList<>(); 
  24.  
  25.  
  26. socketServerT() throws IOException{ 
  27. super(port); 
  28. new PrintOutThread(); 
  29. System.out.println( " server is created"); 
  30. try
  31. while(true){ 
  32. Socket sock = this.accept(); 
  33. new ServerThread(sock); 

  34. }catch( Exception e){ 
  35. e.printStackTrace(); 


  36. class PrintOutThread extends Thread
  37. PrintOutThread(){ 
  38. System.out.println(getName() + "!!!!!"); 
  39. start(); 

  40. public void run()
  41. while(true){ 
  42. if(isPrint){ 
  43. String m = message.getFirst(); 
  44. for(ServerThread i : threadlist){ 
  45. sendMessage(i,m); 

  46. message.removeFirst(); 
  47. isPrint = message.size()>0 ? true:false




  48. class ServerThread extends Thread
  49. private BufferedReader rec; 
  50. private PrintWriter send; 
  51. private Socket client; 
  52. private String name; 
  53.  
  54. ServerThread(Socket sock) throws IOException{ 
  55. client = sock; 
  56. rec = new BufferedReader(new InputStreamReader(client.getInputStream())); 
  57. send = new PrintWriter(client.getOutputStream(),true); 
  58. //rec.readLine(); 
  59. System.out.println(getName() + "is created"); 
  60. send.println("connected to chat room, please input your name!!"); 
  61. start(); 

  62. public PrintWriter getSend()
  63. return send; 

  64. public void run()
  65. try
  66. int flag = 0
  67. String line = ""
  68. while(!line.contains("bye")){ 
  69. line = rec.readLine(); 
  70. if("showuser".equals(line)){ 
  71. send.println(listOneUsers()); 
  72. //line = rec.readLine(); 
  73. continue

  74. if(flag == 0){ 
  75. flag ++; 
  76. name = line; 
  77. user_list.add(name); 
  78. threadlist.add(this); 
  79. send.println(name + " begin to chat"); 
  80. pushMessage("client <" + name+"> enter chat room"); 
  81. }else
  82. pushMessage("client <" + name+"> say :" + line); 

  83. //line = rec.readLine(); 

  84.  
  85. }catch (Exception e){ 
  86. e.printStackTrace(); 
  87. }finally
  88. try
  89. client.close(); 
  90. rec.close(); 
  91. send.close(); 
  92. }catch (IOException e){ 
  93. e.printStackTrace(); 

  94. threadlist.remove(this); 
  95. user_list.remove(name); 
  96. pushMessage("client <" + name+"> exit"); 

  97.  


  98. public void pushMessage(String mess)
  99. message.add(mess); 
  100. isPrint = true

  101. public String listOneUsers()
  102. StringBuffer s = new StringBuffer(); 
  103. s.append("---online users--- "); 
  104. for( String i:user_list){ 
  105. s.append(i + " "); 

  106. s.append("---end--- "); 
  107. return s.toString(); 

  108. public void sendMessage(ServerThread s,String m)
  109. //System.out.println("test"); 
  110. PrintWriter p = s.getSend(); 
  111. p.println(m); 
  112. //p.flush(); 

  113. public static void main(String args[])
  114. try
  115. socketServerT s = new socketServerT(); 
  116. }catch(Exception e){ 
  117. e.printStackTrace(); 



  118.  

客户端

  1. package sock; 
  2.  
  3. import java.io.BufferedReader; 
  4. import java.io.IOException; 
  5. import java.io.InputStreamReader; 
  6. import java.io.PrintWriter; 
  7. import java.net.Socket; 
  8.  
  9. public class socketClientT extends Socket
  10. private static final String server = "127.0.0.1"
  11. private static final int port = 2018
  12.  
  13. private Socket sock; 
  14. private PrintWriter send; 
  15. private BufferedReader rec; 
  16. socketClientT() throws IOException{ 
  17. super(server,port); 
  18. sock = this
  19. send = new PrintWriter(sock.getOutputStream(),true); 
  20. rec = new BufferedReader(new InputStreamReader(sock.getInputStream())); 
  21. Thread t = new recvThread(); 
  22. BufferedReader sysBuff = new BufferedReader(new InputStreamReader(System.in)); 
  23. String line = ""
  24. while(! line.contains("bye")){ 
  25. line = sysBuff.readLine(); 
  26. send.println(line); 

  27. send.close(); 
  28. rec.close(); 
  29. sysBuff.close(); 
  30. this.close(); 

  31. class recvThread extends Thread
  32. private BufferedReader buff; 
  33. recvThread(){ 
  34. try
  35. buff = new BufferedReader(new InputStreamReader(sock.getInputStream())); 
  36. start(); 
  37. } catch (Exception e){ 
  38. e.printStackTrace(); 


  39. public void run()
  40. String res = ""
  41. try
  42. while(true){ 
  43. res = buff.readLine(); 
  44. if(res.contains("bye")) 
  45. break
  46. System.out.println(res); 

  47. send.close(); 
  48. buff.close(); 
  49. sock.close(); 
  50. }catch (Exception e){ 
  51. e.printStackTrace(); 



  52.  
  53. public static void main(String args[])
  54. try
  55. socketClientT s = new socketClientT(); 
  56. }catch (Exception e){ 
  57. e.printStackTrace(); 



  58.  

Java NIO

JDK 1.4的java.util.*;包中引入了新的Java I/O库,其目的是提高IO操作的速度。

简介

NIO我们一般认为是New I/O(也是官方的叫法),因为它是相对于老的I/O类库新增的(其实在JDK 1.4中就已经被引入了,但这个名词还会继续用很久,即使它们在现在看来已经是“旧”的了,所以也提示我们在命名时,需要好好考虑),做了很大的改变。但民间跟多人称之为Non-block I/O,即非阻塞I/O,因为这样叫,更能体现它的特点。而下文中的NIO,不是指整个新的I/O库,而是非阻塞I/O。
    NIO提供了与传统BIO模型中的Socket和ServerSocket相对应的SocketChannel和ServerSocketChannel两种不同的套接字通道实现。
    新增的着两种通道都支持阻塞和非阻塞两种模式。
阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反。
    对于低负载、低并发的应用程序,可以使用同步阻塞I/O来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用NIO的非阻塞模式来开发。
    下面会先对基础知识进行介绍。

缓冲区Buffer

Buffer是一个对象,包含一些要写入或者读出的数据。
在NIO库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,也是写入到缓冲区中。任何时候访问NIO中的数据,都是通过缓冲区进行操作。
缓冲区实际上是一个数组,并提供了对数据结构化访问以及维护读写位置等信息。
具体的缓存区有这些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他们实现了相同的接口:Buffer。

通道Channel

我们对数据的读取和写入要通过Channel,它就像水管一样,是一个通道。通道不同于流的地方就是通道是双向的,可以用于读、写和同时读写操作。
底层的操作系统的通道一般都是全双工的,所以全双工的Channel比流能更好的映射底层操作系统的API。
Channel主要分两大类:

  • SelectableChannel:用户网络读写
  • FileChannel:用于文件操作

后面代码会涉及的ServerSocketChannel和SocketChannel都是SelectableChannel的子类。

多路复用器

Selector是Java  NIO 编程的基础。
Selector提供选择已经就绪的任务的能力:Selector会不断轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。
一个Selector可以同时轮询多个Channel,因为JDK使用了epoll()代替传统的select实现,所以没有最大连接句柄1024/2048的限制。所以,只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。

服务器端代码

server:

  1. /** 
  2. * server 
  3. */ 
  4. public class server
  5.  
  6. private static int port = 8000
  7. private static serverHandle sHandle; 
  8.  
  9. public static void start()
  10. start(port); 

  11. private static synchronized void start(int port)
  12. if (sHandle != null) { 
  13. sHandle.setStarted(false); 

  14. sHandle = new serverHandle(port); 
  15. new Thread(sHandle,"server").start(); 
  16.  

  17. public static void main(String[] args)
  18. start(); 


serverHandle

  1. import java.io.IOException; 
  2. import java.net.InetSocketAddress; 
  3. import java.net.ServerSocket; 
  4. import java.nio.ByteBuffer; 
  5. import java.nio.channels.SelectionKey; 
  6. import java.nio.channels.Selector; 
  7. import java.nio.channels.ServerSocketChannel; 
  8. import java.nio.channels.SocketChannel; 
  9. import java.util.Iterator; 
  10. import java.util.Set; 
  11.  
  12. //import jdk.internal.org.objectweb.asm.Handle; 
  13.  
  14. /** 
  15. * serverHandle 
  16. */ 
  17. public class serverHandle implements Runnable
  18.  
  19. private ServerSocketChannel serverChannel; 
  20. private Selector selector; 
  21. private volatile boolean started;//各个线程都能看到状态 
  22.  
  23. public serverHandle(int port)
  24. try
  25. //创建选择器 
  26. selector = Selector.open(); 
  27. //创建serverSocketChannel 
  28. serverChannel = ServerSocketChannel.open(); 
  29. //如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式 
  30. serverChannel.configureBlocking(false); 
  31. //创建InetSocketAddress 
  32. InetSocketAddress socketAddress = new InetSocketAddress(port); 
  33. //得到ServerSocket 
  34. ServerSocket serverSocket = serverChannel.socket(); 
  35. //绑定ServetSocket到一个具体的端口,并设置backlog 
  36. serverSocket.bind(socketAddress, 1024); 
  37. //向selector注册ServerSocketChannel,设置为监听客户端的连接请求 
  38. serverChannel.register(selector, SelectionKey.OP_ACCEPT); 
  39. //标记服务器状态 
  40. started = true
  41. System.out.println("服务器已经启动,端口号:" + port); 
  42.  
  43. } catch (IOException e) { 
  44. //TODO: handle exception 
  45. e.printStackTrace(); 
  46. System.exit(1); 
  47. }  

  48.  
  49. public void setStarted(boolean flag)
  50. this.started = flag; 

  51. @Override 
  52. public void run()
  53. //循环遍历selector 
  54. while (started) { 
  55. try
  56. //无论是否有读写事件,selector每个1s唤醒一次 
  57.  
  58. try
  59. selector.select(1000); 
  60. } catch (Exception e) { 
  61. e.printStackTrace(); 

  62. //获得状态为ready的selectorkey 
  63. Set<SelectionKey> keys = selector.selectedKeys(); 
  64. Iterator<SelectionKey> iter = keys.iterator(); 
  65. SelectionKey key = null
  66. while (iter.hasNext()) { 
  67. key = iter.next(); 
  68. iter.remove(); 
  69. try
  70. handle(key); 
  71. } catch (Exception e) { 
  72.  
  73. if (key != null) { 
  74. key.cancel(); 
  75. if (key.channel() != null) { 
  76. key.channel().close(); 




  77. } catch (Throwable t) { 
  78.  
  79. t.printStackTrace(); 


  80. //关闭selector 
  81. if (selector != null) { 
  82. try
  83. selector.close(); 
  84. } catch (Exception e) { 
  85.  
  86. e.printStackTrace(); 



  87. private void handle(SelectionKey key) throws IOException
  88. //判断kei是否是有效的 
  89. if (key.isValid()) { 
  90.  
  91. //处理新接入的请求消息, 
  92. if (key.isAcceptable()) { 
  93. //通过selectionkey得到ServerSocketChannel,注意转型 
  94. ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); 
  95. //通过serversocketchannel的accept方法创建SocketChannel实例 
  96. SocketChannel client = serverSocketChannel.accept(); 
  97. //设置client为非阻塞模式 
  98. client.configureBlocking(false); 
  99. //把client注册到selector,注册事件为读 
  100. client.register(selector, SelectionKey.OP_READ); 

  101.  
  102. //读消息 
  103. if (key.isReadable()) { 
  104. SocketChannel sc = (SocketChannel) key.channel(); 
  105. //创建byteBuffer,大小为1M 
  106. ByteBuffer buffer = ByteBuffer.allocate(1024); 
  107. //读取请求码流,返回读取到的字节数 
  108. int readBytes = sc.read(buffer); 
  109. //读取到字节,对字节进行编码 
  110. if (readBytes > 0) { 
  111. //将缓冲区buffer的position设为0,用于后续对缓冲区的读操作 
  112. buffer.flip(); 
  113. //根据缓冲区可读字节数创建字节数组 
  114. byte[] bytes = new byte[buffer.remaining()]; 
  115. //将缓冲区可读字节数组复制到新建的数组中 
  116. buffer.get(bytes); 
  117. String expresString = new String(bytes, "UTF-8"); 
  118. System.out.println("服务器收到的消息:" + expresString); 
  119.  
  120. //处理数据 
  121. String res = null
  122. res = new StringBuffer(expresString).reverse().toString(); 
  123. //写入返回消息 
  124. dowrite(sc,res); 
  125. } else if (readBytes < 0) { 
  126. //链路关闭释放资源 
  127. key.cancel(); 
  128. sc.close(); 




  129. private void dowrite(SocketChannel sc, String res) throws IOException
  130. //把字符串编码为字节数组 
  131. byte[] bytes = res.getBytes(); 
  132. //根据数组容量创建ByteBuffer 
  133. ByteBuffer wBuffer = ByteBuffer.allocate(bytes.length); 
  134. //把字节数组复制到buffer中 
  135. wBuffer.put(bytes); 
  136. //flip操作,更改position为0,方便后续的写操作从头开始 
  137. wBuffer.flip(); 
  138. //发送缓冲区的数据 
  139. sc.write(wBuffer); 


客户端代码

client

  1. import java.util.Scanner; 
  2.  
  3. /** 
  4. * clientChannel 
  5. */ 
  6. public class clientChannel
  7. private static String host = "127.0.0.1"
  8. private static int port = 8000
  9. private static clientHandle cHandle; 
  10.  
  11. public static void start()
  12. start(host,port); 

  13. public static synchronized void start(String host, int port)
  14. if (cHandle != null) { 
  15. cHandle.stop(); 

  16. cHandle = new clientHandle(host, port); 
  17. new Thread(cHandle, "client").start();; 

  18.  
  19. public static Boolean sendMsg(String msg) throws Exception
  20. if (msg.contains("q")) { 
  21. return false

  22. cHandle.sendMsg(msg); 
  23. return true

  24. public static void main(String[] args)
  25. try
  26. start(); 
  27. Scanner s = new Scanner(System.in); 
  28. String tmp; 
  29. while ((tmp = s.nextLine())!= null) { 
  30. sendMsg(tmp); 

  31. } catch (Exception e) { 
  32. //TODO: handle exception 
  33. e.printStackTrace(); 

  34. //start(); 


clientHandle

  1. import java.net.InetSocketAddress; 
  2. import java.nio.ByteBuffer; 
  3. import java.nio.channels.SelectionKey; 
  4. import java.nio.channels.Selector; 
  5. import java.nio.channels.SocketChannel; 
  6. import java.util.Iterator; 
  7. import java.util.Set; 
  8. import java.io.IOException; 
  9. /** 
  10. * clientHandle 
  11. */ 
  12. public class clientHandle implements Runnable
  13.  
  14. private String host; 
  15. private int port; 
  16. private Selector selector; 
  17. private SocketChannel socketChannel; 
  18. private volatile boolean started; 
  19.  
  20. public clientHandle(String ip, int port)
  21. this.host = ip; 
  22. this.port = port; 
  23.  
  24. try
  25. //创建选择器 
  26. this.selector = Selector.open(); 
  27. //创建socketchannel 
  28. this.socketChannel = SocketChannel.open(); 
  29. //配置socketChannel为非阻塞模式 
  30. this.socketChannel.configureBlocking(false); 
  31. this.started = true
  32.  
  33. } catch (Exception e) { 
  34. //TODO: handle exception 
  35. e.printStackTrace(); 
  36. System.exit(1); 

  37.  

  38.  
  39. public void stop()
  40. this.started = false

  41.  
  42. public void doConnection() throws Exception
  43. InetSocketAddress address = new InetSocketAddress(this.host, this.port); 
  44. if (socketChannel.connect(address)) { 
  45. System.out.println("连接服务器成功!!!"); 
  46. } else
  47. System.out.println("未连接成功,下一轮继续!!!"); 
  48. //向selector注册socketChannel的连接操作 
  49. socketChannel.register(selector, SelectionKey.OP_CONNECT); 


  50.  
  51. public void handleInput(SelectionKey key) throws Exception
  52. if (key.isValid()) { 
  53. //获取SeclectionKey对应的socketChannel 
  54. SocketChannel sc = (SocketChannel) key.channel(); 
  55. //测试key对应的socketChannel通道是否已完成或未能完成其套接连接操作 
  56. if (key.isConnectable()) { 
  57. //完成连接过程,返回true表示channel已经建立了连接,false表示建立连接失败 
  58. //当使用SocketChannel的connect()函数进行连接的时候,当处于非阻塞模式的情况下,可能连接不是立刻完成的,需要使用 
  59. //finidhConnect()来检查连接是否建立 
  60. if (sc.finishConnect()) { 
  61. } else
  62. System.exit(1); 
  63. }  

  64. //读消息,判断key对应的channel是否可读 
  65. if (key.isReadable()) { 
  66. //创建一个缓冲区,用来存读取的数据, 
  67. ByteBuffer buffer = ByteBuffer.allocate(1024); 
  68. //读取数据,返回读取的字节数 
  69. int readSize = sc.read(buffer); 
  70. //读取到字节,对字节进行编码 
  71. if (readSize > 0) { 
  72. //设置缓冲区的limit和position,方面后面读取数据 
  73. buffer.flip(); 
  74. //根据缓冲区的可读字节数创建字节数组 
  75. byte[] bytes = new byte[buffer.remaining()]; 
  76. //把缓冲区的内容复制到字节数组中去 
  77. buffer.get(bytes); 
  78. String res = new String(bytes, "UTF-8"); 
  79. System.out.println("客户端收到的数据为:" + res); 
  80. } else if (readSize < 0) { 
  81. //这种情况说明链路已经关闭,释放资源 
  82. key.cancel(); 
  83. sc.close(); 




  84.  
  85. public void doWrite(SocketChannel sc, String request) throws Exception
  86. //将数据转换为字节数组 
  87. byte[] bytes = request.getBytes(); 
  88. //创建字节缓冲区 
  89. ByteBuffer buffer = ByteBuffer.allocate(bytes.length); 
  90. //将字节数组放入字节缓冲区 
  91. buffer.put(bytes); 
  92. //flip操作,调整limit和position 
  93. buffer.flip(); 
  94. //将数据写入到channel中 
  95. sc.write(buffer); 

  96. public void sendMsg(String msg) throws Exception
  97. //这里还没明白为什么要先注册读操作?需要注册读操作才能,知道读状态是否就绪,方便handelInput函数处理!! 
  98. //但是还有一个疑问,什么时候使用OP_WRITE 
  99. socketChannel.register(selector, SelectionKey.OP_READ); 
  100. doWrite(socketChannel, msg); 

  101. @Override  
  102. public void run() {  
  103. try{  
  104. doConnection();  
  105. }catch(Exception e){  
  106. e.printStackTrace();  
  107. System.exit(1);  
  108. }  
  109. //循环遍历selector  
  110. while(started){  
  111. try{  
  112. //无论是否有读写事件发生,selector每隔1s被唤醒一次  
  113. selector.select(1000);  
  114. //阻塞,只有当至少一个注册的事件发生的时候才会继续.  
  115. // selector.select();  
  116. Set<SelectionKey> keys = selector.selectedKeys();  
  117. Iterator<SelectionKey> it = keys.iterator();  
  118. SelectionKey key = null;  
  119. while(it.hasNext()){  
  120. key = it.next();  
  121. it.remove();  
  122. try{  
  123. handleInput(key);  
  124. }catch(Exception e){  
  125. if(key != null){  
  126. key.cancel();  
  127. if(key.channel() != null){  
  128. key.channel().close();  
  129. }  
  130. }  
  131. }  
  132. }  
  133. }catch(Exception e){  
  134. e.printStackTrace();  
  135. System.exit(1);  
  136. }  
  137. }  
  138. //selector关闭后会自动释放里面管理的资源  
  139. if(selector != null)  
  140. try{  
  141. selector.close();  
  142. }catch (Exception e) {  
  143. e.printStackTrace();  
  144. }  


##测试及解析
测试代码:

  1. import java.util.Scanner; 
  2.  
  3. /** 
  4. * Test 
  5. */ 
  6. public class Test
  7.  
  8. public static void main(String[] args)
  9. server.start(); 
  10. try
  11. Thread.sleep(3000); 
  12. } catch (Exception e) { 
  13. //TODO: handle exception 
  14. e.printStackTrace(); 

  15. clientChannel.start(); 
  16. Scanner s = new Scanner(System.in); 
  17. String tmp; 
  18. try
  19. while ((tmp = s.nextLine()) != null) { 
  20. clientChannel.sendMsg(tmp); 

  21. } catch (Exception e) { 
  22. //TODO: handle exception 
  23. e.printStackTrace(); 

  24.  

  25.  

可以看到,创建NIO服务端的主要步骤如下:

  1. 打开ServerSocketChannel,监听客户端连接
  2. 绑定监听端口,设置连接为非阻塞模式
  3. 创建Reactor线程,创建多路复用器并启动线程
  4. 将ServerSocketChannel注册到Reactor线程中的Selector上,监听ACCEPT事件
  5. Selector轮询准备就绪的key
  6. Selector监听到新的客户端接入,处理新的接入请求,完成TCP三次握手,简历物理链路
  7. 设置客户端链路为非阻塞模式
  8. 将新接入的客户端连接注册到Reactor线程的Selector上,监听读操作,读取客户端发送的网络消息
  9. 异步读取客户端消息到缓冲区
  10. 对Buffer编解码,处理半包消息,将解码成功的消息封装成Task
  11. 将应答消息编码为Buffer,调用SocketChannel的write将消息异步发送给客户端

Java AIO

Java nio 2.0的主要改进就是引入了异步IO(包括文件和网络),这里主要介绍下异步网络IO API的使用以及框架的设计,以TCP服务端为例。首先看下为了支持AIO引入的新的类和接口:

** java.nio.channels.AsynchronousChannel**
       标记一个channel支持异步IO操作。

** java.nio.channels.AsynchronousServerSocketChannel**
       ServerSocket的aio版本,创建TCP服务端,绑定地址,监听端口等。

** java.nio.channels.AsynchronousSocketChannel**
       面向流的异步socket channel,表示一个连接。

** java.nio.channels.AsynchronousChannelGroup**
       异步channel的分组管理,目的是为了资源共享。一个AsynchronousChannelGroup绑定一个线程池,这个线程池执行两个任务:处理IO事件和派发CompletionHandlerAsynchronousServerSocketChannel创建的时候可以传入一个AsynchronousChannelGroup,那么通过AsynchronousServerSocketChannel创建的AsynchronousSocketChannel将同属于一个组,共享资。

** java.nio.channels.CompletionHandler**
       异步IO操作结果的回调接口,用于定义在IO操作完成后所作的回调工作。AIO的API允许两种方式来处理异步操作的结果:返回的Future模式或者注册CompletionHandler,推荐用CompletionHandler的方式,这些handler的调用是由AsynchronousChannelGroup的线程池派发的。显然,线程池的大小是性能的关键因素AsynchronousChannelGroup允许绑定不同的线程池,通过三个静态方法来创建:

 public static AsynchronousChannelGroup withFixedThreadPool(int nThreads, ThreadFactory threadFactory) throws IOException

 public static AsynchronousChannelGroup withCachedThreadPool(ExecutorService executor, int initialSize)

 public static AsynchronousChannelGroup withThreadPool(ExecutorService executor) throws IOException

需要根据具体应用相应调整,从框架角度出发,需要暴露这样的配置选项给用户。

在介绍完了aio引入的TCP的主要接口和类之后,我们来设想下一个aio框架应该怎么设计。参考非阻塞nio框架的设计,一般都是采用Reactor模式,Reactor负责事件的注册、select、事件的派发;相应地,异步IO有个Proactor模式,Proactor负责CompletionHandler的派发,查看一个典型的IO写操作的流程来看两者的区别:

Reactor:  send(msg) -> 消息队列是否为空,如果为空  -> 向Reactor注册OP_WRITE,然后返回 -> Reactor select -> 触发Writable,通知用户线程去处理 ->先注销Writable(很多人遇到的cpu 100%的问题就在于没有注销),处理Writeable,如果没有完全写入,继续注册OP_WRITE。注意到,写入的工作还是用户线程在处理。

Proactor: send(msg) -> 消息队列是否为空,如果为空,发起read异步调用,并注册CompletionHandler,然后返回。 -> 操作系统负责将你的消息写入,并返回结果(写入的字节数)给Proactor -> Proactor派发CompletionHandler。可见,写入的工作是操作系统在处理,无需用户线程参与。事实上在aio的API中,AsynchronousChannelGroup就扮演了Proactor的角色。

CompletionHandler有三个方法,分别对应于处理成功、失败、被取消(通过返回的Future)情况下的回调处理:

public interface CompletionHandler<V,A> {

     void completed(V result, A attachment);

    void failed(Throwable exc, A attachment);

    void cancelled(A attachment);
}

其中的泛型参数V表示IO调用的结果,而A是发起调用时传入的attchment。

server端代码

server

  1. //package aio; 
  2.  
  3. public class Server
  4. public static int clientCount = 0
  5. public static int port = 8000
  6. public static String hoString = "127.0.0.1"
  7.  
  8. public static void start()
  9. start(Server.port); 

  10. public static void start(int port)
  11. AsyncServerHandler serverHandler = new AsyncServerHandler(port); 
  12. Thread t1 = new Thread(serverHandler); 
  13. t1.start(); 

  14. public static void main(String[] args)
  15. start(); 


  16.  

AsyncServerHandler

  1. //package aio; 
  2.  
  3. import java.io.IOException; 
  4. import java.net.InetSocketAddress; 
  5. import java.nio.channels.AsynchronousServerSocketChannel; 
  6. import java.nio.channels.CompletionHandler; 
  7. import java.util.concurrent.CountDownLatch; 
  8.  
  9. public class AsyncServerHandler implements Runnable
  10. private AsynchronousServerSocketChannel serverSocketChannel; 
  11. private CountDownLatch latch; 
  12. public AsyncServerHandler(int port)
  13. // TODO Auto-generated constructor stub 
  14. InetSocketAddress address = new InetSocketAddress(port); 
  15. try
  16. serverSocketChannel = AsynchronousServerSocketChannel.open(); 
  17. serverSocketChannel.bind(address); 
  18. } catch (IOException e) { 
  19. // TODO Auto-generated catch block 
  20. e.printStackTrace(); 

  21. System.out.println("服务器已经启动,端口号:" + port); 

  22.  
  23. @Override 
  24. public void run()
  25. // TODO Auto-generated method stub 
  26. //CountDownLatch初始化  
  27. //它的作用:在完成一组正在执行的操作之前,允许当前的现场一直阻塞  
  28. //此处,让现场在此阻塞,防止服务端执行完成后退出  
  29. //也可以使用while(true)+sleep  
  30. //生成环境就不需要担心这个问题,以为服务端是不会退出的  
  31. this.latch = new CountDownLatch(1); 
  32. serverSocketChannel.accept(this, new AcceptHandler(this.latch)); 
  33.  
  34. try
  35. latch.await(); 
  36. } catch (InterruptedException e) { 
  37. // TODO Auto-generated catch block 
  38. e.printStackTrace(); 


  39.  
  40. public AsynchronousServerSocketChannel getServerSocketChannel()
  41. return serverSocketChannel; 

  42.  
  43. public void setServerSocketChannel(AsynchronousServerSocketChannel serverSocketChannel)
  44. this.serverSocketChannel = serverSocketChannel; 

  45.  
  46. public CountDownLatch getLatch()
  47. return latch; 

  48.  
  49. public void setLatch(CountDownLatch latch)
  50. this.latch = latch; 

  51.  

  52.  

AcceptHandler

  1. //package aio; 
  2.  
  3. import java.nio.ByteBuffer; 
  4. import java.nio.channels.AsynchronousServerSocketChannel; 
  5. import java.nio.channels.AsynchronousSocketChannel; 
  6. import java.nio.channels.CompletionHandler; 
  7. import java.util.concurrent.CountDownLatch; 
  8.  
  9. public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncServerHandler>
  10.  
  11. private CountDownLatch latch; 
  12. public AcceptHandler(CountDownLatch latch)
  13. // TODO Auto-generated constructor stub 
  14. this.latch = latch; 

  15. @Override 
  16. public void completed(AsynchronousSocketChannel socketChannel, AsyncServerHandler serverHandler)
  17. // TODO Auto-generated method stub 
  18. //进入这个函数说明说明事件处理成功,已经成功的拿到socketChanne 
  19. Server.clientCount ++; 
  20. System.out.println("当前连接的客户数:" + Server.clientCount); 
  21. //继续接受其他客户机的连接, 
  22. AsynchronousServerSocketChannel channel = serverHandler.getServerSocketChannel(); 
  23. channel.accept(serverHandler, this); 
  24. //创建新的buffer,为读取数据做准备 
  25. ByteBuffer buffer = ByteBuffer.allocate(1024); 
  26. socketChannel.read(buffer, buffer, new serverReadHandler(socketChannel)); 
  27.  

  28.  
  29. @Override 
  30. public void failed(Throwable exc, AsyncServerHandler attachment)
  31. // TODO Auto-generated method stub 
  32. exc.printStackTrace(); 
  33. this.latch.countDown(); 

  34.  
  35.  

  36.  

serverReadHandler

  1. //package aio; 
  2.  
  3. import java.io.IOException; 
  4. import java.io.UnsupportedEncodingException; 
  5. import java.nio.ByteBuffer; 
  6. import java.nio.channels.AsynchronousSocketChannel; 
  7. import java.nio.channels.CompletionHandler; 
  8.  
  9. public class serverReadHandler implements CompletionHandler<Integer,ByteBuffer>
  10. //用于读取半包消息和应答消息 
  11. private AsynchronousSocketChannel serverChannel; 
  12. public serverReadHandler(AsynchronousSocketChannel channel)
  13. // TODO Auto-generated constructor stub 
  14. this.serverChannel = channel; 

  15.  
  16.  
  17. @Override 
  18. public void completed(Integer result, ByteBuffer buffer)
  19. // TODO Auto-generated method stub 
  20. //操作系统读取IO就绪之后,进入这个函数 
  21. //调整limit和position关系,方便读取 
  22. //buffer.flip(); 
  23. if (buffer.hasRemaining()) { 
  24. byte[] bytes = new byte[buffer.remaining()]; 
  25. buffer.get(bytes); 
  26. String msg = null
  27. try
  28. msg = new String(bytes, "UTF-8"); 
  29. } catch (UnsupportedEncodingException e) { 
  30. // TODO Auto-generated catch block 
  31. e.printStackTrace(); 

  32. System.out.println("服务器收到消息:" + msg); 
  33.  
  34. String calResult = null
  35. StringBuffer stringBuffer = new StringBuffer(msg); 
  36. calResult = stringBuffer.reverse().toString(); 
  37. //向客户端发送结果 
  38. byte[] resultBytes = calResult.getBytes(); 
  39. ByteBuffer rBuffer = ByteBuffer.allocate(resultBytes.length); 
  40. rBuffer.put(resultBytes); 
  41. this.serverChannel.write(rBuffer, rBuffer, new ServerWriteHandler(this.serverChannel)); 
  42. }else
  43. System.out.println("服务器没有读取到数据"); 


  44.  
  45. @Override 
  46. public void failed(Throwable exc, ByteBuffer buffer)
  47. // TODO Auto-generated method stub 
  48. try
  49. this.serverChannel.close(); 
  50. System.out.println("服务器socket关闭~~~"); 
  51. } catch (IOException e) { 
  52. // TODO Auto-generated catch block 
  53. e.printStackTrace(); 


  54.  

  55.  

serverWriteHandler

  1. //package aio; 
  2.  
  3. import java.io.IOException; 
  4. import java.nio.ByteBuffer; 
  5. import java.nio.channels.AsynchronousSocketChannel; 
  6. import java.nio.channels.CompletionHandler; 
  7.  
  8. public class ServerWriteHandler implements CompletionHandler<Integer,ByteBuffer>
  9. private AsynchronousSocketChannel serverChannel; 
  10. public ServerWriteHandler(AsynchronousSocketChannel channel)
  11. // TODO Auto-generated constructor stub  
  12. this.serverChannel = channel; 

  13. @Override 
  14. public void completed(Integer result, ByteBuffer buffer)
  15. // TODO Auto-generated method stub 
  16. //调整limit和position的位置,方便下面输出使用 
  17. //buffer.flip(); 
  18. if (buffer.hasRemaining()) { 
  19. System.out.println("服务器输出数据~~~"); 
  20. buffer.clear(); 
  21. //向客户端写入数据 
  22. this.serverChannel.write(buffer, buffer, this); 
  23. } else
  24. //读取数据 
  25. ByteBuffer readBuffer = ByteBuffer.allocate(1024); 
  26. this.serverChannel.read(readBuffer, readBuffer, new serverReadHandler(this.serverChannel)); 


  27. @Override 
  28. public void failed(Throwable exc, ByteBuffer buffer)
  29. // TODO Auto-generated method stub 
  30. //出现异常关闭socketchannel 
  31. try
  32. this.serverChannel.close(); 
  33. } catch (IOException e) { 
  34. // TODO Auto-generated catch block 
  35. e.printStackTrace(); 


  36.  

  37.  

客户端

client

  1. //package aio; 
  2.  
  3. import java.util.Scanner; 
  4.  
  5. public class client
  6. private static String DEFAULT_HOST = "127.0.0.1";  
  7. private static int DEFAULT_PORT = 8000;  
  8. private static AsyncClientHandler clientHandle;  
  9. public static void start(){  
  10. start(DEFAULT_HOST,DEFAULT_PORT);  
  11. }  
  12. public static synchronized void start(String ip,int port){  
  13. if(clientHandle!=null)  
  14. return;  
  15. clientHandle = new AsyncClientHandler(ip,port);  
  16. new Thread(clientHandle,"Client").start();  
  17. }  
  18. //向服务器发送消息  
  19. public static boolean sendMsg(String msg) throws Exception{  
  20. if(msg.equals("q")) return false;  
  21. clientHandle.sendMsg(msg);  
  22. return true;  
  23. }  
  24. //@SuppressWarnings("resource")  
  25. public static void main(String[] args) throws Exception{  
  26. //System.out.println("请输入请求消息:");  
  27. start();  
  28. System.out.println("请输入请求消息:");  
  29. Scanner scanner = new Scanner(System.in);  
  30. String tmp = null
  31. for (int i = 0; i < 10; i++) { 
  32. tmp = scanner.nextLine(); 
  33. clientHandle.sendMsg(tmp); 

  34.  
  35. }  

  36.  

AsyncClientHandler

  1. //package aio; 
  2.  
  3. import java.io.IOException; 
  4. import java.net.InetSocketAddress; 
  5. import java.nio.ByteBuffer; 
  6. import java.nio.channels.AsynchronousSocketChannel; 
  7. import java.nio.channels.CompletionHandler; 
  8. import java.util.concurrent.CountDownLatch; 
  9.  
  10. public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable
  11. private AsynchronousSocketChannel clientChannel; 
  12. private String host; 
  13. private int port; 
  14. private CountDownLatch latch; 
  15.  
  16. public AsyncClientHandler(String host, int port)
  17. // TODO Auto-generated constructor stub 
  18. this.host = host; 
  19. this.port = port; 
  20. try
  21. //创建异步客户端通道 
  22. clientChannel = AsynchronousSocketChannel.open(); 
  23. } catch (Exception e) { 
  24. // TODO: handle exception 
  25. e.printStackTrace(); 


  26.  
  27.  
  28. @Override 
  29. public void run()
  30. // TODO Auto-generated method stub 
  31. latch = new CountDownLatch(1); 
  32. //创建InetScoketAddress 
  33. InetSocketAddress address = new InetSocketAddress(this.host, this.port); 
  34. //发起异步连接操作,回调参数是这个类本身,如果连接成功会回调completed方法 
  35. clientChannel.connect(address, this, this); 
  36. try
  37. latch.await(); 
  38. } catch (InterruptedException e) { 
  39. // TODO Auto-generated catch block 
  40. e.printStackTrace(); 

  41. try
  42. clientChannel.close(); 
  43. } catch (IOException e) { 
  44. // TODO Auto-generated catch block 
  45. e.printStackTrace(); 


  46.  
  47. @Override 
  48. public void completed(Void result, AsyncClientHandler attachment)
  49. // TODO Auto-generated method stub 
  50. //连接服务器成功,就会调用这个函数 
  51. System.out.println("连接服务器成功!!!!"); 

  52.  
  53. @Override 
  54. public void failed(Throwable exc, AsyncClientHandler attachment)
  55. // TODO Auto-generated method stub 
  56. //连接服务器失败,会调用这个函数 
  57. System.out.println("连接服务器失败!!!"); 
  58. exc.printStackTrace(); 
  59.  
  60. try
  61. clientChannel.close(); 
  62. latch.countDown(); 
  63. } catch (IOException e) { 
  64. // TODO Auto-generated catch block 
  65. e.printStackTrace(); 


  66.  
  67. //向服务器发送消息 
  68. public void sendMsg(String msg)
  69. System.out.println(msg); 
  70. byte[] bytes = msg.getBytes(); 
  71. ByteBuffer buffer = ByteBuffer.allocate(bytes.length); 
  72. buffer.put(bytes); 
  73. //进行flip操作 
  74. buffer.flip(); 
  75. //异步写 
  76. clientChannel.write(buffer, buffer, new WriteHandler(clientChannel,latch)); 
  77.  
  78.  

  79.  
  80.  

  81.  
  82.  

writeHandler

  1. //package aio; 
  2.  
  3. import java.io.IOException; 
  4. import java.nio.ByteBuffer; 
  5. import java.nio.channels.AsynchronousSocketChannel; 
  6. import java.nio.channels.CompletionHandler; 
  7. import java.util.concurrent.CountDownLatch; 
  8.  
  9. public class WriteHandler implements CompletionHandler<Integer, ByteBuffer>
  10.  
  11. private AsynchronousSocketChannel clientChannel; 
  12. private CountDownLatch latch; 
  13. public WriteHandler(AsynchronousSocketChannel channel, CountDownLatch latch)
  14. // TODO Auto-generated constructor stub 
  15. this.clientChannel = channel; 
  16. this.latch = latch; 

  17. @Override 
  18. public void completed(Integer result, ByteBuffer buffer)
  19. // TODO Auto-generated method stub 
  20. System.out.println("发送数据成功!~~~"); 
  21. //进行flip操作 
  22. //buffer.flip(); 
  23. //判断buffer中是否有需要发送的数据,如果有数据就进行发送,如果没有就进行读取操作 
  24. if (buffer.hasRemaining()) { 
  25. System.out.println("进入写数据!!!"); 
  26. clientChannel.write(buffer, buffer, this); 
  27. System.out.println("发送数据成功~~~"); 

  28. //读取数据 
  29. System.out.println("进入读取数据"); 
  30. ByteBuffer readBuffer = ByteBuffer.allocate(1024); 
  31. clientChannel.read(readBuffer, readBuffer, new ReadHandle(clientChannel,latch)); 
  32.  

  33.  
  34. @Override 
  35. public void failed(Throwable exc, ByteBuffer buffer)
  36. // TODO Auto-generated method stub 
  37. System.err.println("发送数据失败~~~"); 
  38. try
  39. clientChannel.close(); 
  40. latch.countDown(); 
  41. } catch (IOException e) { 
  42. // TODO Auto-generated catch block 
  43. e.printStackTrace(); 

  44.  

  45.  

  46.  

ReadHandle

  1. //package aio; 
  2.  
  3. import java.io.IOException; 
  4. import java.io.UnsupportedEncodingException; 
  5. import java.nio.ByteBuffer; 
  6. import java.nio.channels.AsynchronousSocketChannel; 
  7. import java.nio.channels.CompletionHandler; 
  8. import java.util.concurrent.CountDownLatch; 
  9.  
  10. public class ReadHandle implements CompletionHandler<Integer,ByteBuffer>
  11. private AsynchronousSocketChannel clientChannel; 
  12. private CountDownLatch latch; 
  13. public ReadHandle(AsynchronousSocketChannel channel, CountDownLatch latch)
  14. // TODO Auto-generated constructor stub 
  15. this.latch = latch; 
  16. this.clientChannel = channel; 

  17. @Override 
  18. public void completed(Integer result, ByteBuffer buffer)
  19. // TODO Auto-generated method stub 
  20. //调整buffer的limit和position方便获取收到的数据 
  21. //buffer.flip(); 
  22. System.out.println("读取数据成功!!!"); 
  23. byte[] bytes = new byte[buffer.remaining()]; 
  24. buffer.get(bytes); 
  25. String res; 
  26. try
  27. res = new String(bytes, "UTF-8"); 
  28. System.out.println("收到的数据: " + res); 
  29. } catch (UnsupportedEncodingException e) { 
  30. // TODO Auto-generated catch block 
  31. e.printStackTrace(); 

  32.  
  33.  
  34.  
  35.  

  36. @Override 
  37. public void failed(Throwable exc, ByteBuffer attachment)
  38. // TODO Auto-generated method stub 
  39. System.err.println("读取数据失败~~~"); 
  40.  
  41. try
  42. clientChannel.close(); 
  43. this.latch.countDown(); 
  44. } catch (IOException e) { 
  45. // TODO Auto-generated catch block 
  46. e.printStackTrace(); 


  47.  
  48.  

  49.  
原文地址:https://www.cnblogs.com/chailinbo/p/9226651.html