springboot整合zookeeper

在springboot中所有的整合都是以bean的形式注入对象,从数据库coon、redis conn、再到整合的zookeeper,依然是依照bean注入连接对象,通过zookeeper api对zookeeper中node 数据进行增删改查等操作,从而实现配置同步。这篇文章只是初步使用web服务,在服务启动时注册服务,并将配置文件内容写入zookeeper,通过api接口获取配置内容。至于多节点配置文件同步一致性,则是以后需要深入研究的主题。

在zookeeper_caesar创建两个模块,core和client,core中存放zookeeper连接和相关操作方法,client是正常的web服务。在springboot分层思想中core相当于DAO层,进行zookeeper操作,在client的service层和controller层进行调用和处理。

其中client依赖core模块,在其pom.xml中添加core模块信息,其后在client中添加spring模块spring-boot-starter-web(spring对servlet封装的模块和嵌入式tomcat)

        <dependency>
            <groupId>com.soft.caesar</groupId>
            <artifactId>core</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

1.client分析

 

在RegistryConfig.java中创建需要的bean  serviceRegistry 即zookeeper连接对象 zk = new ZooKeeper(zkServers,SESSION_TIMEOUT,this)

在TestController.java中调用serviceRestry即core中定义操作getValue,获取zookeeper中数据

在WebListener.java中监听web服务启动,启动时将服务存入zookeeper

ClientApplication.py 启动服务主函数

2.core分析

@Component
public class ServiceRegistryImpl implements ServiceRegistry,Watcher {

    private static CountDownLatch latch = new CountDownLatch(1); # 多线程时,等待,直到一个线程时,在latch被唤醒
    private ZooKeeper zk;
    private static final int SESSION_TIMEOUT=5000;
    private static final String REGISTRY_PATH = "/registry";

    public ServiceRegistryImpl() {
    }

    public ServiceRegistryImpl(String zkServers) {
        try {
            zk = new ZooKeeper(zkServers,SESSION_TIMEOUT,this);
            latch.await();# latch等待唤醒
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    @Override
    public void register(String serviceName, String serviceAddress) {
        try {
            String registryPath = REGISTRY_PATH;
            if (zk.exists(registryPath, false) == null) {
                zk.create(registryPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); #持久化创建/registry node
            }
            //创建服务节点(持久节点)
            String servicePath = registryPath + "/" + serviceName;
            if (zk.exists(servicePath, false) == null) {
                zk.create(servicePath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            //创建地址节点
            String addressPath = servicePath + "/address-"; # 此处节点是瞬态的节点,当服务断开zookeeper连接时,节点消失,重新连接时,address- 以序列添加末尾序列值。
这种序列方法可以判断注册服务的主被,先注册的数字小,后注册的数字大,每次从主上同步数据到被。在服务异常时,节点自动消失,可以探测服务状态 String addressNode = zk.create(addressPath, serviceAddress.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } catch (Exception e){ e.printStackTrace(); } } @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) latch.countDown(); }

验证

启动zookeeper,启动以上web服务

java客户端登录zookeeper,查询注册服务的注册信息

调用接口查询zookeeper数据

 以上是关于zookeeper的初步探索,可以参考https://github.com/CaesarLinsa/zookeeper_caesar,在version1.0分支中去掉core模块,添加到service层中,添加对zookeeper操作接口,实现在接口修改zookeeper同时,配置文件发生变更。当然如此需要每个服务的守护进程中存在类似socket通信,在server端发生变化时,在watch中向守护进程中发送相关命令,促使配置变更,服务启动或者不启动加载配置。

原文地址:https://www.cnblogs.com/CaesarLinsa/p/9733966.html