《用Java写一个通用的服务器程序》03 处理新socket

在讲监听器时说过处理的新的socket要尽快返回,监听器调用的是ClientFactory的createPhysicalConnection方法,那么就来看这个方法:

    public boolean createPhysicalConnection(PushClientSocket socket,
            boolean isObserver, ListenerOptions listenerOptions) {
        PhysicalConnectionPool thePhysicalConnectionPool = 
            serverImpl.getPhysicalConnectionPool();
        IOQueue<PhysicalConnection> ioQueue = serverImpl.getIOQueue();

        // 内置了一个PhysicalConnection的对象池,这样可以避免每次都要
        // 创建PhysicalConnection对象,可以加快处理速度
        PhysicalConnection connection = 
            thePhysicalConnectionPool.borrowObject();
        // 把PhysicalConnection对象和socket对象关联起来
        connection.reset(socket, isObserver, listenerOptions);

        // 初始化协议,分配buffer,用来缓存解析请求时的数据
        if (!connection.setUpProtocolContexts()) {
            thePhysicalConnectionPool.returnObject(connection);
            return false;
        }

        Debug.debug("Physical Connection Created for client from: " + 
                socket.getIP());

        // 把连接注册到I/O队列中,这样就可以监听请求
        if (!ioQueue.addSocketContext(socket, connection)) {
            thePhysicalConnectionPool.returnObject(connection);
            //leave socket close to acceptor
            return false;
        }

        Debug.debug("Queue adds client from: " + socket.getIP());

        // 把创建的PhysicalConnection加入pending队列中,此时连接
        // 还不算是真正的已连接状态,要等到第一个请求到达并正确
        // 处理之后才会是已连接状态,并且会创建一个LogicalConnection
        // 和这个PhysicalConnection相关联
        addPhysicalConnection(connection);
        
        // 初始化PhysicalConnection
        serverImpl.getDispatcher().handleInitialize(connection);

        return true;
    }

ClientFactory是PhysicalConnection的管理程序,这个方法的的作用就是创建PhysicalConnection和新的socket相关联,并且把PhysicalConnection加入请求监听的I/O队列。因此来说说IOQueue。

IOQueue本身是一个接口:

public interface IOQueue<T> {

    public boolean create();

    public void free();

    // 从队列中获取事件,默认实现是带有阻塞超时的,即当没有事件
    // 时会阻塞一段时间,超时就会返回null
    public IOEvent<T> getQueuedEvent(boolean isInputEvents);

    // 注册连接,context是关联对象,类似于附件
    public boolean addSocketContext(PushClientSocket socket, T context);

    // 取消注册
    public void deleteSocketContext(PushClientSocket socket);

    // IOQueue的事件监听是一次性,这是为了防止事件在没有被处理之前,这个事件
    // 再次被捕捉到(Java的read/write事件都是这样),因此这个方法会在事件
    // 被处理后调用,再次注册。
    public boolean rearmSocketForWrite(PushClientSocket socket, T context);

    // Read事件代表的就是从客户端有数据过来
    public boolean rearmSocketForRead(PushClientSocket socket, T context);
}

IOQueueImpl是IOQueue的Java NIO版本的实现。IOQueueImpl会内置一个独立线程以及一个Selector,这里关于注册有一点需要说明:

PushClientSocketImpl的registerSelector方法用于注册socket,这里需要调用wakeup方法。因为如果独立线程会调用Selector的select方法等待新的数据,这个时候直接

调用register方法会被阻塞,因此需要先调用wakeup唤醒selector。

    public SelectionKey registerSelector(Selector selector, int ops, 
            Object attachment) throws IOException {
        selector.wakeup(); // To prevent block when calling register method
        return channel.register(selector, ops, attachment);
    }

接着说说独立线程监听事件,因为OP_WRITE的特殊性,这里只监听OP_READ事件。

    private void pollEvents(boolean isOutPoll) {
        Selector selector;
        BlockingQueue<SelectionKey> queue;
        if (isOutPoll) {
            return;
        } else {
            selector = this.inPollSelector;
            queue = this.inPollQueue;
        }
        
        List<SelectionKey> cache = new LinkedList<SelectionKey>();

        while (isPolling) {
            try {
                selector.select();
            } catch (IOException e) {
                continue;
            }
            
            // 这里调用yield释放控制权是为了刚刚提到的register方法能被顺利执行
            Thread.yield();
            
            // Add into cache (Add into the blocking queue directly
            // may block so that the selector cannot release the selection 
            // key in time)
            if (selector.isOpen()) {
                for (SelectionKey key : selector.selectedKeys()) {
                    // 前面提到监听事件是一次性的,因此这里取消监听
                    // 后面再调用rearm方法重新注册
                    key.cancel(); 
                    cache.add(key);
                }
                
                // Clear the keys
                selector.selectedKeys().clear();
                
                // 因为使用了限定长度的BlockingQueue,可能因为队列已满导致阻塞
                // 因此先把事件转移到缓存中,释放Selector
                queue.addAll(cache);
                cache.clear();
            } else {
                break; // selector closed
            }
        }
    }

顺便说说Demultiplexor获取事件调用的getQueuedEvent方法,这里使用BlockingQueue来实现阻塞等待:

    public IOEvent<PhysicalConnection> getQueuedEvent(boolean isInputEvents) {
        final IOEventType type;
        final BlockingQueue<SelectionKey> pollQueue;
        
        if (isInputEvents) {
            type = IOEventType.read;
            pollQueue = inPollQueue;
        } else {
            type = null;
            pollQueue = null;
        }
        
        if (pollQueue == null) {
            return null;
        }

        try {
            // 设置1秒的超时,这样后面关闭时清空I/O队列的时候不会导致
            // Demultiplexor一直被阻塞
            SelectionKey key = pollQueue.poll(1000L, TimeUnit.MILLISECONDS);
            if (key != null) {
                if (key.attachment() instanceof PhysicalConnection) {
                    return new IOEvent<PhysicalConnection>(type, 
                            (PhysicalConnection)(key.attachment()));
                }
            }
        } catch (InterruptedException e) {
            // Ignore
        }
        
        return null;
    }

关于新socket的处理就说这么多吧。

原文地址:https://www.cnblogs.com/wanly3643/p/4063069.html