config 客户端
ClientWorker#ClientWorker
构造方法中启动定时任务
ClientWorker.LongPollingRunnable
长轮询的任务,在 run 方法的结尾,重新把任务加入到线程池
ClientWorker#checkUpdateDataIds
客户端把所监听配置的元数据信息(namespace, group, dataId, md5)上报给 server
config server 处理请求
ConfigController#listener
LongPollingService#addLongPollingClient
server 比较客户端传过来的 md5 值,如果有变化,马上返回已变化配置的元数据,否则延迟返回。server 使用了 servlet 3.0 的异步机制,用 ScheduledExecutorService 线程池来返回延时响应,
final AsyncContext asyncContext = req.startAsync(); asyncContext.setTimeout(0L); scheduler.execute(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
使用 ClientLongPolling 封装了 AsyncContext,timeout 参数,通过 AsyncContext 可以获取 request 和 response,
在 ClientLongPolling 的 run 方法中,用 ScheduledExecutorService 执行定时任务。所有客户端的 ClientLongPolling 放在 allSubs 中,是一个 ConcurrentLinkedQueue。
// com.alibaba.nacos.config.server.service.LongPollingService.ClientLongPolling public void run() { asyncTimeoutFuture = scheduler.schedule(new Runnable() { @Override public void run() { try { getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis()); /** * 删除订阅关系 */ allSubs.remove(ClientLongPolling.this); if (isFixedPolling()) { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); List<String> changedGroups = MD5Util.compareMd5( (HttpServletRequest)asyncContext.getRequest(), (HttpServletResponse)asyncContext.getResponse(), clientMd5Map); if (changedGroups.size() > 0) { sendResponse(changedGroups); } else { sendResponse(null); } } else { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); sendResponse(null); } } catch (Throwable t) { LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause()); } } }, timeoutTime, TimeUnit.MILLISECONDS); allSubs.add(this); }
如果在定时任务执行时,监听的配置文件依然没有变化,server 则返回空文本给 client;
如果发生了配置变更,会由 DataChangeTask 发送变化的配置 id 给客户端,同时取消定时任务。
// com.alibaba.nacos.config.server.service.LongPollingService.DataChangeTask#run public void run() { try { ConfigService.getContentBetaMd5(groupKey); for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) { ClientLongPolling clientSub = iter.next(); if (clientSub.clientMd5Map.containsKey(groupKey)) { // 如果beta发布且不在beta列表直接跳过 if (isBeta && !betaIps.contains(clientSub.ip)) { continue; } // 如果tag发布且不在tag列表直接跳过 if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) { continue; } getRetainIps().put(clientSub.ip, System.currentTimeMillis()); iter.remove(); // 删除订阅关系 LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance", RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()), "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey); clientSub.sendResponse(Arrays.asList(groupKey)); } } } catch (Throwable t) { LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause()); } }
客户端获得变化的配置元数据后,重新拉取新的配置。
config 客户端有有 2 种缓存文件,failover 和 snapshot,从代码看,server 不可用时,一般使用 snapshot
servlet 异步处理:
https://www.cnblogs.com/davenkin/p/async-servlet.html
final AsyncContext asyncContext = req.startAsync();
asyncContext 可以获取 request 和 response,
asyncContext.complete();