nacos 使用 servlet 异步处理客户端配置长轮询

config 客户端

ClientWorker#ClientWorker 

构造方法中启动定时任务

ClientWorker.LongPollingRunnable

长轮询的任务,在 run 方法的结尾,重新把任务加入到线程池

ClientWorker#checkUpdateDataIds

客户端把所监听配置的元数据信息(namespace, group, dataId, md5)上报给 server

config server 处理请求

ConfigController#listener 
LongPollingService#addLongPollingClient

server 比较客户端传过来的 md5 值,如果有变化,马上返回已变化配置的元数据,否则延迟返回。server 使用了 servlet 3.0 的异步机制,用 ScheduledExecutorService 线程池来返回延时响应,

final AsyncContext asyncContext = req.startAsync();
asyncContext.setTimeout(0L);
scheduler.execute(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));

使用 ClientLongPolling 封装了 AsyncContext,timeout 参数,通过 AsyncContext 可以获取 request 和 response,
在 ClientLongPolling 的 run 方法中,用 ScheduledExecutorService 执行定时任务。所有客户端的 ClientLongPolling 放在 allSubs 中,是一个 ConcurrentLinkedQueue。

// com.alibaba.nacos.config.server.service.LongPollingService.ClientLongPolling
public void run() {
    asyncTimeoutFuture = scheduler.schedule(new Runnable() {
        @Override
        public void run() {
            try {
                getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
                /**
                 * 删除订阅关系
                 */
                allSubs.remove(ClientLongPolling.this);

                if (isFixedPolling()) {
                    LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
                        (System.currentTimeMillis() - createTime),
                        "fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
                        "polling",
                        clientMd5Map.size(), probeRequestSize);
                    List<String> changedGroups = MD5Util.compareMd5(
                        (HttpServletRequest)asyncContext.getRequest(),
                        (HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
                    if (changedGroups.size() > 0) {
                        sendResponse(changedGroups);
                    } else {
                        sendResponse(null);
                    }
                } else {
                    LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
                        (System.currentTimeMillis() - createTime),
                        "timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
                        "polling",
                        clientMd5Map.size(), probeRequestSize);
                    sendResponse(null);
                }
            } catch (Throwable t) {
                LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());
            }

        }

    }, timeoutTime, TimeUnit.MILLISECONDS);

    allSubs.add(this);
}

如果在定时任务执行时,监听的配置文件依然没有变化,server 则返回空文本给 client;

如果发生了配置变更,会由 DataChangeTask 发送变化的配置 id 给客户端,同时取消定时任务。

// com.alibaba.nacos.config.server.service.LongPollingService.DataChangeTask#run
public void run() {
    try {
        ConfigService.getContentBetaMd5(groupKey);
        for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
            ClientLongPolling clientSub = iter.next();
            if (clientSub.clientMd5Map.containsKey(groupKey)) {
                // 如果beta发布且不在beta列表直接跳过
                if (isBeta && !betaIps.contains(clientSub.ip)) {
                    continue;
                }

                // 如果tag发布且不在tag列表直接跳过
                if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
                    continue;
                }

                getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                iter.remove(); // 删除订阅关系
                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
                    (System.currentTimeMillis() - changeTime),
                    "in-advance",
                    RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()),
                    "polling",
                    clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
                clientSub.sendResponse(Arrays.asList(groupKey));
            }
        }
    } catch (Throwable t) {
        LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause());
    }
}

客户端获得变化的配置元数据后,重新拉取新的配置。

config 客户端有有 2 种缓存文件,failover 和 snapshot,从代码看,server 不可用时,一般使用 snapshot

servlet 异步处理:
https://www.cnblogs.com/davenkin/p/async-servlet.html

final AsyncContext asyncContext = req.startAsync();
asyncContext 可以获取 request 和 response,
asyncContext.complete();

原文地址:https://www.cnblogs.com/allenwas3/p/11260576.html