zookeeper(四)Curator框架 的分布式锁

其实在学之前我也有个疑虑,我为啥要学curator,撇开涨薪这些外在的东西,就单技术层面来讲,学curator能帮我做些什么?这就不得不从zookeeper说起,上篇我已经大篇幅讲了zk是做什么的了,但真正要靠zk去实现多服务器自动拉取更新的配置文件等功能是非常难的,如果没有curator,直接去写的话基本上能把你累哭,就好比连Mybatis或者jpa都没有,让你用原生的代码去写个网站一样,你可以把curator当做一个比较强大的工具,有了它操作zk不再是事,说这么多,是时候进入正题了:

curator 官网:http://curator.apache.org

使用curator去实现的几块内容:

 
学习目录:
1.使用curator建立与zk的连接
2.使用curator添加/递归添加节点
3.使用curator删除/递归删除节点
4.使用curator创建/验证 ACL(访问权限列表)
5.使用curator监听 单个/父 节点的变化(watch事件)
---------------------------------------------
6.基于curator实现zookeeper分布式锁(需要掌握基本的多线程知识)
 
前置条件:已掌握zookeeper的基本操作,对zookeeper有所了解,如果没有掌握请翻阅我前面的博客去学习.

本节所需要引入的依赖有以下三个,建议直接全部引入即可:

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.12</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>

  1.通过curator建立与zk的连接

需要准备连接zk的url,建议直接写成工具类,因为接下来会频繁用到,功能类似于jdbc.

public class ZkClientUtil {
    private static final int BASE_SLEEP_TIME_MS = 5000; //定义失败重试间隔时间 单位:毫秒
    private static final int MAX_RETRIES = 3; //定义失败重试次数
    private static final int SESSION_TIME_OUT = 1000000; //定义会话存活时间,根据业务灵活指定 单位:毫秒
    private static final String ZK_URI = "192.168.174.132:2181";//你自己的zkurl和端口号
    private static final String NAMESPACE = "laohan_jianshen";
    //工作空间,可以不指定,建议指定,功能类似于项目包,之后创建的所有的节点都会在该工作空间下,方便管理
    
    public static CuratorFramework build(){
    //创建比较简单,链式编程,很爽,基本上指定点参数就OK了
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS,MAX_RETRIES);//重试策略
        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                .connectString(ZK_URI)
                .retryPolicy(retryPolicy)
                .namespace(NAMESPACE)
                .sessionTimeoutMs(SESSION_TIME_OUT)
                .build();
        return client;
    }
}

  2.通过curator添加/递归添加节点

//通过上一步获取到的client,直接启动该client,值得注意的是client在使用前必须先启动:
client.start;
client
.create()//创建节点
.withMode(CreateMode.xxx)//节点属性:永久节点/临时节点/有序节点 通过CreateMode.即可看到
.withACL(ZooDefs.Ids.xxx)//节点访问权限,通过Ids.即可看到 默认是OPEN_ACL_UNSAFE(开放不安全权限)
.forPath("/yourpath","yourdata".getBytes());//指明你的节点路径,数据可以不指定,数据必须是byte[]

  创建递归节点:

//比如我想一次性创建/yourpath/a/b/c/1/2/3...这样的节点,如果按传统方法会累死你
//curator可以一次性创建好,只需要在创建时添加creatingParentsIfNeeded即可.
client
.create()//创建节点
.creatingParentsIfNeeded()//创建父节点,如果需要的话
 
...

  3.使用curator删除/递归删除节点

client
.delete() //删除
.guaranteed()//保证一定帮你删了它
.withVersion(0)//指定要删节点的版本号
.forPath("/yourpath")//指定要删节点的路径

  递归删除:

//比如我当前的节点结构是这样:/yourpath/a/b/c/1/2/3  我想删除a节点下面的所有目录
//传统方法累死个人,现在只需要添加deletingChildrenIfNeeded即可
client
.delete() //删除
.deletingChildrenIfNeeded()//如果它有儿子都给删了...

  4.使用curator创建/验证 ACL(访问权限列表)

//为了保证安全,有时需要对节点的访问权限做一些限制,否则可能会引起重要信息泄露/篡改/删除等
//节点ACL的创建方式有两种,一种是使用ZK提供的,一种是自定义的
//1.ZK提供的,比较简单,拿来即用,在创建节点时指明withACL即可
client
.create()
.withACL(ZooDefs.Ids.READ_ACL_UNSAFE)//指明该节点是只读节点,还有其他属性,可以通过Ids.查看

  

//创建自定义ACL,需要自己new Id(),并指明是否是加密的,然后账号和密码是多少,加密策略使用zk提供的:
List<ACL> aclList = new ArrayList<ACL>();
ACL acl1 = new ACL(ZooDefs.Perms.READ,new Id("digest",DigestAuthenticationProvider.generateDigest("user:123456")));
ACL acl2 = new ACL(ZooDefs.Perms.ALL,new Id("digest",DigestAuthenticationProvider.generateDigest("root:123456")));
aclList.add(acl1);
aclList.add(acl2);
//如此我就创建好了两种不同的权限账号,user只能对该节点有读的权限,但root用户对该节点有所有权限

  

//ACL验证,创建好节点之后,可以在服务器的zk安装目录的bin目录下 连接客户端./zkCli
//然后通过ls /该目录  查看是否可以访问 正常是不能访问的 会提示权限不够
//下面我们通过curator去连接,要想访问该节点需要在创建client时就指明账号和密码:
CuratorFramework client = CuratorFrameworkFactory
.builder()
.authorization("digest","root:123456".getBytes())//指明使用了加密,用户名和密码用:隔开,以byte[]输入
//如此,接下来通过该client可以对刚刚创建的节点具有所有权限,如果登录的是user,则只具有读权限.

  5.通过curator创建单个节点及其父节点的watch事件

由于zk的watch事件是只能被触发一次的,触发完即销毁监听,这显然不是我们想要的,在实际开发中更多的场景是需要对某个节点持续监听,所以这里我只介绍创建持续监听的单节点/父节点

//对单个节点创建watch事件
//定义NodeCache,指明被监听节点的路径:
final NodeCache nodeCache = new NodeCache(client,"/yourpath");
nodeCache.start(true);//开启
nodeCache
.getCurrentData()//可以获取该监听节点的数据
.getPath();//可以获取该监听节点的路径
 

  

//对指定父节点创建watch事件,只要其任何一个子节点,或子节点的子节点...发生变化,就会触发watch事件.
//定义PathChildrenCache,指明要watch的目录
final PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"yourpath");
//启动,启动策略有三种:同步,异步提交,异步 用的比较多的就是下面这种,用StartMode.可以查看到
pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
//对该节点创建监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
    public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                    //TODO 可以通过PathChildrenCacheEvent.拿到你想要的数据和路径等
    }
});

  至此,curator的常用内容已学习完毕,建议每个都亲自操作一下,为之后的自动配置和分布式锁操作打下基础.

===================================================================

package com.demo.zookeeper.curator.lock;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;

/**
 * num: 模拟 库存 10个
 * 流程: 创建10个线程 同时去减num, 会出现 问题
 * 解决:  curator 的 分布式锁解决方案 InterProcessLock 加锁 acquire() 解锁 release()
 */
public class DistributedLockDemo2 {
    // ZooKeeper 锁节点路径, 分布式锁的相关操作都是在这个节点上进行
    private final String lockPath = "/distributed-lock";

    // ZooKeeper 服务地址, 单机格式为:(127.0.0.1:2181),
    // 集群格式为:(127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183)
    private String connectString;
    // Curator 客户端重试策略
    private RetryPolicy retry;
    // Curator 客户端对象
    private CuratorFramework client;
    // client2 用户模拟其他客户端
    private CuratorFramework client2;
    CountDownLatch countDownLatch = new CountDownLatch(10);
    // 数据
    private static int num = 10;

    // 初始化资源
    @Before
    public void init() throws Exception {
        // 设置 ZooKeeper 服务地址为本机的 2181 端口
        connectString = "127.0.0.1:2181";
        // 重试策略
        // 初始休眠时间为 1000ms, 最大重试次数为 3
        retry = new ExponentialBackoffRetry(1000, 3);
        // 创建一个客户端, 60000(ms)为 session 超时时间, 15000(ms)为链接超时时间
        client = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);
        //client2 = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);
        // 创建会话
        client.start();
        //client2.start();
    }
    @Test
    public void sharedLock() throws Exception {
        // 创建共享锁
        final InterProcessLock lock = new InterProcessSemaphoreMutex(client, lockPath);
        // lock2 用于模拟其他客户端
        final InterProcessLock lock2 = new InterProcessSemaphoreMutex(client2, lockPath);

        for (int i=0;i<10;i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        lock.acquire();
                        if(num >1){
                            num--;
                            System.out.println(Thread.currentThread().getName()+","+num);
                        }
                        lock.release();
                        countDownLatch.countDown();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
        countDownLatch.await();
        System.out.println("### num:" + num);

    }
    // 释放资源
    @After
    public void close() {
        CloseableUtils.closeQuietly(client);
    }
}

 

原文地址:https://www.cnblogs.com/GotoJava/p/13697207.html