参考博客:cnblogs.com/LiZhiW/tag/ZooKeeper/
curator是Netflix公司开源的一个zookeeper客户端,后捐献给apache,curator框架在zookeeper原生API接口上进行了包装,解决了很多zooKeeper客户端非常底层的细节开发。提供zooKeeper各种应用场景(比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装,实现了Fluent风格的API接口,是最好用,最流行的zookeeper的客户端。
原生zookeeperAPI的不足:
-
-
连接对象异步创建,需要开发人员自行编码等待
-
连接没有自动重连超时机制
-
watcher一次注册生效一次
-
不支持递归创建树形节点
-
curator特点:
-
-
解决session会话超时重连
-
watcher反复注册
-
简化开发api
-
遵循Fluent风格的API
-
提供了分布式锁服务、共享计数器、缓存机制等机制
-
maven依赖:
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.7</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.6.0</version> <type>jar</type> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> <type>jar</type> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.6.0</version> <type>jar</type> </dependency>
public class CuratorConnection { private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181"; private static final int TIME_OUT = 5000; private static final int baseSleepTimeMs = 1000; private static final int maxRetries = 3; public static void main(String[] args) { // session重连策略 /** 3秒后重连一次,只重连1次 RetryPolicy retryPolicy = new RetryOneTime(3000); */ /** 每3秒重连一次,重连3次 RetryPolicy retryPolicy = new RetryNTimes(3,3000); */ /** 每3秒重连一次,总等待时间超过10秒后停止重连 RetryPolicy retryPolicy=new RetryUntilElapsed(10000,3000); */ // baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount+ 1))) RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); // 创建连接对象 CuratorFramework client = CuratorFrameworkFactory.builder() // IP地址端口号 .connectString(IP) //会话超时时间 .sessionTimeoutMs(TIME_OUT) // 重连机制 .retryPolicy(retryPolicy) // 命名空间 .namespace("create") // 构建连接对象 .build(); // 打开连接 client.start(); System.out.println(client.isStarted()); // 关闭连接 client.close(); } }
public class CuratorCreate { private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181"; private static final int TIME_OUT = 5000; private static final int baseSleepTimeMs = 1000; private static final int maxRetries = 3; CuratorFramework client; @Before public void before() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); client = CuratorFrameworkFactory.builder() .connectString(IP) .sessionTimeoutMs(5000) .retryPolicy(retryPolicy) .namespace("create") .build(); client.start(); } @After public void after() { client.close(); } // namespace+forPath 就是在ZKserver上数据节点的完整路径 @Test public void create1() throws Exception { // 新增节点 client.create() // 节点的类型 .withMode(CreateMode.PERSISTENT) // 节点的权限列表 world:anyone:cdrwa .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) // arg1:节点的路径 // arg2:节点的数据 .forPath("/create_01", "create_01".getBytes());
System.out.println("结束"); } @Test public void create2() throws Exception { // 自定义权限列表 // 权限列表 List<ACL> list = new ArrayList<ACL>(); // 授权模式和授权对象 Id id = new Id("ip", "192.168.126.143"); list.add(new ACL(ZooDefs.Perms.ALL, id)); client.create().withMode(CreateMode.PERSISTENT).withACL(list).forPath("/node2", " node2".getBytes()); System.out.println("结束"); } @Test public void create3() throws Exception { // 递归创建节点树 client.create() // 递归节点的创建 .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath("/node3/node31", "node31".getBytes()); System.out.println("结束"); } @Test public void create4() throws Exception { // 异步方式创建节点 client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) // 异步回调接口 .inBackground(new BackgroundCallback() { public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { // 节点的路径 System.out.println(curatorEvent.getPath()); // 时间类型 System.out.println(curatorEvent.getType()); } }) .forPath("/node4", "node4".getBytes()); Thread.sleep(5000); System.out.println("结束"); } }
四、更新节点
public class CuratorSet { private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181"; private static final int TIME_OUT = 5000; private static final int baseSleepTimeMs = 1000; private static final int maxRetries = 3; CuratorFramework client; @Before public void before() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); client = CuratorFrameworkFactory.builder() .connectString(IP) .sessionTimeoutMs(TIME_OUT) .retryPolicy(retryPolicy) .namespace("set").build(); client.start(); } @After public void after() { client.close(); } @Test public void set1() throws Exception { // 更新节点 client.setData() // arg1:节点的路径 // arg2:节点的数据 .forPath("/node1", "node11".getBytes()); System.out.println("结束"); } @Test public void set2() throws Exception { client.setData() // 指定版本号 .withVersion(2) .forPath("/node1", "node1111".getBytes()); System.out.println("结束"); } @Test public void set3() throws Exception { // 异步方式修改节点数据 client.setData() .withVersion(-1).inBackground(new BackgroundCallback() { public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { // 节点的路径 System.out.println(curatorEvent.getPath()); // 事件的类型 System.out.println(curatorEvent.getType()); } }).forPath("/node2", "node1".getBytes()); Thread.sleep(5000); System.out.println("结束"); } }
public class CuratorDelete { private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181"; private static final int TIME_OUT = 5000; private static final int baseSleepTimeMs = 1000; private static final int maxRetries = 3; CuratorFramework client; @Before public void before() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); client = CuratorFrameworkFactory.builder() .connectString(IP) .sessionTimeoutMs(TIME_OUT) .retryPolicy(retryPolicy) .namespace("delete").build(); client.start(); } @After public void after() { client.close(); } @Test public void delete1() throws Exception { // 删除节点 client.delete() // 节点的路径 .forPath("/node1"); System.out.println("结束"); } @Test public void delete2() throws Exception { client.delete() // 版本号 .withVersion(0) .forPath("/node1"); System.out.println("结束"); } @Test public void delete3() throws Exception { //删除包含子节点的节点 client.delete() .deletingChildrenIfNeeded() .withVersion(-1) .forPath("/node1"); System.out.println("结束"); } @Test public void delete4() throws Exception { // 异步方式删除节点 client.delete() .deletingChildrenIfNeeded() .withVersion(-1) .inBackground(new BackgroundCallback() { public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { // 节点路径 System.out.println(curatorEvent.getPath()); // 事件类型 System.out.println(curatorEvent.getType()); } }) .forPath("/node1"); Thread.sleep(5000); System.out.println("结束"); } }
public class CuratorGet { private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181"; private static final int TIME_OUT = 5000; private static final int baseSleepTimeMs = 1000; private static final int maxRetries = 3; CuratorFramework client; @Before public void before() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); client = CuratorFrameworkFactory.builder() .connectString(IP) .sessionTimeoutMs(TIME_OUT) .retryPolicy(retryPolicy) .namespace("").build(); client.start(); } @After public void after() { client.close(); } @Test public void get1() throws Exception { // 读取节点数据 byte[] bys = client.getData() // 节点的路径 .forPath("/hello"); System.out.println(new String(bys)); } @Test public void get2() throws Exception { // 读取数据时读取节点的属性 Stat stat = new Stat(); byte[] bys = client.getData() // 读取属性 .storingStatIn(stat) .forPath("/hello"); System.out.println(new String(bys)); System.out.println(stat.getVersion()); } @Test public void get3() throws Exception { // 异步方式读取节点的数据 client.getData().inBackground(new BackgroundCallback() { public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { // 节点的路径 System.out.println(curatorEvent.getPath()); // 事件类型 System.out.println(curatorEvent.getType()); // 数据 System.out.println(new String(curatorEvent.getData())); } }) .forPath("/hello"); Thread.sleep(5000); System.out.println("结束"); } }
- 查看子节点
public class CuratorGetChild { private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181"; private static final int TIME_OUT = 5000; private static final int baseSleepTimeMs = 1000; private static final int maxRetries = 3; CuratorFramework client; @Before public void before() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); client = CuratorFrameworkFactory.builder() .connectString(IP) .sessionTimeoutMs(TIME_OUT) .retryPolicy(retryPolicy) .namespace("").build(); client.start(); } @After public void after() { client.close(); } /** * 读取子节点数据 * @throws Exception */ @Test public void getChild1() throws Exception { // 读取子节点数据 List<String> list = client.getChildren() // 节点路径 .forPath("/create"); for (String str : list) { System.out.println(str); } } /** * 异步方式读取子节点数据 * @throws Exception */ @Test public void getChild2() throws Exception { // 异步方式读取子节点数据 client.getChildren().inBackground((curatorFramework, curatorEvent) -> { // 节点路径 System.out.println(curatorEvent.getPath()); // 事件类型 System.out.println(curatorEvent.getType()); // 读取子节点数据 List<String> list = curatorEvent.getChildren(); for (String str : list) { System.out.println(str); } }) .forPath("/create"); Thread.sleep(5000); System.out.println("结束"); } }
public class CuratorExists { private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181"; private static final int TIME_OUT = 5000; private static final int baseSleepTimeMs = 1000; private static final int maxRetries = 3; CuratorFramework client; @Before public void before() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); client = CuratorFrameworkFactory.builder() .connectString(IP) .sessionTimeoutMs(TIME_OUT) .retryPolicy(retryPolicy) .namespace("").build(); client.start(); } @After public void after() { client.close(); } /** * 判断节点是否存在 * * @throws Exception */ @Test public void exists1() throws Exception { // 判断节点是否存在 Stat stat = client.checkExists() // 节点路径 .forPath("/node2"); System.out.println(stat.getVersion()); } /** * 异步方式判断节点是否存在 * * @throws Exception */ @Test public void exists2() throws Exception { // 异步方式判断节点是否存在 client.checkExists().inBackground((curatorFramework, curatorEvent) -> { // 节点路径 System.out.println("节点路径:" + curatorEvent.getPath()); // 事件类型 System.out.println("事件类型:" + curatorEvent.getType()); System.out.println(curatorEvent.getStat().getVersion()); }).forPath("/node2"); Thread.sleep(50000); System.out.println("结束"); } }
curator提供了两种Watcher(Cache)来监听结点的变化
-
NodeCache : 只是监听某一个特定的节点,监听节点的新增和修改
-
PathChildrenCache : 监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, PathCache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态
public class CuratorWatcher { private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181"; private static final int TIME_OUT = 5000; private static final int baseSleepTimeMs = 1000; private static final int maxRetries = 3; CuratorFramework client; @Before public void before() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); client = CuratorFrameworkFactory.builder() .connectString(IP) .sessionTimeoutMs(TIME_OUT) .retryPolicy(retryPolicy) .namespace("").build(); client.start(); } @After public void after() { client.close(); } @Test public void watcher1() throws Exception { // 监视某个节点的数据变化 // arg1:连接对象 // arg2:监视的节点路径 final NodeCache nodeCache = new NodeCache(client, "/hello"); // 启动监视器对象 nodeCache.start(); nodeCache.getListenable().addListener(() -> { // 节点变化时回调的方法 System.out.println(nodeCache.getCurrentData().getPath()); System.out.println(new String(nodeCache.getCurrentData().getData())); }); Thread.sleep(100000); System.out.println("结束"); //关闭监视器对象 nodeCache.close(); } @Test public void watcher2() throws Exception { // 监视子节点的变化 // arg1:连接对象 // arg2:监视的节点路径 // arg3:事件中是否可以获取节点的数据 PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/create", true); // 启动监听 pathChildrenCache.start(); pathChildrenCache.getListenable().addListener((curatorFramework, pathChildrenCacheEvent) -> { // 当子节点方法变化时回调的方法 // 节点的事件类型 System.out.println("节点的事件类型:" + pathChildrenCacheEvent.getType()); // 节点的路径 System.out.println("节点的路径:" + pathChildrenCacheEvent.getData().getPath()); // 节点数据 System.out.println("节点数据:" + new String(pathChildrenCacheEvent.getData().getData())); }); Thread.sleep(100000); System.out.println("结束"); // 关闭监听 pathChildrenCache.close(); } }
public class CuratorTransaction { private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181"; private static final int TIME_OUT = 5000; private static final int baseSleepTimeMs = 1000; private static final int maxRetries = 3; CuratorFramework client; @Before public void before() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); client = CuratorFrameworkFactory.builder() .connectString(IP) .sessionTimeoutMs(TIME_OUT) .retryPolicy(retryPolicy) .namespace("").build(); client.start(); } @After public void after() { client.close(); } @Test public void tra1() throws Exception { client.inTransaction(). create().forPath("/node4", "node1".getBytes()). and(). create().forPath("/node5", "node2".getBytes()). and(). commit(); } }
-
InterProcessMutex:分布式可重入排它锁
-
InterProcessReadWriteLock:分布式读写锁
public class CuratorLock { private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181"; private static final int TIME_OUT = 5000; private static final int baseSleepTimeMs = 1000; private static final int maxRetries = 3; CuratorFramework client; @Before public void before() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); client = CuratorFrameworkFactory.builder() .connectString(IP) .sessionTimeoutMs(TIME_OUT) .retryPolicy(retryPolicy) .namespace("").build(); client.start(); } @After public void after() { client.close(); } @Test public void lock1() throws Exception { // 排他锁 // arg1:连接对象 // arg2:节点路径 InterProcessLock interProcessLock = new InterProcessMutex(client, "/lock1"); System.out.println("等待获取锁对象!"); // 获取锁 interProcessLock.acquire(); for (int i = 1; i <= 10; i++) { //业务代码 Thread.sleep(3000); System.out.println(i); } // 释放锁 interProcessLock.release(); System.out.println("等待释放锁!"); } @Test public void lock2() throws Exception { // 读写锁 InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock1"); // 获取读锁对象 InterProcessLock interProcessLock = interProcessReadWriteLock.readLock(); System.out.println("等待获取锁对象!"); // 获取锁 interProcessLock.acquire(); //业务代码 for (int i = 1; i <= 10; i++) { Thread.sleep(3000); System.out.println(i); } // 释放锁 interProcessLock.release(); System.out.println("等待释放锁!"); } @Test public void lock3() throws Exception { // 读写锁 InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock1"); // 获取写锁对象 InterProcessLock interProcessLock = interProcessReadWriteLock.writeLock(); System.out.println("等待获取锁对象!"); // 获取锁 interProcessLock.acquire(); //业务代码 for (int i = 1; i <= 10; i++) { Thread.sleep(3000); System.out.println(i); } // 释放锁 interProcessLock.release(); System.out.println("等待释放锁!"); } }