利用Zookeeper实现分布式锁

特别提示:本人博客部分有参考网络其他博客,但均是本人亲手编写过并验证通过。如发现博客有错误,请及时提出以免误导其他人,谢谢!欢迎转载,但记得标明文章出处:http://www.cnblogs.com/mao2080/

1、分布式锁是什么?

分布式锁是控制分布式系统之间同步访问共享资源的一种方式。

2、为什么需要分布式锁?

在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。

3、乐观锁和悲观锁含义

悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。

乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库如果提供类似于write_condition机制的其实都是提供的乐观锁。

两种锁各有优缺点,不可认为一种好于另一种,像乐观锁适用于写比较少的情况下,即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量。但如果经常产生冲突,上层应用会不断的进行retry,这样反倒是降低了性能,所以这种情况下用悲观锁就比较合适。

本段参考了:http://blog.csdn.net/hongchangfirst/article/details/26004335

4、具体实现

本文采用zookeeper第三方库curator实现分布式锁。

1、添加依赖

 1     <!-- Curator对Zookeeper封装 -->
 2         <dependency>
 3             <groupId>org.apache.curator</groupId>
 4             <artifactId>curator-recipes</artifactId>
 5             <version>2.9.1</version>
 6         </dependency>
 7 
 8         <!-- Curator对Zookeeper封装 -->
 9         <dependency>
10             <groupId>org.apache.curator</groupId>
11             <artifactId>curator-client</artifactId>
12             <version>2.9.1</version>
13         </dependency>
14         
15         <!-- Curator对Zookeeper封装 -->
16         <dependency>
17             <groupId>org.apache.curator</groupId>
18             <artifactId>curator-framework</artifactId>
19             <version>2.9.1</version>
20         </dependency>

2、锁的实现

  1 package com.mao;
  2 import org.apache.curator.RetryPolicy;
  3 import org.apache.curator.framework.CuratorFramework;
  4 import org.apache.curator.framework.CuratorFrameworkFactory;
  5 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  6 import org.apache.curator.retry.ExponentialBackoffRetry;
  7 
  8 /**
  9  * 
 10  * 项目名称:---
 11  * 模块名称:公共服务-锁工具
 12  * 功能描述:锁工具类(基于Zookeeper)
 13  * 创建人: mao2080@sina.com
 14  * 创建时间:2017年4月20日 下午4:35:56
 15  * 修改人: mao2080@sina.com
 16  * 修改时间:2017年4月20日 下午4:35:56
 17  */
 18 public class ZKLockUtil {
 19     
 20     /**Zookeeper注册中心地址*/
 21     private static final String ZOOKEEPER_SERVER;
 22     
 23     /**获取锁-每次尝试间隔(单位:毫秒)*/
 24     private static final int BASE_SLEEP_TIME = 1000;
 25     
 26     /**获取锁-最大尝试次数*/
 27     private static final int MAX_RETRIES = 5;
 28     
 29     /**Curator锁对象*/
 30     private InterProcessMutex lock;
 31     
 32     /**Curator客户端*/
 33     private CuratorFramework client;
 34     
 35     /**分布式锁根节点*/
 36     private static final String ZK_BASE_LOCK_PATH = "/Locks/";
 37     
 38     /**
 39      * 初始化Zookeeper注册中心地址
 40      */
 41     static {
 42         ZOOKEEPER_SERVER = "172.24.20.214";
 43     }
 44     /**
 45      * 
 46      * 描述:构造函数
 47      * @author mao2080@sina.com
 48      * @created 2017年4月20日 下午3:52:53
 49      * @since 
 50      * @param businessType 业务类型
 51      * @param baseSleepTimeMs 获取锁-每次尝试间隔(单位:毫秒)
 52      * @param maxRetries 获取锁-最大尝试次数
 53      * @throws BusinessException 
 54      */
 55     private void initService(BusinessType businessType, int baseSleepTimeMs, int maxRetries) throws Exception {
 56         if(isBlank(ZKLockUtil.ZOOKEEPER_SERVER)){
 57             throw new Exception();
 58         }
 59         if(businessType == null || isBlank(businessType.getCode())){
 60             throw new Exception();
 61         }
 62         try {
 63             RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
 64             this.client = CuratorFrameworkFactory.newClient(ZKLockUtil.ZOOKEEPER_SERVER, retryPolicy);
 65             this.client.start();
 66             this.setLock(new InterProcessMutex(this.client, ZK_BASE_LOCK_PATH.concat(businessType.getCode())));
 67         } catch (Exception e) {
 68             throw new Exception();
 69         }
 70     }
 71     
 72     /**
 73      * 
 74      * 描述:构造函数
 75      * @author mao2080@sina.com
 76      * @created 2017年4月20日 下午3:52:44
 77      * @since 
 78      * @param businessType 业务类型
 79      * @throws BusinessException
 80      */
 81     public ZKLockUtil(BusinessType businessType) throws Exception {
 82         this.initService(businessType, ZKLockUtil.BASE_SLEEP_TIME, ZKLockUtil.MAX_RETRIES);
 83     }
 84     
 85     /**
 86      * 
 87      * 描述:构造函数
 88      * @author mao2080@sina.com
 89      * @created 2017年4月20日 下午3:52:53
 90      * @since 
 91      * @param businessType 业务类型
 92      * @param baseSleepTimeMs 获取锁-每次尝试间隔(单位:毫秒)
 93      * @param maxRetries 获取锁-最大尝试次数
 94      * @throws BusinessException 
 95      */
 96     public ZKLockUtil(BusinessType businessType, int baseSleepTimeMs, int maxRetries) throws Exception {
 97         this.initService(businessType, baseSleepTimeMs, maxRetries);
 98     }
 99     
100     /**
101      * 
102      * 描述:释放资源
103      * @author mao2080@sina.com
104      * @created 2017年4月20日 下午3:58:03
105      * @since
106      */
107     public void close(){
108         try {
109             this.getLock().release();
110         } catch (Exception e) {
111             e.printStackTrace();
112         }
113         try {
114             this.client.close();
115         } catch (Exception e) {
116              e.printStackTrace();
117         }
118     }
119     
120     /**
121      * 
122      * 描述:判断对象是否为空
123      * @author mao2080@sina.com
124      * @created 2017年3月20日 上午11:33:55
125      * @since 
126      * @param obj
127      * @return
128      */
129     public static boolean isBlank(Object obj){
130         if(obj == null || "".equals(obj.toString())){
131             return true;
132         }
133         return false;
134     }
135     
136     public InterProcessMutex getLock() {
137         return lock;
138     }
139     
140     public void setLock(InterProcessMutex lock) {
141         this.lock = lock;
142     }
143     
144     /**
145      * 
146      * 项目名称:---
147      * 模块名称:公共服务-锁工具
148      * 功能描述:业务类型
149      * 创建人: mao2080@sina.com
150      * 创建时间:2017年4月22日 下午12:27:04
151      * 修改人: mao2080@sina.com
152      * 修改时间:2017年4月22日 下午12:27:04
153      */
154     public enum BusinessType {
155         
156         Demo("0001", "Demo"),
157         
158         ;
159         
160         /**业务类型编码 */
161         private String code;
162         
163         /**业务类型名称 */
164         private String name;
165         
166         /**
167          * 
168          * 描述:构建业务类型
169          * @author mao2080@sina.com
170          * @created 2017年4月10日 下午3:42:57
171          * @since 
172          * @param code 业务类型编码
173          * @param name 业务类型名称
174          * @return
175          */
176         private BusinessType(String code, String name) {
177             this.code = code;
178             this.name = name;
179         }
180 
181         public String getCode() {
182             return code;
183         }
184 
185         public void setCode(String code) {
186             this.code = code;
187         }
188         
189         public String getName() {
190             return name;
191         }
192 
193         public void setName(String name) {
194             this.name = name;
195         }
196         
197         public static void main(String[] args) {
198             System.out.println(BusinessType.Demo.code);
199         }
200 
201     }
202 
203 }

3、使用场景

  1 package com.mao;
  2 
  3 import java.util.concurrent.CountDownLatch;
  4 import java.util.concurrent.ExecutorService;
  5 import java.util.concurrent.Executors;
  6 import java.util.concurrent.TimeUnit;
  7 
  8 import com.mao.ZKLockUtil.BusinessType;
  9 
 10 /**
 11  * 
 12  * 项目名称:---
 13  * 模块名称:公共服务-锁工具 
 14  * 功能描述:锁工具类 
 15  * 创建人: mao2080@sina.com 
 16  * 创建时间:2017年4月20日下午4:35:56 
 17  * 修改人: mao2080@sina.com 
 18  * 修改时间:2017年4月20日 下午4:35:56
 19  */
 20 public class ZKLockUtilTest {
 21 
 22     /**
 23      * 
 24      * 描述:具体使用样例
 25      * 
 26      * @author mao2080@sina.com
 27      * @created 2017年4月22日 下午12:29:38
 28      * @since
 29      * @param args
 30      * @throws Exception
 31      */
 32     public static void main1(String[] args) throws Exception {
 33         ZKLockUtil lock = new ZKLockUtil(BusinessType.Demo);
 34         try {
 35             if (lock.getLock().acquire(40, TimeUnit.SECONDS)) {
 36                 System.out.println("get lock success...do work");
 37                 Thread.sleep(20000);
 38             }
 39         } catch (Exception e) {
 40             System.out.println("get lock  fail...," + e.getMessage());
 41             e.printStackTrace();
 42         } finally {
 43             lock.close();
 44         }
 45     }
 46 
 47     /**
 48      * 
 49      * 描述:启动线程模拟并发访问
 50      * 
 51      * @author mao2080@sina.com
 52      * @created 2017年4月22日 下午12:29:13
 53      * @since
 54      * @param args
 55      * @throws InterruptedException
 56      */
 57     public static void main(String[] args) throws InterruptedException {
 58         CountDownLatch latch = new CountDownLatch(5);
 59         ExecutorService exec = Executors.newCachedThreadPool();
 60         for (int i = 0; i < 5; i++) {
 61             exec.submit(new MyLock("client" + i, latch));
 62         }
 63         exec.shutdown();
 64         latch.await();
 65         System.out.println("所有任务执行完毕");
 66     }
 67 
 68     static class MyLock implements Runnable {
 69         
 70         private String name;
 71         
 72         private CountDownLatch latch;
 73 
 74         public MyLock(String name, CountDownLatch latch) {
 75             this.name = name;
 76             this.latch = latch;
 77         }
 78 
 79         public String getName() {
 80             return name;
 81         }
 82 
 83         public void setName(String name) {
 84             this.name = name;
 85         }
 86 
 87         public void run() {
 88             ZKLockUtil locks = null;
 89             try {
 90                 locks = new ZKLockUtil(BusinessType.Demo);
 91             } catch (Exception e1) {
 92                 e1.printStackTrace();
 93                 return;
 94             }
 95             try {
 96                 if (locks.getLock().acquire(40, TimeUnit.SECONDS)) {
 97                     System.out.println("----------" + this.name+ "获得资源----------");
 98                     System.out.println("----------" + this.name+ "正在处理资源----------");
 99                     Thread.sleep(1 * 2000);
100                     System.out.println("----------" + this.name+ "资源使用完毕----------");
101                     latch.countDown();
102                 }
103             } catch (Exception e) {
104                 System.out.println("get lock  fail...," + e.getMessage());
105                 e.printStackTrace();
106             } finally {
107                 locks.close();
108             }
109         }
110     }
111 }

4、运行结果

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
----------client1获得资源----------
----------client1正在处理资源----------
----------client1资源使用完毕----------
----------client4获得资源----------
----------client4正在处理资源----------
----------client4资源使用完毕----------
----------client2获得资源----------
----------client2正在处理资源----------
----------client2资源使用完毕----------
----------client0获得资源----------
----------client0正在处理资源----------
----------client0资源使用完毕----------
----------client3获得资源----------
----------client3正在处理资源----------
----------client3资源使用完毕----------
所有任务执行完毕

5、参考博客

1、http://blog.csdn.net/wuzhilon88/article/details/41121195
2、http://www.cnblogs.com/LiZhiW/p/4931577.html

6本文demo下载

原文地址:https://www.cnblogs.com/mao2080/p/6745025.html