《用Java写一个通用的服务器程序》02 监听器

在一个服务器程序中,监听器的作用类似于公司前台,起引导作用,因此监听器花在每个新连接上的时间应该尽可能短,这样才能保证最快响应。

回到编程本身来说:

1. 监听器最好由单独的线程运行

2. 监听器在接到新的连接之后,处理连接的方法需要尽快返回

在Java Push Framework中,因为需要同时监听普通客户端和服务器监视服务的客户端,所以定义两种监听器:Acceptor和MonitorAcceptor。

由于两者的关于监听部分的逻辑是相同的,因此首先定义了抽象类Listener来实现了监视器的功能,把处理socket的部分定义为抽象方法。

// 处理socket的抽象方法
protected abstract boolean handleAcceptedSocket(
            PushClientSocket clientSocket);

对于监听功能的实现比较简单,还是那三步:create,bind,accept。

    private boolean doListening(InetSocketAddress serverAddr) {
        boolean ret = false;
        int socketBufferSize = getServerImpl().getServerOptions()
                    .getSocketBufferSize();
        int socketType = getServerImpl().getServerOptions()
                    .getSocketType();
        try {
            // Create
            serverSocket = SocketFactory.getDefault().createServerSocket(
                    socketType, socketBufferSize);
            
            // Bind
            serverSocket.bind(serverAddr);

            Debug.debug("Start to listen " + serverAddr.getHostName() + ":"
                    + serverAddr.getPort());
            
            // Accept
            doAccept();
            
            ret = true;
        } catch (IOException e) {
            e.printStackTrace();
            if (serverSocket != null) {
                serverSocket.close();
                serverSocket = null;
            }
        }
        
        return ret;
    }

考虑Java中现在有好几种不同的socket:同步阻塞Socket,同步非阻塞Socket,以及JDK7新添加的异步Socket,如果直接使用Java的Socket类,不方便在不同类型的socket之间切换使用。所以我自定义了PushServerSocket和PushClientSocket两个新接口:

// 对于服务器socket来说,只定义了必须的bind和accept,
// 以及一个不会抛出异常的close。
public interface PushServerSocket {

    public void bind(InetSocketAddress serverAddr) throws IOException;

    public PushClientSocket accept() throws IOException;

    public void close();
}
// 客户端socket接口的定义是C++的风格,因为原来的代码是C++写的,这么定义便于翻译原来的C++代码
public interface PushClientSocket {

    public String getIP();
    public int getPort();
    
    // 这里直接使用Selector其实是有问题的,注定了只能使用NIO的方式
    // 后面会考虑修改
    public SelectionKey registerSelector(Selector selector, int ops, 
            Object attachment) throws IOException;
    
    public int send(byte[] buffer, int offset, int size) throws IOException;
    
    public int recv(byte[] buffer, int offset, int size) throws IOException;
    
    public boolean isOpen();
    
    public boolean isConnected();
    
    public void close();
}

两者对应的NIO版本实现是PushServerSocketImpl和PushClientSocketImpl,代码实现比较简单,这里就不贴出来了。

回到Listener,来看doAccept:

    private void doAccept()
    {
        // Start a new thread
        acceptorThread = new Thread(new Runnable()  {
            public void run() {
                while (blnRunning) {
                    try {
                        PushClientSocket clientSocket = serverSocket.accept();
                        
                        Debug.debug("New client from " + clientSocket.getIP());

                        // Start servicing the client connection
                        if (!handleAcceptedSocket(clientSocket)) {
                            clientSocket.close();
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                        return;
                    }
                }
            }
            
        });
        
        // Start the thread
        acceptorThread.start();
    }

这里服务器socket的accept方法实现是阻塞的,这样可以避免不停地轮询,因此在用NIO实现accept时要不能调用configureBlocking设置成非阻塞模式。

后面停止监听时直接调用服务器socket的close方法,accept方法会抛出异常从而跳出循环,结束监听线程的运行。

结束监听时不要忘记使用线程的join方法等待线程结束。

    public void stopListening() {
        blnRunning = false;

        // Close server socket
        if (serverSocket != null) {
            serverSocket.close();
            serverSocket = null;
        }
        
        // Wait the thread to terminate
        if (acceptorThread != null) {
            try {
                acceptorThread.join();
            } catch (InterruptedException e) {
                //e.printStackTrace();
            }

            acceptorThread = null;
        }
    }

Acceptor的实现相对复杂一些,需要记录访问的信息,做一些检查,然后再交给ClientFactory处理:

    protected boolean handleAcceptedSocket(PushClientSocket clientSocket) {
        // 记录日志
        ClientFactory clientFactoryImpl = serverImpl.getClientFactory();
        ServerStats stats = serverImpl.getServerStats();
        ServerOptions options = serverImpl.getServerOptions();

        stats.addToCumul(ServerStats.Measures.VisitorsSYNs, 1);
        // 检查是否达到最大允许访问数
        if (clientFactoryImpl.getClientCount() >= options.getMaxConnections()) {
            Debug.debug("Reach maximum clients allowed, deny it");
            return false;
        }

        //检查IP是否被允许
        if (!clientFactoryImpl.isAddressAllowed(clientSocket.getIP())) {
            Debug.debug("IP refused: " + clientSocket.getIP());
            return false;
        }
        // 处理socket
        return clientFactoryImpl.createPhysicalConnection(clientSocket, 
                false, listenerOptions);
    }

MonitorAcceptor的实现比较简单,直接交给ClientFactory处理就可以。

    protected boolean handleAcceptedSocket(PushClientSocket clientSocket) {
        return serverImpl.getClientFactory().createPhysicalConnection(
                clientSocket, true, listenerOptions);
    }

关于ClientFactory的处理逻辑后面的文章里细讲。

实现一个监听器功能是很容易的,所以可以说的东西不多。

原文地址:https://www.cnblogs.com/wanly3643/p/4058825.html