(四)BIO聊天室实战

1.BIO编程模型

BIO模型:对每一个建立连接的客户端,服务端都要创建一个线程单独处理和这个客户的通信,典型的一请求一应答。

   1.1 优化——伪异步IO编程模型

  • 思路:使用线程池来管理服务器端所有可用线程,即通过一个线程池来处理多个客户端的请求接入。
  • 好处:通过线程池可以灵活的调配线程资源,设置线程的最大值,防止由于海量并发接入导致服务器线程资源耗尽。 

2. 多人聊天室功能概述及设计

  • 功能概述:支持多人同时在线,每个用户的发言都被转发给其他在线用户
  • 服务器端设计:
    • 使用主线程做Acceptor,等待客户端连接,并为每一个客户分配一个Handle线程;
    • 此外,服务器端需要使用容器存储目前在线的所有客户列表,以便转发消息给其他用户。
    • Handle线程任务是,维护客户列表(添加 or 移除);接收用户消息,并转发给其他在线用户。
  • 客户端设计:
    • 要有两个线程,其中主线程与服务器建立连接,并接收来自服务器的消息;
    • 另一个线程则用来处理用户的输入,并将消息发送到服务器。因为等待用户键盘输入时,是阻塞的,此时不能及时显示其他客户的消息。

3. 测试结果

4. 完整代码

   4.1服务器

public class ChatServer {

    private int DEFAULT_PORT = 8888;
    private final String QUIT = "quit";

    private ExecutorService executorService;  // 线程池
    private ServerSocket serverSocket;
    private Map<Integer, Writer> connectedClients; // 键:端口号;值:用于写入客户端的Writer

    public ChatServer() {
        executorService = Executors.newFixedThreadPool(2); // 创建固定数目的线程池
        connectedClients = new HashMap<>();
    }

    // 添加客户方法
    public synchronized void addClient(Socket socket) throws IOException {
        if (socket != null) {
            int port = socket.getPort();
            BufferedWriter writer = new BufferedWriter(
                    new OutputStreamWriter(socket.getOutputStream())
            );
            connectedClients.put(port, writer); // 要处理线程不安全的情况
            System.out.println("客户端[" + port + "]已连接到服务器");
        }
    }

    // 移除客户方法
    public synchronized void removeClient(Socket socket) throws IOException {
        if (socket != null) {
            int port = socket.getPort();
            if (connectedClients.containsKey(port)) {
                // 关闭外层流writer后,内层的socket也会自动关闭
                connectedClients.get(port).close();
            }
            connectedClients.remove(port); // 要处理线程不安全的情况
            System.out.println("客户端[" + port + "]已断开连接");
        }
    }

    // 转发消息方法
    public synchronized void forwardMessage(Socket socket, String fwdMsg) throws IOException {
        for (Integer id : connectedClients.keySet()) {
            if (!id.equals(socket.getPort())) {
                Writer writer = connectedClients.get(id);
                writer.write(fwdMsg);
                writer.flush();
            }
        }
    }

    public boolean readyToQuit(String msg) {
        return QUIT.equals(msg);
    }

    //关闭资源
    public synchronized void close() {
        if (serverSocket != null) {
            try {
                serverSocket.close();
                System.out.println("关闭serverSocket");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    // 服务器主线程任务:等待客户端连接,为其分配处理线程
    public void start() {
        try {
            // 绑定监听端口
            serverSocket = new ServerSocket(DEFAULT_PORT);
            System.out.println("启动服务器,监听端口:" + DEFAULT_PORT + "...");

            while (true) {
                // 等待客户端连接
                Socket socket = serverSocket.accept();

                // bio模型为每个客户都创建一个ChatHandler线程
//                new Thread(new ChatHandler(this,socket)).start();

                // 优化:伪异步IO编程
                executorService.execute(new ChatHandler(this,socket));
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            close();
        }
    }

    public static void main(String[] args) {
        ChatServer server = new ChatServer();
        server.start();
    }
}
public class ChatHandler implements Runnable{
    private ChatServer server;
    private Socket socket;

    public ChatHandler(ChatServer server, Socket socket) {
        this.server = server;
        this.socket = socket;
    }

    @Override
    public void run() {
        try {
            // 存储新上线用户
            server.addClient(socket);

            // 读取用户发送的消息
            BufferedReader reader = new BufferedReader(
                    new InputStreamReader(socket.getInputStream())
            );

            String msg = null;
            // 如果客户端的IO流被关闭时,读取的就是null
            while ((msg = reader.readLine()) != null) {
                String fwdMsg = "客户端[" + socket.getPort() + "]:" + msg + "
";
                System.out.print(fwdMsg);

                // 把消息转发给聊天室在线的其他用户
                server.forwardMessage(socket, fwdMsg);

                // 检查用户是否准备退出
                if (server.readyToQuit(msg)) {
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                server.removeClient(socket);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

   4.2客户端

public class ChatClient {

    private final String DEFAULT_SERVER_HOST = "127.0.0.1";
    private final int DEFAULT_SERVER_PORT = 8888;
    private final String QUIT = "quit";

    private Socket socket;
    private BufferedReader reader;
    private BufferedWriter writer;

    // 发送消息给服务器,让服务器转发给其他人
    public void send(String msg) throws IOException {
        // 确定socket的输出流仍然是开放的状态
        if (!socket.isOutputShutdown()) {
            writer.write(msg + "
");
            writer.flush();
        }
    }

    // 从服务器接收消息
    public String receive() throws IOException {
        String msg = null;
        if (!socket.isInputShutdown()) {
            msg = reader.readLine();
        }
        return msg;
    }

    // 检查用户是否准备退出
    public boolean readyToQuit(String msg) {
        return QUIT.equals(msg);
    }

    // 关闭资源
    public void close() {
        if (writer != null) {
            try {
                System.out.println("关闭socket");
                writer.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void start() {
        try {
            // 创建socket
            socket = new Socket(DEFAULT_SERVER_HOST,DEFAULT_SERVER_PORT);
            // 创建IO流
            reader = new BufferedReader(
                    new InputStreamReader(socket.getInputStream())
            );
            writer = new BufferedWriter(
                    new OutputStreamWriter(socket.getOutputStream())
            );

            // 创建线程:处理用户的输入
            new Thread(new UserinputHandler(this)).start();

            // 主线程:时刻读取服务器转发的消息
            String msg = null;
            // 不为null说明和服务器连接的流依然是有效的
            while ((msg = receive()) != null) {
                System.out.println(msg);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            close();
        }
    }

    public static void main(String[] args) {
        ChatClient chatClient = new ChatClient();
        chatClient.start();
    }
}
public class UserinputHandler implements Runnable{

    private ChatClient chatClient;
    public UserinputHandler(ChatClient chatClient) {
        this.chatClient = chatClient;
    }

    @Override
    public void run() {
        try {
            // 等待用户输入消息
            BufferedReader consoleReader = new BufferedReader(
                    new InputStreamReader(System.in)
            );
            while (true) {
                String input = consoleReader.readLine();

                // 向服务器发送消息
                chatClient.send(input);

                // 检查用户是否准备退出
                if (chatClient.readyToQuit(input)) {
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

参考

一站式学习Java网络编程 全面理解BIO_NIO_AIO,学习手记(四)

原文地址:https://www.cnblogs.com/HuangYJ/p/14457171.html