redis事务、并发及应用场景

事务概念

参考: http://redis.cn/topics/transactions.html

事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

事务是一个原子操作:事务中的命令要么全部被执行,要么全部都不执行。

redis事务是一组命令的集合。多组命令进入到等待执行的事务队列中,执行exec命令告诉redis将等待执行的事务队列中的所有命令,按顺序执行,返回值就是这些命令组成的列表。

Redis 事务可以一次执行多个命令, 具有下列保证:

  • 批量操作在发送 EXEC 命令前被放入队列缓存。
  • 收到 EXEC 命令后进入事务执行,事务中任意命令执行失败,其余的命令依然被执行。
  • 在事务执行过程,其他客户端提交的命令请求不会插入到事务执行命令序列中。

一个事务从开始到执行会经历以下三个阶段:

  • 开始事务。
  • 命令入队。
  • 执行事务。

事务中的错误:

  • 事务在执行 EXEC 之前,入队的命令可能会出错。比如说,命令可能会产生语法错误(参数数量错误,参数名错误,等等),或者其他更严重的错误,比如内存不足(如果服务器使用 maxmemory 设置了最大内存限制的话)。
  • 命令可能在 EXEC 调用之后失败。举个例子,事务中的命令可能处理了错误类型的键,比如将列表命令用在了字符串键上面,诸如此类。

从 Redis 2.6.5 开始,服务器会对命令入队失败的情况进行记录,并在客户端调用 EXEC 命令时,拒绝执行并自动放弃这个事务

在 EXEC 命令执行之后所产生的错误, 并没有对它们进行特别处理: 即使事务中有某个/某些命令在执行时产生了错误, 事务中的其他命令仍然会继续执行

如:

127.0.0.1:6379> multi
OK
127.0.0.1:6379> set a 3
QUEUED
127.0.0.1:6379> lpop a
QUEUED
127.0.0.1:6379> exec
1) OK
2) (error) WRONGTYPE Operation against a key holding the wrong kind of value

redis 事务入队只会检查语法错误,对于exec后执行错误,没有回滚措施。而且在事务中无法在客户端做查询判断,只会得到queued,无法进行业务数据判断,也是很坑。

原子性

一个事务是一个不可分割的最小工作单位,要么都成功要么都失败。
原子操作是指你的一个业务逻辑必须是不可拆分的.比如你给别人转钱,你的账号扣钱,别人的账号增加钱。

单个 Redis 命令的执行是原子性的,但 Redis 没有在事务上增加任何维持原子性的机制,所以 Redis 事务的执行并不是原子性的。

看看下面几个原子性的命令:

  • HINCRBY key field increment 为哈希表 key 中的域 field 的值加上增量 increment
    增量也可以为负数,相当于对给定域进行减法操作。
    如果 key 不存在,一个新的哈希表被创建并执行 HINCRBY 命令。
    如果域 field 不存在,那么在执行命令前,域的值被初始化为 0
  • SETNX key value 只在键 key 不存在的情况下, 将键 key 的值设置为 value;若键 key 已经存在, 则 SETNX 命令不做任何动作。

事务命令

包含5个命令 MULTI、EXEC、DISCARD、WATCH、UNWATCH。

DISCARD 取消事务,放弃执行事务块内的所有命令。

EXEC 执行所有事务块内的命令。

MULTI 标记一个事务块的开始。

UNWATCH 取消 WATCH 命令对所有 key 的监视。

WATCH key [key ...] 监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。

乐观锁

乐观的认为数据不会出现冲突,使用version或timestamp来记录判断。乐观锁的优点开销小,不会出现锁冲突。

可利用watch命令监听key,实现乐观锁,来保证不会出现冲突,应用场景比如秒杀来防止超卖。

秒杀伪代码如下:

 WATCH 锁定量
 MULTI
 incr 锁定量
 if 锁定量 <= 库存量
 减库存
 EXEC

悲观锁

了解下相关命令

  • SETNX(SET if Not eXists) key value 只在键 key 不存在的情况下, 将键 key 的值设置为 value,返回值:命令在设置成功时返回 1 , 设置失败时返回 0
  • INCR KEY 为键 key 储存的数字值加上一。
    如果键 key 不存在, 那么它的值会先被初始化为 0 , 然后再执行 INCR 命令。
    如果键 key 储存的值不能被解释为数字, 那么 INCR 命令将返回一个错误。
    命令会返回键 key 在执行加一操作之后的值
  • SET key value [EX seconds] [PX milliseconds] [NX|XX] NX等同于SETNX操作,EX seconds 将键的过期时间设置为 seconds 秒

了解下抢购模拟代码

<?php

namespace appcontrollers;

use Yii;
use yiiwebController;
use appmodulesCommon;

/**
 * 模拟抢购处理
 * Class ShopController
 * @package appcontrollers
 */
class ShopController extends Controller
{
    public $goods = 'huawei P20';

    //初始化数据
    public function actionInit(){
        $redis = Yii::$app->redis;
        $redis->set('goodNums',100);   //设置库存
        $redis->del('order');           //清空抢购订单
        die('success');
    }

    //悲观锁
    //setnx 实现,有个问题 expire失败(1.人为错误;2.redis崩了)了,这个锁就持久化,一直被锁了
    public function actionBuy(){
        $userId = mt_rand(1,99999999);
        $goods = $this->goods;
        $redis = Yii::$app->redis;
        $lock = $goods;

        try {
            $inventory['num'] = $redis->get('goodNums');
            if($inventory['num']<=0){
                self::removeLock($lock);
                throw new Exception('活动结束');
            }
            if( $redis->setnx($lock,1) ){
                $redis->expire($lock,60);//设置过期时间,防止死锁

                //业务处理  减库存,创建订单
                $redis->decr('goodNums');
                $redis->sadd('order',$userId);

                //todo 实际业务处理时间不可控,所以需要调整过期时间,在业务处理完进行剩余生命时间的判断,没找到回滚业务

                $this->removeLock($lock);

            }else{
                throw new Exception($userId.' 抢购失败');
            }
            Common::addLog('shop.log',$userId.' 抢购成功');
        }catch (Exception $e){
            $this->removeLock($lock);
            Common::addLog('shop.log',$e->getMessage());
        }

        die('success');
    }

    //删除锁
    protected function removeLock( $lock ){
        $redis = Yii::$app->redis;
        return $redis->del($lock);
    }

    //悲观锁
    //incr 解决expire失效,解锁
    public function actionBuy2(){
        $userId = mt_rand(1,99999999);
        $goods = $this->goods;
        $redis = Yii::$app->redis;
        $lock = $goods;

        try {
            $inventory['num'] = $redis->get('goodNums');
            if($inventory['num']<=0){
                $this->removeLock($lock);
                throw new Exception('活动结束');
            }

            $lockset = $redis->incr($lock);
            if( !$lockset ){
                throw new Exception($userId.' 抢购失败');
            }

            if($lockset==1){
                $redis->expire($lock,60);//设置过期时间,防止死锁

                //业务处理  减库存,创建订单
                $redis->decr('goodNums');
                $redis->sadd('order',$userId);

                $this->removeLock($lock);
            }

            //锁的数量大于1并且没有设置过期时间,失败处理
            if( $lockset>1 && $redis->ttl($lock)===-1 ){
                $this->removeLock($lock);
                throw new Exception($userId.' 抢购失败');
            }

            Common::addLog('shop.log',$userId.' 抢购成功');
        }catch (Exception $e){
            $this->removeLock($lock);
            Common::addLog('shop.log',$e->getMessage());
        }

        die('success');
    }


    //悲观锁
    //set key value [expiration EX seconds|PX milliseconds] [NX|XX] 原子命令(redis必须大于2.6版本)
    public function actionBuy3(){
        $userId = mt_rand(1,99999999);
        $goods = $this->goods;
        $redis = Yii::$app->redis;
        $lock = $goods;

        try {
            $inventory['num'] = $redis->get('goodNums');
            if($inventory['num']<=0){
                $this->removeLock($lock);
                throw new Exception('活动结束');
            }

            $lockset = $redis->set($lock,1,'EX',60,'NX');
            if( !$lockset ){
                throw new Exception($userId.' 抢购失败');
            }

            if($lockset==1){

                //业务处理  减库存,创建订单
                $redis->decr('goodNums');
                $redis->sadd('order',$userId);
                         
                
                $this->removeLock($lock);
            }

            Common::addLog('shop.log',$userId.' 抢购成功');
        }catch (Exception $e){
            $this->removeLock($lock);
            Common::addLog('shop.log',$e->getMessage());
        }

        die('success');
    }

    # 乐观锁
    public function actionBuy4(){
        $userId = mt_rand(1,99999999);
        $goods = $this->goods;
        $redis = Yii::$app->redis;
        $lock = $goods;

        try {
            $inventory['num'] = $redis->get('goodNums');
            if($inventory['num']<=0){
                throw new Exception('活动结束');
            }

            $redis->watch($lock);
            $redis->multi();

            //todo:这里还需要重新判断下库存,否则会出现超发,高并发情况下$inventory['num']肯定会出现同时读取一个值;为了方便测试,没写db操作
            //redis事务是将命令放入队列中,无法取goodNums来判断库存是否结束,此处使用数据库来判断库存合理

            //业务处理  减库存,创建订单
            $redis->decr('goodNums');
            $redis->sadd('order',$userId);

            $redis->exec();

            Common::addLog('shop.log',$userId.' 抢购成功');
        }catch (Exception $e){
            $redis->discard();
            Common::addLog('shop.log',$e->getMessage());
        }

        die('success');
    }
    
    # 队列实现,不做详述
}

并发控制及过期时间

服务器访问并发比较大,无效访问频繁,比如说频繁请求接口,爬虫频繁访问服务器,抢购瞬时请求过大,我们需要限流处理。

限流:对访问来源计数,超过设定次数,设置过期时间,提醒访问频繁,稍后再试

limits=500   #设置1秒内限制次数50
if EXISTS userid
    return '访问频繁,锁定时间剩余(ttl userid)秒'
if userid_count_time > limits
   exprice userid,3600
   return '访问频繁,稍后再试'
else 
   MUlTI
   incr userid_count_time          # 对用户每秒的请求进行原子递增计数
   exprice userid_count_time , 60
   EXEC

//使用事务的目的是避免执行错误中断,userid_count_time持久化到磁盘,高并发下这个很有必要

计数器限流,缺点也很大,可能会超过限制数。相比下,高并发 漏桶算法、令牌桶算法更适合做限流,此处不做深究。

队列

可以用队列来做异步任务处理,实现解耦过程。来看看以下可能在使用中需要注意的问题

队列防丢失

运用数据格式list,lpush、rpop就可以入队、出队,但是会有个问题 假设出队的业务执行发生错误,数据会不会因此丢失,所以需要确保出队时确实被消费了,可以参考下面伪代码处理:

while(val = lrange(list,0,-1))
    try{
        //对val这条数据的业务代码处理
        
        rpop(list)
    }catch(Exception e){
        //记录错误,通知programmer处理
        
        break;
    }

参考下lrange语法

阻塞队列

使用队列还有另外一个问题,空队列会一直占用redis连接。利用redis队列操作的阻塞命令我们可以解决这个问题。

来看下这个命令:

BRPOP key [key …] timeout

它是 RPOP key 命令的阻塞版本,当给定列表内没有任何元素可供弹出的时候,连接将被 BRPOP 命令阻塞,直到等待超时或发现可弹出元素为止。

阻塞执行意味着不会一直占用连接,如果这个队列一直是空的,那么当客户端连接超过redis最大连接时间就会自动断掉,代码可能需要做些redis重连操作,但是这个过程相对于一直占用连接,频繁io请求,
所带来的redis性能影响将减少很多。

伪代码:

try(
    while ( var = BRPOP key ){
        //对var业务处理过程

        
    }
catch(Exception e){
    //redis连接中断 重连操作

}

时间区间控制

虽然阻塞队列能解决这个连接占用,但是日常的任务我们基本上借助cron执行,这也就是个定时执行的概念,假设采取上面的方法,势必存在多个进程同时再跑,长此以往系统的进程数就占满了,怎么办呢?

使用时间区间来限制进程运行时间,假设脚本每10分钟执行一次,执行9分钟中断:

伪代码

time = now()+60*9
while ( now()< time ) {
    var = RPOP key
    if( var ){
        //对var业务处理过程
        
    }else{
        sleep(2)      //空数据,休息2秒  避免频繁连接,占用redis资源
    }

}

也可以加上数据防丢失的处理,具体根据业务需求灵活处理。

持久化

服务器中的非空数据库以及数据库中的健值对统称数据库状态。

redis是内存数据库,数据库状态存在内存中,一旦服务器崩掉,服务器状态就会消失不见,所以需要将数据库状态存与磁盘文件中。

RDB

定期的将数据库状态保存在一个RDB快照文件中,RDB文件是一个经过压缩的二进制文件,通过该文件可还原生成RDB文件时的数据库状态。

触发方式:手动和自动

RDB 文件的创建和载入

redis命令:SAVE、BGSAVE

SAVE会阻塞Redis服务器进程,直到RDB文件创建完毕为止,在服务器进程阻塞期间,服务器不能处理任何命令请求。

BGSAVE命令会派生出一个子进程,然后由子进程负责创建RDB文件,服务器进程(父进程)继续处理命令请求。

自动触发

redis.conf 中配置

save 900 1      # 表示900 秒内如果至少有 1 个 key 的值变化,则保存
save 300 10     # 表示300 秒内如果至少有 10 个 key 的值变化,则保存
save 60 10000   # 表示60 秒内如果至少有 10000 个 key 的值变化,则保存

“save m n”。表示m秒内数据集存在n次修改时,自动触发BGSAVE。

伪代码

def SAVE():
    #创建RDB文件
    rdbSave()
def BGSAVE():
    #创建子进程
    pid = fork()
    if pid == 0:
        #子进程负责创建RDB文件
        rdbSave()
        #完成之后向父进程发送信号
        signal_parent()
    elif pid > 0:
        #父进程继续处理命令请求,并通过轮询等待子进程的信号
        handle_request_and_wait_signal()
    else:
        #处理出错情况
        handle_fork_error()

AOF

AOF持久化功能实现分为命令追加(append)、文件写入(wirte)、文件同步(sync)三个步骤。

每一个写命令都通过write函数追加到 appendonly.aof 中,配置方式:启动 AOF 持久化的方式

伪代码

def eventLoop():
    while True:
        #处理文件事件,接收命令请求以及发送命令回复
        #处理命令请求时可能会有新内容被追加到 aof_buf缓冲区中
        processFileEvents()
        #处理时间事件
        processTimeEvents()
        #考虑是否要将 aof_buf中的内容写入和保存到 AOF文件里面
        flushAppendOnlyFile()

命令追加

服务器在执行一个写命令之后,会以协议格式将执行的写命令追加到服务器状态的aof_buf缓冲区的末尾。

文件写入、同步

操作系统中,用户调用write函数写入,将一些数据写入到文件时,为了提高存储的效率,操作系统通常会将数据暂时保存在一个内存缓冲区里面,缓冲区满了或者超过指定时间,真正将缓冲区数据存储到磁盘,提高了效率,但是如果停机,也会造成缓冲区内的数据丢失,
系统提供了fsyncfdatasync两个同步函数,会强制让操作系统立即将缓冲区的数据写入硬盘,确保数据的安全性。

AOF持久化配置 redis.conf :

appendonly yes                      #开启AOF
appendfilename "appendonly.aof"     #默认存储路径

# appendfsync 设置持久化策略,三种:
#appendfsync always     # 每次有数据修改发生时AOF缓冲区数据都会写入AOF文件并同步 (效率最慢但安全性最高)
appendfsync everysec    # 每秒钟写入AOF文件并同步一次,该策略为AOF的缺省策略。(效率高,即便丢失数据只会丢失1秒的数据)
#appendfsync no         # 缓冲区的内容写入到AOF文件,但并不会对AOF文件进行同步,何时同步由操作系统来决定(效率高,丢失上一次同步到这一次的全部AOF数据)

appendonly yes开启 AOF 之后,Redis 每执行一个修改数据的命令,都会把它添加到 AOF 文件中,当 Redis 重启时,将会读取 AOF 文件进行“重放”以恢复到 Redis 关闭前的最后时刻。

RDB、AOF优缺点

RDB优缺

AOF优缺

使用 AOF 持久化会让 Redis 变得非常耐久(much more durable):你可以设置不同的 fsync 策略,比如无 fsync ,每秒钟一次 fsync ,或者每次执行写入命令时 fsync 。 AOF 的默认策略为每秒钟 fsync 一次,在这种配置下,
Redis 仍然可以保持良好的性能,并且就算发生故障停机,也最多只会丢失一秒钟的数据( fsync 会在后台线程执行,所以主线程可以继续努力地处理命令请求)。

对于相同的数据集来说,AOF 文件的体积通常要大于 RDB 文件的体积。根据所使用的 fsync 策略,AOF 的速度可能会慢于 RDB。 在一般情况下, 每秒 fsync 的性能依然非常高, 而关闭 fsync 可以让 AOF 的速度和 RDB 一样快,
即使在高负荷之下也是如此。 不过在处理巨大的写入载入时,RDB 可以提供更有保证的最大延迟时间(latency)。

随着服务器时间的流逝,AOF文件的体积会越来越大。

排序

redis可以当作数据库来存贮数据,如何解决排序查询呢?

SORT命令:

redis禁用危险命令

keys *
虽然其模糊匹配功能使用非常方便也很强大,在小数据量情况下使用没什么问题,数据量大会导致 Redis 锁住及 CPU 飙升,在生产环境建议禁用或者重命名!

flushdb
删除 Redis 中当前所在数据库中的所有记录,并且此命令从不会执行失败

flushall
删除 Redis 中所有数据库中的所有记录,不只是当前所在数据库,并且此命令从不会执行失败。

config
客户端可修改 Redis 配置。

参考:
https://blog.csdn.net/a169388842/article/details/82838818

redis配置

# 绑定ip,指定地址域连接
bind 192.168.1.100 10.0.0.1
原文地址:https://www.cnblogs.com/followyou/p/10816001.html