03基于NIO的聊天室案例

服务端代码实现

  1 package server;
  2 
  3 import java.io.Closeable;
  4 import java.io.IOException;
  5 import java.net.InetSocketAddress;
  6 import java.nio.ByteBuffer;
  7 import java.nio.channels.*;
  8 import java.nio.charset.Charset;
  9 import java.util.Set;
 10 
 11 public class ChatServer {
 12 
 13     private static final int DEFAULT_PORT = 8888;
 14     private static final String QUIT = "quit";
 15     private static final int BUFFER = 1024;
 16 
 17     private ServerSocketChannel server;
 18     private Selector selector;
 19     private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);
 20     private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER);
 21     private Charset charset = Charset.forName("UTF-8");
 22     private int port;
 23 
 24     public ChatServer() {
 25         this(DEFAULT_PORT);
 26     }
 27 
 28     public ChatServer(int port) {
 29         this.port = port;
 30     }
 31 
 32     private void start() {
 33         try {
 34             server = ServerSocketChannel.open();
 35             server.configureBlocking(false);
 36             server.socket().bind(new InetSocketAddress(port));
 37 
 38             selector = Selector.open();
 39             server.register(selector, SelectionKey.OP_ACCEPT);
 40             System.out.println("启动服务器, 监听端口:" + port + "...");
 41 
 42             while (true) {
 43                 selector.select();
 44                 Set<SelectionKey> selectionKeys = selector.selectedKeys();
 45                 for (SelectionKey key : selectionKeys) {
 46                     // 处理被触发的事件
 47                     handles(key);
 48                 }
 49                 selectionKeys.clear();
 50             }
 51 
 52         } catch (IOException e) {
 53             e.printStackTrace();
 54         } finally {
 55             close(selector);
 56         }
 57 
 58     }
 59 
 60     private void handles(SelectionKey key) throws IOException {
 61         // ACCEPT事件 - 和客户端建立了连接
 62         if (key.isAcceptable()) {
 63             ServerSocketChannel server = (ServerSocketChannel) key.channel();
 64             SocketChannel client = server.accept();
 65             client.configureBlocking(false);
 66             client.register(selector, SelectionKey.OP_READ);
 67             System.out.println(getClientName(client) + "已连接");
 68         }
 69         // READ事件 - 客户端发送了消息
 70         else if (key.isReadable()) {
 71             SocketChannel client = (SocketChannel) key.channel();
 72             String fwdMsg = receive(client);
 73             if (fwdMsg.isEmpty()) {
 74                 // 客户端异常
 75                 key.cancel();
 76                 selector.wakeup();
 77             } else {
 78                 System.out.println(getClientName(client) + ":" + fwdMsg);
 79                 forwardMessage(client, fwdMsg);
 80 
 81                 // 检查用户是否退出
 82                 if (readyToQuit(fwdMsg)) {
 83                     key.cancel();
 84                     selector.wakeup();
 85                     System.out.println(getClientName(client) + "已断开");
 86                 }
 87             }
 88 
 89         }
 90     }
 91 
 92     private void forwardMessage(SocketChannel client, String fwdMsg) throws IOException {
 93         for (SelectionKey key: selector.keys()) {
 94             Channel connectedClient = key.channel();
 95             if (connectedClient instanceof ServerSocketChannel) {
 96                 continue;
 97             }
 98 
 99             if (key.isValid() && !client.equals(connectedClient)) {
100                 wBuffer.clear();
101                 wBuffer.put(charset.encode(getClientName(client) + ":" + fwdMsg));
102                 wBuffer.flip();
103                 while (wBuffer.hasRemaining()) {
104                     ((SocketChannel)connectedClient).write(wBuffer);
105                 }
106             }
107         }
108     }
109 
110     private String receive(SocketChannel client) throws IOException {
111         rBuffer.clear();
112         while(client.read(rBuffer) > 0);
113         rBuffer.flip();
114         return String.valueOf(charset.decode(rBuffer));
115     }
116 
117     private String getClientName(SocketChannel client) {
118         return "客户端[" + client.socket().getPort() + "]";
119     }
120 
121     private boolean readyToQuit(String msg) {
122         return QUIT.equals(msg);
123     }
124 
125     private void close(Closeable closable) {
126         if (closable != null) {
127             try {
128                 closable.close();
129             } catch (IOException e) {
130                 e.printStackTrace();
131             }
132         }
133     }
134 
135     public static void main(String[] args) {
136         ChatServer chatServer = new ChatServer(7777);
137         chatServer.start();
138     }
139 }

客户端实现

与服务端交互

  1 package client;
  2 
  3 import java.io.Closeable;
  4 import java.io.IOException;
  5 import java.net.InetSocketAddress;
  6 import java.nio.ByteBuffer;
  7 import java.nio.channels.*;
  8 import java.nio.charset.Charset;
  9 import java.util.Set;
 10 import java.util.concurrent.atomic.AtomicBoolean;
 11 
 12 public class ChatClient {
 13 
 14     private static final String DEFAULT_SERVER_HOST = "127.0.0.1";
 15     private static final int DEFAULT_SERVER_PORT = 8888;
 16     private static final String QUIT = "quit";
 17     private static final int BUFFER = 1024;
 18 
 19     private String host;
 20     private int port;
 21     private SocketChannel client;
 22     private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);
 23     private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER);
 24     private Selector selector;
 25     private Charset charset = Charset.forName("UTF-8");
 26 
 27     public ChatClient() {
 28         this(DEFAULT_SERVER_HOST, DEFAULT_SERVER_PORT);
 29     }
 30 
 31     public ChatClient(String host, int port) {
 32         this.host = host;
 33         this.port = port;
 34     }
 35 
 36     public boolean readyToQuit(String msg) {
 37         return QUIT.equals(msg);
 38     }
 39 
 40     private void close(Closeable closable) {
 41         if (closable != null) {
 42             try {
 43                 closable.close();
 44             } catch (IOException e) {
 45                 e.printStackTrace();
 46             }
 47         }
 48     }
 49 
 50     private void start() {
 51         try {
 52             client = SocketChannel.open();
 53             client.configureBlocking(false);
 54 
 55             selector = Selector.open();
 56             client.register(selector, SelectionKey.OP_CONNECT);
 57             client.connect(new InetSocketAddress(host, port));
 58 
 59             while (true) {
 60                 selector.select();
 61                 Set<SelectionKey> selectionKeys = selector.selectedKeys();
 62                 for (SelectionKey key : selectionKeys) {
 63                     handles(key);
 64                 }
 65                 selectionKeys.clear();
 66             }
 67         } catch (IOException e) {
 68             e.printStackTrace();
 69         } catch (ClosedSelectorException e) {
 70             // 用户正常退出
 71         } finally {
 72             close(selector);
 73         }
 74 
 75     }
 76 
 77     private void handles(SelectionKey key) throws IOException {
 78         // CONNECT事件 - 连接就绪事件
 79         if (key.isConnectable()) {
 80             SocketChannel client = (SocketChannel) key.channel();
 81             if (client.isConnectionPending()) {
 82                 client.finishConnect();
 83                 // 处理用户的输入
 84                 new Thread(new UserInputHandler(this)).start();
 85             }
 86             client.register(selector, SelectionKey.OP_READ);
 87         }
 88         // READ事件 -  服务器转发消息
 89         else if (key.isReadable()) {
 90             SocketChannel client = (SocketChannel) key.channel();
 91             String msg = receive(client);
 92             if (msg.isEmpty()) {
 93                 // 服务器异常
 94                 close(selector);
 95             } else {
 96                 System.out.println(msg);
 97             }
 98         }
 99     }
100 
101     public void send(String msg) throws IOException {
102         if (msg.isEmpty()) {
103             return;
104         }
105 
106         wBuffer.clear();
107         wBuffer.put(charset.encode(msg));
108         wBuffer.flip();
109         while (wBuffer.hasRemaining()) {
110             client.write(wBuffer);
111         }
112 
113         // 检查用户是否准备退出
114         if (readyToQuit(msg)) {
115             close(selector);
116         }
117     }
118 
119     private String receive(SocketChannel client) throws IOException {
120         rBuffer.clear();
121         while (client.read(rBuffer) > 0);
122         rBuffer.flip();
123         return String.valueOf(charset.decode(rBuffer));
124     }
125 
126     public static void main(String[] args) {
127         ChatClient client = new ChatClient("127.0.0.1", 7777);
128         client.start();
129     }
130 }

处理用户输入

 1 package client;
 2 
 3 import java.io.BufferedReader;
 4 import java.io.IOException;
 5 import java.io.InputStreamReader;
 6 
 7 public class UserInputHandler implements Runnable {
 8 
 9     private ChatClient chatClient;
10 
11     public UserInputHandler(ChatClient chatClient) {
12         this.chatClient = chatClient;
13     }
14 
15     @Override
16     public void run() {
17         try {
18             // 等待用户输入消息
19             BufferedReader consoleReader =
20                     new BufferedReader(new InputStreamReader(System.in));
21             while (true) {
22                 String input = consoleReader.readLine();
23 
24                 // 向服务器发送消息
25                 chatClient.send(input);
26 
27                 // 检查用户是否准备退出
28                 if (chatClient.readyToQuit(input)) {
29                     break;
30                 }
31             }
32         } catch (IOException e) {
33             e.printStackTrace();
34         }
35     }
36 }
原文地址:https://www.cnblogs.com/lpzh/p/14955419.html