curator框架的使用以及实现分布式锁等应用与zkclient操作zookeeper,简化复杂原生API

打开zookeeper集群

先体会一下原生API有多麻烦(可略过):

//地址
    static final String ADDR = "192.168.171.128:2181,192.168.171.129:2181,192.168.171.130:2181";
    //session超时时间
    static final int SESSION_OUTTIME = 2000;//ms
    /**
     * 信号量,阻塞程序执行,用于等待zookeeper连接(异步的)成功,发送成功信号
     */
    static final CountDownLatch connectedSemaphore = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        //创建zk客户端(异步连接)   Watcher:监听连接
        ZooKeeper zooKeeper = new ZooKeeper(ADDR, SESSION_OUTTIME, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //获取事件的状态
                Event.KeeperState keeperState = watchedEvent.getState();
                Event.EventType eventType = watchedEvent.getType();
                //如果是建立连接
                if(Event.KeeperState.SyncConnected == keeperState){
                    if(Event.EventType.None == eventType){
                        //如果建立连接成功,则发送信号量,让后续阻塞程序向下执行
                        connectedSemaphore.countDown();
                        System.out.println("zk 建立连接");
                    }
                }
            }
        });
        //进行阻塞,等待zk连接成功
        connectedSemaphore.await();
        //连接成功执行操作:
        //同步     路径(不允许递归创建), 数据  权限, 类型(PERSISTENT:持久) :临时节点只是一次session有效,用来实现分布式锁
        String name = zooKeeper.create("/testRoot", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        String name1 = zooKeeper.create("/testRoot/child02", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        //异步   回调函数   , 回调的参数
        zooKeeper.create("/testRoot/child01", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
                new AsyncCallback.StringCallback() {
                    @Override   //服务端响应吗: 0 成功   传入的path   参数  实际创建的path
                    public void processResult(int i, String s, Object o, String s1) {
                        System.out.println(i+" "+s+" "+o.toString()+" "+s1); }},"param");
        //删除 (路径必须是叶子节点,即没有下一级节点,不支持递归)
        //zooKeeper.delete("/testRoot/child01", -1);//版本号:-1表示删除所有的,像git一样,修改一次版本号加一,可删除指定版本号
        //同样支持异步,和创建一样传入回调
        //判断节点是否存在
        System.out.println(zooKeeper.exists("/testRoot/child01", false));//也可传入watch,异步回调等
        //获取
        byte[] data = zooKeeper.getData("/testRoot", false, null);
		System.out.println(new String(data));
		System.out.println(zooKeeper.getChildren("/testRoot", false));
        //修改
        zooKeeper.setData("/testRoot", "modify data root".getBytes(), -1);
		byte[] data1 = zooKeeper.getData("/testRoot", false, null);
		System.out.println(new String(data1));


        Thread.sleep(3000);//休眠防止异步
        zooKeeper.close();//释放资源

  

zkclient:

  /** zookeeper地址 */
    static final String CONNECT_ADDR = "192.168.171.128:2181,192.168.171.129:2181,192.168.171.130:2181";
    /** session超时时间 */
    static final int SESSION_OUTTIME = 5000;//ms

    public static void main(String[] args) {
        // 创建zkclient实例
        ZkClient zkc=new ZkClient(new ZkConnection(CONNECT_ADDR,SESSION_OUTTIME),10000);
        //增
        zkc.create("/test","测试zkclient",CreateMode.PERSISTENT);// 创建节点,可指定持久或者临时
        zkc.create("/test/heyi","heyi",CreateMode.PERSISTENT);// 创建节点,可指定持久或者临时
        zkc.createEphemeral("/haha","haha");//创建临时节点
        zkc.createPersistent("/test/yaozhen","姚振");//创建持久节点,设置值必须有父节点
        zkc.createPersistent("/320/houzheng",true);//支持递归创建,但是不能设置值,否则会报异常
        //删
        zkc.delete("/test/yaozhen");
        zkc.deleteRecursive("/320");//递归删除,会删除节点下的子节点
        //改
        zkc.writeData("/test","新内容");
        //查
        boolean exists = zkc.exists("/test");
        System.out.println(exists);
        zkc.readData("/jim",true);//如果节点为空返回null
        String readData = zkc.readData("/test");
        System.out.println(readData);
        //获取子节点和阅读子节点数据
        List<String> list = zkc.getChildren("/test");
        for(String p : list){
            System.out.println(p);
            String rp = "/test/" + p;
            String data = zkc.readData(rp);
            System.out.println("节点为:" + rp + ",内容为: " + data);
        }
        int i = zkc.countChildren("/test");//查询子节点数量
        System.out.println(i);

  相比之下zkclient简直太简单了!

watcher也特别简单:

子节点监听:

// 创建zkclient实例
        ZkClient zkClient=new ZkClient(new ZkConnection(CONNECT_ADDR,SESSION_OUTTIME),10000);
        System.out.println("-----------");
        //监听子节点变化(不监听数据变化,只监听节点的新增删除)  IZkChildListener实现类
        zkClient.subscribeChildChanges("/father",(x,y)->{//x:父路径, y:所有子节点路径集合
            System.out.println("parentPath: " + x);
            System.out.println("currentChilds: " + y);
        });
        Thread.sleep(2000);
        zkClient.createPersistent("/father");//父节点添加删除也会监听
        Thread.sleep(1000);
        zkClient.createPersistent("/father/son","儿子");
        Thread.sleep(1000);
        zkClient.createPersistent("/father/daughter","女儿");
        Thread.sleep(1000);
        zkClient.delete("/father/son");
        Thread.sleep(1000);
        zkClient.deleteRecursive("/father");

  

节点数据监听:

        //监听节点数据变化,监听之前节点必须存在,不监听子节点
        zkClient.createPersistent("/father", "1234");
        zkClient.subscribeDataChanges("/father", new IZkDataListener() {
            @Override   //节点数据变化
            public void handleDataChange(String s, Object o) throws Exception {
                System.out.println("变更的节点为:" + s + ", 变更内容为:" + o);
            }
            @Override  //节点删除
            public void handleDataDeleted(String s) throws Exception {
                System.out.println("删除的节点为:" + s);
            }
        });
        Thread.sleep(3000);
        zkClient.writeData("/father", "father");//-1:最新版本的数据
        Thread.sleep(1000);
        zkClient.delete("/father");//节点删除野人不监听
        Thread.sleep(1000);

  

状态监听:

        //监听服务连接状态,可手动启动关闭zookeeper查看触发
        zkClient.subscribeStateChanges(new IZkStateListener() {
            @Override
            public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
                if(state==Watcher.Event.KeeperState.SyncConnected){
                    System.out.println("连接zookeeper成功");
                }else if(state==Watcher.Event.KeeperState.Disconnected){
                    System.out.println("zookeeper断开");
                }else
                    System.out.println("other"+state);
            }
            @Override //连接关闭,过了session的设置时间,再连接session就会重置,触发监听
            public void handleNewSession() throws Exception {
                System.out.println("newsession");
            }
        });
        Thread.sleep(Integer.MAX_VALUE);//用不关闭线程,一直监听

  

curator框架:

 基本增删改查:

//重试策略:初试时间为1s 重试10次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        //通过工厂建立连接
        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR) //连接地址
                .sessionTimeoutMs(SESSION_OUTTIME)
                .retryPolicy(retryPolicy)//重试策略
                .build();
        curatorFramework.start();//一定要开启连接
        //增  creatingParentsIfNeeded:同时创建父节点  withMode:指定节点类型(不加withMode默认为持久类型节点)
        curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
                .forPath("/super/son","儿子".getBytes());//路径、数据内容
        curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
                .forPath("/father/son","儿子".getBytes());//路径、数据内容
        //删
        curatorFramework.delete().deletingChildrenIfNeeded().forPath("/super");//递归删除子节点
        //改
        curatorFramework.setData().forPath("/father/son","新儿子".getBytes());
        //查
        byte[] bytes = curatorFramework.getData().forPath("/father/son");
        System.out.println(bytes.toString());
        //查找子节点
        curatorFramework.getChildren().forPath("/father").forEach(x-> System.out.println(x));
        //判断节点是否存在
        Stat stat = curatorFramework.checkExists().forPath("/super");
        System.out.println(stat);//不存在,返回bull
        //使用inBackground 绑定回调函数
        ExecutorService pool = Executors.newCachedThreadPool();
        curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
                .inBackground((cf,curatorEvent)->{//BackgroundCallback实现类
                    System.out.println("状态码"+curatorEvent.getResultCode());//0表示成功
                    System.out.println("type:"+curatorEvent.getType());//CREATE
                },pool).forPath("/320/yaozhen","姚振".getBytes());
        //使用线程池做回调
        Thread.sleep(Integer.MAX_VALUE);//等待,方便查看打印输出

  

监听:

//重试策略:初试时间为1s 重试10次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        //通过工厂建立连接
        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR) //连接地址
                .sessionTimeoutMs(SESSION_OUTTIME)
                .retryPolicy(retryPolicy)//重试策略
                .build();
        curatorFramework.start();
        //建立一个cache缓存,监听节点
        final NodeCache nodeCache = new NodeCache(curatorFramework,"/father");
        nodeCache.start(true);//第一次启动的时候就会立刻在Zookeeper上读取对应节点的数据内容,并保存在Cache中
        //触发事件为创建节点和更新节点,在删除节点的时候并不触发此操作
        nodeCache.getListenable().addListener(()->{ //NodeCacheListener实现类
            System.out.println("路径为:" + nodeCache.getCurrentData().getPath());
            System.out.println("数据为:" + new String(nodeCache.getCurrentData().getData()));
            System.out.println("状态为:" + nodeCache.getCurrentData().getStat());
            System.out.println("---------------------------------------");
        });

        Thread.sleep(1000);
        curatorFramework.create().forPath("/father", "123".getBytes());

        Thread.sleep(1000);
        curatorFramework.setData().forPath("/father", "456".getBytes());

        Thread.sleep(1000);
        curatorFramework.delete().forPath("/father");

        Thread.sleep(Integer.MAX_VALUE);

  监听节点

//重试策略:初试时间为1s 重试10次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        //通过工厂建立连接
        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR) //连接地址
                .sessionTimeoutMs(SESSION_OUTTIME)
                .retryPolicy(retryPolicy)//重试策略
                .build();
        curatorFramework.start();
        //监听子节点:建立一个PathChildrenCache缓存,第三个参数为是否接受节点数据内容 如果为false则不接受
        PathChildrenCache cache = new PathChildrenCache(curatorFramework, "/father", true);
        //在初始化的时候就进行缓存监听
        cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        //监听子节点的新增,修改,删除
        cache.getListenable().addListener((cf,event)-> {//PathChildrenCacheListener实现类
            switch (event.getType()) {
                case CHILD_ADDED:
                    System.out.println("添加子节点 :" + event.getData().getPath());
                    break;
                case CHILD_UPDATED:
                    System.out.println("更新子节点 :" + event.getData().getPath());
                    break;
                case CHILD_REMOVED:
                    System.out.println("删除子节点 :" + event.getData().getPath());
                    break;
                default:
                    break;
            }
        });
        //创建本身节点不发生变化
        curatorFramework.create().forPath("/father", "init".getBytes());

        //添加子节点
        Thread.sleep(1000);
        curatorFramework.create().forPath("/father/c1", "c1内容".getBytes());
        Thread.sleep(1000);
        curatorFramework.create().forPath("/father/c2", "c2内容".getBytes());

        //修改子节点
        Thread.sleep(1000);
        curatorFramework.setData().forPath("/father/c1", "c1更新内容".getBytes());

        //删除子节点
        Thread.sleep(1000);
        curatorFramework.delete().forPath("/father/c2");

        //删除本身节点
        Thread.sleep(1000);
        curatorFramework.delete().deletingChildrenIfNeeded().forPath("/father");

        Thread.sleep(Integer.MAX_VALUE);

  监听子节点

分布式锁:

        //重试策略:初试时间为1s 重试10次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        //通过工厂建立连接
        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR) //连接地址
                .sessionTimeoutMs(SESSION_OUTTIME)
                .retryPolicy(retryPolicy)//重试策略
                .build();
        curatorFramework.start();
        //使用分布式锁,所有系统同时监听同一个节点,达到分布式锁的目的
        final InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/test");
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                try {
                    countDownLatch.await();//线程等待一起执行
                    lock.acquire();//分布式锁,数据同步
                    //处理业务
                    j--;
                    System.out.println(j);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    try {//释放锁
                        lock.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            },"t"+i).start();
        }
        Thread.sleep(1000);
        countDownLatch.countDown();//模拟十个线程一起并发.指定一起执行
    }
    static int j=10;

  这里模拟了一下十个线程并发,其实和ReentrantLock效果一样的,但是在分布式中也可以实现同步,而ReentrantLock就不行了

分布式计数器:

和分布式锁其实都是同理:

/** zookeeper地址 */
    static final String CONNECT_ADDR = "192.168.171.128:2181,192.168.171.129:2181,192.168.171.130:2181";
    /** session超时时间 */
    static final int SESSION_OUTTIME = 5000;//ms
    public static void main(String[] args) throws Exception {
        //重试策略:初试时间为1s 重试10次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        //通过工厂建立连接
        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR) //连接地址
                .sessionTimeoutMs(SESSION_OUTTIME)
                .retryPolicy(retryPolicy)//重试策略
                .build();
        curatorFramework.start();
        //分布式计数器
        DistributedAtomicInteger counter=new DistributedAtomicInteger(curatorFramework,"/super",new RetryNTimes(3,100));
        //初始化
        counter.forceSet(0);
        AtomicValue<Integer> value = counter.increment();//原子自增
        System.out.println("原值为"+value.preValue());
        System.out.println("更改后的值为"+value.postValue());
        System.out.println("状态"+value.succeeded());
    }

  

分布式线程屏障同步:

for (int i = 0; i < 5; i++) {//模拟5个客户端
            new Thread(()->{
                try {
                    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
                    CuratorFramework cf = CuratorFrameworkFactory.builder()
                            .connectString(CONNECT_ADDR)
                            .retryPolicy(retryPolicy)
                            .build();
                    cf.start();
                    //双重同步DistributedDoubleBarrier: 指定5个线程,到了5个会自动触发
                    DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/super", 5);
                    Thread.sleep(1000 * (new Random()).nextInt(3));
                    System.out.println(Thread.currentThread().getName() + "已经准备");
                    barrier.enter();//准备,等待成员到达5个再一起往下执行
                    System.out.println("同时开始运行...");
                    Thread.sleep(1000 * (new Random()).nextInt(3));
                    System.out.println(Thread.currentThread().getName() + "运行完毕");
                    barrier.leave();//再次等待准备退出,5个都是leave状态时再一起退出
                    System.out.println("同时退出运行...");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            },"t" + i).start();
        }

  和java中CyicrBarrier一模一样,只是支持分布式

static DistributedBarrier barrier;
    public static void main(String[] args) throws Exception {
        for(int i = 0; i < 5; i++){
            new Thread(()-> {
                    try {
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
                        CuratorFramework cf = CuratorFrameworkFactory.builder()
                                .connectString(CONNECT_ADDR)
                                .sessionTimeoutMs(SESSION_OUTTIME)
                                .retryPolicy(retryPolicy)
                                .build();
                        cf.start();
                        barrier = new DistributedBarrier(cf, "/test");
                        System.out.println(Thread.currentThread().getName() + "设置barrier!");
                        barrier.setBarrier();//设置
                        barrier.waitOnBarrier();	//等待Barrier释放,准备一起执行
                        System.out.println("---------开始执行程序----------");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
            },"t" + i).start();
        }
        Thread.sleep(10000);
        if(barrier != null){
            System.out.println("启动所有线程");
            barrier.removeBarrier();//删除屏障
        }
    }

  最后一个是只实现同时执行的,很奇怪,实际练习中开始执行程序这就是不执行,线程也一直卡在折了,启动这块都打印了!还得再研究下!

原文地址:https://www.cnblogs.com/houzheng/p/9773949.html