zookeeper专题学习(二)-----zookeeper原生客户端、zkclient、curator介绍使用

Zookeeper客户端简介

Zookeeper

原生客户端,zookeeper官方提供的java客户端API;

ZkClient

开源的zk客户端,在原生API基础上封装,是一个更易于使用的zookeeper客户端;

Curator

开源的zk客户端,在原生API基础上封装,apache顶级项目;

原生客户端

1、引入依赖

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.8</version>
        </dependency>

2、ApiOperatorDemo

public class ApiOperatorDemo implements Watcher{
    private final static String CONNECTSTRING="192.168.30.10:2181";
    private static CountDownLatch countDownLatch=new CountDownLatch(1);
    private static ZooKeeper zookeeper;
    private static Stat stat=new Stat();

    public static void main(String[] args) throws Exception {
        zookeeper=new ZooKeeper(CONNECTSTRING, 5000, new ApiOperatorDemo());
        countDownLatch.await();

        //创建节点
        String result=zookeeper.create("/node1","123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zookeeper.getData("/node1",new ApiOperatorDemo(),stat); //增加一个
        System.out.println("创建成功:"+result);

        //修改数据
        zookeeper.getData("/node1",new ApiOperatorDemo(),stat);
        zookeeper.setData("/node1","deer2".getBytes(),-1);
        Thread.sleep(2000);

       //删除节点
        zookeeper.getData("/node1",new ApiOperatorDemo(),stat);
        zookeeper.delete("/node1",-1);
        Thread.sleep(2000);

       // 创建节点和子节点
        String path="/node11";
        zookeeper.create(path,"123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
        TimeUnit.SECONDS.sleep(1);
        Stat stat=zookeeper.exists(path+"/node1",true);
        if(stat==null){//表示节点不存在
            zookeeper.create(path+"/node1","123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
            TimeUnit.SECONDS.sleep(1);
        }
        //修改子路径
        zookeeper.setData(path+"/node1","deer".getBytes(),-1);
        TimeUnit.SECONDS.sleep(1);

        //获取指定节点下的子节点
        List<String> childrens=zookeeper.getChildren("/node11",true);
        System.out.println(childrens);
    }

    public void process(WatchedEvent watchedEvent) {
        //如果当前的连接状态是连接成功的,那么通过计数器去控制
        if(watchedEvent.getState()==Event.KeeperState.SyncConnected){
            if(Event.EventType.None==watchedEvent.getType()&&null==watchedEvent.getPath()){
                countDownLatch.countDown();
                System.out.println(watchedEvent.getState()+"-->"+watchedEvent.getType());
            }else if(watchedEvent.getType()== Event.EventType.NodeDataChanged){
                try {
                    System.out.println("数据变更触发路径:"+watchedEvent.getPath()+"->改变后的值:"+
                            new String(zookeeper.getData(watchedEvent.getPath(),true,stat)));
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else if(watchedEvent.getType()== Event.EventType.NodeChildrenChanged){//子节点的数据变化会触发
                try {
                    System.out.println("子节点数据变更路径:"+watchedEvent.getPath()+"->节点的值:"+
                            zookeeper.getData(watchedEvent.getPath(),true,stat));
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else if(watchedEvent.getType()== Event.EventType.NodeCreated){//创建子节点的时候会触发
                try {
                    System.out.println("节点创建路径:"+watchedEvent.getPath()+"->节点的值:"+
                            zookeeper.getData(watchedEvent.getPath(),true,stat));
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else if(watchedEvent.getType()== Event.EventType.NodeDeleted){//子节点删除会触发
                System.out.println("节点删除路径:"+watchedEvent.getPath());
            }
        }
    }
}

3、AuthControlDemo

public class AuthControlDemo implements Watcher{
    private final static String CONNECTSTRING="192.168.30.10:2181";
    private static CountDownLatch countDownLatch=new CountDownLatch(1);
    private static CountDownLatch countDownLatch2=new CountDownLatch(1);

    private static ZooKeeper zookeeper;
    private static Stat stat=new Stat();
    public static void main(String[] args) throws Exception {
        zookeeper=new ZooKeeper(CONNECTSTRING, 5000, new AuthControlDemo());
        countDownLatch.await();

        ACL acl=new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest("root:root")));
        ACL acl2=new ACL(ZooDefs.Perms.CREATE, new Id("ip","192.168.1.1"));

        List<ACL> acls=new ArrayList<>();
        acls.add(acl);
        acls.add(acl2);
        zookeeper.create("/auth1","123".getBytes(),acls,CreateMode.PERSISTENT);
        zookeeper.addAuthInfo("digest","root:root".getBytes());
        zookeeper.create("/auth1/auth1-1","123".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL,CreateMode.EPHEMERAL);

        ZooKeeper zooKeeper1=new ZooKeeper(CONNECTSTRING, 5000, new AuthControlDemo());
        countDownLatch.await();
        zooKeeper1.addAuthInfo("digest","root:root".getBytes());
        zooKeeper1.delete("/auth1/auth1-1",-1);
    }
    public void process(WatchedEvent watchedEvent) {
        //如果当前的连接状态是连接成功的,那么通过计数器去控制
        if(watchedEvent.getState()==Event.KeeperState.SyncConnected){
            if(Event.EventType.None==watchedEvent.getType()&&null==watchedEvent.getPath()){
                countDownLatch.countDown();
                System.out.println(watchedEvent.getState()+"-->"+watchedEvent.getType());
            }
        }
    }
}

原生客户端问题所在 

ZkClient

引入依赖

        <!-- zkclient依赖 -->
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.10</version>
        </dependency>

代码示例

public class ZkClientApiOperatorDemo {

    private final static String CONNECTSTRING="192.168.30.10:2181";

    private static ZkClient  getInstance(){
        return new ZkClient(CONNECTSTRING,10000);
    }

    public static void main(String[] args) throws InterruptedException {
        ZkClient zkClient=getInstance();
        //zkclient 提供递归创建父节点的功能
        zkClient.createPersistent("/zkclient/zkclient1/zkclient1-1/zkclient1-1-1",true);
        System.out.println("success");
        //删除节点
        zkClient.deleteRecursive("/zkclient");
        //获取子节点
        List<String> list=zkClient.getChildren("/node11");
        System.out.println(list);
        //watcher
        //数据内容变化触发
        zkClient.subscribeDataChanges("/node11", new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {
                System.out.println("节点名称:"+s+"->节点修改后的值"+o);
            }

            @Override
            public void handleDataDeleted(String s) throws Exception {
            }
        });

        zkClient.writeData("/node11","node");
        TimeUnit.SECONDS.sleep(2);

        //节点内容出现变化触发
        zkClient.subscribeChildChanges("/node11", new IZkChildListener() {
            @Override
            public void handleChildChange(String s, List<String> list) throws Exception {
                System.out.println("节点名称:"+s+"->"+"当前的节点列表:"+list);
            }
        });

        zkClient.delete("/node11/node1");;
        TimeUnit.SECONDS.sleep(2);
    }
}

ZkClient的API

1、创建会话(同步,重试)

public ZkClient(final String zkServers, final int sessionTimeout, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout)

2、创建节点(同步,递归创建)

public String create(String path,Object data,final List<ACL> acl,CreateMode mode)

public void createPersistent(String path,boolean createParents,List<ACL> acl)

public void createPersistent(String path, Object data, List<ACL> acl)

public String createPersistentSequential(String path,Object data,List<ACL> acl)

public void createEphemeral(String path, Object data, List<ACL> acl)

public String createEphemeralSequential(String path,Object data,List<ACL> acl)

3、删除节点(同步,递归删除)

public boolean delete(String path,int version)

public boolean deleteRecursive(String path)

4、获取节点(同步,避免不存在异常)

public List<String> getChildren(String path)

public <T> T readData(String path, boolean returnNullIfPathNotExists)

public <T> T readData(String path, Stat stat)

5、更新节点(同步,实现CAS,状态返回)

public void writeData(String path, Object datat, int expectedVersion)

public Stat writeDataReturnStat(String path,Object datat,int expectedVersion)

6、检测节点存在(同步)

public boolean exists(String path)

7、权限控制(同步)

public void addAuthInfo(String scheme, final byte[] auth);

public void setAcl(final String path, final List<ACL> acl);

8、监听器

Curator

引入依赖

<!-- curator依赖 -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.0.0</version>
</dependency>

代码示例

public class CuratorOperatorDemo {
    private static CuratorFramework curatorFramework;

    private final static String CONNECTSTRING="192.168.30.10:2181";

    public static CuratorFramework getInstance(){
        curatorFramework = CuratorFrameworkFactory.newClient(CONNECTSTRING,5000,5000,
                new ExponentialBackoffRetry(1000,3));
        curatorFramework.start();
        return curatorFramework;
    }
    public static void main(String[] args) throws InterruptedException {
        CuratorFramework curatorFramework = getInstance();
        System.out.println("连接成功.........");
        /**
         * 创建节点
         */
        try {
            String result=curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/curator/curator1/curator11","123".getBytes());
            System.out.println(result);
        } catch (Exception e) {
            e.printStackTrace();
        }
        /**
         * 删除节点
         */
        try {
            //默认情况下,version为-1
            curatorFramework.delete().deletingChildrenIfNeeded().forPath("/curator");
        } catch (Exception e) {
            e.printStackTrace();
        }
        /**
         * 查询
         */
        Stat stat=new Stat();
        try {
            byte[] bytes=curatorFramework.getData().storingStatIn(stat).forPath("/node11");
            System.out.println(new String(bytes)+"-->stat:"+stat);
        } catch (Exception e) {
            e.printStackTrace();
        }
        /**
         * 更新
         */
        try {
            Stat stat=curatorFramework.setData().forPath("/node11","123".getBytes());
            System.out.println(stat);
        } catch (Exception e) {
            e.printStackTrace();
        }
        /**
         * 异步操作
         */
        ExecutorService service= Executors.newFixedThreadPool(1);
        CountDownLatch countDownLatch=new CountDownLatch(1);
        try {
            curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).
                    inBackground((curatorFramework1, curatorEvent) -> {
                        System.out.println(Thread.currentThread().getName()+"->resultCode:"+curatorEvent.getResultCode()+"->"
                        +curatorEvent.getType());
                        countDownLatch.countDown();
                    },service).forPath("/enjoy","deer".getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        }
        countDownLatch.await();
        service.shutdown();
        /**
         * 事务操作(curator独有的)
         */
        try {
            Collection<CuratorTransactionResult> resultCollections=curatorFramework.inTransaction().create().forPath("/demo1","111".getBytes()).and().
                    setData().forPath("/demo1","111".getBytes()).and().commit();
            for (CuratorTransactionResult result:resultCollections){
                System.out.println(result.getForPath()+"->"+result.getType());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class CuratorEventDemo {

    /**
     * 三种watcher来做节点的监听
     * pathcache   监视一个路径下子节点的创建、删除、节点数据更新
     * NodeCache   监视一个节点的创建、更新、删除
     * TreeCache   pathcaceh+nodecache 的合体(监视路径下的创建、更新、删除事件),
     * 缓存路径下的所有子节点的数据
     */
    public static void main(String[] args) throws Exception {
        CuratorFramework curatorFramework=CuratorClientUtils.getInstance();

        /**
         * 节点变化NodeCache
         */
        NodeCache cache = new NodeCache(curatorFramework,"/curator",false);
        cache.start(true);
        cache.getListenable().addListener(()-> System.out.println("节点数据发生变化,变化后的结果" +
                ":"+new String(cache.getCurrentData().getData())));
        curatorFramework.setData().forPath("/curator","菲菲".getBytes());

        /**
         * PatchChildrenCache
         */
        PathChildrenCache cache=new PathChildrenCache(curatorFramework,"/event",true);
        cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        cache.getListenable().addListener((curatorFramework1,pathChildrenCacheEvent)->{
            switch (pathChildrenCacheEvent.getType()){
                case CHILD_ADDED:
                    System.out.println("增加子节点");
                    break;
                case CHILD_REMOVED:
                    System.out.println("删除子节点");
                    break;
                case CHILD_UPDATED:
                    System.out.println("更新子节点");
                    break;
                default:break;
            }
        });
    }
}

Curator的API

创建会话(同步,重试)

CuratorFrameworkFactory.newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)

CuratorFrameworkFactory.builder().connectString("192.168.11.56:2180").sessionTimeoutMs(30000).connectionTimeoutMs(30000).canBeReadOnly(false).retryPolicy(new ExponentialBackoffRetry(1000,Integer.MAX_VALUE)).build();

retryPolicy 连接策略

  • RetryOneTime: 只重连一次.
  • RetryNTime: 指定重连的次数N.
  • RetryUtilElapsed: 指定最大重连超时时间和重连时间间隔,间歇性重连直到超时或者链接成功.
  • ExponentialBackoffRetry: 基于"backoff"方式重连,和RetryUtilElapsed的区别是重连的时间间隔是动态的.
  • BoundedExponentialBackoffRetry: 同ExponentialBackoffRetry,增加了最大重试次数的控制.

创建节点

client.create().creatingParentIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(aclList) .forPath(path, "hello, zk".getBytes());

删除节点

client.delete().guaranteed().deletingChildrenIfNeeded() .withVersion(version).forPath(path)

获取节点

client.getData().storingStatIn(stat).forPath(path); client.getChildren().forPath(path);

更新节点

client.setData().withVersion(version).forPath(path, data)

判断节点是否存在

client.checkExists().forPath(path);

设置权限

Build.authorization(String scheme, byte[] auth) client.setACL().withVersion(version) .withACL(ZooDefs.Ids.CREATOR_ALL_ACL) .forPath(path);

监听器(避免反复监听)

Cache是curator中对事件监听的包装,对事件的监听可以近似看做是本地缓存视图和远程zk视图的对比过程

  • NodeCache:节点缓存用于处理节点本身的变化 ,回调接口NodeCacheListener
  • PathChildrenCache:子节点缓存用于处理节点的子节点变化,回调接口PathChildrenCacheListener
  • TreeCache:NodeCache和PathChildrenCache的结合体,回调接口TreeCacheListener
原文地址:https://www.cnblogs.com/alimayun/p/12602016.html