1.引入pom依赖
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>2.9.1</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.9.1</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.9.1</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.12</version> </dependency>
2.创建工具类
package com.awifi.capacity.controller; import com.alibaba.fastjson.JSON; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.GetChildrenBuilder; import org.apache.curator.framework.recipes.cache.*; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import java.util.List; /** * @program: analysis-cloud * @description: * @author: huang wei * @create: 2020-06-12 11:51 */ public class ZookeeperFactory { public static String zookeeperConnectionString = "localhost:32771"; public static CuratorFramework client; public static CuratorFramework create() { if (client != null) { return client; } // 重试机制 1 秒钟连接 1 次,连接 3 次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); client.start(); return client; } public static void main(String[] args) throws Exception { // CuratorFramework client = create(); // client.create().forPath("/hwTest"); ZookeeperFactory zookeeperFactory = new ZookeeperFactory(); zookeeperFactory.create(); GetChildrenBuilder children = client.getChildren(); // zookeeperFactory.addZNode("/hwTest","测试数据"); List<String> list = zookeeperFactory.getChildren("/"); System.out.println(JSON.toJSONString(list)); } /** 关闭zk客户端 */ public void stop() { client.close(); } /** 获取zk客户端 */ public CuratorFramework getClient() { return client; } /** 添加节点 */ public void addZNode(String path, String data) throws Exception { //持久化节点:PERSISTENT 持久化排序节点:PERSISTENT_SEQUENTIAL 临时节点:EPHEMERAL 临时排序节点:EPHEMERAL_SEQUENTIAL client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, data.getBytes("UTF-8")); } /** 获取子节点 */ public List<String> getChildren(String path) throws Exception { // return client.getChildren().watched().forPath(path); client.getChildren().storingStatIn(new Stat()).forPath(path); return client.getChildren().forPath(path); } /** 获取子节点数量 */ public int getChildrenCount(String path) throws Exception { return getChildren(path).size(); } /** 递归删除节点 */ public void deleteZNode(String path) throws Exception { client.delete().deletingChildrenIfNeeded().forPath(path); } /** 更新节点数据 */ public void updateZNode(String path, String data) throws Exception { client.setData().forPath(path, data.getBytes("UTF-8")); } /** 判断节点是否存在 */ public boolean nodeIsExist(String path) throws Exception { Stat stat = client.checkExists().creatingParentContainersIfNeeded().forPath(path); return stat != null; } /** 判断节点是否存在 */ public boolean isNodeExist(String path) throws Exception { Stat stat = client.checkExists().forPath(path); if (stat == null) { return false; } else { return true; } } /** 获取节点数据 */ public String getZNodeData(String path) throws Exception { byte[] data = client.getData().forPath(path); return new String(data); } /** * Path Cache用来监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, * Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态, * 而状态的更变将通过PathChildrenCacheListener通知。 * @param path 监听节点 * @param cache 是否缓存 * @param pathChildrenCacheListener 监听执行业务处理函数 * @throws Exception */ public void registerPathChildrenCache(String path, boolean cache, PathChildrenCacheListener pathChildrenCacheListener) throws Exception { // 如果new PathChildrenCache(client, path, true)中的参数cacheData值设置为false, // 则示例中的event.getData().getData()、data.getData()将返回null,cache将不会缓存节点数据。 PathChildrenCache watcher = new PathChildrenCache(client, path, cache); /*PathChildrenCacheListener pathChildrenCacheListener = (client1, event) -> { ChildData data = event.getData(); if (data == null) { System.out.println("No data in event[" + event + "]"); } else { System.out.println("Receive event: " + "type=[" + event.getType() + "]" + ", path=[" + data.getPath() + "]" + ", data=[" + new String(data.getData()) + "]" + ", stat=[" + data.getStat() + "]"); } };*/ watcher.getListenable().addListener(pathChildrenCacheListener); watcher.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); } /** * Node Cache只是监听某一个特定的节点 * @param path 监听节点 * @param nodeCacheListener 监听执行业务处理函数 * @throws Exception */ public void registerNodeCache(String path, NodeCacheListener nodeCacheListener) throws Exception { NodeCache nodeCache = new NodeCache(client, path); nodeCache.getListenable().addListener(nodeCacheListener); nodeCache.start(); } /** * Tree Cache可以监控整个树上的所有节点,类似于PathCache和NodeCache的组合 * @param path 监听节点 * @param treeCacheListener 监听执行业务处理函数 * @throws Exception */ public void registerTreeCache(String path, TreeCacheListener treeCacheListener) throws Exception { TreeCache treeCache = new TreeCache(client, path); treeCache.getListenable().addListener(treeCacheListener); treeCache.start(); } }