zookeeper 实现注册中心 (关闭心跳日志)

分布式注册配置中心:

zookeeper由于拥有watcher机制,使得其拥有发布订阅的功能,而发布与订阅模型,即所谓的配置中心,

顾名思义就是发布者将数据发布到 ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。

应用在启动的时候会主动来获取一次配置,同时,在节点上注册一个 Watcher,这样一来,以后每次配置有更新的时候,

都会实时通知到订阅的客户端,从来达到获取最新配置信息的目的。

<!-- 添加springboot对 zookeeper 的支持 -->

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.12</version>
</dependency>

由于客户端需要与zookeeper建立连接,获取数据,添加监控等等一系列的事情,

所以这里封装一个Utils工具类。

然后对于zookeeper连接客户端的地址的后面可以紧跟一个path,作为在根目录下的工作目录。

该目录就是作为所有操作的根目录,这里使用 /zkRegistry

监听的是该节点下的  conf  节点,

由于zookeeper采用的是异步调用,所以这里需要使用一把锁锁住主线程,在连接成功后自动解锁,

主线程再往下进行。这里使用CountDownLatch实现锁,在主线程创建,传递到DafaultWatcher的回掉函数中。

package com..zookeeper.zkRegistry;

import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;

public class ZkClientUtil {

    private static ZooKeeper zooKeeper;

    private static DefaultWatcher defaultWatcher = new DefaultWatcher();

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    private static String address = "localhost:2181,localhost:2182,localhost:2183,localhost:2184/zkRegistry";


    public static ZooKeeper getZkClient() throws Exception {

        zooKeeper = new ZooKeeper(address,1000,defaultWatcher);
        
        //等待 链接 CountDownLatch -1
        defaultWatcher.setCountDownLatch(countDownLatch);
        //继续执行
        countDownLatch.await(); 

        return zooKeeper;

    }
}

同时由于zookeeper基于watch机制实现发布订阅,所有的watcher都采用自定义的方式实现,

首先是对连接成功的时候的DefaultWatcher。

package com..zookeeper.zkRegistry;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

import java.util.concurrent.CountDownLatch;

public class DefaultWatcher implements Watcher {

    private CountDownLatch countDownLatch;

    public void setCountDownLatch(CountDownLatch countDownLatch){

        this.countDownLatch = countDownLatch;
    }



    @Override
    public void process(WatchedEvent watchedEvent) {

        switch (watchedEvent.getState()) {
            case Unknown:
                break;
            case Disconnected:
                break;
            case NoSyncConnected:
                break;
            case SyncConnected:
                //初始化完成 -1
                countDownLatch.countDown();
                break;
            case AuthFailed:
                break;
            case ConnectedReadOnly:
                break;
            case SaslAuthenticated:
                break;
            case Expired:
                break;
        }
        System.out.println(watchedEvent.toString());

    }

}

创建一个接收数据的实体类对象:

package com..zookeeper.zkRegistry;

public class MyConfData {

    private String data;

    public String getData(){

        return data;
    }

    public void setData(String data){

        this.data = data;
    }
}

封装好所有的watcher和回调函数::

package com..zookeeper.zkRegistry;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.CountDownLatch;

public class MyWatcherAndCallBack implements Watcher,AsyncCallback.StatCallback,AsyncCallback.DataCallback {

    private ZooKeeper zooKeeper;

    private MyConfData myConfData;

    private CountDownLatch countDownLatch = new CountDownLatch(1);

    public void setZooKeeper(ZooKeeper zooKeeper){

        this.zooKeeper = zooKeeper;
    }

    public void setMyConfData(MyConfData myConfData){

        this.myConfData = myConfData;
    }



    //Watcher
    @Override
    public void process(WatchedEvent watchedEvent) {

        switch (watchedEvent.getType()) {
            case None:
                break;
            case NodeCreated:
                zooKeeper.getData("/conf",this,this,"第二次获取数据");
                break;
            case NodeDeleted:
                myConfData.setData("");
                countDownLatch = new CountDownLatch(1);
                break;
            case NodeDataChanged:
                zooKeeper.getData("/conf",this,this,"第二次获取数据");
                break;
            case NodeChildrenChanged:
                break;
        }

    }


    //StatCallback
    @Override
    public void processResult(int i, String s, Object o, Stat stat) {

        zooKeeper.getData("/conf",this,this,"第一次获取数据");


    }

    //DataCallback
    @Override
    public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {

        if(stat!=null){
            System.out.println("取得数据 : "+new String(bytes));
            myConfData.setData(new String(bytes));

            countDownLatch.countDown();
        }
    }

    public void await() throws InterruptedException {

        zooKeeper.exists("/conf",this,this,"127.0.0.1");

        countDownLatch.await();
    }
}

测试类:

(可直接运行 getConfData() 方法,跑项目就放开注调的代码和注解,注@Test)

package com..zookeeper.zkRegistry;

import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

//@Component
public class ZkClientConfing {

    private ZooKeeper zooKeeper;

    private MyConfData myConfData = new MyConfData();

    private MyWatcherAndCallBack myWatcherAndCallBack = new MyWatcherAndCallBack();


    @Before
    public void conn() throws Exception {

        zooKeeper = ZkClientUtil.getZkClient();
    }


    @After
    public void close() throws InterruptedException {

        zooKeeper.close();
    }


    @Test
//    @PostConstruct
    public void getConfData() throws InterruptedException {

//        try {
//            zooKeeper = ZkClientUtil.getZkClient();
//        } catch (Exception e) {
//            e.printStackTrace();
//        }

        myWatcherAndCallBack.setZooKeeper(zooKeeper);

        myWatcherAndCallBack.setMyConfData(myConfData);

        myWatcherAndCallBack.await();

        while (true){

            if(myConfData.getData().equals("")){
                System.out.println("配置中心数据为空!!!!!");
                myWatcherAndCallBack.await();//等待数据输入
            }
            //为了观察数据的变化,这里循环打印设置的数据。
            System.out.println("数据 : "+myConfData.getData());

            Thread.sleep(30000);
        }

    }
}

logback 关闭zookeeper的心跳日志:

<configuration> 

  <logger name="org.apache.zookeeper.ClientCnxn" level="info" />  

</configuration> 

name 为包名

level  为日志级别
原文地址:https://www.cnblogs.com/lifan12589/p/14765386.html