05_zookeeper_原生API使用1(更新)

1. java方式操作远端zookeeper集群概述

步骤:下载zookeeper压缩包并解压, 创建java工程,导入zookeeper相关jar包

(1)下载zookeeper压缩包

http://archive.apache.org/dist/zookeeper/, 下载tar.gz源码包,  并进行解压

(2)创建Java工程

Eclipse ->File->New->Java Project,  输入工程名称,一路Next结束

(3)导入zookeeper相关jar包

* 选中新建的工程并右键,选择 “Build Path”-> “Configure Build Path”

* 切换到”Libraries”,  选择”Add  External Jars”

* 先添加zookeeper-3.4.5.jar

* 然后添加zookeeper所依赖的jar包 (lib目录下的所有jar)

* 导入jar包结束

2. 通过Java API连接zookeeper

(1)   创建测试类CreateSession

(2)  连接zk集群

 1 import org.apache.zookeeper.WatchedEvent;
 2 import org.apache.zookeeper.Watcher;
 3 import org.apache.zookeeper.ZooKeeper;
 4 import java.io.IOException;
 5 
 6 public class CreateSession { //建立和zookeeper集群的连接,并自动周期性发送心跳维护连接
 7     private static ZooKeeper zk;
 8 
 9     public static void main(String args[]) throws IOException, InterruptedException {
10         //zk will connect to zk server, asynchronized
11         zk = new ZooKeeper("192.168.179.101:2181", 5000, new myWatcher()); //myWatcher中覆盖process方法,定义Client端对Server发来的哪些event进行处理以及如何处理
12         Thread.sleep(Integer.MAX_VALUE);
16       }
17     }
18 
19 class myWatcher implements Watcher{
20     @Override
21     public void process(WatchedEvent event) {
22         //handle connected event
23         if ( event.getState()== Event.KeeperState.SyncConnected ) {  //连接建立事件的处理
24             System.out.println("Event: " +  event);
25             System.out.println("=======Client Connected to zookeeper======");
26         }
28     }
29 }

 核心API分析:   zk = new ZooKeeper("192.168.179.101:2181", 5000, new myWatcher());

  1)Zookeeper是API提供的1个类,我们连接zk集群,进行相应的znode操作,都是通过Zookeeper类的实例进行,这个实例就是zk client, 和命令行客户端是同样的角色

  2)ZooKeeper实例的创建需要传递3个参数

  参数1:connectString

  *String类型变量,代表要连接的zk集群服务器,通过逗号分隔,"192.168.179.100:2181,192.168.179.101:2181,192.168.179.102:2181"

  *ZooKeeper实例连接zk集群服务器时,将在给定的服务器中随机选择,并不存在特定的顺序

  参数2:sessionTimeout

 *int型变量,表示Zookeeper实例和zkserver间的超时时间,单位为毫秒

 *连接正常连接后,ZooKeeper实例将自动和zkserver间通过心跳信息来维持连接,不需要我们介入

 参数3:watcher

 *Watcher类实例,通常需要我们自己定义一个类,实现框架提供的Watcher接口中的process方法

 *process方法,本质是一个回调函数,先解释什么是回调函数

 *回调函数理解:打个比方,有一家旅馆提供叫醒服务,但是要求旅客自己决定叫醒的方法。可以是打客房电话,也可以是派服务员去敲门,睡得死怕耽误事的,还可以要求往自己头上浇盆水。“叫醒”这个行为是旅馆提供的,但是叫醒的方式是由旅客决定并告诉旅馆的,也就是回调函数。而旅客告诉旅馆怎么叫醒自己的动作,通常在登记入住的时候完成,称为登记回调函数(to register a callback function)

*再看new myWatcher和process函数:当创建1个ZooKeeper实例时,我们传入了1个myWatcher实例,myWatcher类的内部实现了process方法。本质上就是:我们在 “登记入住”(创建ZooKeeper实例)时,在zk集群这家旅馆 “登记” 1个“通知” 服务(process方法),  并且告诉旅馆,在出现某些特定事情的时候才进行通知,并且我带了一个小弟(myWatcher实例),通知给他就行,他收到通知后会进行相应的处理(myWatcher实例调用process方法)

ZooKeeper实例创建中的联动操作

ZooKeeper实例在创建的过程中,会随机挑选1个zkserver创建连接,但这个动作是异步的

也就是说new ZooKeeper()这个函数,并不是在和zkserver建立好连接后,才结束函数;大多数情况下,函数返回后,和zk集群的连接并没有建立完成

这也是Thread.sleep(Integer.MAX_VALUE)出现的原因:new完了,就让当前这个运行的线程休息,一直等待;当连接真正建立的时候,这个session的连接状态会变化为SyncConnected;

zkserver此时会向对应的Client发送1个连接变化事件, 事件的处理则自动由myWatcher实例这个小弟去调用process方法来搞定

3. 通过Java API创建节点(同步方式)

(1)   创建znode节点

 1 import org.apache.zookeeper.*;
 2 import java.io.IOException;
 3 
 4 public class CreateNode implements Watcher {
 5     private static ZooKeeper zk;
 6 
 7     public static void main(String args[]) throws IOException, InterruptedException {
 8         //zk will connect to zk server, asynchronized
 9         zk = new ZooKeeper("192.168.179.101:2181", 5000, new CreateNode());
10         Thread.sleep(Integer.MAX_VALUE);
12     }
15 @Override 16 public void process(WatchedEvent event) {//Client端处理连接建立事件,处理动作为添加1个永久节点 17 // create persistent node if connected 18 if (event.getState() == Event.KeeperState.SyncConnected) { 19 //创建znode节点 20 try { 21 createNodeSync(); 22 } catch (KeeperException e) { 23 e.printStackTrace(); 24 } catch (InterruptedException e) { 25 e.printStackTrace(); 26 } 27 } 28 29 } 30 31 //create node, synchronized 32 private void createNodeSync() throws KeeperException, InterruptedException { 33 System.out.println("Create node with Sync mode"); 34 String path = zk.create("/node_by_java", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 35 System.out.println("New Node added: " + path); 36 } 38 }

运行该java类,通过终端查看结果:

在zookeeper集群上,通过zkCli.sh客户端连接zookeeper集群,验证节点是否已经添加

核心API分析:  path = zk.create("/node_by_java",   "123".getBytes(),  ZooDefs.Ids.OPEN_ACL_UNSAFE,  CreateMode.PERSISTENT );

public String create(String path,
                     byte[] data,
                     List<ACL> acl,
                     CreateMode createMode)
              throws KeeperException,
                     InterruptedException

参数1 path

String类型,表示要在zk集群上创建的znode的绝对路径

参数2 data

byte数组类型,表示要创建的znode中写入的数据,Java字符串提供getBytes()方法,可以直接将字符串转化为1个byte数组

参数3 acl:

List<ACL>类型,List可以理解为升级版的数组,并且数组中的元素为ACL类型变量,本质上这里指定的是要创建的znode的访问权限

OPEN_ACL_UNSAFE  = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.ALL, ANYONE_ID_UNSAFE)));  本质上是创建了1个允许任何人进行操作的权限

参数4 createMode:

指定要创建的znode类型

CreateMode.PERSISTENT  永久节点

CreateMode.EPHEMERAL   临时节点

CreateMode.PERSISTENT_SEQUENTIAL  永久顺序节点

CreateMode.EPHEMERAL_SEQUENTIAL   临时顺序节点

返回值:path

String类型,被创建的znode节点的实际路径

需要注意的是,这里的create方式是同步方式,也就意味着:当znode创建完成或者创建中出现异常时,函数才会返回

4. 通过Java API查询子节点列表1(同步方式)

 1 import org.apache.zookeeper.*;
 2 import java.io.IOException;
 3 import java.util.List;
 4 
 5 public class GetChildrenSync implements Watcher {
 6     private static ZooKeeper zk;
 7 
 8     public static void main(String args[]) throws IOException, InterruptedException {
 9         //zk will connect to zk server, asynchronized
zk = new ZooKeeper("192.168.179.101:2181", 5000, new GetChildrenSync()); //类实例,该类要实现Watcher类的process函数,定义Client处理哪些zkserver发来的事件event 11 Thread.sleep(Integer.MAX_VALUE); 12 13 } 14 15 @Override 16 public void process(WatchedEvent event) { //框架定义的接口,我们要实现Client处理哪些event,如何处理这些event 17 // 只在连接建立后,查询/的子节点列表 18 if (event.getState() == Event.KeeperState.SyncConnected) { 19 //查询子节点列表 20 try { 21 getChildrenSync(); 22 } catch (KeeperException e) { 23 e.printStackTrace(); 24 } catch (InterruptedException e) { 25 e.printStackTrace(); 26 } 27 } 28 29 } 30 31 //get children , synchronized 32 private void getChildrenSync() throws KeeperException, InterruptedException { 33 System.out.println("Get Children in sync mode"); 34 //false, 不关注子节点列表的变更事件(不注册watcher) 35 List<String> children = zk.getChildren("/", false); 36 System.out.println("Children list of / :" + children); 37 } 38 39 }

 运行java类,通过终端查看结果:

核心API分析:   List<String> children = zk.getChildren("/", false);

public List<String> getChildren(String path,
                                boolean watch)
                         throws KeeperException,
                                InterruptedException

参数1:path
String类型,指明要查询哪个znode的子节点列表
参数2:watch
boolean类型,false表示只是查询,并不需要zkserver在检测到子节点列表发生变化时,进行事件通知(不关注子节点发生变化的Event)
返回值:List<String>
返回子节点列表,每个子节点通过字符串表示,构成一个“数组”

5. 通过Java API查询子节点列表 (同步 + 设置子节点列表变更的watcher)

import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.List;

public class GetChildrenSync implements Watcher {
    private static ZooKeeper zk;
    private String path;

    public static void main(String args[]) throws IOException, InterruptedException {
        //zk will connect to zk server, asynchronized
        zk = new ZooKeeper("192.168.179.101:2181", 5000, new GetChildrenSync());
        Thread.sleep(Integer.MAX_VALUE);

    }

    @Override
    public void process(WatchedEvent event) {
        // “子节点列表发生变化” event的处理
        if(event.getType() == Event.EventType.NodeChildrenChanged) {
            //再次获取子节点列表
            try {
                List<String> new_children = zk.getChildren(event.getPath(), true); //event.getPath()返回 哪个znode的子节点列表发生了变化
                System.out.println(new_children);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        else {
            // 连接建立事件的处理
            if (event.getState() == Event.KeeperState.SyncConnected) {
                //查询子节点列表
                try {
                    getChildrenSync(); //设置关注子节点列表
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    //get children , synchronized
    private void getChildrenSync() throws KeeperException, InterruptedException {
        System.out.println("Get Children in sync mode");
        //true, 关注子节点列表的变更(注册子节点变更watcher)
        List<String> children = zk.getChildren("/", true); //关注“/”子节点列表变化event
        System.out.println("Children list of / :" + children);
    }

}

分析:

代码运行时,会首先建立和zkserver的连接,第一次会查询/的子节点列表,并设置了/路径子节点变更的watcher;

通过命令行方式在/添加1个节点,此时zkserver会向java cliet发送子节点发生变化的事件,此时process函数被再次触发,并且执行再次获取子节点列表的操作

要注意process中event的设计顺序,想想看如果检测Event的state是否为SyncConnected是process中首先出现的检测代码,会出现什么情况?(先挖坑)

开始填坑:

zookeeper框架中的WatchedEvent类型的event,  携带了3类信息

1)当前连接的状态,   event.getState() 可以获取:已连接,已断开等

2)事件类型 event.getType() 可以获取: 子节点列表发生变化,节点数据内容发生变化

3)事件关联的znode节点 event.getPath()   可以获取该znode的绝对路径: 子节点列表发生变化,则关联的znode就是父节点

当client和zk集群刚刚建立连接时,zk会向client发送1个连接建立事件,此时事件的连接状态为connected, 事件类型为EventType.None,  事件关联的节点为空(event.getPath==null)

当子节点列表变化的事件发生时,该事件的连接状态也为connected, 事件类型为EventType.NodeChildrenChanged, 事件关联的节点为/

如果将 event.getState() == Event.KeeperState.SyncConnected放在process函数的开始,则只会执行连接建立时的逻辑,并不会执行子节点变更的处理逻辑

严格来说,连接刚刚建立时的逻辑处理应该进行修改,添加event.getType和event.getPath()来更加精确的描述 “连接刚刚建立”

        else {
            // 连接刚刚建立事件的处理
            if (event.getState() == Event.KeeperState.SyncConnected) {
          if(event.getType()==Event.EventType.None && event.getPath()==null){

//查询子节点列表
                      try {
                            getChildrenSync(); //设置关注子节点列表
                      } catch (KeeperException e) {
                             e.printStackTrace();
                      } catch (InterruptedException e) {
                             e.printStackTrace();
                      }
                 }//
}

 Event的状态,类型总结

* 在 Watcher 接口里面,除了回调函数 process 以外,还包含 Event.KeeperState 和 Event.EventType 两个枚举类,分别代表了通知状态和事件类型

6. 查询节点数据(同步方式)

 1 import org.apache.zookeeper.KeeperException;
 2 import org.apache.zookeeper.WatchedEvent;
 3 import org.apache.zookeeper.Watcher;
 4 import org.apache.zookeeper.ZooKeeper;
 5 import org.apache.zookeeper.data.Stat;
 6 
 7 import java.io.IOException;
 8 import java.util.List;
 9 
10 public class GetDataSync implements Watcher {
11     private static ZooKeeper zk;
12     private String path;
13 
14     public static void main(String args[]) throws IOException, InterruptedException {
15         //zk will connect to zk server, asynchronized
16         zk = new ZooKeeper("192.168.179.101:2181", 5000, new GetDataSync());
17         Thread.sleep(Integer.MAX_VALUE);
18 
19     }
20 
21     @Override
22     public void process(WatchedEvent event) {
23         // 连接建立后,获取给定节点数据
24         if (event.getState() == Event.KeeperState.SyncConnected) {
25             // 连接刚刚建立
26             if (event.getType() == Event.EventType.None && event.getPath() == null) {//连接建立后,查询给定路径的znode的数据
27                 //查询给定路径的znode数据
28                 try {
29                     getNodeData("/node_by_java");
30                 } catch (KeeperException e) {
31                     e.printStackTrace();
32                 } catch (InterruptedException e) {
33                     e.printStackTrace();
34                 }
35 
36             }
37             else if(event.getType()== Event.EventType.NodeDataChanged){
38                 //节点数据发生变化事件
39                 //获取节点的新数据,并再次关注节点数据发生变化的事件
40                 Stat stat = new Stat();
41                 byte[] data = new byte[0];
42                 try {
43                     data = zk.getData(event.getPath(), true, stat);
44                 } catch (KeeperException e) {
45                     e.printStackTrace();
46                 } catch (InterruptedException e) {
47                     e.printStackTrace();
48                 }
49                 System.out.println("Updated data is: " + new String(data));
50             }
51         }
52 
53     }//process
54 
55 
56     //get node data
57     private void getNodeData(String path) throws KeeperException, InterruptedException {
58         System.out.println("Get Node data in sync mode");
60         Stat stat = new Stat();    //创建1个空状态
61         byte[] data = zk.getData(path, false, stat);    //stat会被更新为节点的最新状态, false表示不关注节点数据发生变更的事件
//byte[] data = zk.getData(path, true, stat); //true表示关注节点数据发生变更的事件,注意一次性
62 String data2string = new String(data); 63 System.out.println("Data of " + path + "is: " + data2string); 64 } 65 66 }

 首先不关注节点数据发生变更,看能够正常获取到znode数据

 然后修改为关注节点数据发生变更,通过命令行方式修改数据,查看是否再次获取到更新后的节点数据

 7. 删除节点(同步方式)

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.List;

public class DeleteNodeSync implements Watcher {
    private static ZooKeeper zk;
    private String path;
    private int version;

    public static void main(String args[]) throws IOException, InterruptedException {
        //zk will connect to zk server, asynchronized
        zk = new ZooKeeper("192.168.179.101:2181", 5000, new DeleteNodeSync());
        Thread.sleep(Integer.MAX_VALUE);

    }

    @Override
    public void process(WatchedEvent event) {
        // 连接建立后,删除给定路径的znode
        if (event.getState() == Event.KeeperState.SyncConnected) {
            // 连接刚刚建立
            if (event.getType() == Event.EventType.None && event.getPath() == null) {
                //查询子节点列表
                try {
                    delNode("/node_by_java");
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }

    }//process


    //get node data
    private void delNode(String path) throws KeeperException, InterruptedException {
        System.out.println("Delete Node in sync mode");
        //删除给定路径的znode
        zk.delete(path, -1);  //删除指定路径, 指定dataversion的znode, 如果version指定-1,则删除节点时不进行dataversion校验
        System.out.println("Node deleted: "+ path);
        //删除后再次查询/子节点列表
        List<String> children = zk.getChildren("/", false);
        System.out.println("Children list of / is" + children);
    }

}

 

原文地址:https://www.cnblogs.com/shay-zhangjin/p/7764630.html