Curator操作ZooKeeper

Curator极大简化了ZooKeeper的使用,增加了针对ZooKeeper集群中connection的管理。

节点的创建和删除

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat;
 
public class CuratorBase {
    
    static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";
    static final int SESSION_TIMEOUT = 35000;//会话超时时间,默认为60000,单位:ms 
    static final int CONNECTION_TIMEOUT=60000;//连接超时时间,默认为15000,单位:ms  
    
    public static void main(String[] args) throws Exception {
        
        //重试策略:初试时间为10s,最大重试次数为20
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20);
        //创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                    .connectString(CONNECT_ADDR)
                    .sessionTimeoutMs(SESSION_TIMEOUT)
                    .retryPolicy(retryPolicy)
                    .build();
        //开启连接
        cf.start();
        
        //建立节点 指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容
        cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/persistent/p1","p1 value".getBytes());
        
        Thread.sleep(30000);
 
        //删除节点
        cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/persistent");
 
        cf.close();
    }
}

run as--java application

线程休眠30s后,执行节点删除操作

节点内容的修改

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat;
 
public class CuratorBase {
    
    static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";
    static final int SESSION_TIMEOUT = 35000;//会话超时时间,默认为60000,单位:ms 
    static final int CONNECTION_TIMEOUT=60000;//连接超时时间,默认为15000,单位:ms  
    
    public static void main(String[] args) throws Exception {
        
        //重试策略:初试时间为10s,最大重试次数为20
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20);
        //创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                    .connectString(CONNECT_ADDR)
                    .sessionTimeoutMs(SESSION_TIMEOUT)
                    .retryPolicy(retryPolicy)
                    .build();
        //开启连接
        cf.start();
        
        //创建节点
        cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/persistent/p1","p1 value".getBytes());
        //cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/persistent/p2","p2 value".getBytes());
        //读取节点
        String ret1 = new String(cf.getData().forPath("/persistent/p1"));
        System.out.println(ret1);
        //修改节点
        cf.setData().forPath("/persistent/p1", "new p1 value".getBytes());
        String ret2 = new String(cf.getData().forPath("/persistent/p1"));
        System.out.println(ret2);
 
        cf.close();
    }
}

Eclipse的console输出

Eclipse的ZooKeeper Explorer内容

节点操作的回调函数

节点的新增、修改、删除,都可以设置其回调函数。该回调函数可以输出服务器的状态码、服务器事件类型等内容。还可以加入一个线程池进行优化操作。在批量节点操作的时候,可以用线程池去规划callback,可以将很多的任务放到队列中,使用线程池中的线程将队列中的任务进行处理。线程池中线程的个数可以根据具体的机器配置而定。

下面代码中,节点的创建操作是一个异步的过程,不会阻塞主线程main的执行,代码中将主线程main休眠,子线程在执行完节点的创建操作后执行回调函数并输出相关内容。若不添加主线程休眠的代码,则主线程执行完代码后结束,此时节点创建的子线程还没有完成节点的创建,因main线程的结束子线程也结束,进而就不能完成节点创建和回调函数的执行。

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat;
 
public class CuratorBase {
    
    static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";
    static final int SESSION_TIMEOUT = 35000;//会话超时时间,默认为60000,单位:ms 
    static final int CONNECTION_TIMEOUT=60000;//连接超时时间,默认为15000,单位:ms  
    
    public static void main(String[] args) throws Exception {
        
        //重试策略:初试时间为10s,最大重试次数为20
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20);
        //创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                    .connectString(CONNECT_ADDR)
                    .sessionTimeoutMs(SESSION_TIMEOUT)
                    .retryPolicy(retryPolicy)
                    .build();
        //开启连接
        cf.start();
        
        // 绑定回调函数
        ExecutorService pool = Executors.newCachedThreadPool();
        cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
        .inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {
                System.out.println("code:" + ce.getResultCode());
                System.out.println("type:" + ce.getType());
                System.out.println("线程为:" + Thread.currentThread().getName());
            }
        }, pool)
        .forPath("/persistent/p2","p2 value".getBytes());
        
        System.out.println("主线程:"+Thread.currentThread().getName());
        
        Thread.sleep(Integer.MAX_VALUE);
 
        cf.close();
    }
}

Eclipse中console输出

ZooKeeper Explorer中内容



获取子节点

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat;
 
public class CuratorBase {
    
    static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";
    static final int SESSION_TIMEOUT = 35000;//会话超时时间,默认为60000,单位:ms 
    static final int CONNECTION_TIMEOUT=60000;//连接超时时间,默认为15000,单位:ms  
    
    public static void main(String[] args) throws Exception {
        
        //重试策略:初试时间为10s,最大重试次数为20
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20);
        //创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                    .connectString(CONNECT_ADDR)
                    .sessionTimeoutMs(SESSION_TIMEOUT)
                    .retryPolicy(retryPolicy)
                    .build();
        //开启连接
        cf.start();
        
        // 绑定回调函数
        ExecutorService pool = Executors.newCachedThreadPool();
        cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
        .inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {
                System.out.println("code:" + ce.getResultCode());
                System.out.println("type:" + ce.getType());
                System.out.println("线程为:" + Thread.currentThread().getName());
            }
        }, pool)
        .forPath("/persistent/p2","p2 value".getBytes());
        
        System.out.println("主线程:"+Thread.currentThread().getName());
 
        Thread.sleep(20000);//主线程休眠20s,等待节点创建完毕
        
        // 读取子节点getChildren方法 和 判断节点是否存在checkExists方法
        List<String> list = cf.getChildren().forPath("/persistent");
        for(String p : list){
            System.out.println(p);
        }
        
        Stat stat_p1 = cf.checkExists().forPath("/persistent/p1");
        System.out.println(stat_p1);
        Stat stat_p2 = cf.checkExists().forPath("/persistent/p2");
        System.out.println(stat_p2);
 
        cf.close();
    }
}

Eclipse的console输出

若上面代码将Thread.sleep(20000);删除,有时会出现下面的异常,原因是节点创建和main主线程的执行是异步的。

原文地址:https://www.cnblogs.com/cat520/p/9412815.html