curator.zookeeper

1.引入pom依赖

     <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>2.9.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.9.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.12</version>
        </dependency>

2.创建工具类

package com.awifi.capacity.controller;

import com.alibaba.fastjson.JSON;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import java.util.List;

/**
 * @program: analysis-cloud
 * @description:
 * @author: huang wei
 * @create: 2020-06-12 11:51
 */
public class ZookeeperFactory {
    public static String zookeeperConnectionString = "localhost:32771";
    public static CuratorFramework client;

    public static CuratorFramework create() {
        if (client != null) {
            return client;
        }

        // 重试机制 1 秒钟连接 1 次,连接 3 次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
        client.start();
        return client;
    }

    public static void main(String[] args) throws Exception {
//        CuratorFramework client = create();
//        client.create().forPath("/hwTest");

        ZookeeperFactory zookeeperFactory = new ZookeeperFactory();
        zookeeperFactory.create();

        GetChildrenBuilder children = client.getChildren();
//        zookeeperFactory.addZNode("/hwTest","测试数据");
        List<String> list = zookeeperFactory.getChildren("/");
        System.out.println(JSON.toJSONString(list));
    }

    /** 关闭zk客户端 */
    public void stop() {
        client.close();
    }

    /** 获取zk客户端 */
    public CuratorFramework getClient() {
        return client;
    }

    /** 添加节点 */
    public void addZNode(String path, String data) throws Exception {
        //持久化节点:PERSISTENT  持久化排序节点:PERSISTENT_SEQUENTIAL  临时节点:EPHEMERAL  临时排序节点:EPHEMERAL_SEQUENTIAL
        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, data.getBytes("UTF-8"));
    }

    /** 获取子节点 */
    public List<String> getChildren(String path) throws Exception {
//        return client.getChildren().watched().forPath(path);
        client.getChildren().storingStatIn(new Stat()).forPath(path);
        return client.getChildren().forPath(path);
    }

    /** 获取子节点数量 */
    public int getChildrenCount(String path) throws Exception {
        return getChildren(path).size();
    }

    /** 递归删除节点 */
    public void deleteZNode(String path) throws Exception {
        client.delete().deletingChildrenIfNeeded().forPath(path);
    }

    /** 更新节点数据 */
    public void updateZNode(String path, String data) throws Exception {
        client.setData().forPath(path, data.getBytes("UTF-8"));
    }

    /** 判断节点是否存在 */
    public boolean nodeIsExist(String path) throws Exception {
        Stat stat = client.checkExists().creatingParentContainersIfNeeded().forPath(path);
        return stat != null;
    }

    /** 判断节点是否存在 */
    public boolean isNodeExist(String path) throws Exception {
        Stat stat = client.checkExists().forPath(path);
        if (stat == null) {
            return false;
        } else {
            return true;
        }
    }

    /** 获取节点数据 */
    public String getZNodeData(String path) throws Exception {
        byte[] data = client.getData().forPath(path);
        return new String(data);
    }

    /**
     * Path Cache用来监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时,
     * Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态,
     * 而状态的更变将通过PathChildrenCacheListener通知。
     * @param path                      监听节点
     * @param cache                     是否缓存
     * @param pathChildrenCacheListener 监听执行业务处理函数
     * @throws Exception
     */
    public void registerPathChildrenCache(String path, boolean cache, PathChildrenCacheListener pathChildrenCacheListener) throws Exception {
        // 如果new PathChildrenCache(client, path, true)中的参数cacheData值设置为false,
        // 则示例中的event.getData().getData()、data.getData()将返回null,cache将不会缓存节点数据。
        PathChildrenCache watcher = new PathChildrenCache(client, path, cache);
        /*PathChildrenCacheListener pathChildrenCacheListener = (client1, event) -> {
            ChildData data = event.getData();
            if (data == null) {
                System.out.println("No data in event[" + event + "]");
            } else {
                System.out.println("Receive event: "
                        + "type=[" + event.getType() + "]"
                        + ", path=[" + data.getPath() + "]"
                        + ", data=[" + new String(data.getData()) + "]"
                        + ", stat=[" + data.getStat() + "]");
            }
        };*/
        watcher.getListenable().addListener(pathChildrenCacheListener);
        watcher.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
    }

    /**
     * Node Cache只是监听某一个特定的节点
     * @param path              监听节点
     * @param nodeCacheListener 监听执行业务处理函数
     * @throws Exception
     */
    public void registerNodeCache(String path, NodeCacheListener nodeCacheListener) throws Exception {
        NodeCache nodeCache = new NodeCache(client, path);
        nodeCache.getListenable().addListener(nodeCacheListener);
        nodeCache.start();
    }

    /**
     * Tree Cache可以监控整个树上的所有节点,类似于PathCache和NodeCache的组合
     * @param path              监听节点
     * @param treeCacheListener 监听执行业务处理函数
     * @throws Exception
     */
    public void registerTreeCache(String path, TreeCacheListener treeCacheListener) throws Exception {
        TreeCache treeCache = new TreeCache(client, path);
        treeCache.getListenable().addListener(treeCacheListener);
        treeCache.start();
    }
}
原文地址:https://www.cnblogs.com/weihuang6620/p/13100069.html