说Java网络编程

基础IO模型

网络编程的目的在于远程发送数据,发送接收数据就涉及到I/O的操作,这里因为涉及到比较底层字节和字符的操作,所以不可以使用java.nio.file.Files 操作文件。那就先说说I/O吧,I/O流分为字节流和字符流。字节即Byte,包含8位二进制数,一个二进制数就是1bit,中文名称叫位。字符即一个字母或者一个汉字。一个字母由一个字节组成,而汉字根据编码不同由2个或者3个组成。

 Java I/O类的实现原理是装饰者模式,装饰者模式动态地将责任附加到对象上。若要扩展功能,装饰者提供了比继承更有弹性的替代方案。

1.将被装饰者(Concrete Component)当做类中一个成员变量。
2.利用构造将被装饰者注入

然后引入I/O模型:

阻塞和非阻塞,描述的是结果的请求

阻塞:在得到结果之前就一直呆在那,啥也不干,此时线程挂起,就如其名,线程被阻塞了。

非阻塞:如果没得到结果就返回,等一会再去请求,直到得到结果为止。

异步和同步,描述的是结果的发出,当调用方的请求进来后。

同步:在没获取到结果前就不返回给调用方,如果调用方是阻塞的,那么调用方就会一直等着。如果调用方是非阻塞的,调用方就会先回去,等一会再来问问得到结果没。

异步:调用方一来,会直接返回,等执行完实际的逻辑后在通过回调函数把结果返回给调用方。

异步非阻塞

用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

事实上就是,用户提交IO请求,然后直接返回,并且内核自动完成将数据从内核缓冲区复制到用户缓冲区,完成后再通知用户。

当然,内核通知我们以后我们还需要执行剩余的操作,但是我们的代码已经继续往下运行了,所以AIO采用了回调的机制,为每个socket注册一个回调事件或者是回调处理器,在处理器中完成数据的操作,也就是内核通知到用户的时候,会自动触发回调函数,完成剩余操作。 这样的方式就是异步的网络编程。

但是,想要让操作系统支持这样的功能并非易事,windows的IOCP可以支持AIO方式,但是Linux的AIO支持并不是很好。(所以Netty后来也取消了对AIO的支持)

IO多路复用 :使用IO多路复用器管理socket,由于每个socket是一个文件描述符,操作系统可以维护socket和它的连接状态,一般分为可连接,可读和可写等状态。

每当用户程序接受到socket请求,将请求托管给多路复用器进行监控,当程序对请求感兴趣的事件发生时,多路复用器以某种方式通知或是用户程序自己轮询请求,以便获取就绪的socket,然后只需使用一个线程进行轮询,多个线程处理就绪请求即可。

IO多路复用避免了每个socket请求都需要一个线程去处理,而是使用事件驱动的方式,让少数的线程去处理多数socket的IO请求。

Linux操作系统对IO多路复用提供了较好的支持,select,poll,epoll是Linux提供的支持IO多路复用的API。一般用户程序基于这个API去开发自己的IO复用模型。比如NIO的非阻塞模型,就是采用了IO多路复用的方式,是基于epoll实现的。

select方式主要是使用数组来存储socket描述符,系统将发生事件的描述符做标记,然后IO复用器在轮询描述符数组的时候,就可以知道哪些请求是就绪了的。缺点是数组的长度只能到1024,并且需要不断地在内核空间和用户空间之间拷贝数组。

poll方式不采用数组存储描述符,而是使用独立的自定义数据结构pollfd来描述,并且使用id来表示描述符,能支持更多的请求数量,缺点和select方式有点类似,就是轮询的效率很低,并且需要拷贝数据。

 

当然,上述两种方法适合在请求总数较少,并且活跃请求数较多的情况,这种场景下他们的性能还是不错的。

 

epoll,epoll函数会在内核空间开辟一个特殊的数据结构,红黑树,树节点中存放的是一个socket描述符以及用户程序感兴趣的事件类型。同时epoll还会维护一个链表。用于存储已经就绪的socket描述符节点。由Linux内核完成对红黑树的维护,当事件到达时,内核将就绪的socket节点加入链表中,用户程序可以直接访问这个链表以便获取就绪的socket。

有了直接与文件交互的I/O类,那怎么样与网络交互呢?这里就引入Socket

socket是操作系统提供的网络编程接口,他封装了对于TCP/IP协议栈的支持,用于进程间的通信,当有连接接入主机以后,操作系统自动为其分配一个socket套接字,套接字绑定着一个IP与端口号。通过socket接口,可以获取tcp连接的输入流和输出流,并且通过他们进行读取和写入此操作。

Java提供了net包用于socket编程,同时支持像Inetaddress,URL等工具类,使用socket绑定一个endpoint(ip+端口号),可以用于客户端的请求处理和发送,使用serversocket绑定本地ip和端口号,可以用于服务端接收TCP请求。

BIO编程模型

所谓BIO,就是Block IO,阻塞式的IO。这个阻塞主要发生在:ServerSocket接收请求时(accept()方法)、InputStream、OutputStream(输入输出流的读和写)都是阻塞的。这个可以在下面代码的调试中发现,比如在客户端接收服务器消息的输入流处打上断点,除非服务器发来消息,不然断点是一直停在这个地方的。也就是说这个线程在这时间是被阻塞的。

这里放一个BIO模型实现的聊天室Demo,对于Socket编程基础请移步:https://www.cnblogs.com/yiwangzhibujian/p/7107785.html

服务端ChatServer:

 1 /*
 2 1. 功能实现:这个类的作用就像Acceptor。它有两个比较关键的全局变量,一个就是存储在线用户信息的Map,一个就是线程池。
 3 这个类会监听端口,接收客户端的请求,然后为客户端分配工作线程。
 4 还会提供一些常用的工具方法给每个工作线程调用,比如:发送消息、添加在线用户等。
 5 */
 6 
 7 import java.io.*;
 8 import java.net.*;
 9 import java.util.Map;
10 import java.util.concurrent.*;
11 
12 public class ChatServer {
13     private int DEFAULT_PORT = 8888;
14     /**
15      * 创建一个Map存储在线用户的信息。这个map可以统计在线用户、针对这些用户可以转发其他用户发送的消息
16      * 因为会有多个线程操作这个map,所以为了安全起见用ConcurrentHashMap
17      * 在这里key就是客户端的端口号,但在实际中肯定不会用端口号区分用户,如果是web的话一般用session。
18      * value是IO的Writer,用以存储客户端发送的消息
19      */
20     private Map<Integer, Writer> map = new ConcurrentHashMap<>();
21     /**
22      * 创建线程池,线程上限为10个,如果第11个客户端请求进来,服务器会接收但是不会去分配线程处理它。
23      * 前10个客户端的聊天记录,它看不见。当有一个客户端下线时,这第11个客户端就会被分配线程,服务器显示在线
24      * 大家可以把10再设置小一点,测试看看
25      * */
26     private ExecutorService executorService = Executors.newFixedThreadPool(10);
27     //客户端连接时往map添加客户端
28     public void addClient(Socket socket) throws IOException {
29         if (socket != null) {
30             BufferedWriter writer = new BufferedWriter(
31                     new OutputStreamWriter(socket.getOutputStream())
32             );
33             map.put(socket.getPort(), writer);
34             System.out.println("Client["+socket.getPort()+"]:Online");
35         }
36     }
37 
38     //断开连接时map里移除客户端
39     public void removeClient(Socket socket) throws Exception {
40         if (socket != null) {
41             if (map.containsKey(socket.getPort())) {
42                 map.get(socket.getPort()).close();
43                 map.remove(socket.getPort());
44             }
45             System.out.println("Client[" + socket.getPort() + "]Offline");
46         }
47     }
48 
49     //转发客户端消息,这个方法就是把消息发送给在线的其他的所有客户端
50     public void sendMessage(Socket socket, String msg) throws IOException {
51         //遍历在线客户端
52         for (Integer port : map.keySet()) {
53             //发送给在线的其他客户端
54             if (port != socket.getPort()) {
55                 Writer writer = map.get(port);
56                 writer.write(msg);
57                 writer.flush();
58             }
59         }
60     }
61 
62     //接收客户端请求,并分配Handler去处理请求
63     public void start() {
64         try (ServerSocket serverSocket = new ServerSocket(DEFAULT_PORT)) {
65             System.out.println("Server Start,The Port is:"+DEFAULT_PORT);
66             while (true){
67                 //等待客户端连接
68                 Socket socket=serverSocket.accept();
69                 //为客户端分配一个ChatHandler线程
70                 executorService.execute(new ChatHandler(this, socket));
71             }
72         } catch (IOException e) {
73             e.printStackTrace();
74         }
75     }
76 
77     public static void main(String[] args) {
78         ChatServer server=new ChatServer();
79         server.start();
80     }
81 }
View Code

服务端的ChatHandler:

 1 /*
 2 1. 功能实现:这个类就是工作线程的类。在这个项目中,它的工作很简单:
 3 把接收到的消息转发给其他客户端,当然还有一些小功能,比如添加移除在线用户。
 4 */
 5 
 6 import java.io.*;
 7 import java.net.*;
 8 
 9 public class ChatHandler implements Runnable {
10     private ChatServer server;
11     private Socket socket;
12 
13     //构造函数,ChatServer通过这个分配Handler线程
14     public ChatHandler(ChatServer server, Socket socket) {
15         this.server = server;
16         this.socket = socket;
17     }
18 
19     @Override
20     public void run() {
21         try {
22             //往map里添加这个客户端
23             server.addClient(socket);
24             //读取这个客户端发送的消息
25             BufferedReader reader = new BufferedReader(
26                     new InputStreamReader(socket.getInputStream())
27             );
28             String msg = null;
29             while ((msg = reader.readLine()) != null) {
30                 //这样拼接是为了让其他客户端也能看清是谁发送的消息
31                 String sendmsg = "Client[" + socket.getPort() + "]:" + msg;
32                 //服务器打印这个消息
33                 System.out.println(sendmsg);
34                 //将收到的消息转发给其他在线客户端
35                 server.sendMessage(socket, sendmsg + "
");
36                 if (msg.equals("quit")) {
37                     break;
38                 }
39             }
40         } catch (IOException e) {
41             e.printStackTrace();
42         } finally {
43             //如果用户退出或者发生异常,就在map中移除该客户端
44             try {
45                 server.removeClient(socket);
46             } catch (Exception e) {
47                 e.printStackTrace();
48             }
49         }
50     }
51 }
View Code

客户端ChatClient:

 1 /*
 2 1. 功能实现:客户端启动类,也就是主线程,会通过Socket和服务器连接。也提供了两个工具方法:发送消息和接收消息。
 3 */
 4 
 5 import java.io.*;
 6 import java.net.*;
 7 
 8 public class ChatClient {
 9     private BufferedReader reader;
10     private BufferedWriter writer;
11     private Socket socket;
12     //发送消息给服务器
13     public void sendToServer(String msg) throws IOException {
14         //发送之前,判断socket的输出流是否关闭
15         if (!socket.isOutputShutdown()) {
16             //如果没有关闭就把用户键入的消息放到writer里面
17             writer.write(msg + "
");
18             writer.flush();
19         }
20     }
21     //从服务器接收消息
22     public String receive() throws IOException {
23         String msg = null;
24         //判断socket的输入流是否关闭
25         if (!socket.isInputShutdown()) {
26             //没有关闭的话就可以通过reader读取服务器发送来的消息。注意:如果没有读取到消息线程会阻塞在这里
27             msg = reader.readLine();
28         }
29         return msg;
30     }
31 
32     public void start() {
33         //和服务创建连接
34         try {
35             socket = new Socket("127.0.0.1", 8888);
36             reader=new BufferedReader(
37                     new InputStreamReader(socket.getInputStream())
38             );
39             writer=new BufferedWriter(
40                     new OutputStreamWriter(socket.getOutputStream())
41             );
42             //新建一个线程去监听用户输入的消息
43             new Thread(new UserInputHandler(this)).start();
44             /**
45              * 不停的读取服务器转发的其他客户端的信息
46              * 记录一下之前踩过的小坑:
47              * 这里一定要创建一个msg接收信息,如果直接用receive()方法判断和输出receive()的话会造成有的消息不会显示
48              * 因为receive()获取时,在返回之前是阻塞的,一旦接收到消息才会返回,也就是while这里是阻塞的,一旦有消息就会进入到while里面
49              * 这时候如果输出的是receive(),那么上次获取的信息就会丢失,然后阻塞在System.out.println
50              * */
51             String msg=null;
52             while ((msg=receive())!=null){
53                 System.out.println(msg);
54             }
55         } catch (IOException e) {
56             e.printStackTrace();
57         }finally {
58             try {
59                 if(writer!=null){
60                     writer.close();
61                 }
62             } catch (IOException e) {
63                 e.printStackTrace();
64             }
65         }
66     }
67 
68     public static void main(String[] args) {
69         new ChatClient().start();
70     }
71 }
View Code

客户端UserInputHandler:

 1 /*
 2 1. 功能实现:专门负责等待用户输入信息的线程,一旦有信息键入,就马上发送给服务器。
 3 */
 4 
 5 import java.io.*;
 6 
 7 public class UserInputHandler implements Runnable {
 8     private ChatClient client;
 9 
10     public UserInputHandler(ChatClient client) {
11         this.client = client;
12     }
13 
14     @Override
15     public void run() {
16         try {
17             //接收用户输入的消息
18             BufferedReader reader = new BufferedReader(
19                     new InputStreamReader(System.in)
20             );
21             //不停的获取reader中的System.in,实现了等待用户输入的效果
22             while (true) {
23                 String input = reader.readLine();
24                 //向服务器发送消息
25                 client.sendToServer(input);
26                 if (input.equals("quit"))
27                     break;
28             }
29         } catch (IOException e) {
30             e.printStackTrace();
31         }
32     }
33 }
View Code

这里有一个比较难解决的坑填一下:当Socket关闭的时候,服务端就会收到响应的关闭信号,那么服务端也就知道流已经关闭了,这个时候读取操作完成,就可以继续后续工作。比如利用BufferedReader.readerLine():

  • 误以为readLine()是读取到没有数据时就返回null(因为其它read方法当读到没有数据时返回-1),而实际上readLine()是一个阻塞函数,当没有数据读取时,就一直会阻塞在那,而不是返回null;因为readLine()阻塞后,System.out.println(message)这句根本就不会执行到,所以在接收端就不会有东西输出。要想执行到System.out.println(message),一个办法是发送完数据后就关掉流,这样readLine()结束阻塞状态,而能够得到正确的结果,但显然不能传一行就关一次数据流;另外一个办法是把System.out.println(message)放到while循环体内就可以。
  • readLine()只有在数据流发生异常或者另一端被close()掉时,才会返回null值。
  • 如果不指定buffer大小,则readLine()使用的buffer有8192个字符。在达到buffer大小之前,只有遇到"/r"、"/n"、"/r/n"才会返回。

告知对方已发送完命令:
1. 通过socket关闭,socket.close()。
2. 通过socket关闭输出流,socket.shutdownOutput()。
3. 这里也可以采用约定符号告知对方已发送完命令。
4. 根据长度界定:送一次消息变成了两个步骤:  1. 发送消息的长度   2. 发送消息

如果你了解一点class文件的结构(后续会写,敬请期待),那么你就会佩服这么设计方式,也就是说我们可以在此找灵感,就是我们可以先指定后续命令的长度,然后读取指定长度的内容做为客户端发送的消息。

  现在首要的问题就是用几个字节指定长度呢,我们可以算一算:

  • 1个字节:最大256,表示256B
  • 2个字节:最大65536,表示64K
  • 3个字节:最大16777216,表示16M
  • 4个字节:最大4294967296,表示4G
  • 依次类推

  这个时候是不是很纠结,最大的当然是最保险的,但是真的有必要选择最大的吗,其实如果你稍微了解一点UTF-8的编码方式(字符编码后续会写,敬请期待),那么你就应该能想到为什么一定要固定表示长度字节的长度呢,我们可以使用变长方式来表示长度的表示,比如:

  • 第一个字节首位为0:即0XXXXXXX,表示长度就一个字节,最大128,表示128B
  • 第一个字节首位为110,那么附带后面一个字节表示长度:即110XXXXX 10XXXXXX,最大2048,表示2K
  • 第一个字节首位为1110,那么附带后面二个字节表示长度:即110XXXXX 10XXXXXX 10XXXXXX,最大131072,表示128K
  • 依次类推

  上面提到的这种用法适合高富帅的程序员使用,一般呢,如果用作命名发送,两个字节就够了,如果还不放心4个字节基本就能满足你的所有要求,下面的例子我们将采用2个字节表示长度,目的只是给你一种思路,让你知道有这种方式来获取消息的结尾:

服务端SocketServer:

 1 import java.io.InputStream;
 2 import java.net.ServerSocket;
 3 import java.net.Socket;
 4 
 5 public class SocketServer {
 6   public static void main(String[] args) throws Exception {
 7     // 监听指定的端口
 8     int port = 55533;
 9     ServerSocket server = new ServerSocket(port);
10 
11     // server将一直等待连接的到来
12     System.out.println("server将一直等待连接的到来");
13     Socket socket = server.accept();
14     // 建立好连接后,从socket中获取输入流,并建立缓冲区进行读取
15     InputStream inputStream = socket.getInputStream();
16     byte[] bytes;
17     // 因为可以复用Socket且能判断长度,所以可以一个Socket用到底
18     while (true) {
19       // 首先读取两个字节表示的长度
20       int first = inputStream.read();
21       //如果读取的值为-1 说明到了流的末尾,Socket已经被关闭了,此时将不能再去读取
22       if(first==-1){
23         break;
24       }
25       int second = inputStream.read();
26       int length = (first << 8) + second;
27       // 然后构造一个指定长的byte数组
28       bytes = new byte[length];
29       // 然后读取指定长度的消息即可
30       inputStream.read(bytes);
31       System.out.println("get message from client: " + new String(bytes, "UTF-8"));
32     }
33     inputStream.close();
34     socket.close();
35     server.close();
36   }
37 }

客户端SocketClient:

 1 import java.io.OutputStream;
 2 import java.net.Socket;
 3 
 4 public class SocketClient {
 5   public static void main(String args[]) throws Exception {
 6     // 要连接的服务端IP地址和端口
 7     String host = "127.0.0.1";
 8     int port = 55533;
 9     // 与服务端建立连接
10     Socket socket = new Socket(host, port);
11     // 建立连接后获得输出流
12     OutputStream outputStream = socket.getOutputStream();
13     String message = "你好";
14     //首先需要计算得知消息的长度
15     byte[] sendBytes = message.getBytes("UTF-8");
16     //然后将消息的长度优先发送出去
17     outputStream.write(sendBytes.length >>8);
18     outputStream.write(sendBytes.length);
19     //然后将消息再次发送出去
20     outputStream.write(sendBytes);
21     outputStream.flush();
22     //==========此处重复发送一次,实际项目中为多个命名,此处只为展示用法
23     message = "第二条消息";
24     sendBytes = message.getBytes("UTF-8");
25     outputStream.write(sendBytes.length >>8);
26     outputStream.write(sendBytes.length);
27     outputStream.write(sendBytes);
28     outputStream.flush();
29     //==========此处重复发送一次,实际项目中为多个命名,此处只为展示用法
30     message = "the third message!";
31     sendBytes = message.getBytes("UTF-8");
32     outputStream.write(sendBytes.length >>8);
33     outputStream.write(sendBytes.length);
34     outputStream.write(sendBytes);    
35     
36     outputStream.close();
37     socket.close();
38   }
39 }

NIO编程模型

NIO包含下面几个核心的组件:

  • Channels

  • Buffers

  • Selectors

整个NIO体系包含的类远远不止这三个,只能说这三个是NIO体系的“核心API”。

 

通道

在Java NIO中,主要使用的通道如下(涵盖了UDP 和 TCP 网络IO,以及文件IO):

  • DatagramChannel

  • SocketChannel

  • FileChannel

  • ServerSocketChannel

缓冲区

在Java NIO中使用的核心缓冲区如下(覆盖了通过I/O发送的基本数据类型:byte, char、short, int, long, float, double ,long):

  • ByteBuffer

  • CharBuffer

  • ShortBuffer

  • IntBuffer

  • FloatBuffer

  • DoubleBuffer

  • LongBuffer

选择器

Java NIO提供了“选择器”的概念。这是一个可以用于监视多个通道的对象,如数据到达,连接打开等。因此,单线程可以监视多个通道中的数据。

如果应用程序有多个通道(连接)打开,但每个连接的流量都很低,则可考虑使用它。 例如:在聊天服务器中。

要使用Selector的话,我们必须把Channel注册到Selector上,然后就可以调用Selector的select()方法。这个方法会进入阻塞,直到有一个channel的状态符合条件。当方法返回后,线程可以处理这些事件。

 

Buffer(缓冲区)介绍

Java NIO Buffers用于和NIO Channel交互。 我们从Channel中读取数据到buffers里,从Buffer把数据写入到Channels.

Buffer本质上就是一块内存区,可以用来写入数据,并在稍后读取出来。这块内存被NIO Buffer包裹起来,对外提供一系列的读写方便开发的接口。

在Java NIO中使用的核心缓冲区如下(覆盖了通过I/O发送的基本数据类型:byte,char,short,int,long,float,double ,long):

利用Buffer读写数据,通常遵循四个步骤:

  1. 把数据写入buffer;

  2. 调用flip;

  3. 从Buffer中读取数据;

  4. 调用buffer.clear()或者buffer.compact()。

当写入数据到buffer中时,buffer会记录已经写入的数据大小。当需要读数据时,通过 flip() 方法把buffer从写模式调整为读模式;在读模式下,可以读取所有已经写入的数据。

当读取完数据后,需要清空buffer,以满足后续写入操作。清空buffer有两种方式:调用 clear()compact() 方法。clear会清空整个buffer,compact则只清空已读取的数据,未被读取的数据会被移动到buffer的开始位置,写入位置则近跟着未读数据之后

 

Buffer的容量,位置,上限(Buffer Capacity,Position and Limit)

Buffer缓冲区实质上就是一块内存,用于写入数据,也供后续再次读取数据。这块内存被NIO Buffer管理,并提供一系列的方法用于更简单的操作这块内存。

一个Buffer有三个属性是必须掌握的,分别是:

  • capacity容量

  • position位置

  • limit限制

position和limit的具体含义取决于当前buffer的模式。capacity在两种模式下都表示容量。

读写模式下position和limit的含义:

 

容量(Capacity)

作为一块内存,buffer有一个固定的大小,叫做capacit(容量)。也就是最多只能写入容量值得字节,整形等数据。一旦buffer写满了就需要清空已读数据以便下次继续写入新的数据。

 

位置(Position)

当写入数据到Buffer的时候需要从一个确定的位置开始,默认初始化时这个位置position为0,一旦写入了数据比如一个字节,整形数据,那么position的值就会指向数据之后的一个单元,position最大可以到capacity-1.

当从Buffer读取数据时,也需要从一个确定的位置开始。buffer从写入模式变为读取模式时,position会归零,每次读取后,position向后移动。

 

上限(Limit)

在写模式,limit的含义是我们所能写入的最大数据量,它等同于buffer的容量。

一旦切换到读模式,limit则代表我们所能读取的最大数据量,他的值等同于写模式下position的位置。换句话说,您可以读取与写入数量相同的字节数(限制设置为写入的字节数,由位置标记)。

 

Buffer的常见方法

方法介绍
abstract Object array() 返回支持此缓冲区的数组 (可选操作)
abstract int arrayOffset() 返回该缓冲区的缓冲区的第一个元素的在数组中的偏移量 (可选操作)
int capacity() 返回此缓冲区的容量
Buffer clear() 清除此缓存区。将position = 0;limit = capacity;mark = -1;
Buffer flip() flip()方法可以吧Buffer从写模式切换到读模式。调用flip方法会把position归零,并设置limit为之前的position的值。 也就是说,现在position代表的是读取位置,limit标示的是已写入的数据位置。
abstract boolean hasArray() 告诉这个缓冲区是否由可访问的数组支持
boolean hasRemaining() return position < limit,返回是否还有未读内容
abstract boolean isDirect() 判断个缓冲区是否为 direct
abstract boolean isReadOnly() 判断告知这个缓冲区是否是只读的
int limit() 返回此缓冲区的限制
Buffer position(int newPosition) 设置这个缓冲区的位置
int remaining() return limit - position; 返回limit和position之间相对位置差
Buffer rewind() 把position设为0,mark设为-1,不改变limit的值
Buffer mark() 将此缓冲区的标记设置在其位置

 

Buffer的使用方式/方法介绍

分配缓冲区(Allocating a Buffer)

为了获得缓冲区对象,我们必须首先分配一个缓冲区。在每个Buffer类中,allocate()方法用于分配缓冲区。

下面来看看一个示例:CharBuffer分配空间大小为2048个字符。

  1. CharBuffer buf = CharBuffer.allocate(2048);

写入数据到缓冲区(Writing Data to a Buffer)

写数据到Buffer有两种方法:

  • 从Channel中写数据到Buffer

  • 手动写数据到Buffer,调用put方法

下面是一个实例,演示从Channel写数据到Buffer:

  1. int bytesRead = inChannel.read(buf); //read into buffer.

通过put写数据:

  1. buf.put(127);

put方法有很多不同版本,对应不同的写数据方法。例如把数据写到特定的位置,或者把一个字节数据写入buffer。看考JavaDoc文档可以查阅的更多数据。

 

翻转(flip())

flip()方法可以吧Buffer从写模式切换到读模式。调用flip方法会把position归零,并设置limit为之前的position的值。 也就是说,现在position代表的是读取位置,limit标示的是已写入的数据位置。

 

从Buffer读取数据(Reading Data from a Buffer)

从Buffer读数据也有两种方式。

  • 从buffer读数据到channel

  • 从buffer直接读取数据,调用get方法

读取数据到channel的例子:

  1. int bytesWritten = inChannel.write(buf);

调用get读取数据的例子:

  1. byte aByte = buf.get();

get也有诸多版本,对应了不同的读取方式。

 

rewind()

Buffer.rewind()方法将position置为0,这样我们可以重复读取buffer中的数据。limit保持不变。

 

clear() and compact()

一旦我们从buffer中读取完数据,需要复用buffer为下次写数据做准备。只需要调用clear()或compact()方法。

如果调用的是clear()方法,position将被设回0,limit被设置成 capacity的值。换句话说,Buffer 被清空了。Buffer中的数据并未清除,只是这些标记告诉我们可以从哪里开始往Buffer里写数据。

如果Buffer还有一些数据没有读取完,调用clear就会导致这部分数据被“遗忘”,因为我们没有标记这部分数据未读。

针对这种情况,如果需要保留未读数据,那么可以使用compact。 因此 compact()clear() 的区别就在于: 对未读数据的处理,是保留这部分数据还是一起清空

 

mark()与reset()方法

通过调用Buffer.mark()方法,可以标记Buffer中的一个特定position。之后可以通过调用Buffer.reset()方法恢复到这个position。例如:

  1. buffer.mark();

  2. //call buffer.get() a couple of times, e.g. during parsing.

  3. buffer.reset();  //set position back to mark.

equals() and compareTo()

可以用eqauls和compareTo比较两个buffer

equals():

判断两个buffer相对,需满足:

  • 类型相同

  • buffer中剩余字节数相同

  • 所有剩余字节相等

从上面的三个条件可以看出,equals只比较buffer中的部分内容,并不会去比较每一个元素。

compareTo():

compareTo也是比较buffer中的剩余元素,只不过这个方法适用于比较排序的。

  

Channel

Channel(通道)主要用于传输数据,然后从Buffer中写入或读取。它们两个结合起来虽然和流有些相似,但主要有以下几点区别:
  1.流是单向的,可以发现Stream的输入流和输出流是独立的,它们只能输入或输出。而通道既可以读也可以写。
  2.通道本身不能存放数据,只能借助Buffer。
  3.Channel支持异步。

Channel有如下三个常用的类:FileChannel、SocketChannel、ServerSocketChannel。从名字也可以看出区别,第一个是对文件数据的读写,后面两个则是针对Socket和ServerSocket,这里我们只是用后面两个。

Selector

多个Channel可以注册到Selector,就可以直接通过一个Selector管理多个通道。Channel在不同的时间或者不同的事件下有不同的状态,Selector会通过轮询来达到监视的效果,如果查到Channel的状态正好是我们注册时声明的所要监视的状态,我们就可以查出这些通道,然后做相应的处理。这些状态如下:
  1.客户端的SocketChannel和服务器端建立连接,SocketChannel状态就是Connect
  2.服务器端的ServerSocketChannel接收了客户端的请求,ServerSocketChannel状态就是Accept
  3.当SocketChannel有数据可读,那么它们的状态就是Read
  4.当我们需要向Channel中写数据时,那么它们的状态就是Write

下面继续介绍一些基本的接口操作。

  1. 首先,我们开启一个 Selector。你们爱翻译成选择器也好,多路复用器也好。

    Selector selector = Selector.open();
    
  2. 将 Channel 注册到 Selector 上。前面我们说了,Selector 建立在非阻塞模式之上,所以注册到 Selector 的 Channel 必须要支持非阻塞模式,FileChannel 不支持非阻塞,我们这里讨论最常见的 SocketChannel 和 ServerSocketChannel。

    // 将通道设置为非阻塞模式,因为默认都是阻塞模式的
    channel.configureBlocking(false);
    // 注册
    SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
    

    register 方法的第二个 int 型参数(使用二进制的标记位)用于表明需要监听哪些感兴趣的事件,共以下四种事件:

    • SelectionKey.OP_READ

      对应 00000001,通道中有数据可以进行读取

    • SelectionKey.OP_WRITE

      对应 00000100,可以往通道中写入数据

    • SelectionKey.OP_CONNECT

      对应 00001000,成功建立 TCP 连接

    • SelectionKey.OP_ACCEPT

      对应 00010000,接受 TCP 连接

  3. 我们可以同时监听一个 Channel 中的发生的多个事件,比如我们要监听 ACCEPT 和 READ 事件,那么指定参数为二进制的 00010001 即十进制数值 17 即可。

    注册方法返回值是 SelectionKey 实例,它包含了 Channel 和 Selector 信息,也包括了一个叫做 Interest Set 的信息,即我们设置的我们感兴趣的正在监听的事件集合。

  4. 调用 select() 方法获取通道信息。用于判断是否有我们感兴趣的事件已经发生了。

操作类型描述所属对象
OP_READ 1 << 0 读操作 SocketChannel
OP_WRITE 1 << 2 写操作 SocketChannel
OP_CONNECT 1 << 3 连接socket操作 SocketChannel
OP_ACCEPT 1 << 4 接受socket操作 ServerSocketChannel

Selector 的操作就是以上 3 步,这里来一个简单的示例:

Selector selector = Selector.open();

channel.configureBlocking(false);

SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

while(true) {
  // 判断是否有事件准备好
  int readyChannels = selector.select();
  if(readyChannels == 0) continue;

  // 遍历
  Set<SelectionKey> selectedKeys = selector.selectedKeys();
  Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
  while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();

    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.

    } else if (key.isConnectable()) {
        // a connection was established with a remote server.

    } else if (key.isReadable()) {
        // a channel is ready for reading

    } else if (key.isWritable()) {
        // a channel is ready for writing
    }

    keyIterator.remove();
  }
}

对于 Selector,我们还需要非常熟悉以下几个方法:

  1. select()

    调用此方法,会将上次 select 之后的准备好的 channel 对应的 SelectionKey 复制到 selected set 中。如果没有任何通道准备好,这个方法会阻塞,直到至少有一个通道准备好。

  2. selectNow()

    功能和 select 一样,区别在于如果没有准备好的通道,那么此方法会立即返回 0。

  3. select(long timeout)

    看了前面两个,这个应该很好理解了,如果没有通道准备好,此方法会等待一会

  4. wakeup()

    这个方法是用来唤醒等待在 select() 和 select(timeout) 上的线程的。如果 wakeup() 先被调用,此时没有线程在 select 上阻塞,那么之后的一个 select() 或 select(timeout) 会立即返回,而不会阻塞,当然,它只会作用一次。

 这里放一个NIO模型实现的聊天室Demo:

服务端ChatServer:

  1 /**
  2  * 1. 功能描述:功能基本上和上面讲的工作流程差不多,还会有一些工具方法,都比较简单,就不多说了,如:转发消息,客户端下线后从在线列表移除客户端等。
  3  */
  4 
  5 import java.io.IOException;
  6 import java.net.*;
  7 import java.nio.*;
  8 import java.nio.channels.*;
  9 import java.nio.charset.Charset;
 10 import java.util.Set;
 11 
 12 public class ChatServer {
 13     //设置缓冲区的大小,这里设置为1024个字节
 14     private static final int BUFFER = 1024;
 15 
 16     //Channel都要配合缓冲区进行读写,所以这里创建一个读缓冲区和一个写缓冲区
 17     //allocate()静态方法就是设置缓存区大小的方法
 18     private ByteBuffer read_buffer = ByteBuffer.allocate(BUFFER);
 19     private ByteBuffer write_buffer = ByteBuffer.allocate(BUFFER);
 20 
 21     //为了监听端口更灵活,再不写死了,用一个构造函数设置需要监听的端口号
 22     private int port;
 23 
 24     public ChatServer(int port) {
 25         this.port = port;
 26     }
 27 
 28     private void start() {
 29         //创建ServerSocketChannel和Selector并打开
 30         try (ServerSocketChannel server = ServerSocketChannel.open();
 31              Selector selector = Selector.open()) {
 32             //【重点,实现NIO编程模型的关键】configureBlocking设置ServerSocketChannel为非阻塞式调用,Channel默认的是阻塞的调用方式
 33             server.configureBlocking(false);
 34             //绑定监听端口,这里不是给ServerSocketChannel绑定,而是给ServerSocket绑定,socket()就是获取通道原生的ServerSocket或Socket
 35             server.socket().bind(new InetSocketAddress(port));
 36 
 37             //把server注册到Selector并监听Accept事件
 38             server.register(selector, SelectionKey.OP_ACCEPT);
 39             System.out.println("启动服务器,监听端口:" + port);
 40 
 41 
 42             while (true) {
 43                 //select()会返回此时触发了多少个Selector监听的事件
 44                 if(selector.select()>0) {
 45                     //获取这些已经触发的事件,selectedKeys()返回的是触发事件的所有信息
 46                     Set<SelectionKey> selectionKeys = selector.selectedKeys();
 47                     //循环处理这些事件
 48                     for (SelectionKey key : selectionKeys) {
 49                         handles(key, selector);
 50                     }
 51                     //处理完后清空selectedKeys,避免重复处理
 52                     selectionKeys.clear();
 53                 }
 54             }
 55         } catch (IOException e) {
 56             e.printStackTrace();
 57         }
 58     }
 59 
 60     //处理事件的方法
 61     private void handles(SelectionKey key, Selector selector) throws IOException {
 62         //当触发了Accept事件,也就是有客户端请求进来
 63         if (key.isAcceptable()) {
 64             //获取ServerSocketChannel
 65             ServerSocketChannel server = (ServerSocketChannel) key.channel();
 66             //然后通过accept()方法接收客户端的请求,这个方法会返回客户端的SocketChannel,这一步和原生的ServerSocket类似
 67             SocketChannel client = server.accept();
 68             client.configureBlocking(false);
 69 
 70             //把客户端的SocketChannel注册到Selector,并监听Read事件
 71             client.register(selector, SelectionKey.OP_READ);
 72             System.out.println("客户端[" + client.socket().getPort() + "]上线啦!");
 73         }
 74 
 75         //当触发了Read事件,也就是客户端发来了消息
 76         if (key.isReadable()) {
 77             SocketChannel client = (SocketChannel) key.channel();
 78             //获取消息
 79             String msg = receive(client);
 80             System.out.println("客户端[" + client.socket().getPort() + "]:" + msg);
 81             //把消息转发给其他客户端
 82             sendMessage(client, msg, selector);
 83             //判断用户是否退出
 84             if (msg.equals("quit")) {
 85                 //解除该事件的监听
 86                 key.cancel();
 87                 //更新Selector
 88                 selector.wakeup();
 89                 System.out.println("客户端[" + client.socket().getPort() + "]下线了!");
 90             }
 91         }
 92     }
 93 
 94     //编码方式设置为utf-8,下面字符和字符串互转时用得到
 95     private Charset charset = Charset.forName("UTF-8");
 96 
 97     //接收消息的方法
 98     private String receive(SocketChannel client) throws IOException {
 99         //用缓冲区之前先清空一下,避免之前的信息残留
100         read_buffer.clear();
101         //把通道里的信息读取到缓冲区,用while循环一直读取,直到读完所有消息。因为没有明确的类似
这样的结尾,所以要一直读到没有字节为止
102         while (client.read(read_buffer) > 0) ;
103         //把消息读取到缓冲区后,需要转换buffer的读写状态,不明白的看看前面的Buffer的讲解
104         read_buffer.flip();
105         return String.valueOf(charset.decode(read_buffer));
106     }
107 
108     //转发消息的方法
109     private void sendMessage(SocketChannel client, String msg, Selector selector) throws IOException {
110         msg = "客户端[" + client.socket().getPort() + "]:" + msg;
111         //获取所有客户端,keys()与前面的selectedKeys不同,这个是获取所有已经注册的信息,而selectedKeys获取的是触发了的事件的信息
112         for (SelectionKey key : selector.keys()) {
113             //排除服务器和本客户端并且保证key是有效的,isValid()会判断Selector监听是否正常、对应的通道是保持连接的状态等
114             if (!(key.channel() instanceof ServerSocketChannel) && !client.equals(key.channel()) && key.isValid()) {
115                 SocketChannel otherClient = (SocketChannel) key.channel();
116                 write_buffer.clear();
117                 write_buffer.put(charset.encode(msg));
118                 write_buffer.flip();
119                 //把消息写入到缓冲区后,再把缓冲区的内容写到客户端对应的通道中
120                 while (write_buffer.hasRemaining()) {
121                     otherClient.write(write_buffer);
122                 }
123             }
124         }
125     }
126 
127     public static void main(String[] args) {
128         new ChatServer(8888).start();
129     }
130 }
View Code

客户端ChatClient:

  1 /**
  2  * 1. 功能描述:基本和前两章的BIO、NIO没什么区别,一个线程监听用户输入信息并发送,主线程异步的读取服务器信息。
  3  */
  4 
  5 import java.io.IOException;
  6 import java.net.InetSocketAddress;
  7 import java.nio.ByteBuffer;
  8 import java.nio.channels.ClosedSelectorException;
  9 import java.nio.channels.SelectionKey;
 10 import java.nio.channels.Selector;
 11 import java.nio.channels.SocketChannel;
 12 import java.nio.charset.Charset;
 13 import java.util.Set;
 14 
 15 public class ChatClient {
 16     private static final int BUFFER = 1024;
 17     private ByteBuffer read_buffer = ByteBuffer.allocate(BUFFER);
 18     private ByteBuffer write_buffer = ByteBuffer.allocate(BUFFER);
 19     //声明成全局变量是为了方便下面一些工具方法的调用,就不用try with resource了
 20     private SocketChannel client;
 21     private Selector selector;
 22 
 23     private Charset charset = Charset.forName("UTF-8");
 24 
 25     private void start() {
 26         try  {
 27             client=SocketChannel.open();
 28             selector= Selector.open();
 29             client.configureBlocking(false);
 30             //注册channel,并监听SocketChannel的Connect事件
 31             client.register(selector, SelectionKey.OP_CONNECT);
 32             //请求服务器建立连接
 33             client.connect(new InetSocketAddress("127.0.0.1", 8888));
 34             //和服务器一样,不停的获取触发事件,并做相应的处理
 35             while (true) {
 36                 selector.select();
 37                 Set<SelectionKey> selectionKeys = selector.selectedKeys();
 38                 for (SelectionKey key : selectionKeys) {
 39                     handle(key);
 40                 }
 41                 selectionKeys.clear();
 42             }
 43         } catch (IOException e) {
 44             e.printStackTrace();
 45         }catch (ClosedSelectorException e){
 46             //当用户输入quit时,在send()方法中,selector会被关闭,而在上面的无限while循环中,可能会使用到已经关闭了的selector。
 47             //所以这里捕捉一下异常,做正常退出处理就行了。不会对服务器造成影响
 48         }
 49     }
 50 
 51     private void handle(SelectionKey key) throws IOException {
 52         //当触发connect事件,也就是服务器和客户端建立连接
 53         if (key.isConnectable()) {
 54             SocketChannel client = (SocketChannel) key.channel();
 55             //finishConnect()返回true,说明和服务器已经建立连接。如果是false,说明还在连接中,还没完全连接完成
 56             if(client.finishConnect()){
 57                 //新建一个新线程去等待用户输入
 58                 new Thread(new UserInputHandler(this)).start();
 59             }
 60             //连接建立完成后,注册read事件,开始监听服务器转发的消息
 61             client.register(selector,SelectionKey.OP_READ);
 62         }
 63         //当触发read事件,也就是获取到服务器的转发消息
 64         if(key.isReadable()){
 65             SocketChannel client = (SocketChannel) key.channel();
 66             //获取消息
 67             String msg = receive(client);
 68             System.out.println(msg);
 69             //判断用户是否退出
 70             if (msg.equals("quit")) {
 71                 //解除该事件的监听
 72                 key.cancel();
 73                 //更新Selector
 74                 selector.wakeup();
 75             }
 76         }
 77     }
 78 
 79     //获取消息
 80     private String receive(SocketChannel client) throws IOException {
 81         read_buffer.clear();
 82         while (client.read(read_buffer)>0);
 83         read_buffer.flip();
 84         return String.valueOf(charset.decode(read_buffer));
 85     }
 86 
 87     //发送消息
 88     public void send(String msg) throws IOException{
 89         if(!msg.isEmpty()){
 90             write_buffer.clear();
 91             write_buffer.put(charset.encode(msg));
 92             write_buffer.flip();
 93             while (write_buffer.hasRemaining()){
 94                 client.write(write_buffer);
 95             }
 96             if(msg.equals("quit")){
 97                 selector.close();
 98             }
 99         }
100     }
101 
102     public static void main(String[] args) {
103         new ChatClient().start();
104     }
105 }
View Code

客户端UserInputHandler:

 1 /**
 2  * 1. 功能描述:监听用户输入信息的线程。
 3  */
 4 
 5 import java.io.BufferedReader;
 6 import java.io.IOException;
 7 import java.io.InputStreamReader;
 8 
 9 public class UserInputHandler implements Runnable {
10     ChatClient client;
11     public UserInputHandler(ChatClient chatClient) {
12         this.client=chatClient;
13     }
14     @Override
15     public void run() {
16         BufferedReader read=new BufferedReader(
17                 new InputStreamReader(System.in)
18         );
19         while (true){
20             try {
21                 String input=read.readLine();
22                 client.send(input);
23                 if(input.equals("quit"))
24                     break;
25             } catch (IOException e) {
26                 e.printStackTrace();
27             }
28         }
29     }
30 }
View Code

 

AIO编程模型

AIO中的异步操作

CompletionHandler

在AIO编程模型中,常用的API,如connect、accept、read、write都是支持异步操作的。当调用这些方法时,可以携带一个CompletionHandler参数,它会提供一些回调函数。这些回调函数包括:

1.当这些操作成功时你需要怎么做;

2.如果这些操作失败了你要这么做。关于这个CompletionHandler参数,你只需要写一个类实现CompletionHandler口,并实现里面两个方法就行了。

那如何在调用connect、accept、read、write这四个方法时,传入CompletionHandler参数从而实现异步呢?下面分别举例这四个方法的使用。

先说说Socket和ServerSocket,在NIO中,它们变成了通道,配合缓冲区,从而实现了非阻塞。而在AIO中它们变成了异步通道。也就是AsynchronousServerSocketChannel和AsynchronousSocketChannel,下面例子中对象名分别是serverSocket和socket.

accept:serverSocket.accept(attachment,handler)。handler就是实现了CompletionHandler接口并实现两个回调函数的类,它具体怎么写可以看下面的实战代码。attachment为handler里面可能需要用到的辅助数据,如果没有就填null。

read:socket.read(buffer,attachment,handler)。buffer是缓冲区,用以存放读取到的信息。后面两个参数和accept一样。

write:socket.write(buffer,attachment,handler)。和read参数一样。

connect:socket.connect(address,attachment,handler)。address为服务器的IP和端口,后面两个参数与前几个一样。

Future

既然说到了异步操作,除了使用实现CompletionHandler接口的方式,不得不想到Future。客户端逻辑较为简单,如果使用CompletionHandler的话代码反而更复杂,所以下面的实战客户端代码就会使用Future的方式。简单来说,Future表示的是异步操作未来的结果,怎么理解未来。比如,客户端调用read方法获取服务器发来得消息:

Future<Integer> readResult=clientChannel.read(buffer)

Integer是read()的返回类型,此时变量readResult实际上并不一定有数据,而是表示read()方法未来的结果,这时候readResult有两个方法,isDone():返回boolean,查看程序是否完成处理,如果返回true,有结果了,这时候可以通过get()获取结果。如果你不事先判断isDone()直接调用get()也行,只不过它是阻塞的。如果你不想阻塞,想在这期间做点什么,就用isDone()。

还有一个问题:这些handler的方法是在哪个线程执行的?serverSocket.accept这个方法肯定是在主线程里面调用的,而传入的这些回调方法其实是在其他线程执行的。在AIO中,会有一个AsynchronousChannelGroup,它和AsynchronousServerSocketChannel是绑定在一起的,它会为这些异步通道提供系统资源,线程就算其中一种系统资源,所以为了方便理解,我们暂时可以把他看作一个线程池,它会为这些handler分配线程,而不是在主线程中去执行。

AIO编程模型

上面只说了些零碎的概念,为了更好的理解,下面讲一讲大概的工作流程(主要针对服务器,客户端逻辑较为简单):

1.首先做准备工作。跟NIO一样,先要创建好通道,只不过AIO是异步通道。然后创建好AsyncChannelGroup,可以选择自定义线程池。最后把AsyncServerSocket和AsyncChannelGroup绑定在一起,这样处于同一个AsyncChannelGroup里的通道就可以共享系统资源。

2.最后一步准备工作,创建好handler类,并实现接口和里面两个回调方法。(如图:客户端1对应的handler,里面的回调方法会实现读取消息和转发消息的功能;serverSocket的handler里的回调方法会实现accept功能。)

3.准备工作完成,当客户端1连接请求进来,客户端会马上回去,ServerSocket的异步方法会在连接成功后把客户端的SocketChannel存进在线用户列表,并利用客户端1的handler开始异步监听客户端1发送的消息。

4.当客户端1发送消息时,如果上一步中的handler成功监听到,就会回调成功后的回调方法,这个方法里会把这个消息转发给其他客户端。转发完成后,接着利用handler监听客户端1发送的消息。

epoll原理

select,poll,epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。

epoll是Linux下的一种IO多路复用技术,可以非常高效的处理数以百万计的socket句柄。

在 select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一 个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait() 时便得到通知。(此处去掉了遍历文件描述符,而是通过监听回调的的机制。这正是epoll的魅力所在。) 如果没有大量的idle -connection或者dead-connection,epoll的效率并不会比select/poll高很多,但是当遇到大量的idle- connection,就会发现epoll的效率大大高于select/poll。

注意:linux下Selector底层是通过epoll来实现的,当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

先看看使用c封装的3个epoll系统调用:

  • int epoll_create(int size) epoll_create建立一个epoll对象。参数size是内核保证能够正确处理的最大句柄数,多于这个最大数时内核可不保证效果。

  • *int epoll_ctl(int epfd, int op, int fd, struct epoll_event event) epoll_ctl可以操作epoll_create创建的epoll,如将socket句柄加入到epoll中让其监控,或把epoll正在监控的某个socket句柄移出epoll。

  • *int epoll_wait(int epfd, struct epoll_event events,int maxevents, int timeout) epoll_wait在调用时,在给定的timeout时间内,所监控的句柄中有事件发生时,就返回用户态的进程。

    大概看看epoll内部是怎么实现的:

  1. epoll初始化时,会向内核注册一个文件系统,用于存储被监控的句柄文件,调用epoll_create时,会在这个文件系统中创建一个file节点。同时epoll会开辟自己的内核高速缓存区,以红黑树的结构保存句柄,以支持快速的查找、插入、删除。还会再建立一个list链表,用于存储准备就绪的事件。

  2. 当执行epoll_ctl时,除了把socket句柄放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪list链表里。所以,当一个socket上有数据到了,内核在把网卡上的数据copy到内核中后,就把socket插入到就绪链表里。

  3. 当epoll_wait调用时,仅仅观察就绪链表里有没有数据,如果有数据就返回,否则就sleep,超时时立刻返回。

    epoll的两种工作模式:

  • LT:level-trigger,水平触发模式,只要某个socket处于readable/writable状态,无论什么时候进行epoll_wait都会返回该socket。
  • ET:edge-trigger,边缘触发模式,只有某个socket从unreadable变为readable或从unwritable变为writable时,epoll_wait才会返回该socket。

socket读数据

socket写数据

最后顺便说下在Linux系统中JDK NIO使用的是 LT ,而Netty epoll使用的是 ET。

参考

https://www.cnblogs.com/lbhym/p/12698309.html

https://github.com/h2pl/Java-Tutorial/tree/master/docs/java/network-programming

https://www.zhihu.com/question/21383903/answer/64103663

https://blog.csdn.net/czx2018/article/details/89502699

https://www.cnblogs.com/snailclimb/p/9086334.html

原文地址:https://www.cnblogs.com/RQfreefly/p/13544850.html