Zookeeper分布式锁

原创转载请注明出处:https://www.cnblogs.com/agilestyle/p/11605319.html

Zookeeper是一种提供“分布式服务协调“的中心化服务,分布式应用程序才可以基于Zookeeper的以下两个特性实现分布式锁功能。

  • 顺序临时节点:Zookeeper提供一个多层级的节点命名空间(节点称为Znode),每个节点都用一个以斜杠(/)分隔的路径来表示,而且每个节点都有父节点(根节点除外),非常类似于文件系统。节点类型可以分为持久节点(PERSISTENT )、临时节点(EPHEMERAL),每个节点还能被标记为有序性(SEQUENTIAL),一旦节点被标记为有序性,那么整个节点就具有顺序自增的特点。一般可以组合这几类节点来创建所需要的节点,例如,创建一个持久节点作为父节点,在父节点下面创建临时节点,并标记该临时节点为有序性。
  • Watch机制:Zookeeper还提供了另外一个重要的特性,Watcher(事件监听器)。ZooKeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper服务端会将事件通知给用户。

熟悉了Zookeeper的这两个特性之后,就可以看看Zookeeper是如何实现分布式锁的了。

首先,需要建立一个父节点,节点类型为持久节点(PERSISTENT) ,每当需要访问共享资源时,就会在父节点下建立相应的顺序子节点,节点类型为临时节点(EPHEMERAL),且标记为有序性(SEQUENTIAL),并且以临时节点名称+父节点名称+顺序号组成特定的名字。

在建立子节点后,对父节点下面的所有以临时节点名称name开头的子节点进行排序,判断刚刚建立的子节点顺序号是否是最小的节点,如果是最小节点,则获得锁。

如果不是最小节点,则阻塞等待锁,并且获得该节点的上一顺序节点,为其注册监听事件,等待节点对应的操作获得锁。

当调用完共享资源后,删除该节点,关闭zk,进而可以触发监听事件,释放该锁。

以上实现的分布式锁是严格按照顺序访问的并发锁。一般还可以直接引用Curator框架来实现Zookeeper分布式锁,代码如下:

Maven Dependency

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.2.0</version>
</dependency>

DistributedLock.java

 1 package org.fool.spring.util;
 2 
 3 import java.util.concurrent.TimeUnit;
 4 
 5 public interface DistributedLock {
 6 
 7     void lock() throws Exception;
 8 
 9     boolean tryLock(long time, TimeUnit unit) throws Exception;
10 
11     void unlock() throws Exception;
12 
13     boolean isAcquiredInThisProcess();
14 }

ZkDistributedLock.java

 1 package org.fool.spring.util;
 2 
 3 import org.apache.curator.framework.CuratorFramework;
 4 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 5 
 6 import java.util.concurrent.TimeUnit;
 7 
 8 public class ZkDistributedLock implements DistributedLock {
 9 
10     private InterProcessMutex mutex;
11 
12     ZkDistributedLock(ZkClient zkClient, String lockPath) {
13         CuratorFramework client = zkClient.getClient();
14         this.mutex = new InterProcessMutex(client, lockPath);
15     }
16 
17     @Override
18     public void lock() throws Exception {
19         this.mutex.acquire();
20     }
21 
22     @Override
23     public boolean tryLock(long time, TimeUnit unit) throws Exception {
24         return this.mutex.acquire(time, unit);
25     }
26 
27     @Override
28     public void unlock() throws Exception {
29         this.mutex.release();
30     }
31 
32     @Override
33     public boolean isAcquiredInThisProcess() {
34         return this.mutex.isAcquiredInThisProcess();
35     }
36 
37 }

ZkClient.java

 1 package org.fool.spring.util;
 2 
 3 import org.apache.curator.framework.CuratorFramework;
 4 
 5 public class ZkClient {
 6 
 7     private final CuratorFramework client;
 8 
 9     ZkClient(CuratorFramework client) {
10         this.client = client;
11     }
12 
13     CuratorFramework getClient() {
14         return this.client;
15     }
16 
17     /**
18      * start the client
19      */
20     public void start() {
21         this.client.start();
22     }
23 
24     /**
25      * close the client
26      */
27     public void close() {
28         this.client.close();
29     }
30 
31 }

DistributedLocks.java

 1 package org.fool.spring.util;
 2 
 3 import org.apache.curator.RetryPolicy;
 4 import org.apache.curator.framework.CuratorFramework;
 5 import org.apache.curator.framework.CuratorFrameworkFactory;
 6 import org.apache.curator.retry.ExponentialBackoffRetry;
 7 
 8 public final class DistributedLocks {
 9 
10     private DistributedLocks() {
11     }
12 
13     private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 60 * 1000;
14     private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 15 * 1000;
15     private static final int BASE_SLEEP_TIME_MS = 1000;
16     private static final int MAX_RETRIES = 3;
17 
18     /**
19      * Define the default retry policy
20      */
21     private static final RetryPolicy DEFAULT_RETRY_POLICY = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES);
22 
23     /**
24      * Create a new ZkClient with custom connectString, default sessionTimeout and default connectionTimeout.
25      */
26     public static ZkClient newZkClient(String connectString) {
27         CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, DEFAULT_ZK_SESSION_TIMEOUT_MS,
28                 DEFAULT_ZK_CONNECTION_TIMEOUT_MS, DEFAULT_RETRY_POLICY);
29         return new ZkClient(client);
30     }
31 
32     /**
33      * Create a new ZkClient with custom connectString, sessionTimeout and connectionTimeout
34      */
35     public static ZkClient newZkClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs) {
36         CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs,
37                 connectionTimeoutMs, DEFAULT_RETRY_POLICY);
38         return new ZkClient(client);
39     }
40 
41     /**
42      * Create a new DistributedLock with ZkClient and lock path.
43      */
44     public static DistributedLock newZkDistributedLock(ZkClient zkClient, String lockPath) {
45         return new ZkDistributedLock(zkClient, lockPath);
46     }
47 }

TestZkDistributedLock.java

 1 package org.fool.spring.test;
 2 
 3 import org.fool.spring.util.DistributedLock;
 4 import org.fool.spring.util.DistributedLocks;
 5 import org.fool.spring.util.ZkClient;
 6 import org.slf4j.Logger;
 7 import org.slf4j.LoggerFactory;
 8 
 9 import java.util.concurrent.TimeUnit;
10 
11 public class TestZkDistributedLock {
12 
13     private static final Logger logger = LoggerFactory.getLogger(TestZkDistributedLock.class);
14 
15     private static final String lockPath = "/curator/test";
16 
17     public static void main(String[] args) throws Exception {
18         ZkClient zkClient = DistributedLocks.newZkClient("127.0.0.1:2181");
19         zkClient.start();
20 
21         DistributedLock lock = DistributedLocks.newZkDistributedLock(zkClient, lockPath);
22 
23         boolean isAcquired = lock.isAcquiredInThisProcess();
24         logger.info("==========lock acquired: " + isAcquired + "==========");
25 
26         if (lock.tryLock(3, TimeUnit.SECONDS)) {
27             try {
28                 isAcquired = lock.isAcquiredInThisProcess();
29                 logger.info("==========lock acquired: " + isAcquired + "==========");
30 
31                 // mock to do business logic
32                 Thread.sleep(60000);
33             } finally {
34                 lock.unlock();
35                 logger.info("==========release the lock !!!==========");
36             }
37         } else {
38             logger.info("==========failed to get the lock !!!==========");
39         }
40 
41         zkClient.close();
42     }
43 }

Test

执行TestZkDistributedLock,模拟业务执行占用60s时间

在60s内,再次执行TestZkDistributedLock,可以看到尝试获取锁失败

打开zk client,查看执行期间内的顺序临时节点的变化情况

 

Summary

Zookeeper实现的分布式锁

优点

  • Zookeeper是集群实现,可以避免单点问题,且能保证每次操作都可以有效地释放锁,这是因为一旦应用服务挂掉了,临时节点会因为session连接断开而自动删除掉。

缺点

  • 由于频繁地创建和删除结点,加上大量的Watch事件,对Zookeeper集群来说,压力非常大。且从性能上来说,与Redis实现的分布式锁相比,还是存在一定的差距。

Reference

https://time.geekbang.org/column/article/125983

http://curator.apache.org/

原文地址:https://www.cnblogs.com/agilestyle/p/11605319.html