谷粒商城分布式高级(七)—— 缓存分布式锁(redisson & 可重入锁 & 读写锁 & 闭锁 & 信号量)

一、分布式锁与本地锁

本地缓存问题:每个微服务都要有缓存服务、数据更新时只更新自己的缓存,造成缓存数据不一致
解决方案:分布式缓存,微服务共用 缓存中间件

分布式锁

分布式项目时,但本地锁只能锁住当前服务,需要分布式锁

二、分布式锁实现

1、分布式锁原理与应用

分布式锁演进-基本原理
  我们可以同时去一个地方“占坑”,如果占到,就执行逻辑。否则就必须等待,直到释放锁。 “占坑”可以去redis,可以去数据库,可以去任何大家都能访问的地方。 等待可以自旋的方式。

(1)分布式锁演进 —— 阶段一
(2)分布式锁演进 —— 阶段二

(3)分布式锁演进 —— 阶段三
  

 (4)分布式锁演进 —— 阶段四

  

 (5)分布式锁演进 —— 阶段五-最终形态

最终的代码改造结果:

//TODO 产生堆外内存溢出:OutOfDirectMemoryError
//(1)springboot2.0以后默认使用lettuce作为操作redis客户端。它使用netty进行网络通信
//(2)lettuce的bug导致netty堆外内存溢出 -Xmx300m;netty如果没有指定堆外内存,默认使用-Xmx300m
// 可以通过-Dio.netty.maxDirectMemory进行设置
// 解决方案:不能使用-Dio.netty.maxDirectMemory只去调大堆外内存。
//(1)升级lettuce客户端
//(2)切换使用jedis
@Override
public Map<String, List<Catelog2Vo>> getCatalogJson() {
//给缓存中放json字符串,拿出的json字符串,还要逆转为能用的对象类型:【序列化与反序列化】

/**
* 1、空结果缓存:解决缓存穿透
* 2、设置过期时间(加随机值):解决缓存雪崩
* 3、枷锁:解决缓存击穿
*/

//1、加入缓存逻辑,缓存中存的数据是json字符串
//JSO跨语言、跨平台兼容
String catalogJson = redisTemplate.opsForValue().get("catalogJson");
if(StringUtils.isEmpty(catalogJson)){
//2、缓存中没有,查询数据库
Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDbWithRedisLock();
System.out.println("缓存不命中....将要查询数据库....");
return catalogJsonFromDb;
}
System.out.println("缓存命中....直接返回.....");
//转为我们指定的对象
Map<String, List<Catelog2Vo>> result = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Catelog2Vo>>>(){});
return result;
}


public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
//1、占分布式锁,去redis占坑
String uuid = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock",uuid,300,TimeUnit.SECONDS);
if(lock){
System.out.println("获取分布式锁成功.....");
//加锁成功....执行业务
//2、设置过期时间,必须和加锁是同步的,原子的
//redisTemplate.expire("lock",30,TimeUnit.SECONDS);
Map<String, List<Catelog2Vo>> dataFromDB;
try{
dataFromDB = getDataFromDB();
}finally {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
//删除锁
Integer lock1 = redisTemplate.execute(new DefaultRedisScript<Integer>(script, Integer.class)
, Arrays.asList("lock"), uuid);
}

//获取值对比+对比成功删除=原子操作 lua脚本解锁
/*String lockValue = redisTemplate.opsForValue().get("lock");
if(uuid.equals(lockValue)){
//删除自己的锁
redisTemplate.delete("lock");//删除锁
}*/

return dataFromDB;
}else{
//加锁失败....重试。synchronized
//休眠100ms重试
System.out.println("获取分布式锁失败,等待重试.....");
try {
Thread.sleep(200);
}catch (Exception e){
e.printStackTrace();
}
return getCatalogJsonFromDbWithRedisLock();//自旋的方式
}
}


private Map<String, List<Catelog2Vo>> getDataFromDB() {
String catalogJson = redisTemplate.opsForValue().get("catalogJson");
if(!StringUtils.isEmpty(catalogJson)){
//缓存不为null直接返回
Map<String, List<Catelog2Vo>> result = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Catelog2Vo>>>(){});
return result;
}

System.out.println("查询了数据库.....");

/**
* 1、将数据库的多次查询变为一次
*/
List<CategoryEntity> selectList = baseMapper.selectList(null);

//1、查询所有一级分类
List<CategoryEntity> level1Catagorys = getParent_cid(selectList, 0L);

//2、封装数据
Map<String, List<Catelog2Vo>> parent_cid = level1Catagorys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), v -> {
// 1、每一个的一级分类,查到这个以及分类的二级分类
List<CategoryEntity> categoryEntities = getParent_cid(selectList, v.getCatId());
//2、封装上面的结果
List<Catelog2Vo> catelog2Vos = null;
if (categoryEntities != null) {
catelog2Vos = categoryEntities.stream().map(l2 -> {
Catelog2Vo catelog2Vo = new Catelog2Vo(v.getCatId().toString(), null, l2.getCatId().toString(), l2.getName());
//1、找当前二级分类的三级分类封装成vo
List<CategoryEntity> level3Catalog = getParent_cid(selectList, l2.getCatId());
if(level3Catalog!=null){
List<Catelog2Vo.Category3Vo> collect = level3Catalog.stream().map(l3 -> {
//2、封装成指定格式
Catelog2Vo.Category3Vo category3Vo = new Catelog2Vo.Category3Vo(l2.getCatId().toString(), l3.getCatId().toString(), l3.getName());
return category3Vo;
}).collect(Collectors.toList());
catelog2Vo.setCatalog3List(collect);
}
return catelog2Vo;
}).collect(Collectors.toList());
}
return catelog2Vos;
}));

//3、查到的数据再放入缓存,将对象转为json放到缓存中
String s = JSON.toJSONString(parent_cid);
redisTemplate.opsForValue().set("catalogJson", s, 1, TimeUnit.DAYS);

return parent_cid;
}

   分别启动四个服务和网关,使用jemeter测试,观察控制台打印,发现四个服务只查询了一次数据库,即分布式锁实现成功

总结:

(1)redis分布式锁的原理:setnx,同一时刻只能设置成功一个

    前提,锁的key是一定的,value可以变

(2)没获取到锁阻塞或者sleep一会

(3)设置好了锁,服务出现宕机,没有执行删除锁逻辑,这就造成了死锁

   解决:设置过期时间

(4)业务还没执行完锁就过期了,别人拿到锁,自己执行完去删了别人的锁

   解决:锁续期(redisson有看门狗),。删锁的时候明确是自己的锁。如uuid 判断uuid对了,

(5)但是将要删除的时候锁过期了,别人设置了新值,那删除了别人 的锁

   解决:删除锁必须保证原子性(保证判断和删锁是原子的)。使用redis+Lua脚本 完成,脚本是原子的

三、Redisson 完成分布式锁

1、简介

    Redisson 是架设在 Redis 基础上的一个 Java 驻内存数据网格(In-Memory Data Grid)。充分的利用了 Redis 键值数据库提供的一系列优势,
基于 Java 实用工具包中常用接口,为使用者 提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,
大大降低了设计和研发大规模分布式 系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间 的协作。
官方文档:https:
//github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95

2、环境搭建

1)导入依赖
<!--以后使用redisson作为所有分布式锁,分布式对象等功能框架-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.0</version>
</dependency>

(2)配置redis
package com.atguigu.gulimall.product.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyRedisConfig {

// redission通过redissonClient对象使用 // 如果是多个redis集群,可以配置
@Bean(destroyMethod = "shutdown")
public RedissonClient redisson() {
Config config = new Config();
// 创建单例模式的配置
config.useSingleServer().setAddress("redis://192.168.56.10:6379");
// 根据Config创建出Redisson实例
return Redisson.create(config);
}
}

(3)测试

3、可重入锁(Reentrant Lock)

  分布式锁:github.com/redisson/redisson/wiki/8.-分布式锁和同步器
  A调用B。AB都需要同一锁,此时可重入锁就可以重入,A就可以调用B。不可重入锁时,A调用B将死锁
  
  基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。

  锁的续期:大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。
为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,
看门狗的检查锁的超时时间是30秒钟(每到20s就会自动续借成30s,是1/3的关系),也可以通过修改Config.lockWatchdogTimeout来另行指定。
  测试用例
com.atguigu.gulimall.product.web.IndexController 新增方法
@ResponseBody
@GetMapping("/hello")
public String hello(){
//1、获取一把锁,只要锁的名字一样,就是同一把锁
RLock lock = redisson.getLock("my-lock");

//2、加锁
lock.lock();//阻塞式等待,默认加的锁都是30s时间
//(1)锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时间长,锁自动过期被删掉
//(2)加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。
try {
System.out.println("加锁成功,执行业务..."+Thread.currentThread().getId());
Thread.sleep(30000);
}catch (Exception e){

}finally {
//3、解锁 假设解锁代码没有云顶,redisson会不会出现死锁
System.out.println("释放锁..."+Thread.currentThread().getId());
lock.unlock();
}

return "hello";
}


关于看门狗的原理-redisson如何解决死锁
@ResponseBody
@GetMapping("/hello")
public String hello(){
//1、获取一把锁,只要锁的名字一样,就是同一把锁
RLock lock = redisson.getLock("my-lock");

//2、加锁
lock.lock();//阻塞式等待,默认加的锁都是30s时间
//(1)锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时间长,锁自动过期被删掉
//(2)加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。

//lock.lock(10, TimeUnit.SECONDS);//10秒自动解锁,看门狗不续命,自动解锁时间一定要大于业务的执行时间
//问题:lock.lock(10, TimeUnit.SECONDS); 在锁时间到了以后,不糊自动续期
//1、如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间
//2、如果我们未指定锁的超时时间,就使用30 * 1000【LockWatchdogTimeout看门狗的默认时间 】
// 只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10s都会自动再次续期,续成30s
// internalLockLeaseTime【看门狗时间】/3,10s

//最佳实战
//(1)lock.lock(10, TimeUnit.SECONDS);省掉了整个续期操作。手动解锁
try {
System.out.println("加锁成功,执行业务..."+Thread.currentThread().getId());
Thread.sleep(30000);
}catch (Exception e){

}finally {
//3、解锁 假设解锁代码没有云顶,redisson会不会出现死锁
System.out.println("释放锁..."+Thread.currentThread().getId());
lock.unlock();
}

return "hello";
}
总结:
(1)如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间
(2)如果我们未指定锁的超时时间,就使用30 * 1000【LockWatchdogTimeout看门狗的默认时间 】占锁。如果返回占锁成功future,调用future.onComplete();
(3)没异常的话调用scheduleExpirationRenewal(threadId);
(4)重新设置过期时间,定时任务;
(5)看门狗的原理是定时任务:重新给锁设置过期时间,新的过期时间就是看门狗的默认时间; 锁时间/3是定时任务周期;

  Redisson同时还为分布式锁提供了异步执行的相关方法:
  RLock lock = redisson.getLock("anyLock");
  lock.lockAsync();
  lock.lockAsync(10, TimeUnit.SECONDS);
  Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);

  RLock对象完全符合Java的Lock规范。也就是说只有拥有锁的进程才能解锁,其他进程解锁则会抛出IllegalMonitorStateException错误。

但是如果遇到需要其他进程也能解锁的情况,请使用分布式信号量Semaphore 对象.
  public Map<String, List<Catalog2Vo>> getCatalogJsonDbWithRedisson() {
    Map<String, List<Catalog2Vo>> categoryMap=null;
    RLock lock = redissonClient.getLock("CatalogJson-Lock");
    lock.lock();
    try {
      Thread.sleep(30000);
      categoryMap = getCategoryMap();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }finally {
      lock.unlock();
      return categoryMap;
    }
  }

4、读写锁(ReadWriteLock)

  基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。
  分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
//保证一定能读到最新数据,修改其间,写锁是一个排它锁(互斥锁,独享锁)。读锁是一个共享锁
//写锁没释放读就必须等待
//读 + 读:相当于无锁,并发读,只会在redis中记录好,所有当前的读锁。他们都会同时加锁成功
//写 + 读:等待写锁释放
//写 + 写:阻塞方式
//读 + 写:有读锁。写也需要等待
//只要有些写的存在,都必须等到
@GetMapping("/write")
@ResponseBody
public String writeValue(){
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
String s = "";
//加写锁
RLock rLock = lock.writeLock();
try {
//1、改数据加写锁,度读数据加读锁
rLock.lock();
System.out.println("写锁加锁成功..."+Thread.currentThread().getId());
s = UUID.randomUUID().toString();
Thread.sleep(30000);
redisTemplate.opsForValue().set("writeValue",s);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rLock.unlock();
System.out.println("写锁释放..."+Thread.currentThread().getId());
}
return s;
}

@GetMapping("/read")
@ResponseBody
public String readValue(){
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
String s = "";
//加读锁
RLock rLock = lock.readLock();
try {
rLock.lock();
System.out.println("读锁加锁成功..."+Thread.currentThread().getId());
s = redisTemplate.opsForValue().get("writeValue");
Thread.sleep(30000);
} catch (Exception e) {
e.printStackTrace();
rLock.unlock();
System.out.println("读锁释放..."+Thread.currentThread().getId());
}
return s;
}

5、闭锁(CountDownLatch)

基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch相似的接口和用法。
以下代码只有 gogogo() 被调用5次后,lockDoor()才能继续执行
/**
* 放假,锁门
* 1班没人了,2
* 5个班全部走完,我们可以锁大门
*/
@GetMapping("/lockDoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
RCountDownLatch door = redisson.getCountDownLatch("door");
door.trySetCount(5);
door.await();//等待闭锁都完成

return "放假了...";
}

@GetMapping("/gogogo/{id}")
public String gogogo(@PathVariable("id") Long id){
RCountDownLatch door = redisson.getCountDownLatch("door");
door.countDown();//计数减1
return id+"班的人都走了";
}

6、信号量(Semaphore)

  信号量为存储在redis中的一个数字,当这个数字大于0时,即可以调用acquire()方法增加数量,也可以调用release()方法减少数量,
但是当调用release()之后小于0的话方法就会阻塞,直到数字大于0   基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。
同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
/**
* 车库停车
* 3车位
* 信号量也可以用作分布式限流
*/
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
RSemaphore park = redisson.getSemaphore("park");
//park.acquire();//获取一个信号,获取一个值,占一个车位
boolean b = park.tryAcquire();
if(b){
//执行业务
}else{
return "error";
}
return "ok=>"+b;
}
@GetMapping("/go")
public String go(){
RSemaphore park = redisson.getSemaphore("park");
park.release();//释放一个车位
return "ok";
}

四、缓存数据一致性

1、保证一致性模式

1)双写模式

 (2)失效模式

 (3)缓存一致性解决方案

 

//TODO 产生堆外内存溢出:OutOfDirectMemoryError
//(1)springboot2.0以后默认使用lettuce作为操作redis客户端。它使用netty进行网络通信
//(2)lettuce的bug导致netty堆外内存溢出 -Xmx300m;netty如果没有指定堆外内存,默认使用-Xmx300m
// 可以通过-Dio.netty.maxDirectMemory进行设置
// 解决方案:不能使用-Dio.netty.maxDirectMemory只去调大堆外内存。
//(1)升级lettuce客户端
//(2)切换使用jedis
@Override
public Map<String, List<Catelog2Vo>> getCatalogJson() {
//给缓存中放json字符串,拿出的json字符串,还要逆转为能用的对象类型:【序列化与反序列化】

/**
* 1、空结果缓存:解决缓存穿透
* 2、设置过期时间(加随机值):解决缓存雪崩
* 3、枷锁:解决缓存击穿
*/

//1、加入缓存逻辑,缓存中存的数据是json字符串
//JSO跨语言、跨平台兼容
String catalogJson = redisTemplate.opsForValue().get("catalogJson");
if(StringUtils.isEmpty(catalogJson)){
//2、缓存中没有,查询数据库
Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDbWithRedisLock();
System.out.println("缓存不命中....将要查询数据库....");
return catalogJsonFromDb;
}
System.out.println("缓存命中....直接返回.....");
//转为我们指定的对象
Map<String, List<Catelog2Vo>> result = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Catelog2Vo>>>(){});
return result;
}


/**
* 缓存里面的数据如何和数据库保持一致
* 缓存数据一致性
* (1)双写模式
* (2)
* @return
*/
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedissonLock() {
//1、锁的名字。锁的粒度,越细越快。
//锁的粒度:具体缓存的是某个数据,11-号商品:product-11-lock product-12-lock
RLock lock = redisson.getLock("catalogJson-lock");
lock.lock();

Map<String, List<Catelog2Vo>> dataFromDB;
try{
dataFromDB = getDataFromDB();
}finally {
lock.unlock();
}
return dataFromDB;
}
原文地址:https://www.cnblogs.com/javahr/p/15725749.html