分布式架构基础 一、分布式架构的基石之远程通信协议

  分布式架构的特点是:服务的分布式(分布在不同的计算机节点)、以及远程通信实现数据的交互。
那么就意味着,原本在一个war包下的:用户、订单、商品、库存等业务模块,按照业务维度拆分成用户服务、订单服务、商品服务、库存服务等独立的进程,部署在不同的计算机节点上。而拆分之后的服务,必然会涉及到远程通信的需要,比如用户服务,需要查询订单列表,则会调用订单服务的远程接口,获得订单信息。
在这个过程中,会涉及到远程通信,所以远程通信的技术是整个分布式架构的一个基础,如果没有远程通信,那么分布式架构也就不存在了。

对于内部服务通信来说,我们一般会采用RPC通信,而在RPC通信的实现中,必然会涉及到TCP传输协议、三次握手、四次挥手、TCP的四层网络模型、BIO、NIO、AIO、序列化、反序列化等。

一、一次http请求通信的完整过程

域名解析 -->
发起TCP的3次握手 -->
TCP连接建立后 发起http请求 -->
服务器响应http请求,浏览器得到html代码 -->
浏览器解析html代码,并请求html代码中的资源(如js、css、图片等) -->
浏览器对页面进行渲染呈现给用户

7层网络协议

TCP三次握手、四次挥手

为什么四次挥手
TCP协议是一种面向连接的、可靠的、基于字节流的运输层通信协议。TCP是全双工模式,这就意味着,当主机1发出FIN报文段时,只是表示主机1已经没有数据要发送了,主机1告诉主机2,它的数据已经全部发送完毕了;但是,这个时候主机1还是可以接受来自主机2的数据;
当主机2返回ACK报文段时,表示它已经知道主机1没有数据发送了,但是主机2还是可以发送数据到主机1的;
当主机2也发送了FIN报文段时,这个时候就表示主机2也没有数据要发送了,就会告诉主机1,我也没有数据要发送了,之后彼此就会愉快的中断这次TCP连接。

二、TCP/IP分层管理

三、Java中使用TCP协议进行通信

四、BIO、NIO机制

BIO:同步阻塞式IO,服务器实现模式为一个连接一个线程,即客户端有socket连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善。
NIO:同步非阻塞式IO,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。

1. BIO

a. 一个简单的通信样例

先以一个简单的例子来演示进程或服务之间的通信:
使用ServerSocket和Socket来演示一次简单的通信,端口号为8081,首先 server启动; client --发送数据至--> server; server接受; server --发送数据至--> client; client接收。
ServerDemo

import java.io.*;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;

public class ServerDemo {

    public static void main(String[] args) {

        ServerSocket serverSocket = null;
        try {

            serverSocket = new ServerSocket(8081);

            Socket socket = serverSocket.accept();  // 阻塞
            InetAddress client = socket.getInetAddress();
            System.out.println(String.format("Server: 接收到来自 [%s] 的建立连接请求", client));

            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String receiveMsg = reader.readLine();  // 阻塞
            System.out.println(String.format("Server: 接收到来自 [%s] 的消息 [%s]", client, receiveMsg));

            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            String sendMsg = "Hi this's server, has accept your's msg[" + receiveMsg + "]";
            writer.write(sendMsg + "
"); // 末尾加 
 来告诉通信对手本次消息已发送完成
            writer.flush();
            System.out.println(String.format("ServerDemo: 发送回执至 [%s], 消息为 [%s]", client, sendMsg));

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != serverSocket) { serverSocket.close(); }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

}

ClientDemo

import java.io.*;
import java.net.Socket;

public class ClientADemo {

    public static void main(String[] args) {

        Socket socket = null;
        try {
            socket = new Socket("127.0.0.1", 8081);

            BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            String sendMsg = "Hello this's clientA";
            bw.write(sendMsg + "
");
            bw.flush();
            System.out.println("ClientA: 发送消息 " + sendMsg);

            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String receiveMsg = reader.readLine();  // 阻塞
            System.out.println("ClientA: 接收到回执 " + receiveMsg);

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != socket) { socket.close(); }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

}

执行结果:

server端:
Server: 接收到来自 [/127.0.0.1] 的建立连接请求
Server: 接收到来自 [/127.0.0.1] 的消息 [Hello this's clientA]
ServerDemo: 发送回执至 [/127.0.0.1], 消息为 [Hi this's server, has accept your's msg[Hello this's clientA]]

client端:
ClientA: 发送消息 Hello this's clientA
ClientA: 接收到回执 Hi this's server, has accept your's msg[Hello this's clientA]

b. socket的阻塞

其实我们以debug方式启动上例中的server和client时会发现,当没有client建立连接,server在serverSocket.accept();处是处于阻塞状态的; 当client没有发消息,server在reader.readLine(); 读取消息时,也是处于阻塞状态的。
下面以两个client连接server方式来演示通信的阻塞,模拟场景:
创建两个客户端ClientA,ClientB,由ClientA建立与server的连接,clientA建立连接后等待10s再发送消息至客户端; 再此期间clientB与server通信;
为了保证服务端Server在一次通信完成后继续通信,Server改为自旋。

Server:

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.Date;

public class ServerDemo {

    public static void main(String[] args) {

        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(8081);

            while (!Thread.currentThread().isInterrupted()) { // 自旋

                Socket socket = serverSocket.accept();  // A01. 第一次while循环内,ClientA先建立连接; A03. 当与clientA通信完毕,进入第二次while循环,才能收到ClientB建立的连接请求,开始与B的通信
                SocketAddress client = socket.getRemoteSocketAddress();
                System.out.println(String.format("ServerDemo: 接收到来自 [%s] 的建立连接请求 at[%s]", client, new Date()));

                BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String receiveMsg = reader.readLine();  // A02. 之后由于ClientA等待了10s才发消息,所以server读取不到ClientA的消息,阻塞了10s,之后才能继续执行
                System.out.println(String.format("ServerDemo: 接收到来自 [%s] 的消息 [%s] at[%s]", client, receiveMsg, new Date()));

                BufferedWriter writer = new BufferedWriter((new OutputStreamWriter(socket.getOutputStream())));
                String sendMsg = "Hi this's server, has accept your's msg[" + receiveMsg + "]";
                writer.write(sendMsg + " 
");  // 末尾加 
 来告诉通信对手本次消息已发送完成
                writer.flush();
                System.out.println(String.format("ServerDemo: 发送回执至 [%s], 消息为 [%s] at[%s]", client, sendMsg, new Date()));
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != serverSocket) { serverSocket.close(); }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }


}

ClientA:

import java.io.*;
import java.net.Socket;
import java.util.concurrent.TimeUnit;
import java.util.Date;

public class ClientADemo {

    public static void main(String[] args) {

        Socket socket = null;
        try {
            socket = new Socket("127.0.0.1", 8081);

            TimeUnit.SECONDS.sleep(10);

            BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            String sendMsg = "Hello this's clientA";
            bw.write(sendMsg + "
");
            bw.flush();
            System.out.println("ClientA: 发送消息 " + sendMsg + " " + new Date());

            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String receiveMsg = reader.readLine();
            System.out.println("ClientA: 接收到回执 " + receiveMsg + " " + new Date());

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != socket) { socket.close(); }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

}

ClientB:

import java.io.*;
import java.net.Socket;
import java.util.Date;

public class ClientBDemo {

    public static void main(String[] args) {

        Socket socket = null;
        try {
            socket = new Socket("127.0.0.1", 8081);
            BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            String sendMsg = "Hello this's clientB";
            bw.write(sendMsg + "
");
            bw.flush();
            System.out.println("ClientB: 发送消息 " + sendMsg + " " + new Date());

            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String receiveMsg = reader.readLine();
            System.out.println("ClientB: 接收到回执 " + receiveMsg + " " + new Date());

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != socket) { socket.close(); }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

}

执行结果:

服务端:
ServerDemo: 接收到来自 [/127.0.0.1:9892] 的建立连接请求 at[Sun Jun 14 10:57:13 CST 2019]
ServerDemo: 接收到来自 [/127.0.0.1:9892] 的消息 [Hello this's clientA] at[Sun Jun 14 10:57:23 CST 2019]
ServerDemo: 发送回执至 [/127.0.0.1:9892], 消息为 [Hi this's server, has accept your's msg[Hello this's clientA]] at[Sun Jun 14 10:57:23 CST 2019]
ServerDemo: 接收到来自 [/127.0.0.1:9898] 的建立连接请求 at[Sun Jun 14 10:57:23 CST 2019]
ServerDemo: 接收到来自 [/127.0.0.1:9898] 的消息 [Hello this's clientB] at[Sun Jun 14 10:57:23 CST 2019]
ServerDemo: 发送回执至 [/127.0.0.1:9898], 消息为 [Hi this's server, has accept your's msg[Hello this's clientB]] at[Sun Jun 14 10:57:23 CST 2019]

客户端A: 
ClientA: 发送消息 Hello this's clientA Sun Jun 14 10:57:23 CST 2019
ClientA: 接收到回执 Hi this's server, has accept your's msg[Hello this's clientA]  Sun Jun 14 10:57:23 CST 2019

客户端B:
ClientB: 发送消息 Hello this's clientB Sun Jun 14 10:57:14 CST 2019
ClientB: 接收到回执 Hi this's server, has accept your's msg[Hello this's clientB]  Sun Jun 14 10:57:23 CST 2019

通过Server的运行结果可以看到,在57分13秒时收到A建立连接的请求,之后等待了10s到57分23时才收到A发出的请求之后完成与A的通信;在与A通信完成后,才继续执行了与B的通信;
而B其实在57分14秒就已经把信息发送给了Server,等到57分23秒才完成这次通信。可见ServerSocket在处理请求时时阻塞并且串行的,这也就是所谓的BIO。
而之所以服务端会等待10s才给B响应,是因为Server是在一个线程内执行的,Server在执行到String receiveMsg = reader.readLine(); 时会一直等待A发来的消息才会唤醒,所以不会accept B的建立连接请求。
此时我们就可以进行些许优化,为每个socket单独建立一个线程,异步响应每个请求。

c. 每个socket建立一个线程来执行

一般我们使用线程池来对BIO进行优化。
ServerDemo:


import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ServerDemo {

    static ExecutorService pool = Executors.newFixedThreadPool(20);

    public static void main(String[] args) {

        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(8081);

            while (!Thread.currentThread().isInterrupted()) { // 自旋

                Socket socket = serverSocket.accept();  // 阻塞

                pool.execute(new SocketThread(socket));

            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != serverSocket) { serverSocket.close(); }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

}

class SocketThread implements Runnable {

    private Socket socket;

    public SocketThread(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {

        try {
            SocketAddress client = socket.getRemoteSocketAddress();
            System.out.println(String.format("ServerDemo: 接收到来自 [%s] 的建立连接请求 at[%s]", client, new Date()));

            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String receiveMsg = reader.readLine();  // 阻塞
            System.out.println(String.format("ServerDemo: 接收到来自 [%s] 的消息 [%s] at[%s]", client, receiveMsg, new Date()));

            BufferedWriter writer = new BufferedWriter((new OutputStreamWriter(socket.getOutputStream())));
            String sendMsg = "Hi this's server, has accept your's msg[" + receiveMsg + "]";
            writer.write(sendMsg + " 
");  // 末尾加 
 来告诉通信对手本次消息已发送完成
            writer.flush();
            System.out.println(String.format("ServerDemo: 发送回执至 [%s], 消息为 [%s] at[%s]", client, sendMsg, new Date()));
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }


    }
}

我们为每个socket建立单独的线程来响应,这样每个socket的交互是异步的,也就不存在之前建立连接时的阻塞问题了。
仍然执行上例子中的A、B客户端

Server:
ServerDemo: 接收到来自 [/127.0.0.1:10363] 的建立连接请求 at[Sun Jun 14 11:34:20 CST 2019]
ServerDemo: 接收到来自 [/127.0.0.1:10369] 的建立连接请求 at[Sun Jun 14 11:34:22 CST 2019]
ServerDemo: 接收到来自 [/127.0.0.1:10369] 的消息 [Hello this's clientB] at[Sun Jun 14 11:34:22 CST 2019]
ServerDemo: 发送回执至 [/127.0.0.1:10369], 消息为 [Hi this's server, has accept your's msg[Hello this's clientB]] at[Sun Jun 14 11:34:22 CST 2019]
ServerDemo: 接收到来自 [/127.0.0.1:10363] 的消息 [Hello this's clientA] at[Sun Jun 14 11:34:30 CST 2019]
ServerDemo: 发送回执至 [/127.0.0.1:10363], 消息为 [Hi this's server, has accept your's msg[Hello this's clientA]] at[Sun Jun 14 11:34:30 CST 2019]

ClientA:
ClientA: 发送消息 Hello this's clientA Sun Jun 14 11:34:30 CST 2019
ClientA: 接收到回执 Hi this's server, has accept your's msg[Hello this's clientA]  Sun Jun 14 11:34:30 CST 2019

ClientB:
ClientB: 发送消息 Hello this's clientB Sun Jun 14 11:34:22 CST 2019
ClientB: 接收到回执 Hi this's server, has accept your's msg[Hello this's clientB]  Sun Jun 14 11:34:22 CST 2019

可见ClientB无需等待Server与A通信完成后再与Server通信。

d. 服务端每次接收到数据异步执行

通过加入线程池的改造,我们使得每个socket可以单独与服务端进行通信,但是实际场景中可能Server在接收到client的消息后,需要进行相应的业务逻辑处理,也就是String receiveMsg = reader.readLine(); 之后的逻辑,如果这段逻辑处理起来耗时较长,那么下次client发送的数据就不能及时接受。此时也可以借助阻塞队列BlockIngQueue来帮助异步执行。

模拟场景:
ClientA每次3秒往Server发送一次消息;
Server处理收到的消息需要10秒;每次server收到消息后直接放入阻塞队列总,由另一个线程异步处理,不影响下次消息的接受。

ServerDemo :

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ServerDemo {

    private int port = 8081;
    private ExecutorService pool = Executors.newFixedThreadPool(20);
    private LinkedBlockingQueue<String> receiveMsgQueue = new LinkedBlockingQueue();

    public static void main(String[] args) {

        new ServerDemo();

    }

    public ServerDemo() {

        try {
            ServerSocket serverSocket = new ServerSocket(port);

            // 为每个客户端创建线程
            pool.execute(new ConnectorThread(serverSocket));

            // 客户端发来的消息,单独线程处理
            pool.execute(new ProcessMsgThread(receiveMsgQueue));


        } catch (IOException e) {
            e.printStackTrace();
        }


    }

    class ConnectorThread implements Runnable {

        private ServerSocket serverSocket;

        public ConnectorThread(ServerSocket serverSocket) {
            this.serverSocket = serverSocket;
        }

        @Override
        public void run() {

            while (!Thread.currentThread().isInterrupted()) {

                try {
                    Socket socket = serverSocket.accept();
                    pool.execute(new ReceiveMsgThread(socket));
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }

        }

    }

    class ReceiveMsgThread implements Runnable {

        private Socket socket;

        public ReceiveMsgThread(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {

            try {
                SocketAddress client = socket.getRemoteSocketAddress();
                System.out.println(String.format("ServerDemo: 接收到来自 [%s] 的建立连接请求 at[%s]", client, new Date()));

                BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));

                while (!Thread.currentThread().isInterrupted()) {
                    String receiveMsg = reader.readLine();  // 阻塞
                    System.out.println(String.format("ServerDemo: 接收到来自 [%s] 的消息 [%s] at[%s]", client, receiveMsg, new Date()));

                    // 服务端处理消息需要20s,直接放入阻塞队列异步执行,然后继续接受下次消息
                    //TimeUnit.SECONDS.sleep(20);
                    receiveMsgQueue.put(receiveMsg);

                }

            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }
    }

    class ProcessMsgThread implements Runnable {

        private LinkedBlockingQueue<String> blockingQueue;

        public ProcessMsgThread(LinkedBlockingQueue receiveMsgQueue) {
            this.blockingQueue = receiveMsgQueue;
        }

        @Override
        public void run() {

            while (!Thread.currentThread().isInterrupted()) {

                try {
                    String msg = blockingQueue.take();
                    // TODO do something, process msg
                    TimeUnit.SECONDS.sleep(10);
                    System.out.println("消息处理完成");

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }

        }
    }

}


ClientDemo:


import java.io.*;
import java.net.Socket;
import java.util.Date;
import java.util.concurrent.TimeUnit;

public class ClientADemo {

    public static void main(String[] args) {

        Socket socket = null;
        try {
            socket = new Socket("127.0.0.1", 8081);

            while (true) {
                BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                String sendMsg = "Hello this's clientA";
                bw.write(sendMsg + "
");
                bw.flush();
                System.out.println("ClientA: 发送消息 " + sendMsg + " " + new Date());

                TimeUnit.SECONDS.sleep(3);

            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != socket) { socket.close(); }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

}

运行结果:

Server:
ServerDemo: 接收到来自 [/127.0.0.1:11049] 的建立连接请求 at[Sun Jun 14 12:23:54 CST 2019]
ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:23:54 CST 2019]
ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:23:57 CST 2019]
ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:00 CST 2019]
ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:03 CST 2019]
消息处理完成
ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:06 CST 2019]
ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:09 CST 2019]
ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:12 CST 2019]
消息处理完成
ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:15 CST 2019]
ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:18 CST 2019]
ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:21 CST 2019]
ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:24 CST 2019]
消息处理完成
ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:27 CST 2019]

ClientA:
ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:23:54 CST 2019
ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:23:57 CST 2019
ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:00 CST 2019
ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:03 CST 2019
ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:06 CST 2019
ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:09 CST 2019
ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:12 CST 2019
ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:15 CST 2019
ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:18 CST 2019
ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:21 CST 2019
ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:24 CST 2019
ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:27 CST 2019

可见,通过阻塞队列来辅助处理消息,不会影响server端接受消息。

2. NIO

在使用BIO通信的过程中,在活动连接数不是特别高(小于单机1000)的情况下,这种模型是比较不错的,可以让每一个连接专注于自己的 I/O 并且编程模型简单,也不用过多考虑系统的过载、限流等问题。线程池本身也会避免频繁创建销毁线程的开销。但是,当面对十万甚至百万级连接的时候,传统的 BIO 模型是无能为力的。因此,我们需要一种更高效的 I/O 处理模型来应对更高的并发量。

NIO是一种同步非阻塞的I/O模型,在Java 1.4 中引入了NIO框架,对应 java.nio 包,提供了 Channel , Selector,Buffer等抽象。NIO中的N可以理解为Non-blocking,不单纯是New。它支持面向缓冲的,基于通道的I/O操作方法。 NIO提供了与传统BIO模型中的 Socket 和 ServerSocket 相对应的 SocketChannel 和 ServerSocketChannel 两种不同的套接字通道实现,两种通道都支持阻塞和非阻塞两种模式。阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反。对于低负载、低并发的应用程序,可以使用同步阻塞I/O来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用 NIO 的非阻塞模式来开发。

NIOServer

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NIOServer {

    static Selector selector;

    public static void main(String[] args) {

        try {

            selector = Selector.open();

            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false); // 设为非阻塞
            serverSocketChannel.bind(new InetSocketAddress(8085));
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //把连接事件注册到多路复用器上

            while (true) {
                selector.select(); // 阻塞机制
                Set<SelectionKey> selectionKeySet = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeySet.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if (key.isAcceptable()) { // 连接事件
                        handleAccept(key);
                    } else if (key.isReadable()) {  // 读事件
                        handleRead(key);
                    }
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    private static void handleAccept(SelectionKey key) {

        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

        try {
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            String sendMsg = "Hi this is NIO server.";
            socketChannel.write(ByteBuffer.wrap(sendMsg.getBytes()));
            socketChannel.register(selector, SelectionKey.OP_READ); // 注册读事件
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    private static void handleRead(SelectionKey key) {

        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer byteBuffer =ByteBuffer.allocate(1024);
        try {
            socketChannel.read(byteBuffer);
            System.out.println("Server: receive mgs[" + new String(byteBuffer.array()) + "]");
        } catch (IOException e) {
            e.printStackTrace();
        }

    }


}

NIOClient


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NIOClient {

    static Selector selector;

    public static void main(String[] args) {

        try {
            selector = Selector.open();
            SocketChannel socketChannel =SocketChannel.open();
            socketChannel.configureBlocking(false);
            socketChannel.connect(new InetSocketAddress("127.0.0.1", 8085));
            socketChannel.register(selector, SelectionKey.OP_CONNECT); // 注册连接事件
            while (true) {
                selector.select();
                Set<SelectionKey> selectionKeySet = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeySet.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if (key.isConnectable()) { // 连接事件
                        handleConnect(key);
                    } else if (key.isReadable()){// 读取事件
                        handleRead(key);
                    }
                }

            }

        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    private static void handleConnect(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        if (socketChannel.isConnectionPending()) {
            socketChannel.finishConnect();
        }
        socketChannel.configureBlocking(false);
        String sendMsg = "Hi this's client.";
        socketChannel.write(ByteBuffer.wrap(sendMsg.getBytes()));
        socketChannel.register(selector, SelectionKey.OP_READ);// 注册读事件
    }

    private static void handleRead(SelectionKey key) throws IOException {

        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        socketChannel.read(byteBuffer);
        System.out.println("Client: receive msg[" + new String(byteBuffer.array()) + "]");

    }

}

多路复用器: Selector
多路复用机制:多路指的时多个网络连接,复用指复用同一个线程。
多路复用技术类似于使用多个鱼竿钓鱼,只有有鱼的时候鱼竿响动,我们才提起对应的鱼竿。

原文地址:https://www.cnblogs.com/Qkxh320/p/distributed_basic01_NIO.html