zookeeper记录4(Apache Curator客户端的使用)

录:

1、常用的zk java客户端
2、搭建maven工程,建立curator与zkserver的连接
3、zk命名空间以及创建节点
4、修改节点数据以及删除节点
5、读取节点数据、节点下面子节点列表、判断节点是否存在
6、一次性监听--curator之usingWatcher
7、curator之nodeCache一次注册N次监听
8、curator之PathChildrenCache子节点监听
9、curator之acl权限操作与认证授权

1、常用的zk java客户端    <--返回目录

1)zk原生api
    不足之处:超时重连不支持自动,需要手动操作:watch注册一次后会失效;不支持递归创建节点;
2)zkclient
3)apache curator
    apache的开源项目
    解决watcher注册一次就失效的
    api更加简单易用
    提供更多解决方案并且实现简单,比如分布式锁
    提供常用的zookeeper工具类

2、搭建maven工程,建立curator与zkserver的连接    <--返回目录

  依赖

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.11</version>
</dependency>

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.0</version>
</dependency>

  CuratorOperator

package com.oy.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CuratorOperator {
    private static final Logger log = LoggerFactory.getLogger(CuratorOperator.class);
    private CuratorFramework client = null;
    private static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";

    public CuratorOperator() {
        // 参数1 重试次数; 参数2 每次重试间隔的时间
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath).sessionTimeoutMs(20000)
                .retryPolicy(retryPolicy).build();
        client.start();
    }

    /**
     * 测试客户端连接
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        CuratorOperator curatorOperator = new CuratorOperator();
        boolean started = curatorOperator.client.isStarted();
        log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));

        new Thread().sleep(5000);
        curatorOperator.closeZKClient();
        boolean started1 = curatorOperator.client.isStarted();
        log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
    }

    /**
     * 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) client.close();
    }
}

3、zk命名空间以及创建节点    <--返回目录

package com.oy.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CuratorOperator {
    private static final Logger log = LoggerFactory.getLogger(CuratorOperator.class);
    private CuratorFramework client = null;
    private static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";

    public CuratorOperator() {
        // 参数1 重试次数; 参数2 每次重试间隔的时间
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath).sessionTimeoutMs(20000)
                .retryPolicy(retryPolicy).namespace("workspace").build();
        client.start();
    }

    /**
     * 测试客户端连接
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        CuratorOperator curatorOperator = new CuratorOperator();
        boolean started = curatorOperator.client.isStarted();
        log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));

        // 创建节点
        String nodePath = "/super/son1";
        byte[] data = "testnode".getBytes();
        curatorOperator.client.create().creatingParentContainersIfNeeded()
                .withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath(nodePath, data);

        new Thread().sleep(5000);
        curatorOperator.closeZKClient();
        boolean started1 = curatorOperator.client.isStarted();
        log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
    }

    /**
     * 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) client.close();
    }
}

4、修改节点数据以及删除节点    <--返回目录

  更新节点数据

public static void main(String[] args) throws Exception {
    CuratorOperator curatorOperator = new CuratorOperator();
    boolean started = curatorOperator.client.isStarted();
    log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));

    // 创建节点
    String nodePath = "/super/son1";
//        byte[] data = "testnode".getBytes();
//        curatorOperator.client.create().creatingParentContainersIfNeeded()
//                .withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
//                .forPath(nodePath, data);

    // 更新节点数据
    byte[] newData = "newtestnode".getBytes();
    curatorOperator.client.setData().withVersion(0).forPath(nodePath, newData);

    new Thread().sleep(5000);
    curatorOperator.closeZKClient();
    boolean started1 = curatorOperator.client.isStarted();
    log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
}

  删除节点

public static void main(String[] args) throws Exception {
    CuratorOperator curatorOperator = new CuratorOperator();
    boolean started = curatorOperator.client.isStarted();
    log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));

    // 创建节点
    String nodePath = "/super/son1";
//        byte[] data = "testnode".getBytes();
//        curatorOperator.client.create().creatingParentContainersIfNeeded()
//                .withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
//                .forPath(nodePath, data);

    // 更新节点数据
//    byte[] newData = "newtestnode".getBytes();
//    curatorOperator.client.setData().withVersion(0).forPath(nodePath, newData);

    // 删除节点
    curatorOperator.client.delete()
            .guaranteed() // 如果删除失败,那么在后台还是继续删除,直到成功
            .deletingChildrenIfNeeded() // 如果有子节点,也删除
            .forPath(nodePath);

    new Thread().sleep(5000);
    curatorOperator.closeZKClient();
    boolean started1 = curatorOperator.client.isStarted();
    log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
}

  测试删除前,在/workspace/super/son1下面创建子节点,删除son1的时候,下面的子节点也被删除了

5、读取节点数据、节点下面子节点列表、判断节点是否存在    <--返回目录

package com.oy.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class CuratorOperator {
    private static final Logger log = LoggerFactory.getLogger(CuratorOperator.class);
    private CuratorFramework client = null;
    private static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";

    public CuratorOperator() {
        // 参数1 重试次数; 参数2 每次重试间隔的时间
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath).sessionTimeoutMs(20000)
                .retryPolicy(retryPolicy).namespace("workspace").build();
        client.start();
    }

    /**
     * 测试客户端连接
     * @param args
     * @throws Exception
     */
public static void main(String[] args) throws Exception {
    CuratorOperator curatorOperator = new CuratorOperator();
    boolean started = curatorOperator.client.isStarted();
    log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));

    // 创建节点
    String nodePath = "/super/son1";
//        byte[] data = "testnode".getBytes();
//        curatorOperator.client.create().creatingParentContainersIfNeeded()
//                .withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
//                .forPath(nodePath, data);

    // 更新节点数据
//    byte[] newData = "newtestnode".getBytes();
//    curatorOperator.client.setData().withVersion(0).forPath(nodePath, newData);

    // 删除节点
//    curatorOperator.client.delete()
//            .guaranteed() // 如果删除失败,那么在后台还是继续删除,直到成功
//            .deletingChildrenIfNeeded() // 如果有子节点,也删除
//            .forPath(nodePath);

    // 读取节点数据
    Stat stat = new Stat();
    byte[] nodeData = curatorOperator.client.getData().storingStatIn(stat).forPath(nodePath);
    log.warn(nodePath + "节点数据: {}, 版本: {}", new String(nodeData), stat.getVersion());

    // 查询节点下面的子节点列表
    List<String> childNodes = curatorOperator.client.getChildren().forPath(nodePath);
    for (String child : childNodes) {
        log.warn(child);
    }

    // 判断节点是否存在,如果不存在则为空
    Stat stat1 = curatorOperator.client.checkExists().forPath(nodePath + "/xxx");
    log.warn("stat1: {}", stat1);

    new Thread().sleep(5000);
    curatorOperator.closeZKClient();
    boolean started1 = curatorOperator.client.isStarted();
    log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
}

    /**
     * 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) client.close();
    }
}

6、一次性监听--curator之usingWatcher    <--返回目录

public static void main(String[] args) throws Exception {
    CuratorOperator curatorOperator = new CuratorOperator();
    boolean started = curatorOperator.client.isStarted();
    log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));

    String nodePath = "/super/son1";

    // watcher事件,当使用usingWatcher时,监听只会触发一次,监听完毕后就销毁
    curatorOperator.client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
    //curatorOperator.client.getData().usingWatcher(new MyWatcher()).forPath(nodePath);

    new Thread().sleep(50000);
    curatorOperator.closeZKClient();
    boolean started1 = curatorOperator.client.isStarted();
    log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
}

  测试:通过客户端zkCli.sh 多次修改 set /workspace/super/son1 bbb, 控制台只打印一次watcher监听结果。

7、curator之nodeCache一次注册N次监听    <--返回目录

package com.oy.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class CuratorOperator {
    private static final Logger log = LoggerFactory.getLogger(CuratorOperator.class);
    private CuratorFramework client = null;
    private static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";

    public CuratorOperator() {
        // 参数1 重试次数; 参数2 每次重试间隔的时间
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath).sessionTimeoutMs(20000)
                .retryPolicy(retryPolicy).namespace("workspace").build();
        client.start();
    }

    /**
     * 测试客户端连接
     * @param args
     * @throws Exception
     */
public static void main(String[] args) throws Exception {
    CuratorOperator curatorOperator = new CuratorOperator();
    boolean started = curatorOperator.client.isStarted();
    log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));

    String nodePath = "/super/son1";

    // watcher事件,当使用usingWatcher时,监听只会触发一次,监听完毕后就销毁
    //curatorOperator.client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
    //curatorOperator.client.getData().usingWatcher(new MyWatcher()).forPath(nodePath);

    // NodeCache: 监听数据节点的变化,会触发事件
    final NodeCache nodeCache = new NodeCache(curatorOperator.client, nodePath);
    // buildInital: 为true则在初始化时获取node的值并缓存
    nodeCache.start(true);
    if (nodeCache.getCurrentData() != null) {
       log.warn("节点初始化数据为:{}", new String(nodeCache.getCurrentData().getData()));
    } else {
        log.warn("节点初始化数据为空");
    }
    nodeCache.getListenable().addListener(new NodeCacheListener() {
        public void nodeChanged() throws Exception {
            // 注意:删除节点时下面代码nodeCache.getCurrentData()==null
            String data = new String(nodeCache.getCurrentData().getData());
            log.warn("节点路径{}的数据:{}", nodeCache.getPath(), data);
        }
    });

    new Thread().sleep(50000);
    curatorOperator.closeZKClient();
    boolean started1 = curatorOperator.client.isStarted();
    log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
}

    /**
     * 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) client.close();
    }
}

  测试:

  1)启动main方法

  2)客户端修改/workspace/super/son1的值 set /workspace/super/son1 eee/fff/ggg

8、curator之PathChildrenCache子节点监听    <--返回目录

package com.oy.curator;

import javafx.scene.shape.Path;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class CuratorOperator {
    private static final Logger log = LoggerFactory.getLogger(CuratorOperator.class);
    private CuratorFramework client = null;
    private static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";

    public CuratorOperator() {
        // 参数1 重试次数; 参数2 每次重试间隔的时间
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath).sessionTimeoutMs(20000)
                .retryPolicy(retryPolicy).namespace("workspace").build();
        client.start();
    }

    /**
     * 测试客户端连接
     * @param args
     * @throws Exception
     */
public static void main(String[] args) throws Exception {
    CuratorOperator curatorOperator = new CuratorOperator();
    boolean started = curatorOperator.client.isStarted();
    log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));

    String nodePath = "/super/son1";
    // 参数3 cacheData 设置缓存节点的数据状态
    PathChildrenCache childrenCache = new PathChildrenCache(curatorOperator.client, nodePath, true);
    // StartMode 初始化方式
    // POST_INITIALIZED_EVENT: 异步初始化,初始化之后会触发事件
    // NORMAL: 异步初始化,初始化后不触发事件; BUILD_INITIAL_CACHE: 同步初始化
    childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

    // 只有同步初始化,下面才能获取子节点数据
    List<ChildData> childDataList = childrenCache.getCurrentData();
    log.warn("当前数据节点的子节点数据列表:");
    for (ChildData cd : childDataList) {
        log.warn(new String(cd.getData()));
    }

    childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
                log.warn("字节点初始化完成"); // childrenCache.start(POST_INITIALIZED_EVENT) 异步初始化完成后触发调用
            }
            else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { // 添加子节点。初始化完成时,有几个子节点,这个CHILD_ADDED就会触发几次
                log.warn("添加子节点:{}", event.getData().getPath());
                log.warn("子节点数据:{}", new String(event.getData().getData()));
            } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { // 修改子节点数据
            } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { // 删除子节点
            }
        }
    });

    new Thread().sleep(50000);
    curatorOperator.closeZKClient();
    boolean started1 = curatorOperator.client.isStarted();
    log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
}

    /**
     * 关闭zk客户端连接
     */
    public void closeZKClient() {
        if (client != null) client.close();
    }
}

 9、curator之acl权限操作与认证授权    <--返回目录

// 参数1 重试次数; 参数2 每次重试间隔的时间
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
CuratorFramework client = CuratorFrameworkFactory.builder().authorization("digest", "user:password".getBytes())
        .connectString(zkServerPath).sessionTimeoutMs(20000)
        .retryPolicy(retryPolicy).namespace("workspace").build();
client.start();
// 自定义用户认证访问
ArrayList<ACL> acls = new ArrayList<>();
Id userPwd1 = new Id("digest", AclUtils.getDigestUserPwd("zhangsan1:123"));
Id userPwd2 = new Id("digest", AclUtils.getDigestUserPwd("zhangsan2:123"));
acls.add(new ACL(ZooDefs.Perms.ALL, userPwd1));
acls.add(new ACL(ZooDefs.Perms.READ, userPwd2));
acls.add(new ACL(ZooDefs.Perms.DELETE | ZooDefs.Perms.DELETE, userPwd2));

// 创建节点, 使用自定义的权限列表
String nodePath = "/super/son1";
byte[] data = "testnode".getBytes();
curatorOperator.client.create().creatingParentContainersIfNeeded()
        .withMode(CreateMode.PERSISTENT).withACL(acls)
        .forPath(nodePath, data);

---

原文地址:https://www.cnblogs.com/xy-ouyang/p/14927222.html