大数据学习06_zookeeper3_javaAPI操作

这里操作Zookeeper的JavaAPI使用的是一套zookeeper客户端框架 Curator ,解决了很多 Zookeeper客户端非常底层的细节开发工作

Curator包含了几个包:

  • curator-framework:对zookeeper的底层api的一些封装
  • curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式 计数器等

先创建一个java工程,并且导入maven依赖。Maven依赖(使用curator的版本:2.12.0,对应Zookeeper的版本为:3.4.x,如果跨版本会有兼 容性问题,很有可能导致节点操作失败)

 1 <dependencies>
 2         <dependency>
 3             <groupId>org.apache.curator</groupId>
 4             <artifactId>curator-framework</artifactId>
 5             <version>2.12.0</version>
 6         </dependency>
 7         <dependency>
 8             <groupId>org.apache.curator</groupId>
 9             <artifactId>curator-recipes</artifactId>
10             <version>2.12.0</version>
11         </dependency>
12         <dependency>
13             <groupId>com.google.collections</groupId>
14             <artifactId>google-collections</artifactId>
15             <version>1.0</version>
16         </dependency>
17         <dependency>
18             <groupId>junit</groupId>
19 
20             <artifactId>junit</artifactId>
21             <version>RELEASE</version>
22         </dependency>
23         <dependency>
24             <groupId>org.slf4j</groupId>
25             <artifactId>slf4j-simple</artifactId>
26             <version>1.7.25</version>
27         </dependency>
28     </dependencies>
29     <build>
30         <plugins>
31             <!-- java编译插件 -->
32             <plugin>
33                 <groupId>org.apache.maven.plugins</groupId>
34                 <artifactId>maven-compiler-plugin</artifactId>
35                 <version>3.2</version>
36                 <configuration>
37                     <source>1.8</source>
38                     <target>1.8</target>
39                     <encoding>UTF-8</encoding>
40                 </configuration>
41             </plugin>
42         </plugins>
43     </build>
maven依赖

如果改变了curator要手动重新reimport

 接下来进行API操作:

  • 创建永久节点
 1 @Test
 2 public void createNode() throws Exception {
 3    RetryPolicy retryPolicy = new  ExponentialBackoffRetry(1000, 1);
 4    //获取客户端对象
 5    CuratorFramework client =    
 6 CuratorFrameworkFactory.newClient("192.168.xxx.xxx:2181,192.168.xxx.xxx:2
 7 181,192.168.xxx.xxx:2181", 1000, 1000, retryPolicy);
 8     
 9   //调用start开启客户端操作
10   client.start();
11     
12   //通过create来进行创建节点,并且需要指定节点类型
13   client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTEN
14 T).forPath("/hello3/world");
15 client.close();
16 }
  • 创建临时节点
 1 public void createNode2() throws Exception {
 2  RetryPolicy retryPolicy = new  ExponentialBackoffRetry(3000, 1);
 3   CuratorFramework client =
 4 CuratorFrameworkFactory.newClient("node01:2181,node02:2181,node03:2181",
 5 3000, 3000, retryPolicy);
 6 client.start();
 7 client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).
 8 forPath("/hello5/world");
 9 Thread.sleep(5000);
10 client.close();
11 }
  • 修改节点数据
 1 @Test
 2  public void nodeData() throws Exception {
 3  RetryPolicy retryPolicy = new  ExponentialBackoffRetry(3000, 1);
 4  CuratorFramework client =
 5 CuratorFrameworkFactory.newClient("node01:2181,node02:2181,node03:2181",
 6 3000, 3000, retryPolicy);
 7  client.start();
 8  client.setData().forPath("/hello5", "hello7".getBytes());
 9  client.close();
10 }
  • 节点数据查询
 1 @Test
 2  public void updateNode() throws Exception {
 3 RetryPolicy retryPolicy = new  ExponentialBackoffRetry(3000, 1);
 4  CuratorFramework client =
 5 CuratorFrameworkFactory.newClient("node01:2181,node02:2181,node03:2181",
 6 3000, 3000, retryPolicy);
 7  client.start();
 8  byte[] forPath = client.getData().forPath("/hello3");
 9  System.out.println(new String(forPath));
10  client.close();
11  }
  • 节点watch机制

 1 @Test
 2     public void watchZnode() throws Exception {
 3         RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
 4         String connectstr="192.168.xx.xxx:2181,192.168.xx.xxx:2181,192.168.xx.xxx:2181";
 5         CuratorFramework client =
 6                 CuratorFrameworkFactory.newClient(connectstr, 8000, 8000, policy);
 7         client.start();
 8         // ExecutorService pool = Executors.newCachedThreadPool();
 9         //设置节点的cache
10 
11         TreeCache treeCache = new TreeCache(client, "/hello3");
12         //设置监听器和处理过程
13         treeCache.getListenable().addListener(new TreeCacheListener()
14         {
15             @Override
16             public void childEvent(CuratorFramework client,
17                                    TreeCacheEvent event) throws Exception {
18                 ChildData data = event.getData();
19                 if(data !=null){
20                     switch (event.getType()) {
21                         case NODE_ADDED:
22                             System.out.println("NODE_ADDED : "+
23                                     data.getPath() +" 数据:"+ new String(data.getData())+"监控到有新增节点");
24                             break;
25                         case NODE_REMOVED:
26                             System.out.println("NODE_REMOVED : "+
27                                     data.getPath() +" 数据:"+ new String(data.getData())+"监控到移除节点");
28                             break;
29                         case NODE_UPDATED:
30                             System.out.println("NODE_UPDATED : "+
31                                     data.getPath() +" 数据:"+ new String(data.getData())+"监控到有更新节点");
32                             break;
33 
34                         default:
35                             break;
36 
37                     }
38                 }else{
39                     System.out.println( "data is null : "+
40                             event.getType());
41                 }
42             }
43         });
44         //开始监听
45         treeCache.start();
46         Thread.sleep(500000);
47     }
原文地址:https://www.cnblogs.com/g414056667/p/13568158.html