zookeeper的API应用
作者:尹正杰
版权声明:原创作品,谢绝转载!否则将追究法律责任。
一.环境准备
1>.创建Maven工程
使用IDE工具创建一个Maven工程,关于IDE看您个人喜好,可以使用Ecllipse或者Idea均可。
2>.在pom.xml文件中添加依赖关系
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.org.yinzhengjie</groupId> <artifactId>zookeeper</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.1</version> </dependency> </dependencies> </project>
3>.创建log4j.properties文件
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
二.获取子节点并监听节点变化
1>.使用默认的回调函数
package cn.org.yinzhengjie; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.List; public class ZkClient { //声明一个ZooKeeper的客户端对象 private ZooKeeper zkCli; //定义zookeeper集群的连接地址,注意域名解析哟~ private static final String CONNECT_ADRESS = "hadoop101.yinzhengjie.org.cn:2181,hadoop102.yinzhengjie.org.cn:2181,hadoop103.yinzhengjie.org.cn:2181"; //定义与zookeeper集群会话的超时时间,单位为毫秒 private static final int SESSION_TIMEOUT = 3000; @Before public void before() throws IOException { zkCli = new ZooKeeper(CONNECT_ADRESS, SESSION_TIMEOUT, watchedEvent -> { System.out.println(watchedEvent.getType() + "***** 默认回调函数 *****" + watchedEvent.getPath()); }); } @Test public void ls() throws KeeperException, InterruptedException { //查看根znode的子节点信息,并观察该事件 List<String> children = zkCli.getChildren("/", true); System.out.println("========== 开始遍历查询结果 =========="); for (String child:children){ System.out.println(child); } System.out.println("========== 结束遍历查询结果 =========="); /** * 让当前线程延时阻塞(zookeeper客户端和服务端是异步通信的),目的是当服务端根下的znode发生变化时会及时调用回调函数. */ Thread.sleep(Long.MAX_VALUE); } }
2>.使用自定义的回调函数
package cn.org.yinzhengjie; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.List; public class ZkClient { //声明一个ZooKeeper的客户端对象 private ZooKeeper zkCli; //定义zookeeper集群的连接地址,注意域名解析哟~ private static final String CONNECT_ADRESS = "hadoop101.yinzhengjie.org.cn:2181,hadoop102.yinzhengjie.org.cn:2181,hadoop103.yinzhengjie.org.cn:2181"; //定义与zookeeper集群会话的超时时间,单位为毫秒 private static final int SESSION_TIMEOUT = 3000; @Before public void before() throws IOException { zkCli = new ZooKeeper(CONNECT_ADRESS, SESSION_TIMEOUT, watchedEvent -> { System.out.println(watchedEvent.getType() + "***** 默认回调函数 *****" + watchedEvent.getPath()); }); } @Test public void ls() throws KeeperException, InterruptedException { //查看根znode的子节点信息,并观察该事件 List<String> children = zkCli.getChildren("/", event ->{ System.out.println(event.getType() + "***** 自定义的回调函数 *****" + event.getPath()); }); System.out.println("========== 开始遍历查询结果 =========="); for (String child:children){ System.out.println(child); } System.out.println("========== 结束遍历查询结果 =========="); /** * 让当前线程延时阻塞(zookeeper客户端和服务端是异步通信的),目的是当服务端根下的znode发生变化时会及时调用回调函数. */ Thread.sleep(Long.MAX_VALUE); } }
三.创建子节点
package cn.org.yinzhengjie; import org.apache.zookeeper.*; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.List; public class ZkClient { //声明一个ZooKeeper的客户端对象 private ZooKeeper zkCli; //定义zookeeper集群的连接地址,注意域名解析哟~ private static final String CONNECT_ADRESS = "hadoop101.yinzhengjie.org.cn:2181,hadoop102.yinzhengjie.org.cn:2181,hadoop103.yinzhengjie.org.cn:2181"; //定义与zookeeper集群会话的超时时间,单位为毫秒 private static final int SESSION_TIMEOUT = 3000; @Before public void before() throws IOException { zkCli = new ZooKeeper(CONNECT_ADRESS, SESSION_TIMEOUT, watchedEvent -> { System.out.println(watchedEvent.getType() + "***** 默认回调函数 *****" + watchedEvent.getPath()); }); } @Test public void create() throws KeeperException, InterruptedException { /** * create的方法前面如下: * public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) * * 以下是对各参数的解释说明: * path: * 指定要创建的znode在zookeeper的路径. * data: * 指定创建的znode需要保存的原始数据. * acl: * 指定创建znode的ACL,我们指定的"ZooDefs.Ids.OPEN_ACL_UNSAFE"您可以理解为权限最大的ACL. * createMode: * 指定创建znode的类型.我们指定的"CreateMode.EPHEMERAL"您可以理解为创建临时znode。 */ String znode = zkCli.create("/yinzhengjie2020", "yinzhengjie.com".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println(znode); /** * 让当前线程延时阻塞(zookeeper客户端和服务端是异步通信的) */ Thread.sleep(Long.MAX_VALUE); } }
四.查询znode存储的数据
package cn.org.yinzhengjie; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.List; public class ZkClient { //声明一个ZooKeeper的客户端对象 private ZooKeeper zkCli; //定义zookeeper集群的连接地址,注意域名解析哟~ private static final String CONNECT_ADRESS = "hadoop101.yinzhengjie.org.cn:2181,hadoop102.yinzhengjie.org.cn:2181,hadoop103.yinzhengjie.org.cn:2181"; //定义与zookeeper集群会话的超时时间,单位为毫秒 private static final int SESSION_TIMEOUT = 3000; @Before public void before() throws IOException { zkCli = new ZooKeeper(CONNECT_ADRESS, SESSION_TIMEOUT, watchedEvent -> { System.out.println(watchedEvent.getType() + "***** 默认回调函数 *****" + watchedEvent.getPath()); }); } @Test public void get() throws InterruptedException, KeeperException { /** * getData方法的签名如下: * public byte[] getData(String path, boolean watch, Stat stat) * 以下是对其参数的相关说明: * path: * 指定要获取数据的znode路径 * watch: * 是否需要监视此节点. * stat: * 传递一个Stat对象即可. */ byte[] data = zkCli.getData("/yinzhengjie2020", true, new Stat()); //将获取的数据由字节数组转换成字符串 String getData = new String(data); //打印获取到的数据 System.out.println(getData); /** * 让当前线程延时阻塞(zookeeper客户端和服务端是异步通信的) */ Thread.sleep(Long.MAX_VALUE); } }
五.修改znode存储的数据
package cn.org.yinzhengjie; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.List; public class ZkClient { //声明一个ZooKeeper的客户端对象 private ZooKeeper zkCli; //定义zookeeper集群的连接地址,注意域名解析哟~ private static final String CONNECT_ADRESS = "hadoop101.yinzhengjie.org.cn:2181,hadoop102.yinzhengjie.org.cn:2181,hadoop103.yinzhengjie.org.cn:2181"; //定义与zookeeper集群会话的超时时间,单位为毫秒 private static final int SESSION_TIMEOUT = 3000; @Before public void before() throws IOException { zkCli = new ZooKeeper(CONNECT_ADRESS, SESSION_TIMEOUT, watchedEvent -> { System.out.println(watchedEvent.getType() + "***** 默认回调函数 *****" + watchedEvent.getPath()); }); } @Test public void set() throws InterruptedException, KeeperException { /** * setData方法的签名如下: * public Stat setData(final String path, byte[] data, int version) * 以下是对其参数的相关说明: * path: * 指定要修改数据的znode路径. * data: * 指定要修改的数据内容. * version: * 指定版本号.这个版本号要和zookeeper集群保存该znode的dataVersion值保持一致哟~ */ Stat stat = zkCli.setData("/yinzhengjie2020", "bigdata".getBytes(), 1); //打印数据长度,很明显"bigdata"的长度为"7". System.out.println(stat.getDataLength()); } }
六.判断znode是否存在
package cn.org.yinzhengjie; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.List; public class ZkClient { //声明一个ZooKeeper的客户端对象 private ZooKeeper zkCli; //定义zookeeper集群的连接地址,注意域名解析哟~ private static final String CONNECT_ADRESS = "hadoop101.yinzhengjie.org.cn:2181,hadoop102.yinzhengjie.org.cn:2181,hadoop103.yinzhengjie.org.cn:2181"; //定义与zookeeper集群会话的超时时间,单位为毫秒 private static final int SESSION_TIMEOUT = 3000; @Before public void before() throws IOException { zkCli = new ZooKeeper(CONNECT_ADRESS, SESSION_TIMEOUT, watchedEvent -> { System.out.println(watchedEvent.getType() + "***** 默认回调函数 *****" + watchedEvent.getPath()); }); } @Test public void exist() throws InterruptedException, KeeperException { /** * exists方法的签名如下: * public Stat exists(String path, boolean watch) * 以下是对其参数的相关说明: * path: * 指定zookeeper中znode的路径. * watch: * 是否需要监视此节点. */ Stat znode = zkCli.exists("/yinzhengjie2019", false); //如果znode存在就打印其数据长度,不存在就提示用户. if (znode == null){ System.out.println("该znode不存在!"); }else{ System.out.println(znode.getDataLength()); } } }
七.删除znode
package cn.org.yinzhengjie; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.List; public class ZkClient { //声明一个ZooKeeper的客户端对象 private ZooKeeper zkCli; //定义zookeeper集群的连接地址,注意域名解析哟~ private static final String CONNECT_ADRESS = "hadoop101.yinzhengjie.org.cn:2181,hadoop102.yinzhengjie.org.cn:2181,hadoop103.yinzhengjie.org.cn:2181"; //定义与zookeeper集群会话的超时时间,单位为毫秒 private static final int SESSION_TIMEOUT = 3000; @Before public void before() throws IOException { zkCli = new ZooKeeper(CONNECT_ADRESS, SESSION_TIMEOUT, watchedEvent -> { System.out.println(watchedEvent.getType() + "***** 默认回调函数 *****" + watchedEvent.getPath()); }); } @Test public void delete() throws InterruptedException, KeeperException { String znodePath = "/yinzhengjie2020"; Stat znode = zkCli.exists(znodePath, false); if (znode != null){ /** * delete方法的签名如下: * public void delete(final String path, int version) * 以下是对其参数的相关说明: * path: * 指定zookeeper中znode的路径. * version: * 指定znode的版本号(该版本号要和zookeeper集群保存该znode的dataVersion值保持一致哟). * * 温馨提示: * 如果您要删除的znode下有子znod是无法直接删除的,需要先删除子znode. */ zkCli.delete(znodePath,znode.getVersion()); } } }
八.循环监听(watch)案例
package cn.org.yinzhengjie; import org.apache.zookeeper.*; import org.junit.Before; import org.junit.Test; import java.io.IOException; public class ZkClient { //声明一个ZooKeeper的客户端对象 private ZooKeeper zkCli; //定义zookeeper集群的连接地址,注意域名解析哟~ private static final String CONNECT_ADRESS = "hadoop101.yinzhengjie.org.cn:2181,hadoop102.yinzhengjie.org.cn:2181,hadoop103.yinzhengjie.org.cn:2181"; //定义与zookeeper集群会话的超时时间,单位为毫秒 private static final int SESSION_TIMEOUT = 3000; @Before public void before() throws IOException { zkCli = new ZooKeeper(CONNECT_ADRESS, SESSION_TIMEOUT, watchedEvent -> { System.out.println(watchedEvent.getType() + "***** 默认回调函数 *****" + watchedEvent.getPath()); }); } @Test public void register() throws KeeperException,InterruptedException { byte[] data = zkCli.getData("/yinzhengjie2020", new Watcher() { @Override public void process(WatchedEvent event) { try { register(); }catch (KeeperException e){ e.printStackTrace(); }catch (InterruptedException e){ e.printStackTrace(); } } }, null); //打印获取到的数据 System.out.println(new String(data)); } @Test public void testRegister(){ try { register(); Thread.sleep(Long.MAX_VALUE); }catch (KeeperException e){ e.printStackTrace(); }catch (InterruptedException e){ e.printStackTrace(); } } }