Zookeeper 系列(五)Curator API

Zookeeper 系列(五)Curator API

一、Curator 使用

Curator 框架中使用链式编程风格,易读性更强,使用工程方法创建连接对象使用。

(1) CuratorFrameworkFactory :俩个静态工厂方法(参数不同)来实现

  • 参数1: connectString,连接串
  • 参数2: retryPolicy,重试连接策略。有四种实现分别为:ExponentialBackoffRetry、RetryNTimes. RetryOneTimes、RetryUntilElapsed
  • 参数3: sessionTimeoutMs 会话超时时间默认为 6000oms
  • 参数4: connectionTimeOutms 连接超时时间,默认为 15000ms

注意:对于 retrypolicy 策略通过一个接口来让用户自定义实现。

(2) create :创建节点,可选链式项

(3) delete :删除节点,可选链式项

(4) getdata、setdata :读取和修改数据

(5) 异步绑定回调方法 :比如创建节点时绑定一个回调函数,该回调函数可以输出服务器的状态码以及服务器事件类型。还可以加入一个线程池进行优化操作。

(6) getchildren :读取子节点方法

(7) checkexists :判断节点是否存在方法

示例

(1) 环境准备

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>4.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.0.0</version>
</dependency>

(2) Curator 操作 Zookeeper

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author: leigang
 * @version: 2018-04-06
 */
public class ZkCuratorBase {

    private static final String CONNECT_ADDR = "127.0.0.1:2181";
    private static final int SEESION_OUTTIME = 5 * 1000;
    
    @Test
    public void test() throws Exception {
        //1. 重试策略:初试时间为1s,重试10次
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 10);

        //2. 通过工厂创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString(CONNECT_ADDR)
                .sessionTimeoutMs(SEESION_OUTTIME)
                .retryPolicy(retryPolicy)
                .build();

        //3. 开启连接
        cf.start();

        //4. 创建
        cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
                .forPath("/curator", "curator".getBytes());

        //5. 获取
        byte[] data = cf.getData().forPath("/curator");
        System.out.println("/curator:" + new String(data));

        //6. 修改
        cf.setData().forPath("/curator", "curator-xxx".getBytes());

        //7. 递归删除
        cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/curator");

        //8. 回调函数
        ExecutorService pool = Executors.newCachedThreadPool();
        cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
                .inBackground(new BackgroundCallback() {
                    @Override
                    public void processResult(CuratorFramework cf, CuratorEvent event) throws Exception {
                        System.out.println("code:" + event.getResultCode());
                        System.out.println("type:" + event.getType());
                        System.out.println("线程为:" + Thread.currentThread().getName());
                    }
                }, pool)
                .forPath("/curator", "curator-test".getBytes());

        System.out.println("主线程为:" + Thread.currentThread().getName());

        Thread.sleep(100 * 1000);
        cf.close();
    }
}

二、Curator Watcher

  • NodeCacheListener :监听节点的新增、修改操作
  • PathChildrenCacheListener :监听子节点的新增增、修改、别除操作

(1) 环境准备

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.0</version>
</dependency>

(2) NodeCacheListener

// 节点本身监听
public class ZkCuratorWatcher1 {

    private static final String CONNECT_ADDR = "127.0.0.1:2181";
    private static final int SEESION_OUTTIME = 5 * 1000;
    
    @Test
    public void test() throws Exception {
        //1. 重试策略:初试时间为1s,重试10次
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 10);

        //2. 通过工厂创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString(CONNECT_ADDR)
                .sessionTimeoutMs(SEESION_OUTTIME)
                .retryPolicy(retryPolicy)
                .build();

        //3. 开启连接
        cf.start();

        //4. 建立一个cache缓存
        NodeCache cache = new NodeCache(cf, "/curator", false);
        cache.start(true);

        cache.getListenable().addListener(new NodeCacheListener() {
            //触发事件为创建、更新、删除节点
            @Override
            public void nodeChanged() throws Exception {
                ChildData data = cache.getCurrentData();
                if (data != null) {
                    System.out.println("路径为:" + cache.getCurrentData().getPath());
                    System.out.println("数据为:" + new String(cache.getCurrentData().getData()));
                    System.out.println("状态为:" + cache.getCurrentData().getStat());
                } else {
                    System.out.println("删除节点");
                }
                System.out.println("================================================");
            }
        });

        Thread.sleep(1000);
        cf.create().forPath("/curator", "123".getBytes());

        Thread.sleep(1000);
        cf.setData().forPath("/curator", "12344".getBytes());

        Thread.sleep(1000);
        cf.delete().forPath("/curator");
        Thread.sleep(10 * 1000);

        cf.close();
    }
}

(3) PathChildrenCacheListener

// 子节点监听
public class ZkCuratorWatcher2 {

    private static final String CONNECT_ADDR = "127.0.0.1:2181";
    private static final int SEESION_OUTTIME = 5 * 1000;
    
    @Test
    public void test() throws Exception {
        //1. 重试策略:初试时间为1s,重试10次
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 10);

        //2. 通过工厂创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString(CONNECT_ADDR)
                .sessionTimeoutMs(SEESION_OUTTIME)
                .retryPolicy(retryPolicy)
                .build();

        //3. 开启连接
        cf.start();

        //4. 建立一个 PathChildrenCache 缓存,第三个参数为是否接受节点数据内容,如果为 false 则不接受
        PathChildrenCache cache = new PathChildrenCache(cf, "/curator", true);
        //5. 在初始化的时候就进行缓存监听
        cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

        cache.getListenable().addListener(new PathChildrenCacheListener() {
            // 监听子节点变量,新建、修改、删除
            @Override
            public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception {
                switch (event.getType()) {
                    case CHILD_ADDED:
                        System.out.println("CHILD_ADDED:" + event.getData().getPath());
                        break;
                    case CHILD_UPDATED:
                        System.out.println("CHILD_UPDATED:" + event.getData().getPath());
                        break;
                    case CHILD_REMOVED:
                        System.out.println("CHILD_REMOVED:" + event.getData().getPath());
                        break;
                    default:
                        break;
                }
            }
        });

        cf.create().creatingParentsIfNeeded().forPath("/curator/c1", "c1".getBytes());
        cf.create().creatingParentsIfNeeded().forPath("/curator/c2", "c2".getBytes());

        Thread.sleep(1000);
        cf.setData().forPath("/curator/c1", "c1-update".getBytes());

        cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/curator");

        cf.close();
    }
}

三、Curator 场景应用

(一)分布式锁

在分布式场景中,我们为了保证数据的一性,经常在程序运行的某一个点需要进行同步操作(java 可提供 synchronized 或者 Reentrantlock 实现)比如我们看一个小示例,这个示例会出现分布式不同步的问题:因为我们之前所说的是在高并发下访问一个程序,现在我们则是在高并发下访问多个服务器节点(分布式)。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;

/**
 * ReentrantLock
 * @author: leigang
 * @version: 2018-04-06
 */
public class Lock1 {
    
    private static ReentrantLock reentrantLock = new ReentrantLock();
    private static int count = 10;
    
    public static void genarNo() {
        try {
            reentrantLock.lock();
            count--;
            System.out.println(count);
        } finally {
            reentrantLock.unlock();
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        countDownLatch.await();
                        genarNo();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

        Thread.sleep(1000);
        countDownLatch.countDown();
    } 
}

我们使用 Curator 基于 zookeeper 的特性提供的分布式锁来处理分布式场景的数据一致性,zookeeper 本身的分布式是有写问题的,这里强烈推荐使用 Curator 的分布式锁!

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 由 Zookeeper 实现的分布式锁
 * @author: leigang
 * @version: 2018-04-06
 */
public class Lock2 {

    private static final String CONNECT_ADDR = "127.0.0.1:2181";
    private static final int SEESION_OUTTIME = 5 * 1000;
    
    public static CuratorFramework createCuratorFramework() {
        //1. 重试策略:初试时间为1s,重试10次
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 10);

        //2. 通过工厂创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString(CONNECT_ADDR)
                .sessionTimeoutMs(SEESION_OUTTIME)
                .retryPolicy(retryPolicy)
                .build();

        return cf;
    }
    
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    CuratorFramework cf = createCuratorFramework();
                    cf.start();
                    // 由 Zookeeper 实现的分布式锁
                    InterProcessMutex lock = new InterProcessMutex(cf, "/curator");
                    //ReentrantLock reentrantLock = new ReentrantLock();

                    try {
                        countDownLatch.await();
                        lock.acquire();
                        //reentrantLock.lock();
                        genarNo();
                        System.out.println(Thread.currentThread().getName() + "执行业务逻辑...");
                        Thread.sleep(1000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            // 释放
                            lock.release();
                            //reentrantLock.unlock();
                        } catch (Exception e) {
                            ;
                        }
                    }
                }
            }).start();
        }

        Thread.sleep(1000);
        countDownLatch.countDown();
    }

    private static int count = 10;

    public static void genarNo() {
        System.out.println(--count);
    }
}

(二)分布式计数器

分布式计数器功能一说到分布式计数器,你可能脑海里想到了 AtomicInteger 这种经典的方式如果针对于一个 JVM 的场景当然没有问题,但是我们现在是分布式场景下,就需要利用 Curator 框架的 DistributedAtomiclnteger 了。

public class ZkCuratorAtomicInteger {

    private static final String CONNECT_ADDR = "127.0.0.1:2181";
    private static final int SEESION_OUTTIME = 5 * 1000;

    @Test
    public void test() throws Exception {
        //1. 重试策略:初试时间为1s,重试10次
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 10);

        //2. 通过工厂创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString(CONNECT_ADDR)
                .sessionTimeoutMs(SEESION_OUTTIME)
                .retryPolicy(retryPolicy)
                .build();

        //3. 开启连接
        cf.start();

        //4. 使用 DistributedAtomicInteger
        DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(cf, "/curator",
                new RetryNTimes(3, 1000));

        atomicInteger.forceSet(0);
        atomicInteger.increment();
        AtomicValue<Integer> value = atomicInteger.get();

        System.out.println(value.succeeded());
        System.out.println(value.postValue());  //最新值????????
        System.out.println(value.preValue());   //原始值????????
    }
}

(三)Barrier

public class ZkCuratorBarrier1 {

    private static final String CONNECT_ADDR = "127.0.0.1:2181";
    private static final int SEESION_OUTTIME = 5 * 1000;

    public static CuratorFramework createCuratorFramework() {
        //1. 重试策略:初试时间为1s,重试10次
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 10);

        //2. 通过工厂创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString(CONNECT_ADDR)
                .sessionTimeoutMs(SEESION_OUTTIME)
                .retryPolicy(retryPolicy)
                .build();

        return cf;
    }

    @Test
    public void test() throws InterruptedException {
        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        CuratorFramework cf = createCuratorFramework();
                        cf.start();

                        DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/barrier", 5);
                        Thread.sleep(1000 * (new Random().nextInt(3)));
                        System.out.println(Thread.currentThread().getName() + "已经准备");

                        barrier.enter();
                        System.out.println("同时开始运行...");
                        Thread.sleep(1000 * (new Random().nextInt(3)));
                        System.out.println(Thread.currentThread().getName() + "运行完毕");
                        barrier.leave();
                        System.out.println("同时退出运行...");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, "t" + i).start();

        }

        Thread.sleep(100 * 1000);
    }
}

运行结果:

t1已经准备
t3已经准备
t2已经准备
t4已经准备
同时开始运行...
同时开始运行...
同时开始运行...
同时开始运行...
同时开始运行...
t3运行完毕
t4运行完毕
t0运行完毕
t2运行完毕
t1运行完毕
同时退出运行...
同时退出运行...
同时退出运行...
同时退出运行...
同时退出运行...

(四)Cluster

(1) Zookeeper 监听器,用于监听子节点的变化:

public class ZkCuratorWatcher {

    private static final String CONNECT_ADDR = "127.0.0.1:2181";
    private static final int SEESION_OUTTIME = 5 * 1000;

    public ZkCuratorWatcher() {
        try {
            //1. 重试策略:初试时间为1s,重试10次
            ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 10);

            //2. 通过工厂创建连接
            CuratorFramework cf = CuratorFrameworkFactory.builder()
                    .connectString(CONNECT_ADDR)
                    .sessionTimeoutMs(SEESION_OUTTIME)
                    .retryPolicy(retryPolicy)
                    .build();

            //3. 开启连接
            cf.start();

            //4. 创建根节点
            if (cf.checkExists().forPath("/cluster") == null) {
                cf.create().withMode(CreateMode.PERSISTENT).forPath("/cluster", "cluster".getBytes());
            }

            //5. 建立一个 PathChildrenCache 缓存,第三个参数为是否接受节点数据内容,如果为 false 则不接受
            PathChildrenCache cache = new PathChildrenCache(cf, "/cluster", true);
            //6. 在初始化的时候就进行缓存监听
            cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

            cache.getListenable().addListener(new PathChildrenCacheListener() {
                // 监听子节点变量,新建、修改、删除
                @Override
                public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception {
                    switch (event.getType()) {
                        case CHILD_ADDED:
                            System.out.println("CHILD_ADDED:" + event.getData().getPath());
                            break;
                        case CHILD_UPDATED:
                            System.out.println("CHILD_UPDATED:" + event.getData().getPath());
                            break;
                        case CHILD_REMOVED:
                            System.out.println("CHILD_REMOVED:" + event.getData().getPath());
                            break;
                        default:
                            break;
                    }
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

(2) 启动两个客户端:

public class Client1 {
    @Test
    public void test() throws InterruptedException {
        ZkCuratorWatcher zkCuratorWatcher = new ZkCuratorWatcher();
        System.out.println(this.getClass().getSimpleName() + " start...");
        Thread.sleep(Integer.MAX_VALUE);
    }
}

public class Client2 {
    @Test
    public void test() throws InterruptedException {
        ZkCuratorWatcher zkCuratorWatcher = new ZkCuratorWatcher();
        System.out.println(this.getClass().getSimpleName() + " start...");
        Thread.sleep(Integer.MAX_VALUE);
    }
}

(3) 第三个客户端用于修改 Zookeeper 的节点,观察 Client1 和 Client2 是否监听到了节点的改变:

public class Test {

    private static final String CONNECT_ADDR = "127.0.0.1:2181";
    private static final int SEESION_OUTTIME = 5 * 1000;
    
    @org.junit.Test
    public void test() throws Exception {
        //1. 重试策略:初试时间为1s,重试10次
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 10);

        //2. 通过工厂创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString(CONNECT_ADDR)
                .sessionTimeoutMs(SEESION_OUTTIME)
                .retryPolicy(retryPolicy)
                .build();

        //3. 开启连接
        cf.start();

        cf.create().creatingParentsIfNeeded().forPath("/cluster/c1", "c1".getBytes());
        cf.create().creatingParentsIfNeeded().forPath("/cluster/c2", "c2".getBytes());

        Thread.sleep(1000);
        cf.setData().forPath("/cluster/c1", "c1-update".getBytes());

        cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/cluster/c1");
        cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/cluster/c2");

        cf.close();
    }
}

(4) 运行结果,Client1 和 Client2 下都有输出如下结果,说明都监听到了 /cluster 下子节点的添加、修改和删除:

CHILD_ADDED:/cluster/c1
CHILD_ADDED:/cluster/c2
CHILD_UPDATED:/cluster/c1
CHILD_REMOVED:/cluster/c1
CHILD_REMOVED:/cluster/c2

(5) 下面做另外一个测试,在 Zookeeper 下手动创建 /cluster 节点后,再启动 Client1:

[zk: localhost:2181(CONNECTED) 3] ls /cluster
[]
[zk: localhost:2181(CONNECTED) 4] create /cluster/c1 c1
[zk: localhost:2181(CONNECTED) 6] ls /cluster
[c1]

运行 Client1 结果如下:

Client1 start...
CHILD_ADDED:/cluster/c1

以上测试说明 Curator 会自动将触发 CHILD_ADDED 事件,工作中可以用来自注册,不需要手动查询来注册。

原文地址:https://www.cnblogs.com/binarylei/p/8732334.html