1. java方式操作远端zookeeper集群概述
步骤:下载zookeeper压缩包并解压, 创建java工程,导入zookeeper相关jar包
(1)下载zookeeper压缩包
http://archive.apache.org/dist/zookeeper/, 下载tar.gz源码包, 并进行解压
(2)创建Java工程
Eclipse ->File->New->Java Project, 输入工程名称,一路Next结束
(3)导入zookeeper相关jar包
* 选中新建的工程并右键,选择 “Build Path”-> “Configure Build Path”
* 切换到”Libraries”, 选择”Add External Jars”
* 先添加zookeeper-3.4.5.jar
* 然后添加zookeeper所依赖的jar包 (lib目录下的所有jar)
* 导入jar包结束
2. 通过Java API连接zookeeper
(1) 创建测试类CreateSession
(2) 连接zk集群
1 import org.apache.zookeeper.WatchedEvent; 2 import org.apache.zookeeper.Watcher; 3 import org.apache.zookeeper.ZooKeeper; 4 import java.io.IOException; 5 6 public class CreateSession { //建立和zookeeper集群的连接,并自动周期性发送心跳维护连接 7 private static ZooKeeper zk; 8 9 public static void main(String args[]) throws IOException, InterruptedException { 10 //zk will connect to zk server, asynchronized 11 zk = new ZooKeeper("192.168.179.101:2181", 5000, new myWatcher()); //myWatcher中覆盖process方法,定义Client端对Server发来的哪些event进行处理以及如何处理 12 Thread.sleep(Integer.MAX_VALUE); 16 } 17 } 18 19 class myWatcher implements Watcher{ 20 @Override 21 public void process(WatchedEvent event) { 22 //handle connected event 23 if ( event.getState()== Event.KeeperState.SyncConnected ) { //连接建立事件的处理 24 System.out.println("Event: " + event); 25 System.out.println("=======Client Connected to zookeeper======"); 26 } 28 } 29 }
核心API分析: zk = new ZooKeeper("192.168.179.101:2181", 5000, new myWatcher());
1)Zookeeper是API提供的1个类,我们连接zk集群,进行相应的znode操作,都是通过Zookeeper类的实例进行,这个实例就是zk client, 和命令行客户端是同样的角色
2)ZooKeeper实例的创建需要传递3个参数
参数1:connectString
*String类型变量,代表要连接的zk集群服务器,通过逗号分隔,"192.168.179.100:2181,192.168.179.101:2181,192.168.179.102:2181"
*ZooKeeper实例连接zk集群服务器时,将在给定的服务器中随机选择,并不存在特定的顺序
参数2:sessionTimeout
*int型变量,表示Zookeeper实例和zkserver间的超时时间,单位为毫秒
*连接正常连接后,ZooKeeper实例将自动和zkserver间通过心跳信息来维持连接,不需要我们介入
参数3:watcher
*Watcher类实例,通常需要我们自己定义一个类,实现框架提供的Watcher接口中的process方法
*process方法,本质是一个回调函数,先解释什么是回调函数
*回调函数理解:打个比方,有一家旅馆提供叫醒服务,但是要求旅客自己决定叫醒的方法。可以是打客房电话,也可以是派服务员去敲门,睡得死怕耽误事的,还可以要求往自己头上浇盆水。“叫醒”这个行为是旅馆提供的,但是叫醒的方式是由旅客决定并告诉旅馆的,也就是回调函数。而旅客告诉旅馆怎么叫醒自己的动作,通常在登记入住的时候完成,称为登记回调函数(to register a callback function)
*再看new myWatcher和process函数:当创建1个ZooKeeper实例时,我们传入了1个myWatcher实例,myWatcher类的内部实现了process方法。本质上就是:我们在 “登记入住”(创建ZooKeeper实例)时,在zk集群这家旅馆 “登记” 1个“通知” 服务(process方法), 并且告诉旅馆,在出现某些特定事情的时候才进行通知,并且我带了一个小弟(myWatcher实例),通知给他就行,他收到通知后会进行相应的处理(myWatcher实例调用process方法)
ZooKeeper实例创建中的联动操作
ZooKeeper实例在创建的过程中,会随机挑选1个zkserver创建连接,但这个动作是异步的
也就是说new ZooKeeper()这个函数,并不是在和zkserver建立好连接后,才结束函数;大多数情况下,函数返回后,和zk集群的连接并没有建立完成
这也是Thread.sleep(Integer.MAX_VALUE)出现的原因:new完了,就让当前这个运行的线程休息,一直等待;当连接真正建立的时候,这个session的连接状态会变化为SyncConnected;
zkserver此时会向对应的Client发送1个连接变化事件, 事件的处理则自动由myWatcher实例这个小弟去调用process方法来搞定
3. 通过Java API创建节点(同步方式)
(1) 创建znode节点
1 import org.apache.zookeeper.*; 2 import java.io.IOException; 3 4 public class CreateNode implements Watcher { 5 private static ZooKeeper zk; 6 7 public static void main(String args[]) throws IOException, InterruptedException { 8 //zk will connect to zk server, asynchronized 9 zk = new ZooKeeper("192.168.179.101:2181", 5000, new CreateNode()); 10 Thread.sleep(Integer.MAX_VALUE); 12 }
15 @Override 16 public void process(WatchedEvent event) {//Client端处理连接建立事件,处理动作为添加1个永久节点 17 // create persistent node if connected 18 if (event.getState() == Event.KeeperState.SyncConnected) { 19 //创建znode节点 20 try { 21 createNodeSync(); 22 } catch (KeeperException e) { 23 e.printStackTrace(); 24 } catch (InterruptedException e) { 25 e.printStackTrace(); 26 } 27 } 28 29 } 30 31 //create node, synchronized 32 private void createNodeSync() throws KeeperException, InterruptedException { 33 System.out.println("Create node with Sync mode"); 34 String path = zk.create("/node_by_java", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 35 System.out.println("New Node added: " + path); 36 } 38 }
运行该java类,通过终端查看结果:
在zookeeper集群上,通过zkCli.sh客户端连接zookeeper集群,验证节点是否已经添加
核心API分析: path = zk.create("/node_by_java", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException
参数1 path:
String类型,表示要在zk集群上创建的znode的绝对路径
参数2 data:
byte数组类型,表示要创建的znode中写入的数据,Java字符串提供getBytes()方法,可以直接将字符串转化为1个byte数组
参数3 acl:
List<ACL>类型,List可以理解为升级版的数组,并且数组中的元素为ACL类型变量,本质上这里指定的是要创建的znode的访问权限
OPEN_ACL_UNSAFE = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.ALL, ANYONE_ID_UNSAFE))); 本质上是创建了1个允许任何人进行操作的权限
参数4 createMode:
指定要创建的znode类型
CreateMode.PERSISTENT 永久节点
CreateMode.EPHEMERAL 临时节点
CreateMode.PERSISTENT_SEQUENTIAL 永久顺序节点
CreateMode.EPHEMERAL_SEQUENTIAL 临时顺序节点
返回值:path
String类型,被创建的znode节点的实际路径
需要注意的是,这里的create方式是同步方式,也就意味着:当znode创建完成或者创建中出现异常时,函数才会返回
4. 通过Java API查询子节点列表1(同步方式)
1 import org.apache.zookeeper.*; 2 import java.io.IOException; 3 import java.util.List; 4 5 public class GetChildrenSync implements Watcher { 6 private static ZooKeeper zk; 7 8 public static void main(String args[]) throws IOException, InterruptedException { 9 //zk will connect to zk server, asynchronized
zk = new ZooKeeper("192.168.179.101:2181", 5000, new GetChildrenSync()); //类实例,该类要实现Watcher类的process函数,定义Client处理哪些zkserver发来的事件event 11 Thread.sleep(Integer.MAX_VALUE); 12 13 } 14 15 @Override 16 public void process(WatchedEvent event) { //框架定义的接口,我们要实现Client处理哪些event,如何处理这些event 17 // 只在连接建立后,查询/的子节点列表 18 if (event.getState() == Event.KeeperState.SyncConnected) { 19 //查询子节点列表 20 try { 21 getChildrenSync(); 22 } catch (KeeperException e) { 23 e.printStackTrace(); 24 } catch (InterruptedException e) { 25 e.printStackTrace(); 26 } 27 } 28 29 } 30 31 //get children , synchronized 32 private void getChildrenSync() throws KeeperException, InterruptedException { 33 System.out.println("Get Children in sync mode"); 34 //false, 不关注子节点列表的变更事件(不注册watcher) 35 List<String> children = zk.getChildren("/", false); 36 System.out.println("Children list of / :" + children); 37 } 38 39 }
运行java类,通过终端查看结果:
核心API分析: List<String> children = zk.getChildren("/", false);
public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException
参数1:path
String类型,指明要查询哪个znode的子节点列表
参数2:watch
boolean类型,false表示只是查询,并不需要zkserver在检测到子节点列表发生变化时,进行事件通知(不关注子节点发生变化的Event)
返回值:List<String>
返回子节点列表,每个子节点通过字符串表示,构成一个“数组”
5. 通过Java API查询子节点列表 (同步 + 设置子节点列表变更的watcher)
import org.apache.zookeeper.*; import java.io.IOException; import java.util.List; public class GetChildrenSync implements Watcher { private static ZooKeeper zk; private String path; public static void main(String args[]) throws IOException, InterruptedException { //zk will connect to zk server, asynchronized zk = new ZooKeeper("192.168.179.101:2181", 5000, new GetChildrenSync()); Thread.sleep(Integer.MAX_VALUE); } @Override public void process(WatchedEvent event) { // “子节点列表发生变化” event的处理 if(event.getType() == Event.EventType.NodeChildrenChanged) { //再次获取子节点列表 try { List<String> new_children = zk.getChildren(event.getPath(), true); //event.getPath()返回 哪个znode的子节点列表发生了变化 System.out.println(new_children); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } else { // 连接建立事件的处理 if (event.getState() == Event.KeeperState.SyncConnected) { //查询子节点列表 try { getChildrenSync(); //设置关注子节点列表 } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } } //get children , synchronized private void getChildrenSync() throws KeeperException, InterruptedException { System.out.println("Get Children in sync mode"); //true, 关注子节点列表的变更(注册子节点变更watcher) List<String> children = zk.getChildren("/", true); //关注“/”子节点列表变化event System.out.println("Children list of / :" + children); } }
分析:
代码运行时,会首先建立和zkserver的连接,第一次会查询/的子节点列表,并设置了/路径子节点变更的watcher;
通过命令行方式在/添加1个节点,此时zkserver会向java cliet发送子节点发生变化的事件,此时process函数被再次触发,并且执行再次获取子节点列表的操作
要注意process中event的设计顺序,想想看如果检测Event的state是否为SyncConnected是process中首先出现的检测代码,会出现什么情况?(先挖坑)
开始填坑:
zookeeper框架中的WatchedEvent类型的event, 携带了3类信息:
1)当前连接的状态, event.getState() 可以获取:已连接,已断开等
2)事件类型, event.getType() 可以获取: 子节点列表发生变化,节点数据内容发生变化
3)事件关联的znode节点, event.getPath() 可以获取该znode的绝对路径: 子节点列表发生变化,则关联的znode就是父节点
当client和zk集群刚刚建立连接时,zk会向client发送1个连接建立事件,此时事件的连接状态为connected, 事件类型为EventType.None, 事件关联的节点为空(event.getPath==null)
当子节点列表变化的事件发生时,该事件的连接状态也为connected, 事件类型为EventType.NodeChildrenChanged, 事件关联的节点为/
如果将 event.getState() == Event.KeeperState.SyncConnected放在process函数的开始,则只会执行连接建立时的逻辑,并不会执行子节点变更的处理逻辑
严格来说,连接刚刚建立时的逻辑处理应该进行修改,添加event.getType和event.getPath()来更加精确的描述 “连接刚刚建立”
else {
// 连接刚刚建立事件的处理
if (event.getState() == Event.KeeperState.SyncConnected) {
if(event.getType()==Event.EventType.None && event.getPath()==null){
//查询子节点列表
try { getChildrenSync(); //设置关注子节点列表 } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }
}//
}
Event的状态,类型总结
* 在 Watcher 接口里面,除了回调函数 process 以外,还包含 Event.KeeperState 和 Event.EventType 两个枚举类,分别代表了通知状态和事件类型
6. 查询节点数据(同步方式)
1 import org.apache.zookeeper.KeeperException; 2 import org.apache.zookeeper.WatchedEvent; 3 import org.apache.zookeeper.Watcher; 4 import org.apache.zookeeper.ZooKeeper; 5 import org.apache.zookeeper.data.Stat; 6 7 import java.io.IOException; 8 import java.util.List; 9 10 public class GetDataSync implements Watcher { 11 private static ZooKeeper zk; 12 private String path; 13 14 public static void main(String args[]) throws IOException, InterruptedException { 15 //zk will connect to zk server, asynchronized 16 zk = new ZooKeeper("192.168.179.101:2181", 5000, new GetDataSync()); 17 Thread.sleep(Integer.MAX_VALUE); 18 19 } 20 21 @Override 22 public void process(WatchedEvent event) { 23 // 连接建立后,获取给定节点数据 24 if (event.getState() == Event.KeeperState.SyncConnected) { 25 // 连接刚刚建立 26 if (event.getType() == Event.EventType.None && event.getPath() == null) {//连接建立后,查询给定路径的znode的数据 27 //查询给定路径的znode数据 28 try { 29 getNodeData("/node_by_java"); 30 } catch (KeeperException e) { 31 e.printStackTrace(); 32 } catch (InterruptedException e) { 33 e.printStackTrace(); 34 } 35 36 } 37 else if(event.getType()== Event.EventType.NodeDataChanged){ 38 //节点数据发生变化事件 39 //获取节点的新数据,并再次关注节点数据发生变化的事件 40 Stat stat = new Stat(); 41 byte[] data = new byte[0]; 42 try { 43 data = zk.getData(event.getPath(), true, stat); 44 } catch (KeeperException e) { 45 e.printStackTrace(); 46 } catch (InterruptedException e) { 47 e.printStackTrace(); 48 } 49 System.out.println("Updated data is: " + new String(data)); 50 } 51 } 52 53 }//process 54 55 56 //get node data 57 private void getNodeData(String path) throws KeeperException, InterruptedException { 58 System.out.println("Get Node data in sync mode"); 60 Stat stat = new Stat(); //创建1个空状态 61 byte[] data = zk.getData(path, false, stat); //stat会被更新为节点的最新状态, false表示不关注节点数据发生变更的事件
//byte[] data = zk.getData(path, true, stat); //true表示关注节点数据发生变更的事件,注意一次性
62 String data2string = new String(data); 63 System.out.println("Data of " + path + "is: " + data2string); 64 } 65 66 }
首先不关注节点数据发生变更,看能够正常获取到znode数据
然后修改为关注节点数据发生变更,通过命令行方式修改数据,查看是否再次获取到更新后的节点数据
7. 删除节点(同步方式)
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.List; public class DeleteNodeSync implements Watcher { private static ZooKeeper zk; private String path; private int version; public static void main(String args[]) throws IOException, InterruptedException { //zk will connect to zk server, asynchronized zk = new ZooKeeper("192.168.179.101:2181", 5000, new DeleteNodeSync()); Thread.sleep(Integer.MAX_VALUE); } @Override public void process(WatchedEvent event) { // 连接建立后,删除给定路径的znode if (event.getState() == Event.KeeperState.SyncConnected) { // 连接刚刚建立 if (event.getType() == Event.EventType.None && event.getPath() == null) { //查询子节点列表 try { delNode("/node_by_java"); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } }//process //get node data private void delNode(String path) throws KeeperException, InterruptedException { System.out.println("Delete Node in sync mode"); //删除给定路径的znode zk.delete(path, -1); //删除指定路径, 指定dataversion的znode, 如果version指定-1,则删除节点时不进行dataversion校验 System.out.println("Node deleted: "+ path); //删除后再次查询/子节点列表 List<String> children = zk.getChildren("/", false); System.out.println("Children list of / is" + children); } }