3、nio中的selector使用

通过编写一个客户端和服务器端的例子来熟悉selector的使用

服务端逻辑:

1. 绑定一个端口号
2. channel注册到selector中
3. 用死循环来监听如果有时间发生,遍历selectionKey set
4. 判断发生的事件类型,前面会注册accept事件,如果发生accept事件,那么注册读事件,同时清除selectionKey set 中的当前元素。、
5. 接收事件时,将channel保存下来。
6. 发生读事件时,说明有信息,发过来了,那么将消息,转发给所有的客户端。然后清除自身的事件。

 1 import java.io.IOException;
 2 import java.net.InetSocketAddress;
 3 import java.net.ServerSocket;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.SelectionKey;
 6 import java.nio.channels.Selector;
 7 import java.nio.channels.ServerSocketChannel;
 8 import java.nio.channels.SocketChannel;
 9 import java.nio.charset.Charset;
10 import java.util.*;
11 
12 public class NioServer {
13 
14     private static HashMap<String, SocketChannel> clientMap = new HashMap<String, SocketChannel>();
15 
16     public static void main(String[] args) throws IOException {
17         ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
18         serverSocketChannel.configureBlocking(false);
19 
20         ServerSocket serverSocket = serverSocketChannel.socket();
21         serverSocket.bind(new InetSocketAddress(8899));
22 
23         Selector selector = Selector.open();
24 
25         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
26 
27         while(true) {
28             int number = selector.select();
29 //            System.out.println("number:" + number);
30             Set<SelectionKey> selectionKeySet = selector.selectedKeys();
31 
32             Iterator<SelectionKey> iterable = selectionKeySet.iterator();
33 
34             if(number > 0 ) {
35                 while(iterable.hasNext()) {
36                     SelectionKey selectionKey = iterable.next();
37 
38                     if(selectionKey.isAcceptable()) {//如果是可接收连接的
39                         ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
40                         SocketChannel socketChannel = ssc.accept();
41                         socketChannel.configureBlocking(false);
42 
43                         socketChannel.register(selector, SelectionKey.OP_READ);//注册读事件
44 
45                         clientMap.put(UUID.randomUUID() + "", socketChannel);//保存下channel
46 
47                         iterable.remove();
48                     } else if(selectionKey.isReadable()){//可读的
49                         SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
50                         ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
51 
52                         int readCount = socketChannel.read(byteBuffer);
53 
54                         //这里本该用while
55                         if(readCount > 0 ) {//读取到数据,就写回到其他客户端
56                             byteBuffer.flip();
57 
58                             Charset charset = Charset.forName("UTF-8");
59                             String receiveStr = new String(charset.decode(byteBuffer).array());
60 
61                             System.out.println(socketChannel + " receive msg :" + receiveStr);
62 
63                             String sendKey = "";
64 
65                             for(Map.Entry<String, SocketChannel> entry : clientMap.entrySet()) {//第一遍遍历找到发送者
66                                 if(socketChannel == entry.getValue()) {
67                                     sendKey = entry.getKey();
68                                     break;
69                                 }
70                             }
71 
72                             for (Map.Entry<String, SocketChannel> entry: clientMap.entrySet()  ) {//给每个保存的连接,都发送消息
73                                 ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
74                                 writeBuffer.put((sendKey + ":" +  receiveStr).getBytes());
75 
76                                 writeBuffer.flip();
77                                 entry.getValue().write(writeBuffer);
78                             }
79                         }
80                         iterable.remove();//这个 删除很关键  每次循环完selectionKeySet ,一定要清楚事件,不然肯定会影响下一次的事件触发,或者直接不触发下次的事件
81                     }
82                 }
83             }
84 
85         }
86     }
87 }

客户端逻辑

1. 建立socketChannel 连接到对应的端口
2. 新建selector对象,然后把socketChannel注册到selector上
3. 建立死循环 ,监听是否有事件发生,若有,则遍历seletionKey set ,
4. 判断发生的事件是什么,
5. 如果是连接事件 ,做对应的连接处理,注册读事件

判断是否在等待连接 ,在进程中
if (channel.isConnectionPending()) { 
channel.finishConnect(); 完成连接,这里是阻塞的

6. 如果发生了读事件,读取数据

 1 import java.io.BufferedReader;
 2 import java.io.InputStream;
 3 import java.io.InputStreamReader;
 4 import java.net.InetSocketAddress;
 5 import java.nio.ByteBuffer;
 6 import java.nio.channels.SelectionKey;
 7 import java.nio.channels.Selector;
 8 import java.nio.channels.SocketChannel;
 9 import java.time.LocalDateTime;
10 import java.util.Iterator;
11 import java.util.Set;
12 import java.util.concurrent.Executor;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.Executors;
15 import java.util.concurrent.ThreadFactory;
16 
17 public class NioClient {
18 
19     public static void main(String[] args) {
20         try{
21             SocketChannel socketChannel = SocketChannel.open();
22             socketChannel.configureBlocking(false);
23             socketChannel.connect(new InetSocketAddress(8899));//服务端就是bind  然后accept  serverSocketChannel
24 
25             Selector selector = Selector.open();
26 
27             socketChannel.register(selector, SelectionKey.OP_CONNECT);//注册连接事件
28 
29             while(true) {
30                 int number = selector.select();
31 
32                 if(number > 0) {
33                     Set<SelectionKey> selectionKeySet =  selector.selectedKeys();
34 
35                     Iterator<SelectionKey> iterable = selectionKeySet.iterator();
36                     while(iterable.hasNext()) {//有事件发生
37                         SelectionKey selectionKey = iterable.next();
38 
39                         SocketChannel client = (SocketChannel) selectionKey.channel();
40                         if(selectionKey.isConnectable()) {//判断 selectionkey 状态  可连接的
41                             if(client.isConnectionPending()) {//是否在准备连接的进程中
42                                 client.finishConnect();//这里会阻塞,如果连接未建立,抛异常 ,
43 
44                                 ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
45 
46                                 byteBuffer.put((LocalDateTime.now() + ",连接成功").getBytes());
47                                 byteBuffer.flip();
48                                 client.write(byteBuffer);
49 
50                                 ExecutorService executorService = Executors.newSingleThreadExecutor(Executors.defaultThreadFactory());
51 
52                                 executorService.submit(() -> {//起一个新的线程,去接收控制台的输入 ,不影响其他线程
53                                     while(true) {
54                                         try{
55                                             byteBuffer.clear();
56                                             InputStreamReader inputStreamReader = new InputStreamReader(System.in);
57                                             BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
58 
59                                             byteBuffer.put(bufferedReader.readLine().getBytes());
60                                             byteBuffer.flip();
61                                             client.write(byteBuffer);
62 
63                                         }catch (Exception e) {
64                                             e.printStackTrace();
65                                         }
66                                     }
67                                 });
68                             }
69 
70                             iterable.remove();//这个事件清楚,很关键
71                             client.register(selector, SelectionKey.OP_READ);//注册读事件
72                         } else if(selectionKey.isReadable()){//可读取
73                             SocketChannel socketChannel1 = (SocketChannel) selectionKey.channel();
74                             ByteBuffer readBuffer = ByteBuffer.allocate(1024);
75 
76                             int readCount = socketChannel.read(readBuffer);
77                             if(readCount > 0) {
78                                 String receiveMsg = new String(readBuffer.array());
79                                 System.out.println("receiveMsg : " + receiveMsg);
80                             }
81 
82                             iterable.remove();
83                         }
84 
85                     }
86                 }
87             }
88 
89 
90 
91 
92         }catch (Exception e ) {
93             e.printStackTrace();
94         }
95 
96 
97     }
98 }
原文地址:https://www.cnblogs.com/amibandoufu/p/11441560.html