把 Redis 当作消息队列

把 Redis 当作消息队列

List

List 的数据结构是链表,在头部和尾部操作元素,时间复杂度都是 O(1),所以它很适合用来当作消息队列。

消息队列要符合先进先出原则,生产者从左边开始塞,消费者从右边开始消费。

127.0.0.1:6379> lpush queue msg1
(integer) 1
127.0.0.1:6379> rpop queue
"msg1"

消费者在代码中就是加一个 while 循环,不断地执行 rpop 这个操作。这样就会造成一个问题,就是当队列里面没有消息的时候,会一直循环,造成 CPU 空转,不仅浪费 CPU 资源,还影响 Redis 性能。为了解决这个问题,我们可以在队列中没有消息的时候 sleep 几秒。示例代码如下(这里 Redis 我使用的是单例模式):

public function actionRpop()
{
    $redis = Redis::getInstance();
    while (true) {
        $res = $redis->rpop('queue');
        if ($res) {
            // handle msg
            echo $res . "
";
        } else {
            sleep(2);
        }
    }
}

如此解决了 CPU 空转的问题,不过也带来了新的问题,就是处理消息最多存在 2s 的延迟。

为了解决这个问题,我们可以用 brpop 阻塞式拉取。当队列中没有消息的时候,客户端会发生阻塞直至有新的消息到来。

127.0.0.1:6379> brpop queue 0	// 0 表示不设置超时时间

这样就解决了 CPU 空转和消费延迟的问题,看起来似乎很完美。不过还是存在问题的:当不设置超时时间或者设置的超时时间太长,然后这个连接太久没有活跃过,可能会被 Redis Server 判定为无效连接,并且会被强制下线。

在 PHP 中,直接使用 brpop 不设置超时时间可能会报 read error on connection to **.**.**.**:6379 错,所以在客户端需要有重连机制。示例如下:

public function actionBrpop()
{
    try {
        $redis = Redis::getInstance();
        start:
        while (true) {
            $res = $redis->brpop('queue', 0);
            if ($res) {
                // handle msg
                print_r($res) . "
";
            }
        }
    } catch (RedisException $e) {
        echo $e->getMessage() . "
";
        goto start;
    }
}

或者直接设置阻塞 2s 超时,不断地循环就行了。

这样一来,Redis List 当作消息队列是不是就完美无缺呢?还差得远呢!

  • 首先,Redis 并不能保证持久化,就是数据不能保证不丢失!当然,这是 Redis 的设计问题,我们这里不纠结这个,否则文章写不下去了。
  • 其次,从 List 中 pop 消息后,这条消息就从 List 中删除了,没法再重复消费,就没有 ack 机制。
  • 然后,消息不可堆积。Redis 是基于内存的,内存的容量有限,迟早撑爆。一撑爆就宕机,就会丢失未同步到磁盘的数据。
  • 还有,不支持消费者组,没法加速消费。

第一个问题 pass,第三第四个问题无解。这里我们来讨论下第二个问题,我这里有一个解决方案,就是不用 pop,而是用 lrange 和 lrem 组合。先用 lrange 命令批量地获取消息,然后消费,消费成功就用 lrem 命令删除。如此虽然提高了时间复杂度,但是解决了不支持重复消费的问题。不过也带来了新的问题。因为 lrange 只能从左到右读,所以生产者只能用 rpush。还有 lrange 没有阻塞这个说法,所以还是得 sleep 避免 CPU 空转。代码示例:

public function actionLrange()
{
    $redis = Redis::getInstance();
    while (true) {
        // take 100 at a time
        $res = $redis->lrange('queue', 0, 100);
        if ($res) {
            foreach ($res as $v) {
                // handle msg
                echo $v . "
";
                // when handled success
                $redis->lrem('queue', $v, 1);
            }
        } else {
            sleep(2);
        }
    }
}

Stream

在 Redis 5.0 以后,新增了 Stream 数据类型,解决了上述无解的问题。

Redis Stream 基本的命令我就不在这里介绍了,官方文档里面都有。

Redis 文档

PHP Redis 文档

Stream 在创建队列的时候,可以指定队列长度。当队列长度超过上限,旧的消息会被删除。这样解决了消息堆积撑爆内存的问题,不过也带来了新的问题:当消费速度跟不上生产速度,未被消费的旧消息将会丢失。

# 10000 代表队列长度,* 代表 Redis 自动生成唯一 ID,msg 为 key,msg1 为value
127.0.0.1:6379> xadd queue maxlen 10000 * msg msg1
"1621579187964-0"

Stream 支持消费组,可以由多个消费者同时消费,并且保证了一条消息只能被同一消费组的一个消费者消费,避免了重复消费。

# 创建队列 queue 的消费组 group,0 等同于 0-0,代表从头开始拉取 
127.0.0.1:6379> xgroup create queue group 0
OK

现在我们可以在消费组下面添加消费者。

# count 1 代表每次拉取一条消息,block 2000 代表没有消息时阻塞 2s,> 代表拉取最新的消息
127.0.0.1:6379> xreadgroup group group consumer1 count 1 block 2000 streams queue > 
1) 1) "queue"
   2) 1) 1) "1621579187964-0"
         2) 1) "msg"
            2) "msg1"

消费组里每消费一次,都会往 pending 里面写数据,记录了所有的消费信息,比如几个消费者,每个消费者消费了几条信息,哪个消费者消费了哪条消息。

127.0.0.1:6379> xpending queue group
1) (integer) 1
2) "1621579187964-0"
3) "1621579187964-0"
4) 1) 1) "consumer1"
      2) "1"

Stream 提供了 ack 机制,每一次 ack,就会删掉 pending 里面的相关记录。

127.0.0.1:6379> xack queue group 1621579187964-0
(integer) 1

PHP 消费者代码示例:

public function actionConsumer1()
{
    $redis = Redis::getInstance();
    $queue = 'queue';
    $group = 'group';
    while (true) {
        $res = $redis->xreadgroup($group, 'consumer1', [$queue => '>'], 1, 2000);
        if ($res) {
            // handle msg
            var_dump($res);
            // when handled success
            $redis->xack($queue, $group, array_keys($res['queue']));
        }
    }
}

public function actionConsumer2()
{
    $redis = Redis::getInstance();
    $queue = 'queue';
    $group = 'group';
    while (true) {
        $res = $redis->xreadgroup($group, 'consumer2', [$queue => '>'], 1, 2000);
        if ($res) {
            // handle msg
            var_dump($res);
            // when handled success
            $redis->xack($queue, $group, array_keys($res['queue']));
        }
    }
}

运行结果:

127.0.0.1:6379> xadd queue maxlen 10000 * msg msg1
"1621580563151-0"
127.0.0.1:6379> xadd queue maxlen 10000 * msg msg2
"1621580564627-0"
127.0.0.1:6379> xadd queue maxlen 10000 * msg msg3
"1621580565981-0"
127.0.0.1:6379> xadd queue maxlen 10000 * msg msg4
"1621580567406-0"
127.0.0.1:6379> xadd queue maxlen 10000 * msg msg5
"1621580569186-0"
127.0.0.1:6379> xpending queue group
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)

# consumer1
$ php yii redis/consumer1
array(1) {
  ["queue"]=>
  array(1) {
    ["1621580564627-0"]=>
    array(1) {
      ["msg"]=>
      string(4) "msg2"
    }
  }
}
array(1) {
  ["queue"]=>
  array(1) {
    ["1621580565981-0"]=>
    array(1) {
      ["msg"]=>
      string(4) "msg3"
    }
  }
}
array(1) {
  ["queue"]=>
  array(1) {
    ["1621580567406-0"]=>
    array(1) {
      ["msg"]=>
      string(4) "msg4"
    }
  }
}

# consumer2
$ php yii redis/consumer2
array(1) {
  ["queue"]=>
  array(1) {
    ["1621580563151-0"]=>
    array(1) {
      ["msg"]=>
      string(4) "msg1"
    }
  }
}
array(1) {
  ["queue"]=>
  array(1) {
    ["1621580569186-0"]=>
    array(1) {
      ["msg"]=>
      string(4) "msg5"
    }
  }
}

总结

把 Redis 当作消费队列的优点:

  • 部署方便,学习成本低。
  • 基于内存,速度快。

缺点:

  • Redis 不能保证数据持久性,有可能会丢失数据。
  • 面对消息积压,内存资源紧张。

所以,当你确保 Redis 不会宕机(日志刷盘或者主从同步的时候数据不丢失),内存够用的情况下,完全可以用来当作消息队列。亦或是业务对于数据丢失不敏感,也可以用。否则,最好还是用专业的消息队列中间件。

原文地址:https://www.cnblogs.com/74percent/p/14794200.html