ZKClient操作Zookeeper

ZKClient在Zookeeper原生API基础上进行封装,简化了代码的复杂度。

节点的创建和删除

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
 
public class ZkClientBase {
 
    static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";
    static final int SESSION_TIMEOUT = 35000;//单位:ms 
    static final int CONNECTION_TIMEOUT=60000;//单位:ms
    
    public static void main(String[] args) throws Exception {
        
        ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), CONNECTION_TIMEOUT);
        //创建临时节点和持久节点(解决递归创建节点的问题) 
        zkc.createEphemeral("/ephemeral");
        zkc.createPersistent("/persistent/p_1", true);
        
        Thread.sleep(20000);//线程休眠20s
        
        //删除临时节点和持久节点(解决递归删除节点的问题)
        zkc.delete("/ephemeral");
        zkc.deleteRecursive("/persistent");
    }
}

run as --java application


20s后

子节点的创建和读取

import java.util.List;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
 
public class ZkClientBase {
 
    static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";
    static final int SESSION_TIMEOUT = 35000;//单位:ms 
    static final int CONNECTION_TIMEOUT=60000;//单位:ms
    
    public static void main(String[] args) throws Exception {
        ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), CONNECTION_TIMEOUT);
        
        zkc.createPersistent("/persistent", "persistent value");
        zkc.createPersistent("/persistent/p1", "p1 value");
        zkc.createPersistent("/persistent/p2", "p2 value");
        List<String> list = zkc.getChildren("/persistent");
        for(String key : list){
            String nodePath = "/persistent/" + key;
            String data = zkc.readData(nodePath);
            System.out.println("节点path为:" + nodePath + ",内容为: " + data);
        }
            
    }
}

Eclipse中console输出

Eclipse中Zookeeper Explorer内容

更新节点内容

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
 
public class ZkClientBase {
 
    static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";
    static final int SESSION_TIMEOUT = 35000;//单位:ms 
    static final int CONNECTION_TIMEOUT=60000;//单位:ms
    
    public static void main(String[] args) throws Exception {
        
        ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), CONNECTION_TIMEOUT);
        
        zkc.createPersistent("/persistent", "persistent value");
        zkc.writeData("/persistent", "new persistent value");
        System.out.println(zkc.readData("/persistent"));
        System.out.println(zkc.exists("/persistent"));
        System.out.println(zkc.exists("/persistent1"));
                
    }
}

Eclipse Console输出


Eclipse中Zookeeper Explorer内容

ZKClient解决了watcher需要反复注册的问题

ZKClient提供了一套监听方式,使用监听节点的方式进行操作,避免了繁琐的反复watcher操作

subscribeChildChanges实现IZkChildListener接口类,重写handleChildChange方法。IZkChildListener触发事件:子节点创建、子节点删除、节点自身创建、节点自身删除。

import java.util.List;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
 
public class ZkClientWatcher {
 
    static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";
    static final int SESSION_TIMEOUT = 35000;//单位:ms 
    static final int CONNECTION_TIMEOUT=60000;//单位:ms 
    
    
    public static void main(String[] args) throws Exception {
        ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), CONNECTION_TIMEOUT);
        
        //父节点添加子节点监听,仅监听父节点及其子节点的创建、删除,不会监听父节点及其子节点数据的变化
        zkc.subscribeChildChanges("/persistent", new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println("parentPath: " + parentPath);
                System.out.println("currentChilds: " + currentChilds);
            }
        });
        
        zkc.createPersistent("/persistent");
        Thread.sleep(1000);
        zkc.createPersistent("/persistent" + "/" + "p1", "p1 value");
        Thread.sleep(1000);
        zkc.createPersistent("/persistent" + "/" + "p2", "p2 value");
        Thread.sleep(1000);
        //修改p2节点的值
        zkc.writeData("/persistent" + "/" + "p2", "new p2 value");
        Thread.sleep(2000);
        
        zkc.delete("/persistent/p2");
        Thread.sleep(1000);
        zkc.deleteRecursive("/persistent");
        Thread.sleep(5000);//休眠5s等待handleChildChange处理完成
    }
}

Eclipse的console输出

Eclipse中Zookeeper Explorer内容

subscribeDataChanges实现IZkDataListener接口,重写handleDataDeleted方法处理节点的删除事件,重写handleDataChange方法处理节点的数据更新事件。

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
 
public class ZkClientWatcher {
 
    static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";
    static final int SESSION_TIMEOUT = 35000;//单位:ms 
    static final int CONNECTION_TIMEOUT=60000;//单位:ms  
    
    public static void main(String[] args) throws Exception {
        ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), CONNECTION_TIMEOUT);
        
        zkc.createPersistent("/persistent", "persistent value");
        zkc.subscribeDataChanges("/persistent", new IZkDataListener() {
            //删除节点
            @Override
            public void handleDataDeleted(String path) throws Exception {
                System.out.println("删除的节点为:" + path);
            }
            
            //节点数据发生变化
            @Override
            public void handleDataChange(String path, Object data) throws Exception {
                System.out.println("变更的节点为:" + path + ", 变更内容为:" + data);
            }
        });
        
        Thread.sleep(3000);
        zkc.writeData("/persistent", "new persistent value", -1);
        Thread.sleep(1000);
        zkc.createPersistent("/persistent" + "/" + "p1", "p1 value");
        Thread.sleep(1000);
        zkc.createPersistent("/persistent" + "/" + "p2", "p2 value");
        Thread.sleep(1000);
        //修改p2节点的值
        zkc.writeData("/persistent" + "/" + "p2", "new p2 value");
        Thread.sleep(3000);
        
        zkc.deleteRecursive("/persistent");
        Thread.sleep(Integer.MAX_VALUE);
    }
}

Eclipse中console输出

Eclipse中Zookeeper Explorer内容

原文地址:https://www.cnblogs.com/cat520/p/9412798.html