基于NIO的同步非阻塞编程完整案例,客户端发送请求,服务端获取数据并返回给客户端数据,客户端获取返回数据

本文转载自:https://www.cnblogs.com/houzheng/p/9460450.html

这块还是挺复杂的,挺难理解,但是多练几遍,多看看研究研究其实也就那样,就是一个Selector轮询的过程,这里想要双向通信,客户端和服务端都需要一个Selector,并一直轮询,

直接贴代码:

Server:服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package cn.hou.socket01._03nio01;
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
 
//nio 服务端
public class Server implements Runnable {
    //1 多路复用器
    private Selector selector;
    //2 建立缓冲区
    private ByteBuffer readBuf=ByteBuffer.allocate(1024);
    private ByteBuffer writeBuf=ByteBuffer.allocate(1024);
    //构造函数
    public Server(int port){
        try {
            //1 打开多路复用器
            this.selector=Selector.open();
            //2 打开服务器通道
            ServerSocketChannel ssc = ServerSocketChannel.open();
            //3 设置服务器通道为非阻塞方式
            ssc.configureBlocking(false);
            //4 绑定ip
            ssc.bind(new InetSocketAddress(port));
            //5 把服务器通道注册到多路复用器上,只有非阻塞信道才可以注册选择器.并在注册过程中指出该信道可以进行Accept操作
            ssc.register(this.selector, SelectionKey.OP_ACCEPT);
            System.out.println("服务器已经启动.....");
        catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void run() {
        while(true){//一直循环
            try {
                this.selector.select();//多路复用器开始监听
                //获取已经注册在多了复用器上的key通道集
                Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator();
                //遍历
                while (keys.hasNext()) {
                    SelectionKey key = keys.next();//获取key
                    //如果是有效的
                    if(key.isValid()){
                        // 如果为阻塞状态,一般是服务端通道
                        if(key.isAcceptable()){
                            this.accept(key);
                        }
                        // 如果为可读状态,一般是客户端通道
                        if(key.isReadable()){
                            this.read(key);
                        }
                    }
                    //从容器中移除处理过的key
                    keys.remove();
                }
            catch (IOException e) {
                e.printStackTrace();
            }
        }
         
    }
    //从客户端通道获取数据并进行处理
    private void read(SelectionKey key) {
        try {
            //1 清空缓冲区旧的数据
            this.readBuf.clear();
            //2 获取之前注册的socket通道对象
            SocketChannel sc = (SocketChannel) key.channel();
            //3 读取数据
            int count = sc.read(this.readBuf);
            //4 如果没有数据
            if(count == -1){
                key.channel().close();
                key.cancel();
                return;
            }
            //5 有数据则进行读取 读取之前需要进行复位方法(把position 和limit进行复位)
            this.readBuf.flip();
            //6 根据缓冲区的数据长度创建相应大小的byte数组,接收缓冲区的数据
            byte[] bytes = new byte[this.readBuf.remaining()];
            //7 接收缓冲区数据
            this.readBuf.get(bytes);
            //8 打印结果
            String body = new String(bytes).trim();
            System.out.println("服务端接受到客户端请求的数据: " + body);
            //9 告诉客户端已收到数据
            writeBuf.put("你好,客户端,我已收到数据".getBytes());
            //对缓冲区进行复位
            writeBuf.flip();
            //写出数据到服务端
            sc.write(writeBuf);
            //清空缓冲区数据
            writeBuf.clear();
        catch (IOException e) {
            e.printStackTrace();
        }
    }
    //接受一个客户端socket进行处理
    private void accept(SelectionKey key) {
        try {
            //1 获取服务通道
            ServerSocketChannel ssc =  (ServerSocketChannel) key.channel();
            //2 执行阻塞方法,当有客户端请求时,返回客户端通信通道
            SocketChannel sc = ssc.accept();
            //3 设置阻塞模式
            sc.configureBlocking(false);
            //4 注册到多路复用器上,并设置可读标识
            sc.register(this.selector, SelectionKey.OP_READ);
        catch (IOException e) {
            e.printStackTrace();
        }
         
    }
     
    public static void main(String[] args) {
        //启动服务器
        new Thread(new Server(9527)).start();
    }
     
 
}

  Client客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package cn.hou.socket01._03nio01;
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
 
//nio 客户端
public class Client{
    //客户端信道选择器,轮询读取服务端返回数据
    private Selector selector;
    //连接信道
    private SocketChannel sc;
    public Client(){
        try {
            this.sc=SocketChannel.open();//打开信道
            sc.connect(new InetSocketAddress("127.0.0.1",9527));////连接服务端
            sc.configureBlocking(false);//设置非阻塞
            selector = Selector.open();//必须打开
            //将当前客户端注册到多路复用器上,并设置为可读状态
            sc.register(this.selector, SelectionKey.OP_READ);
            //开启线程,一直轮询
            new Thread(()->{
                while(true){//一直循环
                    try {
                        this.selector.select();//多路复用器开始监听
                        //获取已经注册在多了复用器上的key通道集
                        Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator();
                        //遍历
                        while (keys.hasNext()) {
                            SelectionKey key = keys.next();//获取key
                            //如果是有效的
                            if(key.isValid()){
                                // 如果为可读状态,读取服务端返回的数据
                                if(key.isReadable()){
                                    this.read(key);
                                }
                            }
                            //从容器中移除处理过的key
                            keys.remove();
                        }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        catch (IOException e) {
            e.printStackTrace();
        }
    }
     
    //客户端获取服务端返回的数据
    private void read(SelectionKey key) {
        try {
            //建立写缓冲区
            ByteBuffer readBuf = ByteBuffer.allocate(1024);
            //2 获取之前注册的socket通道对象
            SocketChannel sc = (SocketChannel) key.channel();
            //3 读取数据
            int count = sc.read(readBuf);
            //4 如果没有数据
            if(count == -1){
                key.channel().close();
                key.cancel();
                return;
            }
            //5 有数据则进行读取 读取之前需要进行复位方法(把position 和limit进行复位)
            readBuf.flip();
            //6 根据缓冲区的数据长度创建相应大小的byte数组,接收缓冲区的数据
            byte[] bytes = new byte[readBuf.remaining()];
            //7 接收缓冲区数据
            readBuf.get(bytes);
            //8 打印结果
            String body = new String(bytes).trim();
            System.out.println("客户端已接受到服务端返回的数据: " + body);
        catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    public static void main(String[] args) {
        //建立写缓冲区
        ByteBuffer writebuf = ByteBuffer.allocate(1024);
        Client client = new Client();
        try {
            while(true){
                //定义一个字节数组,然后使用系统录入功能:
                byte[] bytes = new byte[1024];
                System.in.read(bytes);
                //把数据放到缓冲区中
                writebuf.put(bytes);
                //对缓冲区进行复位
                writebuf.flip();
                //写出数据到服务端
                client.sc.write(writebuf);
                //清空缓冲区数据
                writebuf.clear();
            }
        catch (IOException e) {
            e.printStackTrace();
        finally {
            if(client.sc != null){
                try {
                    client.sc.close();
                catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

  ,先启动服务端,然后再启动客户端:

效果如下:

Server:

Client:

原文地址:https://www.cnblogs.com/xuqing0422/p/12866316.html