Zookeeper 源码(七)请求处理

Zookeeper 源码(七)请求处理

以单机启动为例讲解 Zookeeper 是如何处理请求的。先回顾一下单机时的请求处理链。

// 单机包含 3 个请求链:PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor
protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor syncProcessor = new SyncRequestProcessor(this,
            finalProcessor);
    ((SyncRequestProcessor)syncProcessor).start();
    firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    ((PrepRequestProcessor)firstProcessor).start();
}

请求的调用链如下:

PrepRequestProcessor.processRequest() <- ZooKeeperServer.submitRequest() <- ZooKeeperServer.processPacket() <- NettyServerCnxn.receiveMessage() <- CnxnChannelHandler.processMessage() <- CnxnChannelHandler.messageReceived() 

RequestProcessor 接口

public interface RequestProcessor {
    public static class RequestProcessorException extends Exception {
        public RequestProcessorException(String msg, Throwable t) {
            super(msg, t);
        }
    }
    // 处理请求
    void processRequest(Request request) throws RequestProcessorException;
    // 关闭当前及子处理器,处理器可能是线程
    void shutdown();
}

一、PrepRequestProcessor

PrepRequestProcessor 是服务器的请求预处理器,能够识别出当前客户端是否是事务请求,对于事务请求,进行一系列预处理,如创建请求事务头,事务体,会话检查,ACL 检查等。

(1) PrepRequestProcessor 构造函数

public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
    // 已提交请求队列
    LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
    // 下个处理器
    private final RequestProcessor nextProcessor;
    // Zookeeper 服务器
    ZooKeeperServer zks;

    public PrepRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
        // 初始化线程
        super("ProcessThread(sid:" + zks.getServerId() + " cport:"
                + zks.getClientPort() + "):", zks.getZooKeeperServerListener());
        this.nextProcessor = nextProcessor;
        this.zks = zks;
    }
}

说明:类的核心属性有 submittedRequests 和 nextProcessor,前者表示已经提交的请求,而后者表示提交的下个处理器。

(2) RequestProcessor 接口实现

// 接收请求
public void processRequest(Request request) {
    submittedRequests.add(request);
}

// 关闭线程
public void shutdown() {
    LOG.info("Shutting down");
    submittedRequests.clear();
    submittedRequests.add(Request.requestOfDeath);
    nextProcessor.shutdown();
}

既然请求都提交到 submittedRequests 中了,必然有地方消费 submittedRequests,下面看一下线程的处理过程。

(3) run(核心)

public void run() {
    try {
        while (true) {
            Request request = submittedRequests.take();
            long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
            if (request.type == OpCode.ping) {          // 请求类型为 PING
                traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
            }
            if (Request.requestOfDeath == request) {    // 结束线程
                break;
            }
            pRequest(request);                          // 处理请求(核心)   
        }
    } catch (RequestProcessorException e) {
        if (e.getCause() instanceof XidRolloverException) {
            LOG.info(e.getCause().getMessage());
        }
        handleException(this.getName(), e);
    } catch (Exception e) {
        handleException(this.getName(), e);
    }
    LOG.info("PrepRequestProcessor exited loop!");
}

说明:run 函数是对 Thread 类 run 函数的重写,其核心逻辑相对简单,即不断从队列中取出 request 进行处理,其会调用 pRequest 函数,while 自旋这样做的好处是充分利用 CPU,避免线程频繁切换线程。

二、SyncRequestProcessor

在分析了 PrepRequestProcessor 处理器后,接着来分析 SyncRequestProcessor,该处理器将请求存入磁盘,其将请求批量的存入磁盘以提高效率,请求在写入磁盘之前是不会被转发到下个处理器的。

SyncRequestProcessor 除了会定期的把 request 持久化到本地磁盘,同时他还要维护本机的 txnlog 和 snapshot,这里的基本逻辑是:

  • 每隔 snapCount/2 个 request 会重新生成一个 snapshot 并滚动一次 txnlog,同时为了避免所有的 zookeeper server 在同一个时间生成 snapshot 和滚动日志,这里会再加上一个随机数,snapCount 的默认值是 10w 个 request

(1) 重要属性

public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {

    private final ZooKeeperServer zks;
    // queuedRequests 接收外界传递的请求队列
    private final LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
    private final RequestProcessor nextProcessor;

    // 快照处理线程
    private Thread snapInProcess = null;
    volatile private boolean running;

    // 等待被刷新到磁盘的请求队列
    private final LinkedList<Request> toFlush = new LinkedList<Request>();
    private final Random r = new Random(System.nanoTime());
    // 快照个数
    private static int snapCount = ZooKeeperServer.getSnapCount();
    // 关闭线程
    private final Request requestOfDeath = Request.requestOfDeath;
}

(2) run(核心方法)

public void run() {
    try {
        // 1. 初始化,日志数量为 0
        int logCount = 0;
        // 确保所有的服务器在同一时间不是使用的同一个快照
        int randRoll = r.nextInt(snapCount/2);
        while (true) {
            Request si = null;
            // 2. 没有需要刷新到磁盘的请求,则 take 取出数据,会阻塞
            if (toFlush.isEmpty()) {
                si = queuedRequests.take();
            // 3. 有则 poll 取出数据,不会阻塞
            } else {
                si = queuedRequests.poll();
                // 没有请求则先将已有的请求刷新到磁盘
                if (si == null) {
                    flush(toFlush);
                    continue;
                }
            }
            if (si == requestOfDeath) {
                break;
            }
            if (si != null) {
                // 4. 将请求添加至日志文件,只有事务性请求才会返回 true
                if (zks.getZKDatabase().append(si)) {
                    logCount++;
                    if (logCount > (snapCount / 2 + randRoll)) {
                        randRoll = r.nextInt(snapCount/2);
                        // 4.1 生成滚动日志 roll the log
                        zks.getZKDatabase().rollLog();
                        // 4.2 生成快照日志,如果 snapInProcess 线程仍在进行快照则忽略本次快照
                        if (snapInProcess != null && snapInProcess.isAlive()) {
                            LOG.warn("Too busy to snap, skipping");
                        } else {
                            snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                                    public void run() {
                                        try {
                                            zks.takeSnapshot();
                                        } catch(Exception e) {
                                            LOG.warn("Unexpected exception", e);
                                        }
                                    }
                                };
                            snapInProcess.start();
                        }
                        logCount = 0;
                    }
                // 5. 查看此时 toFlush 是否为空,如果为空,说明近段时间读多写少,直接交给下一个处理器处理
                } else if (toFlush.isEmpty()) {
                    if (nextProcessor != null) {
                        nextProcessor.processRequest(si);
                        if (nextProcessor instanceof Flushable) {
                            ((Flushable)nextProcessor).flush();
                        }
                    }
                    continue;
                }
                toFlush.add(si);
                if (toFlush.size() > 1000) {
                    flush(toFlush);
                }
            }
        }
    } catch (Throwable t) {
        handleException(this.getName(), t);
    } finally{
        running = false;
    }
    LOG.info("SyncRequestProcessor exited!");
}

(3) flush(刷新到磁盘)

private void flush(LinkedList<Request> toFlush) throws IOException, RequestProcessorException {
    if (toFlush.isEmpty())
        return;
    // 1. 提交至 ZK 数据库
    zks.getZKDatabase().commit();

    // 2. 将所有的请求提交到下个处理器处理
    while (!toFlush.isEmpty()) {
        Request i = toFlush.remove();
        if (nextProcessor != null) {
            nextProcessor.processRequest(i);
        }
    }
    if (nextProcessor != null && nextProcessor instanceof Flushable) {
        // 刷新到磁盘
        ((Flushable)nextProcessor).flush();
    }
}

说明:该函数主要用于将toFlush队列中的请求刷新到磁盘中。

三、FinalRequestProcessor

FinalRequestProcessor 负责把已经 commit 的写操作应用到本机,对于读操作则从本机中读取数据并返回给 client,这个 processor 是责任链中的最后一个

FinalRequestProcessor 是一个同步处理的 processor,主要的处理逻辑就在方法 processRequest 中:

  • 如果 request.hdr != null,则表明 request 是写操作,则调用 zks.processTxn(hdr, txn) 来把 request 关联的写操作执行到内存状态中
  • 如果是写操作,则调用 zks.getZKDatabase().addCommittedProposal(request);
    把 request 加入到 ZKDatabase.committedLog 队列中,这个队列主要是为了快速和 follower 同步而保留的
  • 为各类操作准备响应数据,对于写操作则根据 processTxn 的结果来回复,如果是读操作,则读取内存中的状态
  • 发送响应数据给 client

processRequest 的处理逻辑非常长,我们一点点分析。

(1) 处理事务请求

public void processRequest(Request request) {
    ProcessTxnResult rc = null;
    synchronized (zks.outstandingChanges) {
        // 1. 请求委托 ZookeeperServer 处理,zks 会针对事务和非事务请求会分别处理
        rc = zks.processTxn(request);

        // 2. request.hdr!=null 则是事务请求,即写操作,outstandingChanges 保存有所有的事务请求记录
        //    PrepRequestProcessor 会将事务请求添加到集合中,FinalRequestProcessor 则事务请求已经处理完毕需要移除
        if (request.getHdr() != null) {
            // 事务请求头
            TxnHeader hdr = request.getHdr();
            Record txn = request.getTxn();
            long zxid = hdr.getZxid();
            // zk 有严格的执行顺序,如果小于 zxid 则认为已经处理完毕
            while (!zks.outstandingChanges.isEmpty()
                   && zks.outstandingChanges.get(0).zxid <= zxid) {
                ChangeRecord cr = zks.outstandingChanges.remove(0);
                if (cr.zxid < zxid) {
                    LOG.warn("Zxid outstanding " + cr.zxid + " is less than current " + zxid);
                }
                if (zks.outstandingChangesForPath.get(cr.path) == cr) {
                    zks.outstandingChangesForPath.remove(cr.path);
                }
            }
        }

        // 3. 如果是事务请求,则把 request 加入到 ZKDatabase.committedLog 队列中
        if (request.isQuorum()) {
            zks.getZKDatabase().addCommittedProposal(request);
        }
    }
}

processRequest 将请求委托给了 zk 处理,我们看一下 ZookeeperServer 是如何处理请求的。

public ProcessTxnResult processTxn(Request request) {
    return processTxn(request, request.getHdr(), request.getTxn());
}

private ProcessTxnResult processTxn(Request request, TxnHeader hdr,
                                    Record txn) {
    ProcessTxnResult rc;
    int opCode = request != null ? request.type : hdr.getType();
    long sessionId = request != null ? request.sessionId : hdr.getClientId();
    if (hdr != null) {
        // 写操作(事务请求)
        rc = getZKDatabase().processTxn(hdr, txn);
    } else {
        // 读操作(非事务请求)
        rc = new ProcessTxnResult();
    }
    if (opCode == OpCode.createSession) {
        if (hdr != null && txn instanceof CreateSessionTxn) {
            CreateSessionTxn cst = (CreateSessionTxn) txn;
            sessionTracker.addGlobalSession(sessionId, cst.getTimeOut());
        } else if (request != null && request.isLocalSession()) {
            request.request.rewind();
            int timeout = request.request.getInt();
            request.request.rewind();
            sessionTracker.addSession(request.sessionId, timeout);
        } else {
            LOG.warn("*****>>>>> Got " + txn.getClass() + " " + txn.toString());
        }
    } else if (opCode == OpCode.closeSession) {
        sessionTracker.removeSession(sessionId);
    }
    return rc;
}

(2) 请求响应

// 1. 对于写操作(事务请求)根据 processTxn() 的结果来获取响应数据
case OpCode.create: {
    lastOp = "CREA";
    rsp = new CreateResponse(rc.path);
    err = Code.get(rc.err);
    break;
}
// 2. 对于读操作(非事务请求)从内存数据库中获取响应数据
case OpCode.getData: {
    lastOp = "GETD";
    GetDataRequest getDataRequest = new GetDataRequest();
    ByteBufferInputStream.byteBuffer2Record(request.request,
            getDataRequest);
    DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    Long aclL;
    synchronized(n) {
        aclL = n.acl;
    }
    PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL),
            ZooDefs.Perms.READ,
            request.authInfo);
    Stat stat = new Stat();
    // 直接从内存数据库中获取响应数据
    byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
            getDataRequest.getWatch() ? cnxn : null);
    rsp = new GetDataResponse(b, stat);
    break;
}

参考:

  1. 《Zookeeper请求处理》:https://www.cnblogs.com/leesf456/p/6140503.html
  2. 《【Zookeeper】源码分析之请求处理链(二)之PrepRequestProcessor》:https://www.cnblogs.com/leesf456/p/6412843.html
  3. 《【Zookeeper】源码分析之请求处理链(三)之SyncRequestProcessor》:https://www.cnblogs.com/leesf456/p/6438411.html
  4. 《【Zookeeper】源码分析之请求处理链(四)之FinalRequestProcessor》:https://www.cnblogs.com/leesf456/p/6472496.html
  5. 从 Paxos 到 Zookeeper : 分布式一致性原理与实践

每天用心记录一点点。内容也许不重要,但习惯很重要!

原文地址:https://www.cnblogs.com/binarylei/p/9960371.html