Consul的分布式锁实现

构建分布式系统的时候,经常需要控制对共享资源的互斥访问,就涉及到分布式锁(也称为全局锁)的实现,基于目前的各种工具,我们已经有了大量的实现方式,比如:基于Redis的实现、基于Zookeeper的实现。本文将介绍一种基于Consul 的Key/Value存储来实现分布式锁以及信号量的方法。
分布式锁实现

  • 基于Consul的分布式锁主要利用Key/Value存储API中的acquire和release操作来实现。acquire和release操作是类似Check-And-Set的操作:

  • acquire操作只有当锁不存在持有者时才会返回true,并且set设置的Value值,同时执行操作的session会持有对该Key的锁,否则就返回false
    release操作则是使用指定的session来释放某个Key的锁,如果指定的session无效,那么会返回false,否则就会set设置Value值,并返回true

基本流程
在这里插入图片描述

代码实现 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.didispace</groupId>
    <artifactId>consul-distributed-lock</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <version.consul-api>1.2.1</version.consul-api>
        <version.slf4j>1.7.21</version.slf4j>
        <version.slf4j-log4j>1.7.21</version.slf4j-log4j>
        <version.log4j>1.2.17</version.log4j>
        <version.maven-compile-plugin>3.5.1</version.maven-compile-plugin>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>

        <dependency>
            <groupId>com.ecwid.consul</groupId>
            <artifactId>consul-api</artifactId>
            <version>${version.consul-api}</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${version.slf4j}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${version.slf4j-log4j}</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${version.log4j}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.10</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${version.maven-compile-plugin}</version>
                <configuration>
                    <source>${maven.compiler.source}</source>
                    <target>${maven.compiler.target}</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76

代码实现

package com.didispace.lock.consul;

import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.kv.model.PutParams;
import lombok.extern.slf4j.Slf4j;

import java.time.LocalDateTime;

/**
 * 基于Consul的互斥锁
 *
 */
@Slf4j
public class Lock extends BaseLock {

    private static final String prefix = "lock/";  // 同步锁参数前缀

    /**
     * @param consulClient
     * @param lockKey       同步锁在consul的KV存储中的Key路径,会自动增加prefix前缀,方便归类查询
     * @param checkTtl      对锁Session的TTL
     */
    public Lock(ConsulClient consulClient, String lockKey, CheckTtl checkTtl) {
        super(consulClient, prefix + lockKey, checkTtl);
    }

    /**
     * 获取同步锁
     *
     * @param block            是否阻塞,直到获取到锁为止,默认尝试间隔时间为500ms。
     * @return
     */
    public Boolean lock(boolean block) throws InterruptedException {
        return lock(block, 500L, null);
    }


    /**
     * 获取同步锁
     *
     * @param block            是否阻塞,直到获取到锁为止
     * @param timeInterval     block=true时有效,再次尝试的间隔时间
     * @param maxTimes         block=true时有效,最大尝试次数
     * @return
     */
    public Boolean lock(boolean block, Long timeInterval, Integer maxTimes) throws InterruptedException {
        if (sessionId != null) {
            throw new RuntimeException(sessionId + " - Already locked!");
        }
        sessionId = createSession("lock-" + this.keyPath);
        int count = 1;
        while(true) {
            PutParams putParams = new PutParams();
            putParams.setAcquireSession(sessionId);
            if(consulClient.setKVValue(keyPath, "lock:" + LocalDateTime.now(), putParams).getValue()) {
                return true;
            } else if(block) {
                if(maxTimes != null && count >= maxTimes) {
                    return false;
                } else {
                    count ++;
                    if(timeInterval != null)
                        Thread.sleep(timeInterval);
                    continue;
                }
            } else {
                return false;
            }
        }
    }

    /**
     * 释放同步锁
     *
     * @return
     */
    public Boolean unlock() {
        if(checkTtl != null) {
            checkTtl.stop();
        }

        PutParams putParams = new PutParams();
        putParams.setReleaseSession(sessionId);
        boolean result = consulClient.setKVValue(keyPath, "unlock:" + LocalDateTime.now(), putParams).getValue();

        destroySession();
        return result;
    }


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92

测式代码

import com.didispace.lock.consul.CheckTtl;
import com.didispace.lock.consul.Lock;
import com.ecwid.consul.v1.ConsulClient;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.Random;

/**
 * 测试
 */

public class TestLock {

    @Test
    public void testLock() throws Exception  {
        ConsulClient consulClient = new ConsulClient();
        CheckTtl checkTtl = new CheckTtl("lock-1", consulClient);
        new Thread(new LockRunner(1, new CheckTtl("lock-1", consulClient))).start();
        new Thread(new LockRunner(2, new CheckTtl("lock-2", consulClient))).start();
        new Thread(new LockRunner(3, new CheckTtl("lock-3", consulClient))).start();
        new Thread(new LockRunner(4, new CheckTtl("lock-4", consulClient))).start();
        new Thread(new LockRunner(5, new CheckTtl("lock-5", consulClient))).start();
        Thread.sleep(30000L);
    }


}

@Slf4j
@AllArgsConstructor
class LockRunner implements Runnable {

    private int flag;
    private CheckTtl checkTtl;

    @Override
    public void run() {
        Lock lock = new Lock(new ConsulClient(), "lock-key", checkTtl);
        try {
            // 获取分布式互斥锁(参数含义:阻塞模式、每次尝试获取锁的间隔500ms、尝试n次)
            if (lock.lock(true, 500L, null)) {
                log.info("Thread {} start!", flag);
                // 处理业务逻辑
                Thread.sleep(new Random().nextInt(5000));
                log.info("Thread {} end!", flag);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }
}
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

源码下载

原文地址:https://www.cnblogs.com/ExMan/p/13903672.html