JAVA中的BIO,NIO与多路复用(select,poll,epoll)

1.Socket链接的建立

   java程序(server) 在应用空间中运行,当建立一个socket链接时,会向内核空间中的内核程序(sc)发送指令,内核程序中一定会执行Socket(AF_UNIX,SOCK_STAM,0) -> fd(文件标识符)6  --传递-> bind(6,9999) 绑定端口和文件标识符 --监听-->listen(6)

2.BIO (Blocking I/O)

  概念:同步阻塞型IO,对于客户端的每个链接都将开启一个新的线程处理.

  流程:在BIO中apccet的阻塞是在内核程序中阻塞,在使用strace 监控线程时可以看到 accept(6, 后停止打印,直到有链接建立然后才会继续剩余代码的执行. IO的阻塞在strace中可以看到是recv(5 停止打印直到有数据传输  

  优点:可以与多个客户端建立链接

  缺点:一链接一线程,导致内存浪费.链接建立的越多,CPU线程切换更加频繁.

  代码:

 1 2 
 3 import java.io.*;
 4 import java.net.ServerSocket;
 5 import java.net.Socket;
 6 import java.util.concurrent.*;
 7 
 8 /**
 9  * @author baiyang
10  * @version 1.0
11  * @date 2020/6/9 11:10 下午
12  */
13 public class Server {
14 
15     public static void main(String[] args) throws Exception {
16 
17         ExecutorService executorService = Executors.newFixedThreadPool(50);
18         final ServerSocket serverSocket = new ServerSocket(9999);
19         while (true){
20             Socket socket = serverSocket.accept(); // 阻塞
21             new Thread(() ->{
22                 BufferedReader bufferedReader = null;
23                 int port = socket.getPort();
24                 System.out.println(port);
25                 try {
26                     bufferedReader = new BufferedReader(new InputStreamReader(new BufferedInputStream(socket.getInputStream()))); // IO阻塞
27                     String clientMessage = bufferedReader.readLine();
28                     System.out.println(clientMessage);
29 
30                     BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new BufferedOutputStream(socket.getOutputStream())));
31                     bufferedWriter.write("this is service
");
32                     bufferedWriter.flush();
33                     bufferedWriter.close();
34                     bufferedReader.close();
35                 } catch (IOException e) {
36                     e.printStackTrace();
37                 }
38             }).start();
39 
40         }
41     }
42 }
 1 package com.diandian.client.bio;
 2 
 3 import java.io.*;
 4 import java.net.InetSocketAddress;
 5 import java.net.Socket;
 6 
 7 /**
 8  * @author baiyang
 9  * @version 1.0
10  * @date 2020/6/9 11:18 下午
11  */
12 public class Client {
13 
14     public static void main(String[] args) throws Exception {
15         Socket socket = new Socket();
16         socket.connect(new InetSocketAddress(9090));
17         BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new BufferedOutputStream(socket.getOutputStream())));
18         bufferedWriter.write("this is client
");
19         bufferedWriter.flush();
20         BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new BufferedInputStream(socket.getInputStream())));
21         String clientMessage = bufferedReader.readLine();
22         bufferedReader.close();
23         bufferedWriter.close();
24         System.out.println(clientMessage);
25 
26 
27     }
28 }
View Code

3.NIO(java -> New I/O  操作系统 -> NONBLOCKING)

概念:在java中NIO 指的是new IO 1.4版本之后新的一个包出现

  在操作系统中的体现是NONBLOCKING的支持

  同步非阻塞

流程:在NIO中,ServerSocketChannel类中调用方法configureBlocking(false),accept将不在阻塞.当没有链接时accept() = -1,当客户端没有发送信息时recv() = -1.BIO中的阻塞就解决了

优点: 解决了BIO多线程的问题,解决了C10K问题

缺点: 循环遍历已链接的客户端,实现监控是否有数据写入. ---> 每一次的循环将会向内核程序发送一条指令,假设有1W个链接建立且只有1个客户端向服务器发送指令,那么需要向内核发送1W次指令,无效指令数9999次.

代码:

 1 package com.diandian.server.nio;
 2 
 3 import java.net.InetSocketAddress;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.ServerSocketChannel;
 6 import java.nio.channels.SocketChannel;
 7 import java.util.ArrayList;
 8 
 9 /**
10  * @author baiyang
11  * @version 1.0
12  * @date 2020/6/11 11:32 下午
13  */
14 public class NioServer {
15 
16     public static void main(String[] args) {
17         //  保存已链接的客户端
18         ArrayList<SocketChannel> clients = new ArrayList<>();
19         try {
20             ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
21             serverSocketChannel.bind(new InetSocketAddress(9999));
22             // 设置false不阻塞 调用 OS  NONBLOCKING
23             serverSocketChannel.configureBlocking(false);
24             while (true) {
25                 // 可有可无方便测试
26                 Thread.sleep(2000);
27                 SocketChannel client = serverSocketChannel.accept();
28                 if (null == client) {
29                     System.out.println("没有客户端建立链接");
30                 } else {
31                     client.configureBlocking(false);
32                     clients.add(client);
33                     System.out.println(client.socket().getPort());
34                 }
35                 ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
36                 for (SocketChannel s : clients){
37                     int read = s.read(byteBuffer);
38                     if(read > 0){
39                         byteBuffer.flip();
40                         byte[] bytes = new byte[byteBuffer.limit()];
41                         byteBuffer.get(bytes);
42                         System.out.println("收到客户端信息:"+new String(bytes));
43                         byteBuffer.clear();
44                     }
45                 }
46 
47             }
48         } catch (Exception e) {
49             e.printStackTrace();
50         }
51     }
52 }

4.多路复用

 本质上select poll epoll 都是IO同步的,读写操作在就绪后都将由自己进行读写,所以读写操作是阻塞的

 1 package com.diandian.service.nio;
 2 
 3 import java.net.InetSocketAddress;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.SelectionKey;
 6 import java.nio.channels.Selector;
 7 import java.nio.channels.ServerSocketChannel;
 8 import java.nio.channels.SocketChannel;
 9 import java.util.Iterator;
10 import java.util.Set;
11 
12 /**
13  * @author baiyang
14  * @version 1.0
15  * @date 2020/6/12 9:46 上午
16  */
17 public class NioSelect {
18 
19     public static void main(String[] args) {
20         try {
21             ServerSocketChannel server = ServerSocketChannel.open();
22             server.configureBlocking(false);
23             server.bind(new InetSocketAddress(9999));
24             // 开启selector
25             Selector selector = Selector.open();
26             // 将accept注册到selector
27             server.register(selector,SelectionKey.OP_ACCEPT);
28             while (true){
29                 System.out.println( " key Sizes:" + selector.selectedKeys().size());
30                 while (selector.select(1000) > 0){
31                     Set<SelectionKey> keys = selector.selectedKeys();
32                     Iterator<SelectionKey> iterator = keys.iterator();
33                     while (iterator.hasNext()){
34                         SelectionKey next = iterator.next();
35                         // 移除防止重复遍历
36                         iterator.remove();
37                         // 是否有新的链接进来
38                         if(next.isAcceptable()){
39                             ServerSocketChannel channel = (ServerSocketChannel) next.channel();
40                             SocketChannel cline = channel.accept();
41                             // 设置非阻塞
42                             cline.configureBlocking(false);
43                             System.out.println(cline.socket().getPort());
44                             ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
45                             // 将read事件注册到selector中
46                             cline.register(selector,SelectionKey.OP_READ,buffer);
47                         }else if (next.isReadable()){
48                             SocketChannel client = (SocketChannel) next.channel();
49                             ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
50                             buffer.clear();
51                             int read;
52                             while (true) {
53                                 read = client.read(buffer);
54                                 if (read > 0) {
55                                     buffer.flip();
56                                     while (buffer.hasRemaining()) {
57                                         client.write(buffer);
58                                     }
59                                     buffer.clear();
60                                 } else if (read == 0) {
61                                     break;
62                                 } else {
63                                     client.close();
64                                     break;
65                                 }
66                             }
67                         }
68 
69                     }
70                 }
71             }
72 
73         } catch (Exception e) {
74             e.printStackTrace();
75         }
76     }
77 }

 

  4.1 select poll

  概念:NIO中解决多次向内核程序发送无效指令问题

  流程: select(fd) --全量遍历--> 用户空间

  优点:减少了向内核程序发送指令次数,一次将所有链接的fd发送给内核,由内核遍历

  缺点:每次都将全量的fd进行发送,内核将进行全量遍历,只有一个selector来来进行监控accept() IO操作

  select和poll的区别: 本质上是一致的, poll用链表存储无链接限制. select 监听fd有链接数量限制,LINUX32位默认 1024 64位默认 2048 

  4.2 epoll (even poll)

概念:会将有发生IO操作的fd通知到应用程序
优点:不再每次进行全量的遍历 复杂度降低至O(1)可使用多个selector进行监控
流程: 建立链接后 epoll_create(256) -> 7 ----> epoll_crl(7,ADD,6(scoket建立时返回的文件标识符),accept) 将6的accept注册到7空间(红黑树保存fd)中 ----> epoll_wait() 阻塞的等待客户端的链接 O(1) -accept(6)->8 与客户端建立链接8 -> epoll_crl(7,ADD,8,read)将读事件注册到7空间 ----> epoll_wait(6,8)

原文地址:https://www.cnblogs.com/diandiandian/p/13096977.html