五、zookeeper的javaApi

  znode是zooKeeper集合的核心组件,zookeeper API提供了一小组方法使用zookeeper集合来操纵znode的所有细节。

# 查看当前zk版本号/尽量保持API和zkServer版本一致
echo stat|nc 127.0.0.1 2181

  maven仓库地址

<dependencies>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.10</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/junit/junit -->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
</dependencies>

  客户端应该遵循以步骤,与zookeeper服务器进行清晰和干净的交互。

    • 连接到zookeeper服务器。zookeeper服务器为客户端分配会话ID

    • 定期向服务器发送心跳。否则,zookeeper服务器将过期会话ID,客户端需要重新连接。

    • 只要会话ID处于活动状态,就可以获取/设置znode。

    • 所有任务完成后,断开与zookeeper服务器的连接。如果客户端长时间不活动,则zookeeper服务器将自动断开客户端。

一、连接到ZooKeeper 

ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)
  • connectionString:zookeeper主机

  • sessionTimeout:会话超时(以毫秒为单位)

  • watcher : 实现“监视器”对象。zookeeper集合通过监视器对象返回连接状态。

public class ZookeeperConnection {
    private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
    private static final int TIME_OUT = 5000;

    public static void main(String[] args) {
        ZooKeeper zooKeeper = null;
        try {
            // 计数器对象
            CountDownLatch countDownLatch = new CountDownLatch(1);
            /**
             * 第一个参数:服务器的ip和端口
             * 第二个参数:客户端与服务器之间的会话超时时间,以毫秒为单位
             * 第三个参数:监视器对象
             */
            zooKeeper = new ZooKeeper(IP, TIME_OUT, (event) -> {
                if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                    System.out.println("连接创建成功");
                    countDownLatch.countDown();
                }
            });
            //Zookeeper创建对象是异步的,此时new完后,连接不一定创建成功。
            // 主线程阻塞,等待连接对象的创建成功
            countDownLatch.await();
            // 打印会话编号
            System.out.println(zooKeeper.getSessionId());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (zooKeeper != null) {
                    zooKeeper.close();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

二、新增节点

// 同步方式
create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
// 异步方式
create(String path, byte[] data, List<ACL> acl, CreateMode createMode,AsyncCallback.StringCallback callBack,Object ctx)
  • path :znode路径。例如,/node1 /node1/node11

  • data: 要存储在指定znode路径中的数据

  • acl :要创建的节点的访问控制列表。zookeeper API提供了一个静态接口ZooDefs.Ids 来获取一些基本的acl列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE返回打开znode的acl列表。

  • createMode:节点的类型,这是一个枚举。

  • callBack:异步回调接口

  • ctx:传递上下文参数

public class ZKCreate {
    ZooKeeper zooKeeper = null;

    /**
     * 获取连接
     */
    @Before
    public void befor() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        zooKeeper = new ZooKeeper("39.98.67.88:2181", 5000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    System.out.println("连接创建成功");
                    countDownLatch.countDown();
                }
            }
        });
        // 主线程阻塞等待连接对象的创建成功
        countDownLatch.await();
    }

    /**
     * 同步创建节点
     */
    @Test
    public void create1() throws KeeperException, InterruptedException {
        /**
         * 第一个参数:节点的路径
         * 第二个参数:节点的数据
         * 第三个参数:权限列表 ZooDefs.Ids.OPEN_ACL_UNSAFE:world:anyone:cdrwa /
         * 第四个参数:节点的类子持久化节点
         */
        zooKeeper.create("/hello/gzh", "helloworld".getBytes(), ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    /**
     * world授权模式
     */
    @Test
    public void als1() throws KeeperException, InterruptedException {
        // world授权模式
        // 权限列表
        List<ACL> acls = new ArrayList<>();
        // 授权模式和授权对象
        Id id = new Id("world", "anyone");
        // 权限设置
        acls.add(new ACL(ZooDefs.Perms.READ, id));
        acls.add(new ACL(ZooDefs.Perms.WRITE, id));
        zooKeeper.create("/hello/cl", "cl".getBytes(), acls, CreateMode.PERSISTENT);
    }

    /**
     * IP授权模式
     */
    @Test
    public void als2() throws Exception {
        // IP授权模式
        // 权限列表
        List<ACL> acls = new ArrayList<>();
        // 授权模式和授权对象
        Id id = new Id("ip", "0.0.0.0");
        // 权限设置
        acls.add(new ACL(ZooDefs.Perms.ALL, id));
        zooKeeper.create("/hello/alsj", "cl".getBytes(), acls, CreateMode.PERSISTENT);
    }

    /**
     * auth授权模式
     */
    @Test
    public void als3() throws Exception {
        // auth授权模式
        // 添加授权用户
        zooKeeper.addAuthInfo("digest", "root:070313".getBytes());
        // 权限列表
        List<ACL> acls = new ArrayList<>();
        // 授权模式和授权对象
        Id id = new Id("auth", "root");
        // 权限设置
        acls.add(new ACL(ZooDefs.Perms.ALL, id));
        zooKeeper.create("/hello/eh", "eel".getBytes(), acls, CreateMode.PERSISTENT);
    }

    /**
     * digest授权模式
     */
    @Test
    public void als4() throws Exception {
        // digest授权模式
        // 权限列表
        List<ACL> acls = new ArrayList<>();
        // 授权模式和授权对象
        Id id = new Id("digest", "root:4d9PWRXHtxrRSgCIGCixNUZdTPQ=");
        // 权限设置
        acls.add(new ACL(ZooDefs.Perms.ALL, id));
        zooKeeper.create("/hello/cjk", "cls".getBytes(), acls, CreateMode.PERSISTENT);
    }

    /**
     * 异步创建节点
     */
    @Test
    public void create2() throws KeeperException, InterruptedException {
        /**
         * 第一个参数:节点的路径
         * 第二个参数:节点的数据
         * 第三个参数:权限列表 ZooDefs.Ids.OPEN_ACL_UNSAFE:world:anyone:cdrwa /
         * 第四个参数:节点的类子持久化节点
         * 第五个参数:异步回调接口
         * 第六个参数:上下文参数
         */
        zooKeeper.create("/hello/***", "zhuxi".getBytes(), ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, String name) {
                // 0 代表创建成功
                System.out.println(rc);
                // 节点的路径
                System.out.println(path);
                // 上下文的参数
                System.out.println(ctx);
                // 节点的路径
                System.out.println(name);
            }
        }, "i am context");
        TimeUnit.SECONDS.sleep(1);
        System.out.println("创建完成");
    }

    /**
     * 关闭连接
     */
    @After
    public void after() throws InterruptedException {
        if (zooKeeper != null) {
            System.out.println("关闭成功");
            zooKeeper.close();
        }
    }
}

三、更新节点

// 同步方式
setData(String path, byte[] data, int version)
// 异步方式
setData(String path, byte[] data, int version,AsyncCallback.StatCallback callBack, Object ctx)
  • path:znode路径

  • data: 要存储在指定znode路径中的数据。

  • version:znode的当前版本。每当数据更改时,ZooKeeper会更新znode的版本 号。

  • callBack:异步回调接口

  • ctx:传递上下文参数

public class ZKUpdate {
    private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
    private static final int TIME_OUT = 5000;
    ZooKeeper zooKeeper = null;

    @Before
    public void createZk() throws Exception {
        CountDownLatch countDownLatch = Utils.getLatch();
        zooKeeper = new ZooKeeper(IP, TIME_OUT, (event) -> {
            if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                System.out.println("连接创建成功");
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
    }

    @Test
    public void update1() throws KeeperException, InterruptedException {
        /**
         * 第一个参数:节点的路径
         * 第二个参数:修改的数据
         * 第三个参数:数据版本号 -1 代表版本号不参与更新
         */
        Stat stat = zooKeeper.setData("/update_test", "i love you".getBytes(), -1);
        System.out.println(stat);
    }

    @Test
    public void update2() throws InterruptedException {
        /**
         * 第一个参数:节点的路径
         * 第二个参数:修改的数据
         * 第三个参数:数据版本号 -1 代表版本号不参与更新
         */
        zooKeeper.setData("/update_test", "i love you2".getBytes(), -1, new AsyncCallback.StatCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                // 0 代表创建成功
                System.out.println(rc);
                // 节点的路径
                System.out.println(path);
                // 上下文的参数
                System.out.println(ctx);
                // 当前节点的详细信息
                System.out.println(stat);
            }
        }, "i am context");
        TimeUnit.SECONDS.sleep(1);
    }


    /**
     * 关闭连接
     */
    @After
    public void after() throws InterruptedException {
        if (zooKeeper != null) {
            System.out.println("关闭成功");
            zooKeeper.close();
        }
    }
}

四、删除节点

// 同步方式
delete(String path, int version)
// 异步方式
delete(String path, int version, AsyncCallback.VoidCallback callBack,Object ctx)
  • path:znode路径。

  • version:znode的当前版本

  • callBack:异步回调接口

  • ctx:传递上下文参数

public class ZKDelete {
    private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
    private static final int TIME_OUT = 5000;
    ZooKeeper zooKeeper = null;

    @Before
    public void createZk() throws Exception {
        CountDownLatch countDownLatch = Utils.getLatch();
        zooKeeper = new ZooKeeper(IP, TIME_OUT, (event) -> {
            if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                System.out.println("连接创建成功");
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
    }

@Test
public void delete1() throws KeeperException, InterruptedException { /** * 第一个参数:删除节点的路径 * 第二个参数:数据版本信息,-1代表不考律版本信息 */ zooKeeper.delete("/del_node1", -1); } @Test public void delete2() throws InterruptedException { /** * 第一个参数:删除节点的路径 * 第二个参数:数据版本信息,-1代表不考律版本信息 * 第三个参数:异步回调接口 * 第四个参数:上下文参数 */ zooKeeper.delete("/del_node2", -1, new AsyncCallback.VoidCallback() { @Override public void processResult(int rc, String path, Object ctx) { // 0代表删除成功 System.out.println(rc); // 删除节点的路径 System.out.println(path); // 上下文参数 System.out.println(ctx); } }, "我是上下文参数"); TimeUnit.SECONDS.sleep(1); } /** * 关闭连接 */ @After public void after() throws InterruptedException { if (zooKeeper != null) { System.out.println("关闭成功"); zooKeeper.close(); } } }

五、查看节点

// 同步方式
getData(String path, boolean b, Stat stat)
// 异步方式
getData(String path, boolean b,AsyncCallback.DataCallback callBack,Object ctx)
  • path:znode路径。

  • b:是否使用连接对象中注册的监视器。

  • stat: 返回znode的元数据。

  • callBack:异步回调接口

  • ctx:传递上下文参数

public class ZKSelect {
    private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
    private static final int TIME_OUT = 5000;
    ZooKeeper zooKeeper = null;

    @Before
    public void createZk() throws Exception {
        CountDownLatch countDownLatch = Utils.getLatch();
        zooKeeper = new ZooKeeper(IP, TIME_OUT, (event) -> {
            if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                System.out.println("连接创建成功");
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
    }

    @Test
    public void get1() throws KeeperException, InterruptedException {
        /**
         * 第一个参数:节点的路径
         * 第二个参数:后面讲解 这里为false即可
         * 第三个参数:读取节点属性的对象
         */
        byte[] data = zooKeeper.getData("/jdy", false, new Stat());
        System.out.println(new String(data));
    }

    /**
     * @功能: 异步查询节点
     */
    @Test
    public void get2() throws KeeperException, InterruptedException {
        /**
         * 第一个参数:节点的路径
         * 第二个参数:后面讲解 这里为false即可
         * 第三个参数:异步回调接口
         * 第四个参数:上下文参数
         */
        zooKeeper.getData("/jdy", false, new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                // 0代表读取成功
                System.out.println(rc);
                // 节点的路径
                System.out.println(path);
                // 上下文参数
                System.out.println(ctx);
                // 节点的详细信息
                System.out.println(stat);
            }
        }, "我是上下文参数");
        TimeUnit.SECONDS.sleep(1);
    }

    /**
     * 关闭连接
     */
    @After
    public void after() throws InterruptedException {
        if (zooKeeper != null) {
            System.out.println("关闭成功");
            zooKeeper.close();
        }
    }
}

六、查看子节点

// 同步方式
getChildren(String path, boolean b)
// 异步方式
getChildren(String path, boolean b,AsyncCallback.ChildrenCallback callBack,Object ctx)
  • path:Znode路径。

  • b:是否使用连接对象中注册的监视器。

  • callBack:异步回调接口。

  • ctx:传递上下文参数

public class ZKGetChildren {
    private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
    private static final int TIME_OUT = 5000;
    ZooKeeper zooKeeper = null;

    @Before
    public void createZk() throws Exception {
        CountDownLatch countDownLatch = Utils.getLatch();
        zooKeeper = new ZooKeeper(IP, TIME_OUT, (event) -> {
            if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                System.out.println("连接创建成功");
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
    }


    /**
     * 同步查询子节点
     */
    @Test
    public void getChildren1() throws KeeperException, InterruptedException {
        /**
         * 第一个参数:节点的路径
         * 第二个参数:是否启用监控 这里为false即可
         */
        List<String> children = zooKeeper.getChildren("/hello", false);
        children.forEach(System.out::println);
    }

    /**
     * 异步查询子节点
     */
    @Test
    public void getChildren2() throws InterruptedException {
        /**
         * 第一个参数:节点的路径
         * 第二个参数:是否启用监控 这里为false即可
         * 第三个参数:异步回调接口
         * 第四个参数:上下文参数
         */
        zooKeeper.getChildren("/hello", false, new AsyncCallback.ChildrenCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, List<String> children) {
                // 0代表读取成功
                System.out.println(rc);
                // 节点的路径
                System.out.println(path);
                // 上下文参数
                System.out.println(ctx);
                // 子节点集合
                children.forEach(System.out::println);
            }
        }, "我是上下文参数");
        TimeUnit.SECONDS.sleep(1);
    }


    /**
     * 关闭连接
     */
    @After
    public void after() throws InterruptedException {
        if (zooKeeper != null) {
            System.out.println("关闭成功");
            zooKeeper.close();
        }
    }
}

七、检查节点是否存在

// 同步方法
exists(String path, boolean b)
// 异步方法
exists(String path, boolean b,AsyncCallback.StatCallback callBack,Objectctx)
  • path:znode路径。

  • b:是否使用连接对象中注册的监视器。

  • callBack: 异步回调接口。

  • ctx:传递上下文参数

public class ZKExists {
    private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
    private static final int TIME_OUT = 5000;
    ZooKeeper zooKeeper = null;

    @Before
    public void createZk() throws Exception {
        CountDownLatch countDownLatch = Utils.getLatch();
        zooKeeper = new ZooKeeper(IP, TIME_OUT, (event) -> {
            if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                System.out.println("连接创建成功");
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
    }

    @Test
    public void exists1() throws KeeperException, InterruptedException {
        /**
         * 第一个参数:节点的路径
         * 第二个参数:是否启用监控 这里为false即可
         */
        Stat stat = zooKeeper.exists("/hello", false);
        System.out.println(stat);
    }


    @Test
    public void exists2() throws InterruptedException {
        /**
         * 第一个参数:节点的路径
         * 第二个参数:是否启用监控 这里为false即可
         * 第三个参数:异步回调接口
         * 第四个参数:上下文参数
         */
        zooKeeper.exists("/hello", false, new AsyncCallback.StatCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                // 0代表读取成功
                System.out.println(rc);
                // 节点的路径
                System.out.println(path);
                // 上下文参数
                System.out.println(ctx);
                // 节点的详细信息,如果为null则说明该节点不存在
                System.out.println(stat);
            }
        }, "我是上下文参数");
        TimeUnit.SECONDS.sleep(1);
    }


    /**
     * 关闭连接
     */
    @After
    public void after() throws InterruptedException {
        if (zooKeeper != null) {
            System.out.println("关闭成功");
            zooKeeper.close();
        }
    }
}
原文地址:https://www.cnblogs.com/jdy1022/p/14821019.html