当服务提供端下线时,先看正常的流程。一般来说都是通过kill -15 结束jvm进程,此时会执行钩子函数
public final class ProviderBootStrap { private static Logger logger = LoggerLoader.getLogger(ServicePublisher.class); static Server httpServer = null; static volatile Map<String, Server> serversMap = new HashMap<String, Server>(); static volatile boolean isInitialized = false; static Date startTime = new Date(); public static Date getStartTime() { return startTime; } public static void init() { if (!isInitialized) { synchronized (ProviderBootStrap.class) { if (!isInitialized) { ProviderProcessHandlerFactory.init(); SerializerFactory.init(); ClassUtils.loadClasses("com.dianping.pigeon"); Thread shutdownHook = new Thread(new ShutdownHookListener()); shutdownHook.setDaemon(true); shutdownHook.setPriority(Thread.MAX_PRIORITY); Runtime.getRuntime().addShutdownHook(shutdownHook);
ShutdownHookListener
public void run() { if (logger.isInfoEnabled()) { logger.info("shutdown hook begin......"); } boolean isRocketShutdown = ConfigManagerLoader.getConfigManager().getBooleanValue("pigeon.invoker.rocketshutdown",false); if(isRocketShutdown && ServicePublisher.getAllServiceProviders().size() == 0){ // rocket shutdown } else { try { ServiceFactory.unpublishAllServices(); } catch (Throwable e) { logger.error("error with shutdown hook", e); } try { InvokerBootStrap.shutdown(); } catch (Throwable e) { logger.error("error with shutdown hook", e); } try { ProviderBootStrap.shutdown(); } catch (Throwable e) { logger.error("error with shutdown hook", e); } } if (logger.isInfoEnabled()) { logger.info("shutdown hook end......"); } }
ServicePublisher.unpublishAllServices();
public static void unpublishAllServices() throws RegistryException { if (logger.isInfoEnabled()) { logger.info("unpublish all services"); } ServiceOnlineTask.stop(); setServerWeight(0); try { Thread.sleep(UNPUBLISH_WAITTIME); } catch (InterruptedException e) { } for (String url : serviceCache.keySet()) { ProviderConfig<?> providerConfig = serviceCache.get(url); if (providerConfig != null) { unpublishService(providerConfig); } } }
if (existingService) { List<Server> servers = ProviderBootStrap.getServers(providerConfig);//获取本台机器上所有的服务 for (Server server : servers) { String serverAddress = configManager.getLocalIp() + ":" + server.getPort();//拿到ip:port String registryUrl = server.getRegistryUrl(providerConfig.getUrl());//注册的url RegistryManager.getInstance().unregisterService(registryUrl, RegistryManager.getInstance().getGroup(url), serverAddress);
RegistryManager.unregisterService
public void unregisterService(String serviceName, String group, String serviceAddress) throws RegistryException { if (registry != null) { registry.unregisterService(serviceName, group, serviceAddress); registeredServices.remove(serviceName); monitor.logEvent("PigeonService.unregister", serviceName, "group=" + group); } }
CuratorRegistry.unregisterPersistentNode
public void unregisterPersistentNode(String serviceName, String group, String serviceAddress) throws RegistryException { String servicePath = Utils.getServicePath(serviceName, group);// DP/SERVER/http:^^service.dianping.com^rpcserver^commonServer_1.00 try { if (client.exists(servicePath, false)) { Stat stat = new Stat(); String addressValue = client.getWithNodeExistsEx(servicePath, stat); String[] addressArray = addressValue.split(","); List<String> addressList = new ArrayList<String>(); for (String addr : addressArray) { addr = addr.trim(); if (addr.length() > 0 && !addressList.contains(addr)) { addressList.add(addr); } } if (addressList.contains(serviceAddress)) { addressList.remove(serviceAddress);//把本机的ip地址去掉 if (!addressList.isEmpty()) {//如果去掉本机ip后不为空,继续写zk Collections.sort(addressList); client.set(servicePath, StringUtils.join(addressList.iterator(), ","), stat.getVersion()); } else { List<String> children = client.getChildren(servicePath, false);//这里估计是兼容写法,因为不一定是写dp/server/节点,其他节点可能存在有子节点的情况 if (CollectionUtils.isEmpty(children)) { if (delEmptyNode) { try { client.delete(servicePath); } catch (NoNodeException e) { logger.warn("Already deleted path:" + servicePath + ":" + e.getMessage()); } } else { client.set(servicePath, "", stat.getVersion()); } } else { logger.warn("Existing children [" + children + "] under path:" + servicePath); client.set(servicePath, "", stat.getVersion()); } } }
总结一下就是,如果是正常的关闭,会走钩子函数,通过写zk的方式把该服务器的ip从服务列表里去掉。
接下来看客户端,客户端会监听zk
无论是客户端还是服务端,启动时都会执行
ProviderBootStrap.init()
在这个方法里都会调用 RegistryManager.getInstance() 进行初始化。
public static RegistryManager getInstance() { if (!isInit) { synchronized (RegistryManager.class) { if (!isInit) { instance.init(); initializeException = null; RegistryEventListener.addListener(new InnerServerInfoListener()); isInit = true; } } } return instance; }
最终会调用 CuratorRegistry.init()
进而调用 CuratorClient.init()
private void init() throws Exception { if (!initialized) { synchronized (this) { if (!initialized) { curatorStateListenerActive = true; newCuratorClient(configManager.getStringValue(KEY_REGISTRY_ADDRESS)); initialized = true; } } } }
private boolean newCuratorClient() throws InterruptedException { logger.info("begin to create zookeeper client:" + address); // CuratorFramework client = CuratorFrameworkFactory.newClient(address, // sessionTimeout, connectionTimeout, // new MyRetryPolicy(retries, retryInterval)); CuratorFramework client = CuratorFrameworkFactory.builder().connectString(address) .sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(connectionTimeout) .retryPolicy(new MyRetryPolicy(retries, retryInterval)).build(); client.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { logger.info("zookeeper state changed to " + newState); if (newState == ConnectionState.RECONNECTED) { RegistryEventListener.connectionReconnected(); } monitor.logEvent(EVENT_NAME, "zookeeper:" + newState.name().toLowerCase(), ""); } }); client.getCuratorListenable().addListener(new CuratorEventListener(this), curatorEventListenerThreadPool);
CuratorEventListener#eventReceived
public void eventReceived(CuratorFramework client, CuratorEvent curatorEvent) throws Exception { WatchedEvent event = (curatorEvent == null ? null : curatorEvent.getWatchedEvent()); if (event == null || (event.getType() != EventType.NodeCreated && event.getType() != EventType.NodeDataChanged && event.getType() != EventType.NodeDeleted && event.getType() != EventType.NodeChildrenChanged)) { return; } if (logger.isInfoEnabled()) logEvent(event); try { PathInfo pathInfo = parsePath(event.getPath()); if (pathInfo == null) { logger.warn("Failed to parse path " + event.getPath()); return; } if (pathInfo.type == ADDRESS) { addressChanged(pathInfo);
最终会调用 DefaultServiceChangeListener # onServiceHostChange
public void onServiceHostChange(String serviceName, List<String[]> hostList) { try { Set<HostInfo> newHpSet = parseHostPortList(serviceName, hostList);//收到zk事件之后,最新的服务器ip列表 Set<HostInfo> oldHpSet = RegistryManager.getInstance().getReferencedServiceAddresses(serviceName);//从缓存里拿出来 Set<HostInfo> toAddHpSet = Collections.emptySet(); Set<HostInfo> toRemoveHpSet = Collections.emptySet(); if (oldHpSet == null) { toAddHpSet = newHpSet; } else { toRemoveHpSet = Collections.newSetFromMap(new ConcurrentHashMap<HostInfo, Boolean>()); toRemoveHpSet.addAll(oldHpSet); toRemoveHpSet.removeAll(newHpSet); toAddHpSet = Collections.newSetFromMap(new ConcurrentHashMap<HostInfo, Boolean>()); toAddHpSet.addAll(newHpSet); toAddHpSet.removeAll(oldHpSet); } if (logger.isInfoEnabled()) { logger.info("service hosts changed, to added hosts:" + toAddHpSet); logger.info("service hosts changed, to removed hosts:" + toRemoveHpSet); } for (HostInfo hostPort : toAddHpSet) { RegistryEventListener.providerAdded(serviceName, hostPort.getHost(), hostPort.getPort(), hostPort.getWeight()); } for (HostInfo hostPort : toRemoveHpSet) { RegistryEventListener.providerRemoved(serviceName, hostPort.getHost(), hostPort.getPort()); }
重点看 RegistryEventListener.providerRemoved
ServiceProviderChangeListener的实现类有四种,都是匿名类,每个类要解决的问题都不同
1 ClientManager中的匿名类是这么实现的
RegistryManager # removeServiceAddress
public void removeServiceAddress(String serviceName, HostInfo hostInfo) { Set<HostInfo> hostInfos = referencedServiceAddresses.get(serviceName); if (hostInfos == null || !hostInfos.contains(hostInfo)) { logger.info("address:" + hostInfo + " is not in address list of service " + serviceName); return; } hostInfos.remove(hostInfo); logger.info("removed address:" + hostInfo + " from service:" + serviceName); HostInfo cachedHostInfo = referencedAddresses.get(hostInfo.getConnect()); if (cachedHostInfo != null) { cachedHostInfo.removeService(serviceName); } // If server is not referencd any more, remove from server list if (!isAddressReferenced(hostInfo)) { referencedAddresses.remove(hostInfo.getConnect()); } }
上面的代码很好理解,就是把缓存的服务端信息清理掉
2 ClusterListenerManager # InnerServiceProviderChangeListener
class InnerServiceProviderChangeListener implements ServiceProviderChangeListener { @Override public void hostWeightChanged(ServiceProviderChangeEvent event) { } @Override public void providerAdded(ServiceProviderChangeEvent event) { } @Override public void providerRemoved(ServiceProviderChangeEvent event) { // addConnect的逆操作 String connect = NetUtils.toAddress(event.getHost(), event.getPort()); if (logger.isInfoEnabled()) { logger.info("[cluster-listener-mgr] remove:" + connect + " from " + event.getServiceName()); } ConnectInfo cmd = connectInfoMap.get(connect); if (cmd != null) { cmd.getServiceNames().remove(event.getServiceName()); if (cmd.getServiceNames().size() == 0) { connectInfoMap.remove(connect); } } for (ClusterListener listener : listeners) { listener.doNotUse(event.getServiceName(), event.getHost(), event.getPort()); } } }
DefaultClusterListener # doNotUse
public void doNotUse(String serviceName, String host, int port) { if (logger.isInfoEnabled()) { logger.info("[cluster-listener] do not use service provider:" + serviceName + ":" + host + ":" + port); } List<Client> cs = serviceClients.get(serviceName); List<Client> newCS = new CopyOnWriteArrayList<Client>(); if (cs != null && !cs.isEmpty()) { newCS.addAll(cs); } Client clientFound = null; for (Client client : cs) { if (client != null && client.getHost() != null && client.getHost().equals(host) && client.getPort() == port) { newCS.remove(client); clientFound = client; } } serviceClients.put(serviceName, newCS);//还是清理缓存 // 一个client可能对应多个serviceName,仅当client不被任何serviceName使用时才关闭 if (clientFound != null) { if (!isClientInUse(clientFound)) { allClients.remove(clientFound.getAddress()); RequestQualityManager.INSTANCE.removeClientQualities(clientFound.getAddress());//统计质量信息也就是请求失败次数等,清理掉 closeClientInFuture(clientFound);//关闭tcp连接 } } }
上下的两个内名实现类就略过了,一个是空实现,一个是清理权重信息
其实,除此之外,当客户端启动的时候,会和每一个服务端都建立连接,同时和每个服务端都有心跳检测,如果心跳检测失败,客户端也会执行清理缓存和断开连接的操作。