zookeeper客户端访问服务端时,基于NIO的线程池绑定

 SelectorThread可以通过JVM参数指定,表示多路复用器。

每一个客户端生成一个SocketChannel,多个或一个SocketChannel与SelectorThread绑定。

详细描述如下:

  1. 会有若干个客户端由AcceptThread线程去接收socket连接,客户端与ZKServer取得连接。每存在一个连接就有一个SocketChannel

    1.   同时也会生成一个NIOServerCnxn,这个上下文对象在下文有“大用处”
  2. SocketChannel与SelectorThread(多路复用选择器)绑定到一起,作为请求的传输通道

    1. 每产生一个SocketChannel对象就会作为SelectorThread对象的acceptedQueue属性与SelectorThread绑定。
  3. SelectorThread监听接收就绪事件

    1. SelectorThread会不断的从acceptedQueue中读取SocketChannel对象,作为注册事件读到SelectorThread的selector中。同时SelectorThread也会不断地从selector中读取就绪事件
  4. 处理请求的方式

    1.   每获取到一个就绪事件就会封装成一个IOWorkRequest对象
    2.        然后会将IOWorkRequest对象封装成一个ScheduleWorkRequest对象(线程)
    3.        将Schedule对象放到线程池中执行
      1.   线程池执行时调用IOWorkRequest对象的doWork()方法。
                public void doWork() throws InterruptedException {
        
        
                    if (!key.isValid()) {
                        selectorThread.cleanupSelectionKey(key);
                        return;
                    }
        
                    // 读就绪或写就绪
                    if (key.isReadable() || key.isWritable()) {
        
                        // 处理key
                        // 到这里,多个客户端请求还是并发处理的
        
                        cnxn.doIO(key); // 顺序
        
                        // Check if we shutdown or doIO() closed this connection
                        if (stopped) {
                            cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
                            return;
                        }
                        if (!key.isValid()) {
                            selectorThread.cleanupSelectionKey(key);
                            return;
                        }
                        touchCnxn(cnxn);
                    }
        
                    // Mark this connection as once again ready for selection
                    cnxn.enableSelectable();
                    // Push an update request on the queue to resume selecting
                    // on the current set of interest ops, which may have changed
                    // as a result of the I/O operations we just performed.
                    if (!selectorThread.addInterestOpsUpdateRequest(key)) {
                        cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED);
                    }
                }
      2. 可以看到,doWork方法内部调用的就是NIOServerCnxn的doIO方法。所以doIO()方法是zkServer真正处理客户端请求的方法(处理CRUD等请求)
        void doIO(SelectionKey k) throws InterruptedException {
                try {
                    if (!isSocketOpen()) {
                        LOG.warn("trying to do i/o on a null socket for session: 0x{}", Long.toHexString(sessionId));
        
                        return;
                    }
                    if (k.isReadable()) {
                        // 读就绪,把数据读到incomingBuffer中,
                        int rc = sock.read(incomingBuffer); // 45 一开始读4个字节数据,也就是读数据包的长度
        
                        //
                        if (rc < 0) {
                            // 没有读到数据则报错
                            handleFailedRead();
                        }
        
                        // 表示还有没有剩余空间可以读数据
                        if (incomingBuffer.remaining() == 0) {
                            boolean isPayload;
        
                            // 读到的是长度
                            if (incomingBuffer == lenBuffer) { // start of next request
                                incomingBuffer.flip();
                                isPayload = readLength(k);
                                incomingBuffer.clear(); // 54byte
                            } else {
                                // 读到的是真正的packet数据(也就是命令)
                                // continuation
                                isPayload = true;
                            }
        
                            if (isPayload) { // not the case for 4letterword
                                // 处理命令
                                readPayload();//读数据
                            } else {
                                // four letter words take care
                                // need not do anything else
                                return;
                            }
                        }
                    }
                    if (k.isWritable()) {
                        // 从outgoingBuffers中获取数据进行写入(返回给客户端)
                        handleWrite(k);
        
                        if (!initialized && !getReadInterest() && !getWriteInterest()) {
                            throw new CloseRequestException("responded to info probe", DisconnectReason.INFO_PROBE);
                        }
                    }
                } catch (CancelledKeyException e) {
                    LOG.warn("CancelledKeyException causing close of session: 0x{}", Long.toHexString(sessionId));
        
                    LOG.debug("CancelledKeyException stack trace", e);
        
                    close(DisconnectReason.CANCELLED_KEY_EXCEPTION);
                } catch (CloseRequestException e) {
                    // expecting close to log session closure
                    // 移除watcher,并关闭socket
                    close();
                } catch (EndOfStreamException e) {
                    LOG.warn("Unexpected exception", e);
                    // expecting close to log session closure
                    close(e.getReason());
                } catch (ClientCnxnLimitException e) {
                    // Common case exception, print at debug level
                    ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
                    LOG.warn("Closing session 0x{}", Long.toHexString(sessionId), e);
                    close(DisconnectReason.CLIENT_CNX_LIMIT);
                } catch (IOException e) {
                    LOG.warn("Close of session 0x{}", Long.toHexString(sessionId), e);
                    close(DisconnectReason.IO_EXCEPTION);
                }
            }

原文地址:https://www.cnblogs.com/yibao/p/14044769.html