nacos源码解析(二)-客户端如何访问注册中心

概述

  客户端与注册中心服务端的交互,主要集中在服务注册,服务下线,服务发现以及订阅某个服务,其实使用最多的就是服务注册和服务发现,下面我会从源码的角度分析一下这四个功能,客户端是如何处理的,本文不会介绍注册中心服务端如何处理的,这个之后会写文章分析。

客户端代码

public class NamingExample {

    public static void main(String[] args) throws NacosException {

        Properties properties = new Properties();
        properties.setProperty("serverAddr","127.0.0.1:8848");
        properties.setProperty("namespace", "namespace");

        NamingService naming = NamingFactory.createNamingService(properties);
        //服务注册
        naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "TEST1");

        naming.registerInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");
        //服务发现
        System.out.println(naming.getAllInstances("nacos.test.3"));

        System.out.println("----------------------------------------------------");
        //服务下线
        naming.deregisterInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");

        System.out.println(naming.getAllInstances("nacos.test.3"));

        System.out.println("----------------------------------------------------");
        //服务订阅
        naming.subscribe("nacos.test.3", new EventListener() {
            @Override
            public void onEvent(Event event) {
                System.out.println(((NamingEvent)event).getServiceName());

                System.out.println("======================================================");
                System.out.println(((NamingEvent)event).getInstances());
            }
        });

        try{
          Thread.sleep(5000000);
        } catch (Exception e){

        }

    }
}
View Code

服务注册分析

@Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {

        if (instance.isEphemeral()) {
            BeatInfo beatInfo = new BeatInfo();
            beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
            beatInfo.setIp(instance.getIp());
            beatInfo.setPort(instance.getPort());
            beatInfo.setCluster(instance.getClusterName());
            beatInfo.setWeight(instance.getWeight());
            beatInfo.setMetadata(instance.getMetadata());
            beatInfo.setScheduled(false);
            beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
            //为注册服务设置一个定时任务获取心跳信息,默认为5s汇报一次
            beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
        }
        //注册到服务端
        serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
    }

注册主要做了两件事,第一件事:为注册的服务设置一个定时任务,定时拉去服务信息。 第二件事:将服务注册到服务端。下面详细介绍一下这两个事,不会贴出源码,贴出源码太长了,大家自己看

第一件事详解

  1.启动一个定时任务,定时拉取服务信息,时间间隔为5s

  2.如果拉下来服务正常,不做处理,如果不正常,重新注册

第二件事详解

  发送http请求给注册中心服务端,调用服务注册接口,注册服务

服务发现分析

@Override
    public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {

        ServiceInfo serviceInfo;
        if (subscribe) {
            //从本地缓存中获取,如果本地缓存不存在从服务端拉取
            serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        } else {
            serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        }
        List<Instance> list;
        if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
            return new ArrayList<Instance>();
        }
        return list;
    }

服务发现流程如下:

服务下线分析

 @Override
    public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        if (instance.isEphemeral()) {
            //移除心跳信息监测的定时任务
            beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(), instance.getPort());
        }
        //请求服务端下线接口
        serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);
    }

这个和服务注册干的事情正好相反,也做了两件事,第一件事:不在进行心跳检测。  第二件事:请求服务端服务下线接口。下面详细分析一下

第一件事详解

  停止进行心跳检测,并不是把第一步的定时任务给停止了,而是设置了一个参数,当定时任务运行的时候会判断这个参数,如果为true,直接返回了,不会拉服务端的信息

第二件事详解

  请求服务端服务下线接口

服务订阅详解

@Override
    public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException {
        eventDispatcher.addListener(hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
            StringUtils.join(clusters, ",")), StringUtils.join(clusters, ","), listener);
    }

public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {

NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map");
List<EventListener> observers = Collections.synchronizedList(new ArrayList<EventListener>());
observers.add(listener);

observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers);
if (observers != null) {
observers.add(listener);
}
  
serviceChanged(serviceInfo);
}
 

服务订阅功能稍微有些复杂,不过也还好,看下面的图

 这里稍微提一下nacos事件处理机制,nacos的事件处理也是采用观察者模式实现的,具体流程如下

   nacos的监听器和springboot的监听器还不太一样,springboot是每次变更一次,主动的触发通知,然后自己处理这个事件,但是nacos不同,nacos是启动的时候就会启动一个死循环,然后这个死循环去消费队列,如果有某个服务发生了变更,就会把变更的服务信息放入到这个队列中,注意,这个队列是LinkedBlockingQueue,是一个阻塞队列,之后死循环会调用take方法获取队列中的信息,如果队列为空,会阻塞。

  上面的解释中,大家可能比较感兴趣的是,在什么地方捕获到服务发生变更了,其实在第一次服务发现的时候就会有服务变更事件,因为从服务器上拉去的信息和本地的缓存信息不一致,就会把这个变更的服务信息放入队列中。

在最开始的客户端代码中,服务订阅那一块的输出如下:

DEFAULT_GROUP@@nacos.test.3
======================================================
[{"clusterName":"DEFAULT","enabled":true,"ephemeral":true,"healthy":true,"instanceHeartBeatInterval":5000,"instanceHeartBeatTimeOut":15000,"instanceId":"2.2.2.2#9999#DEFAULT#DEFAULT_GROUP@@nacos.test.3","instanceIdGenerator":"simple","ip":"2.2.2.2","ipDeleteTimeout":30000,"metadata":{},"port":9999,"serviceName":"DEFAULT_GROUP@@nacos.test.3","weight":1.0}, {"clusterName":"TEST1","enabled":true,"ephemeral":true,"healthy":true,"instanceHeartBeatInterval":5000,"instanceHeartBeatTimeOut":15000,"instanceId":"11.11.11.11#8888#TEST1#DEFAULT_GROUP@@nacos.test.3","instanceIdGenerator":"simple","ip":"11.11.11.11","ipDeleteTimeout":30000,"metadata":{},"port":8888,"serviceName":"DEFAULT_GROUP@@nacos.test.3","weight":1.0}]
DEFAULT_GROUP@@nacos.test.3
======================================================
[{"clusterName":"TEST1","enabled":true,"ephemeral":true,"healthy":true,"instanceHeartBeatInterval":5000,"instanceHeartBeatTimeOut":15000,"instanceId":"11.11.11.11#8888#TEST1#DEFAULT_GROUP@@nacos.test.3","instanceIdGenerator":"simple","ip":"11.11.11.11","ipDeleteTimeout":30000,"metadata":{},"port":8888,"serviceName":"DEFAULT_GROUP@@nacos.test.3","weight":1.0}]

这里比较神奇的是执行了两次事件,根据上面的解释,在订阅时,无论这个服务有没有发生变更,都会把服务信息放入到变更队列中,之后会执行事件,所以第一次打印是正常的,大家可以发现第一次打印的结果中,服务名称为DEFAULT_GROUP@@nacos.test.3的服务有两个实例,我们确实注册了两个实例,分别为11.11.11.11:88882.2.2.2:9999,但是其实我们已经把2.2.2.2:9999下线了,为什么这里打印的还是两个呢?

这个就是服务发现那个地方有一个定时更新本地服务信息捣的鬼,那个定时任务是每隔1s执行一次,也就是说我们在执行这段订阅代码的时候,本地缓存还没有刷新,还是两个实例,所以打印为两个实例。

有了上面的理解,第二次执行事件就很好理解了,因为定时任务跟新了本地缓存,所以会有一个新的服务变更的事件放入到了队列中,所以又执行了一次event,为了验证我们理解的正确性,我们改一下客户端的代码,验证一下我们理解的正确性。

public class NamingExample {

    public static void main(String[] args) throws NacosException {

        Properties properties = new Properties();
        properties.setProperty("serverAddr","127.0.0.1:8848");
        properties.setProperty("namespace", "namespace");

        NamingService naming = NamingFactory.createNamingService(properties);
        //服务注册
        naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "TEST1");

        naming.registerInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");
        //服务发现
        System.out.println(naming.getAllInstances("nacos.test.3"));

        System.out.println("----------------------------------------------------");
        //服务下线
        naming.deregisterInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");

        System.out.println(naming.getAllInstances("nacos.test.3"));
       
        //加入1s的延迟
        try{
            Thread.sleep(1000);
        } catch (Exception e){

        }
        System.out.println("----------------------------------------------------");
        //服务订阅
        naming.subscribe("nacos.test.3", new EventListener() {
            @Override
            public void onEvent(Event event) {
                System.out.println(((NamingEvent)event).getServiceName());

                System.out.println("======================================================");
                System.out.println(((NamingEvent)event).getInstances());
            }
        });

        try{
          Thread.sleep(5000000);
        } catch (Exception e){

        }

    }
}
View Code

打印结果如下

DEFAULT_GROUP@@nacos.test.3
======================================================
[{"clusterName":"TEST1","enabled":true,"ephemeral":true,"healthy":true,"instanceHeartBeatInterval":5000,"instanceHeartBeatTimeOut":15000,"instanceId":"11.11.11.11#8888#TEST1#DEFAULT_GROUP@@nacos.test.3","instanceIdGenerator":"simple","ip":"11.11.11.11","ipDeleteTimeout":30000,"metadata":{},"port":8888,"serviceName":"DEFAULT_GROUP@@nacos.test.3","weight":1.0}]

我们在执行订阅之前加了1s的等待,我们发现就执行了一次事件,这就验证了我们猜想的正确性。

总结

  总的来说,觉得nacos的代码写的比较清晰,看着很舒服,由于本人水平很菜,有什么问题,望大家指正。

 

  

原文地址:https://www.cnblogs.com/gunduzi/p/13215075.html