Zookeeper客户端使用(使用Curator)

 

Zookeeper客户端(使用Curator)

 

 

三、使用curator客户端

在pom.xml中加入依赖

<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-framework</artifactId>
  <version>2.12.0</version>
</dependency>

<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-recipes</artifactId>
  <version>2.11.0</version>
</dependency>

直接上代码:

  1 /**
  2  * Project Name:mk-project <br>
  3  * Package Name:com.suns.zookeeper.curator <br>
  4  *
  5  * @author mk <br>
  6  * Date:2018-10-31 14:03 <br>
  7  */
  8 
  9 package com.suns.zookeeper.curator;
 10 
 11 import org.apache.curator.RetryPolicy;
 12 import org.apache.curator.framework.CuratorFramework;
 13 import org.apache.curator.framework.CuratorFrameworkFactory;
 14 import org.apache.curator.framework.recipes.cache.*;
 15 import org.apache.curator.retry.ExponentialBackoffRetry;
 16 import org.apache.zookeeper.CreateMode;
 17 import org.apache.zookeeper.data.Stat;
 18 
 19 import java.util.concurrent.TimeUnit;
 20 
 21 /**
 22  * curator客户端使用
 23  *
 24  *  和原生zookeeper优点:
 25  *  1.使用api更方便,功能更丰富
 26  *  2.监听节点数据改变或者子节点变化,只需要订阅一次,便可以一直使用。而原生zookeeper的监听是一次性的,需要重复注册。
 27  *  3.链式编程
 28  *
 29  * Curator包含了几个包:
 30  * curator-framework:对zookeeper的底层api的一些封装
 31  * curator-client:提供一些客户端的操作,例如重试策略等
 32  * curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等
 33  * ClassName: ZkCuratorTest <br>
 34  * Description:  <br>
 35  * @author mk
 36  * @Date 2018-10-31 14:03 <br>
 37  * @version
 38  */
 39 public class ZkCuratorTest {
 40 
 41     public static final String connect = "127.0.0.1:2181";
 42     private static CuratorFramework curatorFramework = null;
 43     private static String nodePath = "/curator1";
 44     private static String nodeChildPath = "/curator1/n1/n11/n111/n1111";
 45 
 46     public static void main(String[] args) throws Exception {
 47 
 48         //初始化
 49         init(connect,5000);
 50 
 51         //监听节点数据改变或者子节点变化,只需要订阅一次,便可以一直使用。而原生zookeeper的监听是一次性的,需要重复注册。
 52         listener(nodePath);
 53 
 54         //新增
 55         create(nodePath,"n1");
 56         //递归新增
 57         createRecursion(nodeChildPath,"n1");
 58 
 59         //查询
 60         query(nodePath);
 61 
 62         TimeUnit.SECONDS.sleep(2);
 63 
 64         //修改
 65         update(nodePath,"n11");
 66 
 67         //单个节点删除
 68 //        delete(nodePath);
 69         //递归删除
 70         deleteRecursion(nodePath);
 71 
 72     }
 73 
 74     private static void deleteRecursion(String path) throws Exception {
 75         Void aVoid = curatorFramework.delete().deletingChildrenIfNeeded().forPath(path);
 76         System.out.println("delete:"+"["+path+"],result:"+aVoid);
 77     }
 78 
 79     private static void delete(String path) throws Exception {
 80         Void aVoid = curatorFramework.delete().forPath(path);
 81         System.out.println("delete:"+"["+path+"],result:"+aVoid);
 82 
 83     }
 84 
 85     private static void update(String path, String data) throws Exception {
 86         Stat stat = curatorFramework.setData().forPath(path, data.getBytes());
 87         System.out.println("setData:"+"["+path+"],stat:"+stat);
 88 
 89     }
 90 
 91     private static void query(String path) throws Exception {
 92         byte[] bytes = curatorFramework.getData().forPath(path);
 93         System.out.println("query:"+"["+path+"],result:"+new String(bytes));
 94 
 95     }
 96 
 97     private static void createRecursion(String path,String data) throws Exception {
 98         String result = curatorFramework.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, data.getBytes());
 99         System.out.println("create:"+"["+path+"-->"+data+"],result:"+result);
100 
101     }
102 
103     private static void create(String path, String data) throws Exception {
104 //        Stat stat = curatorFramework.checkExists().forPath(path);
105 //        if(null != stat){
106 //            System.out.println("节点["+path+"]已存在,不能新增");
107 //            return;
108 //        }
109         String result = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, data.getBytes());
110         System.out.println("create:"+"["+path+"-->"+data+"],result:"+result);
111     }
112 
113     private static void listener(String path) throws Exception {
114 
115         //监听节点内容改变
116         final NodeCache nodeCache = new NodeCache(curatorFramework, path);
117         nodeCache.start();
118         /*nodeCache.getListenable().addListener(new NodeCacheListener() {
119             @Override
120             public void nodeChanged() throws Exception {
121                 System.out.println("节点内容发生变化----->"+nodeCache.getCurrentData());
122             }
123         });*/
124 
125         //使用lambda表达式-jdk1.8以上
126         nodeCache.getListenable().addListener(()->{System.out.println("节点内容发生变化----->"+nodeCache.getCurrentData());});
127 
128 
129         //监听子节点改变
130         final PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, path, true);
131         pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
132       /*  pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
133             @Override
134             public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
135                 switch (pathChildrenCacheEvent.getType() ){
136                     case CHILD_ADDED:
137                         System.out.println("新增子节点"+pathChildrenCache.getCurrentData());
138                         break;
139                     case CHILD_UPDATED:
140                         System.out.println("更新子节点"+pathChildrenCache.getCurrentData());
141                         break;
142                     case CHILD_REMOVED:
143                         System.out.println("删除子节点"+pathChildrenCache.getCurrentData());
144                         break;
145                     default:break;
146                 }
147             }
148         });*/
149         //使用lambda表达式-jdk1.8以上
150         pathChildrenCache.getListenable().addListener((curatorFramework,pathChildrenCacheEvent)->
151             {switch (pathChildrenCacheEvent.getType() ){
152                 case CHILD_ADDED:
153                     System.out.println("新增子节点"+pathChildrenCache.getCurrentData());
154                     break;
155                 case CHILD_UPDATED:
156                     System.out.println("更新子节点"+pathChildrenCache.getCurrentData());
157                     break;
158                 case CHILD_REMOVED:
159                     System.out.println("删除子节点"+pathChildrenCache.getCurrentData());
160                     break;
161                 default:break;
162             }});
163 
164 
165     }
166 
167     private static void init(String connect, int sessionTimeout) {
168         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);//重试策略,初始等待1s,重试3次
169         //通过工厂获得CuratorFramework
170         curatorFramework = CuratorFrameworkFactory.builder()
171                 .connectString(connect).connectionTimeoutMs(sessionTimeout).retryPolicy(retryPolicy).build();
172         curatorFramework.start();//开启连接
173         System.out.println("curatorFramework start :" +connect);
174     }
175 
176 }

运行截图:



原文地址:https://www.cnblogs.com/lookupthesky/p/9883188.html