znode是zooKeeper集合的核心组件,zookeeper API提供了一小组方法使用zookeeper集合来操纵znode的所有细节。
# 查看当前zk版本号/尽量保持API和zkServer版本一致 echo stat|nc 127.0.0.1 2181
maven仓库地址
<dependencies> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> </dependency> <!-- https://mvnrepository.com/artifact/junit/junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies>
客户端应该遵循以步骤,与zookeeper服务器进行清晰和干净的交互。
-
-
连接到zookeeper服务器。zookeeper服务器为客户端分配会话ID。
-
定期向服务器发送心跳。否则,zookeeper服务器将过期会话ID,客户端需要重新连接。
-
只要会话ID处于活动状态,就可以获取/设置znode。
-
所有任务完成后,断开与zookeeper服务器的连接。如果客户端长时间不活动,则zookeeper服务器将自动断开客户端。
-
一、连接到ZooKeeper
ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)
-
-
sessionTimeout:会话超时(以毫秒为单位)
-
public class ZookeeperConnection { private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181"; private static final int TIME_OUT = 5000; public static void main(String[] args) { ZooKeeper zooKeeper = null; try { // 计数器对象 CountDownLatch countDownLatch = new CountDownLatch(1); /** * 第一个参数:服务器的ip和端口 * 第二个参数:客户端与服务器之间的会话超时时间,以毫秒为单位 * 第三个参数:监视器对象 */ zooKeeper = new ZooKeeper(IP, TIME_OUT, (event) -> { if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { System.out.println("连接创建成功"); countDownLatch.countDown(); } }); //Zookeeper创建对象是异步的,此时new完后,连接不一定创建成功。 // 主线程阻塞,等待连接对象的创建成功 countDownLatch.await(); // 打印会话编号 System.out.println(zooKeeper.getSessionId()); } catch (Exception e) { e.printStackTrace(); } finally { try { if (zooKeeper != null) { zooKeeper.close(); } } catch (InterruptedException e) { e.printStackTrace(); } } } }
// 同步方式 create(String path, byte[] data, List<ACL> acl, CreateMode createMode) // 异步方式 create(String path, byte[] data, List<ACL> acl, CreateMode createMode,AsyncCallback.StringCallback callBack,Object ctx)
-
-
data: 要存储在指定znode路径中的数据
-
acl :要创建的节点的访问控制列表。zookeeper API提供了一个静态接口ZooDefs.Ids 来获取一些基本的acl列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE返回打开znode的acl列表。
-
createMode:节点的类型,这是一个枚举。
-
callBack:异步回调接口
-
public class ZKCreate { ZooKeeper zooKeeper = null; /** * 获取连接 */ @Before public void befor() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(1); zooKeeper = new ZooKeeper("39.98.67.88:2181", 5000, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { System.out.println("连接创建成功"); countDownLatch.countDown(); } } }); // 主线程阻塞等待连接对象的创建成功 countDownLatch.await(); } /** * 同步创建节点 */ @Test public void create1() throws KeeperException, InterruptedException { /** * 第一个参数:节点的路径 * 第二个参数:节点的数据 * 第三个参数:权限列表 ZooDefs.Ids.OPEN_ACL_UNSAFE:world:anyone:cdrwa / * 第四个参数:节点的类子持久化节点 */ zooKeeper.create("/hello/gzh", "helloworld".getBytes(), ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT); } /** * world授权模式 */ @Test public void als1() throws KeeperException, InterruptedException { // world授权模式 // 权限列表 List<ACL> acls = new ArrayList<>(); // 授权模式和授权对象 Id id = new Id("world", "anyone"); // 权限设置 acls.add(new ACL(ZooDefs.Perms.READ, id)); acls.add(new ACL(ZooDefs.Perms.WRITE, id)); zooKeeper.create("/hello/cl", "cl".getBytes(), acls, CreateMode.PERSISTENT); } /** * IP授权模式 */ @Test public void als2() throws Exception { // IP授权模式 // 权限列表 List<ACL> acls = new ArrayList<>(); // 授权模式和授权对象 Id id = new Id("ip", "0.0.0.0"); // 权限设置 acls.add(new ACL(ZooDefs.Perms.ALL, id)); zooKeeper.create("/hello/alsj", "cl".getBytes(), acls, CreateMode.PERSISTENT); } /** * auth授权模式 */ @Test public void als3() throws Exception { // auth授权模式 // 添加授权用户 zooKeeper.addAuthInfo("digest", "root:070313".getBytes()); // 权限列表 List<ACL> acls = new ArrayList<>(); // 授权模式和授权对象 Id id = new Id("auth", "root"); // 权限设置 acls.add(new ACL(ZooDefs.Perms.ALL, id)); zooKeeper.create("/hello/eh", "eel".getBytes(), acls, CreateMode.PERSISTENT); } /** * digest授权模式 */ @Test public void als4() throws Exception { // digest授权模式 // 权限列表 List<ACL> acls = new ArrayList<>(); // 授权模式和授权对象 Id id = new Id("digest", "root:4d9PWRXHtxrRSgCIGCixNUZdTPQ="); // 权限设置 acls.add(new ACL(ZooDefs.Perms.ALL, id)); zooKeeper.create("/hello/cjk", "cls".getBytes(), acls, CreateMode.PERSISTENT); } /** * 异步创建节点 */ @Test public void create2() throws KeeperException, InterruptedException { /** * 第一个参数:节点的路径 * 第二个参数:节点的数据 * 第三个参数:权限列表 ZooDefs.Ids.OPEN_ACL_UNSAFE:world:anyone:cdrwa / * 第四个参数:节点的类子持久化节点 * 第五个参数:异步回调接口 * 第六个参数:上下文参数 */ zooKeeper.create("/hello/***", "zhuxi".getBytes(), ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { // 0 代表创建成功 System.out.println(rc); // 节点的路径 System.out.println(path); // 上下文的参数 System.out.println(ctx); // 节点的路径 System.out.println(name); } }, "i am context"); TimeUnit.SECONDS.sleep(1); System.out.println("创建完成"); } /** * 关闭连接 */ @After public void after() throws InterruptedException { if (zooKeeper != null) { System.out.println("关闭成功"); zooKeeper.close(); } } }
// 同步方式 setData(String path, byte[] data, int version) // 异步方式 setData(String path, byte[] data, int version,AsyncCallback.StatCallback callBack, Object ctx)
-
-
data: 要存储在指定znode路径中的数据。
-
version:znode的当前版本。每当数据更改时,ZooKeeper会更新znode的版本 号。
-
callBack:异步回调接口
-
public class ZKUpdate { private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181"; private static final int TIME_OUT = 5000; ZooKeeper zooKeeper = null; @Before public void createZk() throws Exception { CountDownLatch countDownLatch = Utils.getLatch(); zooKeeper = new ZooKeeper(IP, TIME_OUT, (event) -> { if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { System.out.println("连接创建成功"); countDownLatch.countDown(); } }); countDownLatch.await(); } @Test public void update1() throws KeeperException, InterruptedException { /** * 第一个参数:节点的路径 * 第二个参数:修改的数据 * 第三个参数:数据版本号 -1 代表版本号不参与更新 */ Stat stat = zooKeeper.setData("/update_test", "i love you".getBytes(), -1); System.out.println(stat); } @Test public void update2() throws InterruptedException { /** * 第一个参数:节点的路径 * 第二个参数:修改的数据 * 第三个参数:数据版本号 -1 代表版本号不参与更新 */ zooKeeper.setData("/update_test", "i love you2".getBytes(), -1, new AsyncCallback.StatCallback() { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { // 0 代表创建成功 System.out.println(rc); // 节点的路径 System.out.println(path); // 上下文的参数 System.out.println(ctx); // 当前节点的详细信息 System.out.println(stat); } }, "i am context"); TimeUnit.SECONDS.sleep(1); } /** * 关闭连接 */ @After public void after() throws InterruptedException { if (zooKeeper != null) { System.out.println("关闭成功"); zooKeeper.close(); } } }
四、删除节点
// 同步方式 delete(String path, int version) // 异步方式 delete(String path, int version, AsyncCallback.VoidCallback callBack,Object ctx)
-
-
version:znode的当前版本
-
callBack:异步回调接口
-
public class ZKDelete { private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181"; private static final int TIME_OUT = 5000; ZooKeeper zooKeeper = null; @Before public void createZk() throws Exception { CountDownLatch countDownLatch = Utils.getLatch(); zooKeeper = new ZooKeeper(IP, TIME_OUT, (event) -> { if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { System.out.println("连接创建成功"); countDownLatch.countDown(); } }); countDownLatch.await(); }
@Test public void delete1() throws KeeperException, InterruptedException { /** * 第一个参数:删除节点的路径 * 第二个参数:数据版本信息,-1代表不考律版本信息 */ zooKeeper.delete("/del_node1", -1); } @Test public void delete2() throws InterruptedException { /** * 第一个参数:删除节点的路径 * 第二个参数:数据版本信息,-1代表不考律版本信息 * 第三个参数:异步回调接口 * 第四个参数:上下文参数 */ zooKeeper.delete("/del_node2", -1, new AsyncCallback.VoidCallback() { @Override public void processResult(int rc, String path, Object ctx) { // 0代表删除成功 System.out.println(rc); // 删除节点的路径 System.out.println(path); // 上下文参数 System.out.println(ctx); } }, "我是上下文参数"); TimeUnit.SECONDS.sleep(1); } /** * 关闭连接 */ @After public void after() throws InterruptedException { if (zooKeeper != null) { System.out.println("关闭成功"); zooKeeper.close(); } } }
// 同步方式 getData(String path, boolean b, Stat stat) // 异步方式 getData(String path, boolean b,AsyncCallback.DataCallback callBack,Object ctx)
-
-
b:是否使用连接对象中注册的监视器。
-
stat: 返回znode的元数据。
-
callBack:异步回调接口
-
public class ZKSelect { private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181"; private static final int TIME_OUT = 5000; ZooKeeper zooKeeper = null; @Before public void createZk() throws Exception { CountDownLatch countDownLatch = Utils.getLatch(); zooKeeper = new ZooKeeper(IP, TIME_OUT, (event) -> { if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { System.out.println("连接创建成功"); countDownLatch.countDown(); } }); countDownLatch.await(); } @Test public void get1() throws KeeperException, InterruptedException { /** * 第一个参数:节点的路径 * 第二个参数:后面讲解 这里为false即可 * 第三个参数:读取节点属性的对象 */ byte[] data = zooKeeper.getData("/jdy", false, new Stat()); System.out.println(new String(data)); } /** * @功能: 异步查询节点 */ @Test public void get2() throws KeeperException, InterruptedException { /** * 第一个参数:节点的路径 * 第二个参数:后面讲解 这里为false即可 * 第三个参数:异步回调接口 * 第四个参数:上下文参数 */ zooKeeper.getData("/jdy", false, new AsyncCallback.DataCallback() { @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { // 0代表读取成功 System.out.println(rc); // 节点的路径 System.out.println(path); // 上下文参数 System.out.println(ctx); // 节点的详细信息 System.out.println(stat); } }, "我是上下文参数"); TimeUnit.SECONDS.sleep(1); } /** * 关闭连接 */ @After public void after() throws InterruptedException { if (zooKeeper != null) { System.out.println("关闭成功"); zooKeeper.close(); } } }
// 同步方式 getChildren(String path, boolean b) // 异步方式 getChildren(String path, boolean b,AsyncCallback.ChildrenCallback callBack,Object ctx)
-
-
b:是否使用连接对象中注册的监视器。
-
callBack:异步回调接口。
-
public class ZKGetChildren { private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181"; private static final int TIME_OUT = 5000; ZooKeeper zooKeeper = null; @Before public void createZk() throws Exception { CountDownLatch countDownLatch = Utils.getLatch(); zooKeeper = new ZooKeeper(IP, TIME_OUT, (event) -> { if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { System.out.println("连接创建成功"); countDownLatch.countDown(); } }); countDownLatch.await(); } /** * 同步查询子节点 */ @Test public void getChildren1() throws KeeperException, InterruptedException { /** * 第一个参数:节点的路径 * 第二个参数:是否启用监控 这里为false即可 */ List<String> children = zooKeeper.getChildren("/hello", false); children.forEach(System.out::println); } /** * 异步查询子节点 */ @Test public void getChildren2() throws InterruptedException { /** * 第一个参数:节点的路径 * 第二个参数:是否启用监控 这里为false即可 * 第三个参数:异步回调接口 * 第四个参数:上下文参数 */ zooKeeper.getChildren("/hello", false, new AsyncCallback.ChildrenCallback() { @Override public void processResult(int rc, String path, Object ctx, List<String> children) { // 0代表读取成功 System.out.println(rc); // 节点的路径 System.out.println(path); // 上下文参数 System.out.println(ctx); // 子节点集合 children.forEach(System.out::println); } }, "我是上下文参数"); TimeUnit.SECONDS.sleep(1); } /** * 关闭连接 */ @After public void after() throws InterruptedException { if (zooKeeper != null) { System.out.println("关闭成功"); zooKeeper.close(); } } }
// 同步方法 exists(String path, boolean b) // 异步方法 exists(String path, boolean b,AsyncCallback.StatCallback callBack,Objectctx)
-
-
b:是否使用连接对象中注册的监视器。
-
callBack: 异步回调接口。
-
public class ZKExists { private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181"; private static final int TIME_OUT = 5000; ZooKeeper zooKeeper = null; @Before public void createZk() throws Exception { CountDownLatch countDownLatch = Utils.getLatch(); zooKeeper = new ZooKeeper(IP, TIME_OUT, (event) -> { if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { System.out.println("连接创建成功"); countDownLatch.countDown(); } }); countDownLatch.await(); } @Test public void exists1() throws KeeperException, InterruptedException { /** * 第一个参数:节点的路径 * 第二个参数:是否启用监控 这里为false即可 */ Stat stat = zooKeeper.exists("/hello", false); System.out.println(stat); } @Test public void exists2() throws InterruptedException { /** * 第一个参数:节点的路径 * 第二个参数:是否启用监控 这里为false即可 * 第三个参数:异步回调接口 * 第四个参数:上下文参数 */ zooKeeper.exists("/hello", false, new AsyncCallback.StatCallback() { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { // 0代表读取成功 System.out.println(rc); // 节点的路径 System.out.println(path); // 上下文参数 System.out.println(ctx); // 节点的详细信息,如果为null则说明该节点不存在 System.out.println(stat); } }, "我是上下文参数"); TimeUnit.SECONDS.sleep(1); } /** * 关闭连接 */ @After public void after() throws InterruptedException { if (zooKeeper != null) { System.out.println("关闭成功"); zooKeeper.close(); } } }