Pigeon源码分析(六) -- 服务下线流程

当服务提供端下线时,先看正常的流程。一般来说都是通过kill -15 结束jvm进程,此时会执行钩子函数

public final class ProviderBootStrap {

    private static Logger logger = LoggerLoader.getLogger(ServicePublisher.class);
    static Server httpServer = null;
    static volatile Map<String, Server> serversMap = new HashMap<String, Server>();
    static volatile boolean isInitialized = false;
    static Date startTime = new Date();

    public static Date getStartTime() {
        return startTime;
    }

    public static void init() {
        if (!isInitialized) {
            synchronized (ProviderBootStrap.class) {
                if (!isInitialized) {
                    ProviderProcessHandlerFactory.init();
                    SerializerFactory.init();
                    ClassUtils.loadClasses("com.dianping.pigeon");
                    Thread shutdownHook = new Thread(new ShutdownHookListener());
                    shutdownHook.setDaemon(true);
                    shutdownHook.setPriority(Thread.MAX_PRIORITY);
                    Runtime.getRuntime().addShutdownHook(shutdownHook);

  ShutdownHookListener 

public void run() {
        if (logger.isInfoEnabled()) {
            logger.info("shutdown hook begin......");
        }

        boolean isRocketShutdown = ConfigManagerLoader.getConfigManager().getBooleanValue("pigeon.invoker.rocketshutdown",false);
        if(isRocketShutdown && ServicePublisher.getAllServiceProviders().size() == 0){
            // rocket shutdown
        } else {
            try {
                ServiceFactory.unpublishAllServices();
            } catch (Throwable e) {
                logger.error("error with shutdown hook", e);
            }
            try {
                InvokerBootStrap.shutdown();
            } catch (Throwable e) {
                logger.error("error with shutdown hook", e);
            }
            try {
                ProviderBootStrap.shutdown();
            } catch (Throwable e) {
                logger.error("error with shutdown hook", e);
            }
        }
        if (logger.isInfoEnabled()) {
            logger.info("shutdown hook end......");
        }
    }

  ServicePublisher.unpublishAllServices();

public static void unpublishAllServices() throws RegistryException {
        if (logger.isInfoEnabled()) {
            logger.info("unpublish all services");
        }
        ServiceOnlineTask.stop();
        setServerWeight(0);
        try {
            Thread.sleep(UNPUBLISH_WAITTIME);
        } catch (InterruptedException e) {
        }
        for (String url : serviceCache.keySet()) {
            ProviderConfig<?> providerConfig = serviceCache.get(url);
            if (providerConfig != null) {
                unpublishService(providerConfig);
            }
        }
    }
if (existingService) {
            List<Server> servers = ProviderBootStrap.getServers(providerConfig);//获取本台机器上所有的服务
            for (Server server : servers) {
                String serverAddress = configManager.getLocalIp() + ":" + server.getPort();//拿到ip:port
                String registryUrl = server.getRegistryUrl(providerConfig.getUrl());//注册的url
                RegistryManager.getInstance().unregisterService(registryUrl,
                        RegistryManager.getInstance().getGroup(url), serverAddress);

RegistryManager.unregisterService

public void unregisterService(String serviceName, String group, String serviceAddress) throws RegistryException {
        if (registry != null) {
            registry.unregisterService(serviceName, group, serviceAddress);
            registeredServices.remove(serviceName);
            monitor.logEvent("PigeonService.unregister", serviceName, "group=" + group);
        }
    }

CuratorRegistry.unregisterPersistentNode

public void unregisterPersistentNode(String serviceName, String group, String serviceAddress)
            throws RegistryException {
        String servicePath = Utils.getServicePath(serviceName, group);// DP/SERVER/http:^^service.dianping.com^rpcserver^commonServer_1.00
        try {
            if (client.exists(servicePath, false)) {
                Stat stat = new Stat();
                String addressValue = client.getWithNodeExistsEx(servicePath, stat);
                String[] addressArray = addressValue.split(",");
                List<String> addressList = new ArrayList<String>();
                for (String addr : addressArray) {
                    addr = addr.trim();
                    if (addr.length() > 0 && !addressList.contains(addr)) {
                        addressList.add(addr);
                    }
                }
                if (addressList.contains(serviceAddress)) {
                    addressList.remove(serviceAddress);//把本机的ip地址去掉
                    if (!addressList.isEmpty()) {//如果去掉本机ip后不为空,继续写zk
                        Collections.sort(addressList);
                        client.set(servicePath, StringUtils.join(addressList.iterator(), ","), stat.getVersion());
                    } else {
                        List<String> children = client.getChildren(servicePath, false);//这里估计是兼容写法,因为不一定是写dp/server/节点,其他节点可能存在有子节点的情况
                        if (CollectionUtils.isEmpty(children)) {
                            if (delEmptyNode) {
                                try {
                                    client.delete(servicePath);
                                } catch (NoNodeException e) {
                                    logger.warn("Already deleted path:" + servicePath + ":" + e.getMessage());
                                }
                            } else {
                                client.set(servicePath, "", stat.getVersion());
                            }
                        } else {
                            logger.warn("Existing children [" + children + "] under path:" + servicePath);
                            client.set(servicePath, "", stat.getVersion());
                        }
                    }
                }

总结一下就是,如果是正常的关闭,会走钩子函数,通过写zk的方式把该服务器的ip从服务列表里去掉。

接下来看客户端,客户端会监听zk

无论是客户端还是服务端,启动时都会执行 

ProviderBootStrap.init()

在这个方法里都会调用 RegistryManager.getInstance()  进行初始化。

public static RegistryManager getInstance() {
        if (!isInit) {
            synchronized (RegistryManager.class) {
                if (!isInit) {
                    instance.init();
                    initializeException = null;
                    RegistryEventListener.addListener(new InnerServerInfoListener());
                    isInit = true;
                }
            }
        }
        return instance;
    }

最终会调用  CuratorRegistry.init() 

进而调用 CuratorClient.init()

private void init() throws Exception {
        if (!initialized) {
            synchronized (this) {
                if (!initialized) {
                    curatorStateListenerActive = true;
                    newCuratorClient(configManager.getStringValue(KEY_REGISTRY_ADDRESS));
                    initialized = true;
                }
            }
        }
    }
private boolean newCuratorClient() throws InterruptedException {
        logger.info("begin to create zookeeper client:" + address);
        // CuratorFramework client = CuratorFrameworkFactory.newClient(address,
        // sessionTimeout, connectionTimeout,
        // new MyRetryPolicy(retries, retryInterval));
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString(address)
                .sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(connectionTimeout)
                .retryPolicy(new MyRetryPolicy(retries, retryInterval)).build();
        client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                logger.info("zookeeper state changed to " + newState);
                if (newState == ConnectionState.RECONNECTED) {
                    RegistryEventListener.connectionReconnected();
                }
                monitor.logEvent(EVENT_NAME, "zookeeper:" + newState.name().toLowerCase(), "");
            }
        });
        client.getCuratorListenable().addListener(new CuratorEventListener(this), curatorEventListenerThreadPool);

  CuratorEventListener#eventReceived

  

public void eventReceived(CuratorFramework client, CuratorEvent curatorEvent) throws Exception {
        WatchedEvent event = (curatorEvent == null ? null : curatorEvent.getWatchedEvent());

        if (event == null
                || (event.getType() != EventType.NodeCreated && event.getType() != EventType.NodeDataChanged
                        && event.getType() != EventType.NodeDeleted && event.getType() != EventType.NodeChildrenChanged)) {
            return;
        }

        if (logger.isInfoEnabled())
            logEvent(event);

        try {
            PathInfo pathInfo = parsePath(event.getPath());
            if (pathInfo == null) {
                logger.warn("Failed to parse path " + event.getPath());
                return;
            }

            if (pathInfo.type == ADDRESS) {
                addressChanged(pathInfo);

  最终会调用 DefaultServiceChangeListener # onServiceHostChange

public void onServiceHostChange(String serviceName, List<String[]> hostList) {
        try {
            Set<HostInfo> newHpSet = parseHostPortList(serviceName, hostList);//收到zk事件之后,最新的服务器ip列表
            Set<HostInfo> oldHpSet = RegistryManager.getInstance().getReferencedServiceAddresses(serviceName);//从缓存里拿出来
            Set<HostInfo> toAddHpSet = Collections.emptySet();
            Set<HostInfo> toRemoveHpSet = Collections.emptySet();
            if (oldHpSet == null) {
                toAddHpSet = newHpSet;
            } else {
                toRemoveHpSet = Collections.newSetFromMap(new ConcurrentHashMap<HostInfo, Boolean>());
                toRemoveHpSet.addAll(oldHpSet);
                toRemoveHpSet.removeAll(newHpSet);
                toAddHpSet = Collections.newSetFromMap(new ConcurrentHashMap<HostInfo, Boolean>());
                toAddHpSet.addAll(newHpSet);
                toAddHpSet.removeAll(oldHpSet);
            }
            if (logger.isInfoEnabled()) {
                logger.info("service hosts changed, to added hosts:" + toAddHpSet);
                logger.info("service hosts changed, to removed hosts:" + toRemoveHpSet);
            }
            for (HostInfo hostPort : toAddHpSet) {
                RegistryEventListener.providerAdded(serviceName, hostPort.getHost(), hostPort.getPort(),
                        hostPort.getWeight());
            }
            for (HostInfo hostPort : toRemoveHpSet) {
                RegistryEventListener.providerRemoved(serviceName, hostPort.getHost(), hostPort.getPort());
            }

  重点看 RegistryEventListener.providerRemoved

  ServiceProviderChangeListener的实现类有四种,都是匿名类,每个类要解决的问题都不同

  1 ClientManager中的匿名类是这么实现的

  RegistryManager # removeServiceAddress

public void removeServiceAddress(String serviceName, HostInfo hostInfo) {
        Set<HostInfo> hostInfos = referencedServiceAddresses.get(serviceName);
        if (hostInfos == null || !hostInfos.contains(hostInfo)) {
            logger.info("address:" + hostInfo + " is not in address list of service " + serviceName);
            return;
        }
        hostInfos.remove(hostInfo);
        logger.info("removed address:" + hostInfo + " from service:" + serviceName);

        HostInfo cachedHostInfo = referencedAddresses.get(hostInfo.getConnect());
        if (cachedHostInfo != null) {
            cachedHostInfo.removeService(serviceName);
        }

        // If server is not referencd any more, remove from server list
        if (!isAddressReferenced(hostInfo)) {
            referencedAddresses.remove(hostInfo.getConnect());
        }
    }

上面的代码很好理解,就是把缓存的服务端信息清理掉

2  ClusterListenerManager # InnerServiceProviderChangeListener

class InnerServiceProviderChangeListener implements ServiceProviderChangeListener {
        @Override
        public void hostWeightChanged(ServiceProviderChangeEvent event) {
        }

        @Override
        public void providerAdded(ServiceProviderChangeEvent event) {
        }

        @Override
        public void providerRemoved(ServiceProviderChangeEvent event) {
            // addConnect的逆操作
            String connect = NetUtils.toAddress(event.getHost(), event.getPort());
            if (logger.isInfoEnabled()) {
                logger.info("[cluster-listener-mgr] remove:" + connect + " from " + event.getServiceName());
            }
            ConnectInfo cmd = connectInfoMap.get(connect);
            if (cmd != null) {
                cmd.getServiceNames().remove(event.getServiceName());
                if (cmd.getServiceNames().size() == 0) {
                    connectInfoMap.remove(connect);
                }
            }
            for (ClusterListener listener : listeners) {
                listener.doNotUse(event.getServiceName(), event.getHost(), event.getPort());
            }
        }
    }

  DefaultClusterListener # doNotUse

public void doNotUse(String serviceName, String host, int port) {
        if (logger.isInfoEnabled()) {
            logger.info("[cluster-listener] do not use service provider:" + serviceName + ":" + host + ":" + port);
        }
        List<Client> cs = serviceClients.get(serviceName);
        List<Client> newCS = new CopyOnWriteArrayList<Client>();
        if (cs != null && !cs.isEmpty()) {
            newCS.addAll(cs);
        }
        Client clientFound = null;
        for (Client client : cs) {
            if (client != null && client.getHost() != null && client.getHost().equals(host) && client.getPort() == port) {
                newCS.remove(client);
                clientFound = client;
            }
        }
        serviceClients.put(serviceName, newCS);//还是清理缓存

        // 一个client可能对应多个serviceName,仅当client不被任何serviceName使用时才关闭
        if (clientFound != null) {
            if (!isClientInUse(clientFound)) {
                allClients.remove(clientFound.getAddress());
                RequestQualityManager.INSTANCE.removeClientQualities(clientFound.getAddress());//统计质量信息也就是请求失败次数等,清理掉
                closeClientInFuture(clientFound);//关闭tcp连接
            }
        }
    }

上下的两个内名实现类就略过了,一个是空实现,一个是清理权重信息

其实,除此之外,当客户端启动的时候,会和每一个服务端都建立连接,同时和每个服务端都有心跳检测,如果心跳检测失败,客户端也会执行清理缓存和断开连接的操作。

原文地址:https://www.cnblogs.com/juniorMa/p/14863087.html