06_zookeeper原生Java API使用

【Zookeeper构造方法概述】

/**
     * 客户端和zk服务端的连接是一个异步的过程
     * 当连接成功后,客户端会收到一个watch通知
     *
     * ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
     *          long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
     * 参数介绍
     * connectString:连接服务器的ip字符串
     *      比如:"192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181"
     *      可以是一个ip,也可以是多个ip,一个ip代表单机,多个ip代表集群
     *      也可以在ip后加路径
     * sessionTimeout:超时时间,心跳收不到了,那就超时
     * watcher:通知事件,如果有对应的事件触发,则会收到一个通知:如果不需要,那就设为null
     * sessionId:会话的id
     * sessionPasswd:会话密码,当会话丢失后,可以依据sessionId和sessionPasswd重新获取会话
     * canBeReadOnly:可读,当这个物理机节点断开后,还是可以读到数据的,只是不能写,
     *                此时数据被读取到的可能是旧数据,一般设置为false,不推荐使用
     *
     */
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)

【Zookeeper API 客户端连接服务端例子】

package com.zk.demo;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

/**
 * Created by HigginCui on 2018/9/20.
 */
public class ZkConnect implements Watcher{

    public static final String zkServerPath = "127.0.0.1:2181";

    public static final Integer timeout = 5000;

    /**
     * 客户端和zk服务端的连接是一个异步的过程
     * 当连接成功后,客户端会收到一个watch通知
     */
    public static void main(String[] args) throws Exception{
        ZooKeeper zk = new ZooKeeper(zkServerPath,timeout,new ZkConnect());

        for (int i=0;i<20;i++) {
            Thread.sleep(10);  //休眠10ms,在这个过程中,连接状态会从CONNECTING--->CONNECTED
            System.out.println(i+"---"+zk.getState());
        }

    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        System.err.println("收到zk的watch通知----" );
    }
}

【运行结果】

【使用CountDownLatch优化zk连接过程】 

package com.zk.demo;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

public class ZkConnect implements Watcher{

    public static final String zkServerPath = "127.0.0.1:2181";

    public static final Integer timeout = 5000;

    private static CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception{
        ZooKeeper zk = new ZooKeeper(zkServerPath,timeout,new ZkConnect());

        System.out.println("连接状态---" + zk.getState());
        latch.await();
        System.out.println("连接状态---" + zk.getState());

    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        System.err.println("收到zk的watch通知----" );
        latch.countDown();
    }
}

【运行结果】

【创建节点】

/**
 * 
 * String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
 * 
 * 同步或异步创建节点,都不支持子节点的递归操作,异步有一个callBack方法
 * 参数
 * path:创建的路径
 * data:存储的数据,byte[]类型
 * acl:权限控制策略
 *      Ids.OPEN_ACL_UNSAFE ---> world:anyone:crdwa
 *      CREATE_ALL_ACL ---> auth:user:password:cdrwa
 * createMode:节点类型,是一个枚举
 *      CreateMode.PERSISTENT:           持久节点
 *      CreateMode.PERSISTENT_SEQUENTIAL:持久顺序节点
 *      CreateMode.EPHEMERAL:            临时节点
 *      CreateMode.EPHEMERAL_SEQUENTIAL: 临时顺序节点
 */

 【创建一个临时节点】

package com.zk.demo;

import org.apache.zookeeper.*;
import java.util.concurrent.CountDownLatch;

/**
 * Created by HigginCui on 2018/9/21.
 */
public class ZkNodeOperator implements Watcher{


    private static ZooKeeper zooKeeper = null;

    public static final String zkServerPath = "127.0.0.1:2181";

    public static final Integer timeout = 5000;

    private static CountDownLatch latch = new CountDownLatch(1);

    public ZkNodeOperator() {
    }

    private static void init()  {
        try{
            zooKeeper = new ZooKeeper(zkServerPath,timeout,new ZkNodeOperator());
            latch.await();
        }catch (Exception e){
            e.printStackTrace();
            if(zooKeeper!=null){
                try {
                    zooKeeper.close();
                }catch (InterruptedException e1){
                    e1.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws Exception{
        init();
      //创建/testnode临时节点,包含数据为haha,权限是OPEN_ACL_UNSAFE,类型为临时节点 zooKeeper.create(
"/testnode","haha".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL); } /** * 观察者 */ @Override public void process(WatchedEvent event) { System.err.println("收到zk的watch通知----" +event.getPath()+"---"+event.getState()); latch.countDown(); } }

【运行结果 直接看打开zkCli.sh连接】

 【创建持久节点,并且产生一个回调通知】

package com.zk.demo;

import org.apache.zookeeper.*;
import java.util.concurrent.CountDownLatch;

/**
 * Created by HigginCui on 2018/9/21.
 */
public class ZkNodeOperator implements Watcher{


    private static ZooKeeper zooKeeper = null;

    public static final String zkServerPath = "127.0.0.1:2181";

    public static final Integer timeout = 5000;

    private static CountDownLatch latch = new CountDownLatch(1);

    public ZkNodeOperator() {
    }

    private static void init()  {
        try{
            zooKeeper = new ZooKeeper(zkServerPath,timeout,new ZkNodeOperator());
            latch.await();
        }catch (Exception e){
            e.printStackTrace();
            if(zooKeeper!=null){
                try {
                    zooKeeper.close();
                }catch (InterruptedException e1){
                    e1.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws Exception{
        init();
        //create(String path, byte[] data, List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)
        String ctx = "{'create':'success'}";
        zooKeeper.create("/testnode","haha".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,new CreateNodeCallBack(),ctx);

        System.in.read();  //线程停在这里,等待回调线程的打印结果
    }


    /**
     * 观察者
     */
    @Override
    public void process(WatchedEvent event) {
        System.err.println("收到zk的watch通知----" +event.getPath()+"---"+event.getState());
        latch.countDown();

    }
}

/**
 * 创建节点回调通知类
 */
class CreateNodeCallBack implements AsyncCallback.StringCallback{
    @Override
    public void processResult(int i, String path, Object ctx, String s1) {
        System.out.println("【回调方法】:创建节点的路径:"+path);
        System.out.println("【回调方法】:ctx==="+(String)ctx);
    }
}

【运行结果——控制台打印】

【运行结果——zk客户端连接】

【修改节点的数据】 

/**
* path:节点路径
* data:修改后的数据
* version:版本号,这里的版本号必须是正要修改的节点数据的版本号!!
*/
Stat setData(String path, byte[] data, int version)

修改节点代码示例

Stat stat = zooKeeper.setData("/testnode", "Higgin".getBytes(), 1);
System.out.println("修改后数据的版本号为---"+stat.getVersion());

【运行结果】

先看下对应的数据版本号信息

看下控制台运行结果

修改下代码,将版本号改为0,与zk上要修改的数据的版本号保持一致

Stat stat = zooKeeper.setData("/testnode", "Higgin".getBytes(), 0);
System.out.println("修改后数据的版本号为---"+stat.getVersion());

【查看运行结果】

 【删除节点】

/**
* 删除节点
* 注意:version版本号必须和要删除的节点数据的版本号一致
*/
public void delete(String path, int version)

【删除实例】

zooKeeper.delete("/testnode",1);

【运行结果】

删除前,先看下对应节点数据

执行删除代码后,可以看到节点已经不存在了

【获取节点数据】

public byte[] getData(String path, boolean watch, Stat stat) 
public byte[] getData(String path, Watcher watcher, Stat stat)
public void getData(String path, Watcher watcher, DataCallback cb, Object ctx)
public void getData(String path, boolean watch, DataCallback cb, Object ctx)

[获取节点数据实例]

byte[] dateBytes = zooKeeper.getData("/testnode", true, null);
String str = new String(dateBytes);
System.err.println("获取的数据为==="+str);

[运行结果]

【获取子节点的列表数据】

public List<String> getChildren(String path, Watcher watcher)
public List<String> getChildren(String path, boolean watch, Stat stat)
public List<String> getChildren(String path, boolean watch, Stat stat) 
public void getChildren(String path, Watcher watcher, ChildrenCallback cb, Object ctx)
......

[获取子节点列表实例]

List<String> childList = zooKeeper.getChildren("/testnode", null);
for (String child : childList) {
    System.err.println("子节点=="+child);
}

[ 运行结果 ]

先看下zk上对应的数据

[ 控制台运行结果 ]

原文地址:https://www.cnblogs.com/HigginCui/p/9678505.html