八、zookeeper 开源客户端curator介绍

参考博客:cnblogs.com/LiZhiW/tag/ZooKeeper/

一、curator简介

  curator是Netflix公司开源的一个zookeeper客户端,后捐献给apache,curator框架在zookeeper原生API接口上进行了包装,解决了很多zooKeeper客户端非常底层的细节开发。提供zooKeeper各种应用场景(比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装,实现了Fluent风格的API接口,是最好用,最流行的zookeeper的客户端。

  原生zookeeperAPI的不足:

    • 连接对象异步创建,需要开发人员自行编码等待

    • 连接没有自动重连超时机制

    • watcher一次注册生效一次

    • 不支持递归创建树形节点

  curator特点:

    • 解决session会话超时重连

    • watcher反复注册

    • 简化开发api

    • 遵循Fluent风格的API

    • 提供了分布式锁服务、共享计数器、缓存机制等机制

  maven依赖:

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.7</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.6.0</version>
    <type>jar</type>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.10</version>
    <type>jar</type>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.6.0</version>
    <type>jar</type>
</dependency>

二、连接到ZooKeeper

public class CuratorConnection {
    private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
    private static final int TIME_OUT = 5000;
    private static final int baseSleepTimeMs = 1000;
    private static final int maxRetries = 3;

    public static void main(String[] args) {
        // session重连策略
        /**
         3秒后重连一次,只重连1次
         RetryPolicy retryPolicy = new RetryOneTime(3000);
         */
        /**
         每3秒重连一次,重连3次
         RetryPolicy retryPolicy = new RetryNTimes(3,3000);
         */
        /**
         每3秒重连一次,总等待时间超过10秒后停止重连
         RetryPolicy retryPolicy=new RetryUntilElapsed(10000,3000);
         */
        // baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount+ 1)))
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        // 创建连接对象
        CuratorFramework client = CuratorFrameworkFactory.builder()
                // IP地址端口号
                .connectString(IP)
                //会话超时时间
                .sessionTimeoutMs(TIME_OUT)
                // 重连机制
                .retryPolicy(retryPolicy)
                // 命名空间
                .namespace("create")
                // 构建连接对象
                .build();
        // 打开连接
        client.start();
        System.out.println(client.isStarted());
        // 关闭连接
        client.close();
    }
}

三、新增节点

public class CuratorCreate {
    private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
    private static final int TIME_OUT = 5000;
    private static final int baseSleepTimeMs = 1000;
    private static final int maxRetries = 3;
    CuratorFramework client;

    @Before
    public void before() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        client = CuratorFrameworkFactory.builder()
                .connectString(IP)
                .sessionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("create")
                .build();
        client.start();
    }

    @After
    public void after() {
        client.close();
    }
  // namespace+forPath 就是在ZKserver上数据节点的完整路径
    @Test
    public void create1() throws Exception {
        // 新增节点
        client.create()
                // 节点的类型
                .withMode(CreateMode.PERSISTENT)
                // 节点的权限列表 world:anyone:cdrwa
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                // arg1:节点的路径
                // arg2:节点的数据
                .forPath("/create_01", "create_01".getBytes());
   System.out.println(
"结束"); } @Test public void create2() throws Exception { // 自定义权限列表 // 权限列表 List<ACL> list = new ArrayList<ACL>(); // 授权模式和授权对象 Id id = new Id("ip", "192.168.126.143"); list.add(new ACL(ZooDefs.Perms.ALL, id)); client.create().withMode(CreateMode.PERSISTENT).withACL(list).forPath("/node2", " node2".getBytes()); System.out.println("结束"); } @Test public void create3() throws Exception { // 递归创建节点树 client.create() // 递归节点的创建 .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath("/node3/node31", "node31".getBytes()); System.out.println("结束"); } @Test public void create4() throws Exception { // 异步方式创建节点 client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) // 异步回调接口 .inBackground(new BackgroundCallback() { public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { // 节点的路径 System.out.println(curatorEvent.getPath()); // 时间类型 System.out.println(curatorEvent.getType()); } }) .forPath("/node4", "node4".getBytes()); Thread.sleep(5000); System.out.println("结束"); } }

四、更新节点

public class CuratorSet {

    private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
    private static final int TIME_OUT = 5000;
    private static final int baseSleepTimeMs = 1000;
    private static final int maxRetries = 3;
    CuratorFramework client;

    @Before
    public void before() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        client = CuratorFrameworkFactory.builder()
                .connectString(IP)
                .sessionTimeoutMs(TIME_OUT)
                .retryPolicy(retryPolicy)
                .namespace("set").build();
        client.start();
    }

    @After
    public void after() {
        client.close();
    }

    @Test
    public void set1() throws Exception {
        // 更新节点
        client.setData()
                // arg1:节点的路径
                // arg2:节点的数据
                .forPath("/node1", "node11".getBytes());
        System.out.println("结束");
    }

    @Test
    public void set2() throws Exception {
        client.setData()
                // 指定版本号
                .withVersion(2)
                .forPath("/node1", "node1111".getBytes());
        System.out.println("结束");
    }

    @Test
    public void set3() throws Exception {
        // 异步方式修改节点数据
        client.setData()
                .withVersion(-1).inBackground(new BackgroundCallback() {
            public void processResult(CuratorFramework curatorFramework,
                                      CuratorEvent curatorEvent) throws Exception {
                // 节点的路径
                System.out.println(curatorEvent.getPath());
                // 事件的类型
                System.out.println(curatorEvent.getType());
            }
        }).forPath("/node2", "node1".getBytes());
        Thread.sleep(5000);
        System.out.println("结束");
    }
}

五、删除节点

public class CuratorDelete {

    private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
    private static final int TIME_OUT = 5000;
    private static final int baseSleepTimeMs = 1000;
    private static final int maxRetries = 3;
    CuratorFramework client;

    @Before
    public void before() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        client = CuratorFrameworkFactory.builder()
                .connectString(IP)
                .sessionTimeoutMs(TIME_OUT)
                .retryPolicy(retryPolicy)
                .namespace("delete").build();
        client.start();
    }

    @After
    public void after() {
        client.close();
    }

    @Test
    public void delete1() throws Exception {
        // 删除节点
        client.delete()
                // 节点的路径
                .forPath("/node1");
        System.out.println("结束");
    }

    @Test
    public void delete2() throws Exception {
        client.delete()
                // 版本号
                .withVersion(0)
                .forPath("/node1");
        System.out.println("结束");
    }

    @Test
    public void delete3() throws Exception {
        //删除包含子节点的节点
        client.delete()
                .deletingChildrenIfNeeded()
                .withVersion(-1)
                .forPath("/node1");
        System.out.println("结束");
    }

    @Test
    public void delete4() throws Exception {
        // 异步方式删除节点
        client.delete()
                .deletingChildrenIfNeeded()
                .withVersion(-1)
                .inBackground(new BackgroundCallback() {
                    public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                        // 节点路径
                        System.out.println(curatorEvent.getPath());
                        // 事件类型
                        System.out.println(curatorEvent.getType());
                    }
                })
                .forPath("/node1");
        Thread.sleep(5000);
        System.out.println("结束");
    }
}

六、查看节点

public class CuratorGet {

    private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
    private static final int TIME_OUT = 5000;
    private static final int baseSleepTimeMs = 1000;
    private static final int maxRetries = 3;
    CuratorFramework client;

    @Before
    public void before() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        client = CuratorFrameworkFactory.builder()
                .connectString(IP)
                .sessionTimeoutMs(TIME_OUT)
                .retryPolicy(retryPolicy)
                .namespace("").build();
        client.start();
    }

    @After
    public void after() {
        client.close();
    }

    @Test
    public void get1() throws Exception {
        // 读取节点数据
        byte[] bys = client.getData()
                // 节点的路径
                .forPath("/hello");
        System.out.println(new String(bys));
    }

    @Test
    public void get2() throws Exception {
        // 读取数据时读取节点的属性
        Stat stat = new Stat();
        byte[] bys = client.getData()
                // 读取属性
                .storingStatIn(stat)
                .forPath("/hello");
        System.out.println(new String(bys));
        System.out.println(stat.getVersion());
    }

    @Test
    public void get3() throws Exception {
        // 异步方式读取节点的数据
        client.getData().inBackground(new BackgroundCallback() {
            public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                // 节点的路径
                System.out.println(curatorEvent.getPath());
                // 事件类型
                System.out.println(curatorEvent.getType());
                // 数据
                System.out.println(new String(curatorEvent.getData()));
            }
        })
                .forPath("/hello");
        Thread.sleep(5000);
        System.out.println("结束");
    }
}
  • 查看子节点
public class CuratorGetChild {

    private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
    private static final int TIME_OUT = 5000;
    private static final int baseSleepTimeMs = 1000;
    private static final int maxRetries = 3;
    CuratorFramework client;

    @Before
    public void before() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        client = CuratorFrameworkFactory.builder()
                .connectString(IP)
                .sessionTimeoutMs(TIME_OUT)
                .retryPolicy(retryPolicy)
                .namespace("").build();
        client.start();
    }

    @After
    public void after() {
        client.close();
    }

    /**
     * 读取子节点数据
     * @throws Exception
     */
    @Test
    public void getChild1() throws Exception {
        // 读取子节点数据
        List<String> list = client.getChildren()
                // 节点路径
                .forPath("/create");
        for (String str : list) {
            System.out.println(str);
        }
    }

    /**
     * 异步方式读取子节点数据
     * @throws Exception
     */
    @Test
    public void getChild2() throws Exception {
        // 异步方式读取子节点数据
        client.getChildren().inBackground((curatorFramework, curatorEvent) -> {
            // 节点路径
            System.out.println(curatorEvent.getPath());
            // 事件类型
            System.out.println(curatorEvent.getType());
            // 读取子节点数据
            List<String> list = curatorEvent.getChildren();
            for (String str : list) {
                System.out.println(str);
            }
        })
                .forPath("/create");
        Thread.sleep(5000);
        System.out.println("结束");
    }
}

七、检查节点是否存在

public class CuratorExists {

    private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
    private static final int TIME_OUT = 5000;
    private static final int baseSleepTimeMs = 1000;
    private static final int maxRetries = 3;
    CuratorFramework client;

    @Before
    public void before() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        client = CuratorFrameworkFactory.builder()
                .connectString(IP)
                .sessionTimeoutMs(TIME_OUT)
                .retryPolicy(retryPolicy)
                .namespace("").build();
        client.start();
    }

    @After
    public void after() {
        client.close();
    }

    /**
     * 判断节点是否存在
     *
     * @throws Exception
     */
    @Test
    public void exists1() throws Exception {
        // 判断节点是否存在
        Stat stat = client.checkExists()
                // 节点路径
                .forPath("/node2");
        System.out.println(stat.getVersion());
    }

    /**
     * 异步方式判断节点是否存在
     *
     * @throws Exception
     */
    @Test
    public void exists2() throws Exception {
        // 异步方式判断节点是否存在
        client.checkExists().inBackground((curatorFramework, curatorEvent) -> {
            // 节点路径
            System.out.println("节点路径:" + curatorEvent.getPath());
            // 事件类型
            System.out.println("事件类型:" + curatorEvent.getType());
            System.out.println(curatorEvent.getStat().getVersion());
        }).forPath("/node2");
        Thread.sleep(50000);
        System.out.println("结束");
    }
}

八、watcherAPI

  curator提供了两种Watcher(Cache)来监听结点的变化

    • NodeCache : 只是监听某一个特定的节点,监听节点的新增和修改

    • PathChildrenCache : 监控一个ZNode的子节点. 当一个子节点增加, 更新,删除, PathCache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态

public class CuratorWatcher {

    private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
    private static final int TIME_OUT = 5000;
    private static final int baseSleepTimeMs = 1000;
    private static final int maxRetries = 3;
    CuratorFramework client;

    @Before
    public void before() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        client = CuratorFrameworkFactory.builder()
                .connectString(IP)
                .sessionTimeoutMs(TIME_OUT)
                .retryPolicy(retryPolicy)
                .namespace("").build();
        client.start();
    }

    @After
    public void after() {
        client.close();
    }

    @Test
    public void watcher1() throws Exception {
        // 监视某个节点的数据变化
        // arg1:连接对象
        // arg2:监视的节点路径
        final NodeCache nodeCache = new NodeCache(client, "/hello");
        // 启动监视器对象
        nodeCache.start();
        nodeCache.getListenable().addListener(() -> {
            // 节点变化时回调的方法
            System.out.println(nodeCache.getCurrentData().getPath());
            System.out.println(new String(nodeCache.getCurrentData().getData()));
        });
        Thread.sleep(100000);
        System.out.println("结束");
        //关闭监视器对象
        nodeCache.close();
    }

    @Test
    public void watcher2() throws Exception {
        // 监视子节点的变化
        // arg1:连接对象
        // arg2:监视的节点路径
        // arg3:事件中是否可以获取节点的数据
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/create", true);
        // 启动监听
        pathChildrenCache.start();
        pathChildrenCache.getListenable().addListener((curatorFramework, pathChildrenCacheEvent) -> {
            // 当子节点方法变化时回调的方法
            // 节点的事件类型
            System.out.println("节点的事件类型:" + pathChildrenCacheEvent.getType());
            // 节点的路径
            System.out.println("节点的路径:" + pathChildrenCacheEvent.getData().getPath());
            // 节点数据
            System.out.println("节点数据:" + new String(pathChildrenCacheEvent.getData().getData()));
        });
        Thread.sleep(100000);
        System.out.println("结束");
        // 关闭监听
        pathChildrenCache.close();
    }
}

九、事务

public class CuratorTransaction {

    private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
    private static final int TIME_OUT = 5000;
    private static final int baseSleepTimeMs = 1000;
    private static final int maxRetries = 3;
    CuratorFramework client;

    @Before
    public void before() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        client = CuratorFrameworkFactory.builder()
                .connectString(IP)
                .sessionTimeoutMs(TIME_OUT)
                .retryPolicy(retryPolicy)
                .namespace("").build();
        client.start();
    }

    @After
    public void after() {
        client.close();
    }

    @Test
    public void tra1() throws Exception {
        client.inTransaction().
                create().forPath("/node4", "node1".getBytes()).
                and().
                create().forPath("/node5", "node2".getBytes()).
                and().
                commit();
    }
}

十、分布式锁

  • InterProcessMutex:分布式可重入排它锁

  • InterProcessReadWriteLock:分布式读写锁

public class CuratorLock {

    private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
    private static final int TIME_OUT = 5000;
    private static final int baseSleepTimeMs = 1000;
    private static final int maxRetries = 3;
    CuratorFramework client;

    @Before
    public void before() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        client = CuratorFrameworkFactory.builder()
                .connectString(IP)
                .sessionTimeoutMs(TIME_OUT)
                .retryPolicy(retryPolicy)
                .namespace("").build();
        client.start();
    }

    @After
    public void after() {
        client.close();
    }

    @Test
    public void lock1() throws Exception {
        // 排他锁
        // arg1:连接对象
        // arg2:节点路径
        InterProcessLock interProcessLock = new InterProcessMutex(client, "/lock1");
        System.out.println("等待获取锁对象!");
        // 获取锁
        interProcessLock.acquire();
        for (int i = 1; i <= 10; i++) {
            //业务代码
            Thread.sleep(3000);
            System.out.println(i);
        }
        // 释放锁
        interProcessLock.release();
        System.out.println("等待释放锁!");
    }

    @Test
    public void lock2() throws Exception {
        // 读写锁
        InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock1");
        // 获取读锁对象
        InterProcessLock interProcessLock = interProcessReadWriteLock.readLock();
        System.out.println("等待获取锁对象!");
        // 获取锁
        interProcessLock.acquire();
        //业务代码
        for (int i = 1; i <= 10; i++) {
            Thread.sleep(3000);
            System.out.println(i);
        }
        // 释放锁
        interProcessLock.release();
        System.out.println("等待释放锁!");
    }

    @Test
    public void lock3() throws Exception {
        // 读写锁
        InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock1");
        // 获取写锁对象
        InterProcessLock interProcessLock = interProcessReadWriteLock.writeLock();
        System.out.println("等待获取锁对象!");
        // 获取锁
        interProcessLock.acquire();
        //业务代码
        for (int i = 1; i <= 10; i++) {
            Thread.sleep(3000);
            System.out.println(i);
        }
        // 释放锁
        interProcessLock.release();
        System.out.println("等待释放锁!");
    }
}
原文地址:https://www.cnblogs.com/jdy1022/p/14830674.html