七:zooKeeper开源客户端ZkClient的api测试

ZkClient是Gitthub上一个开源的ZooKeeper客户端。ZKClient在ZooKeeper原生API接口之上进行了包装,是一个更加易用的ZooKeeper客户端。同时ZKClient在内部实现诸如Session超时重连,Watcher反复注册等功能。

一:maven依赖

 1  <dependency>
 2       <groupId>org.apache.zookeeper</groupId>
 3       <artifactId>zookeeper</artifactId>
 4       <version>3.4.6</version>
 5     </dependency>
 6     <dependency>
 7       <groupId>com.101tec</groupId>
 8       <artifactId>zkclient</artifactId>
 9       <version>0.5</version>
10     </dependency>
View Code

二:一个java存储实体

 1 package com.yeepay.sxf.testZkClient;
 2 
 3 import java.io.Serializable;
 4 
 5 
 6 /**
 7  * 用户对象
 8  * @author sxf
 9  *
10  */
11 public class User implements Serializable{
12     private int id;
13     private String name;
14     private String address;
15     
16     
17     public int getId() {
18         return id;
19     }
20     public void setId(int id) {
21         this.id = id;
22     }
23     public String getName() {
24         return name;
25     }
26     public void setName(String name) {
27         this.name = name;
28     }
29     public String getAddress() {
30         return address;
31     }
32     public void setAddress(String address) {
33         this.address = address;
34     }
35     
36 }
View Code

三:测试源代码

  1 package com.yeepay.sxf.testZkClient;
  2 
  3 import java.util.List;
  4 
  5 import org.I0Itec.zkclient.IZkChildListener;
  6 import org.I0Itec.zkclient.IZkDataListener;
  7 import org.I0Itec.zkclient.ZkClient;
  8 import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
  9 import org.apache.zookeeper.CreateMode;
 10 import org.apache.zookeeper.data.Stat;
 11 /**
 12  * zkclient的api测试
 13  * acl权限相关操作和原生zookeeper客户端一样
 14  * @author sxf
 15  *
 16  */
 17 public class CreateSession {
 18 
 19     public static void main(String[] args) throws InterruptedException {
 20         //第一个参数:zk的ip:端口号
 21         //会话过期时间
 22         //链接超时时间
 23         //序列化器(将java对象转化成byte数组,得到时从byte数组反序列化成java对象)这个序列化器在事件订阅上,修改数据一定要保持一致,否则无效果
 24         //ZkClient zkClient=new ZkClient("10.151.30.75:2181", 10000, 10000, new SerializableSerializer());
 25         
 26         //这个客户端链接,传入的序列化器皿等于什么都没做
 27         ZkClient zkClient=new ZkClient("10.151.30.75:2181", 10000, 10000, new BytesPushThroughSerializer());
 28         
 29         //创建一个节点
 30         //createNode(zkClient);
 31         
 32         //获取一个节点中存取的值
 33         //getNodeValue(zkClient);
 34         
 35         //获取一个节点下的子节点列表
 36         //getNodeChildList(zkClient);
 37         
 38         //判断一个节点是否存在
 39         //isNodeExists(zkClient);
 40         
 41         //删除一个节点
 42         //deleteNode(zkClient);
 43         
 44         //修改一个节点的数据
 45         //updateNodeValue(zkClient);
 46         
 47         //为某个节点的子节点列表发生变化订阅一个事件
 48         //whenChildListChangeThenDoSomeThing(zkClient);
 49         
 50         //为某个节点存储的值发生变化或者节点被删除订阅一个事件
 51         whenNodeValueChangeThenDoThing(zkClient);
 52         
 53         Thread.sleep(Integer.MAX_VALUE);
 54     }
 55     
 56     
 57     /**
 58      * 创建一个node节点
 59      * @param zkClient
 60      */
 61     public static void createNode(ZkClient zkClient){
 62         //user 对象必须实现序列化接口Serializable
 63         User user=new User();
 64         user.setId(1);
 65         user.setName("shangxiaofei");
 66         user.setAddress("smx");
 67         //(节点路径,节点数据,持久节点)
 68         String path=zkClient.create("/node_131", user, CreateMode.PERSISTENT);
 69         System.out.println("CreateSession.createNode(创建节点路径为:)"+path);
 70     }
 71     
 72     /**
 73      * 获取一个节点中存取的值
 74      * @param zkClient
 75      */
 76     public static void getNodeValue(ZkClient zkClient){
 77         
 78         Stat stat=new Stat();
 79         //获取一个节点中存取的值
 80         User user=zkClient.readData("/node_131",stat);
 81         System.out.println("CreateSession.getNodeValue(id==>)"+user.getId());
 82         System.out.println("CreateSession.getNodeValue(name==>)"+user.getName());
 83         System.out.println("CreateSession.getNodeValue(address==>)"+user.getAddress());
 84         System.out.println("CreateSession.getNodeValue(节点状态==>)"+stat);
 85     }
 86     
 87     /**
 88      * 获取某个节点的子节点列表
 89      * @param zkClient
 90      */
 91     public static void getNodeChildList(ZkClient zkClient){
 92         
 93         List<String> nodesList=zkClient.getChildren("/");
 94         System.out.println("CreateSession.getNodeChildList(“/”下的子节点列表为=>)"+nodesList);
 95     }
 96     
 97     /**
 98      * 检测某一个节点是否存在
 99      * @param zkClient
100      */
101     public static void isNodeExists(ZkClient zkClient){
102         boolean flag=zkClient.exists("/node_132");
103         System.out.println("CreateSession.isNodeExists(节点是否存在的结果)"+flag);
104     }
105     
106     
107     /**
108      * 删除一个节点或删除存在子节点的节点
109      * @param zkClient
110      */
111     public static void deleteNode(ZkClient zkClient){
112         //删除无子节点的节点。如果存在子节点,抛出异常
113         boolean flag=zkClient.delete("/node_131");
114         System.out.println("CreateSession.deleteNode(删除无子节点的节点结果==>)"+flag);
115             
116         //删除存在子节点的节点。先深入子节点,删除掉所有子节点的节点,再删除当前节点
117         boolean flag2=zkClient.deleteRecursive("/node_131");
118         System.out.println("CreateSession.deleteNode(删除存在子节点的节点结果==>)"+flag2);
119     }
120     
121     /**
122      * 修改一个数据节点中存储的值
123      * @param zkClient
124      */
125     public static void updateNodeValue(ZkClient zkClient){
126         User user=new User();
127         user.setId(100);
128         user.setName("shangxiaoshuai");
129         user.setAddress("smxcyxzyc");
130         //修改一个节点中存储的数据
131         zkClient.writeData("/node_131", user);
132         
133         //修改一个节点中存储的数据,实现类似乐观锁的功能
134         zkClient.writeData("/node_131", user, 1);
135     }
136     
137     
138     /**
139      * 当子节点列表发生变化,则触发一个事件
140      */
141     public static void whenChildListChangeThenDoSomeThing(ZkClient zkClient){
142         /**
143          * 可以监听(1)子节点列表发生变化,增加新子节点,删除子节点
144          * (2)节点本身被删除
145          *    (3)节点本身被创建,意味着可以订阅一个并不存在的节点的监听事件
146          */
147                         
148         zkClient.subscribeChildChanges("/node_132", new WhenChildChangeThing());
149     }
150     /**
151      * 当子节点列表发生变化的事件实现类
152      * @author sxf
153      *
154      */
155     private static class WhenChildChangeThing implements IZkChildListener{
156 
157         @Override
158         public void handleChildChange(String parentPath,List<String> currentChilds) throws Exception {
159                 System.out.println("节点列表变化的父节点路径==>"+parentPath);
160                 System.out.println("当前的节点列表==>"+currentChilds);
161         }
162         
163     }
164     
165     /**
166      * 为节点数据内容发生变化订阅一个事件
167      * @param zkClient
168      */
169     public static void whenNodeValueChangeThenDoThing(ZkClient zkClient){
170         zkClient.subscribeDataChanges("/node_132", new WhenNodeValueChange());
171     }
172     /**
173      * 当数据节点存储的值发生变化的事件监听类
174      * @author sxf
175      *
176      */
177     private static class WhenNodeValueChange implements IZkDataListener{
178 
179         //当节点数据内容发生变化这个函数被调用
180         @Override
181         public void handleDataChange(String dataPath, Object data)
182                 throws Exception {
183             System.out.println("节点数据内容发生变化,变化后的值为==>"+data);
184             System.out.println("节点数据内容发生变化,变化节点的路径==>"+dataPath);
185         }
186 
187         //当节点被删除时这个函数被调用
188         @Override
189         public void handleDataDeleted(String dataPath) throws Exception {
190             System.out.println("节点被删除的路径为==>"+dataPath);
191         }
192         
193     }
194 }
View Code
原文地址:https://www.cnblogs.com/shangxiaofei/p/5219937.html