消息队列 ---常用的 MQ 中间件

目前市面上比较常用的 MQ(Message Queue,消息队列)中间件有 RabbitMQ、Kafka、RocketMQ,如果是轻量级的消息队列可以使用 Redis 提供的消息队列,其中 Redis 属于轻量级的消息队列,而 RabbitMQ、Kafka 属于比较成熟且比较稳定和高效的 MQ 中间件。

Redis 轻量级的消息中间件

Redis 是一个高效的内存性数据库中间件,但使用 Redis 也可以实现消息队列的功能。

早期的 Redis(Redis 5.0 之前)是不支持消息确认的,那时候我们可以通过 List 数据类型的 lpush 和 rpop 方法来实现队列消息的存入和读取功能,或者使用 Redis 提供的发布订阅(pub/sub)功能来实现消息队列,但这种模式不支持持久化,List 虽然支持持久化但不能设置复杂的路由规则来匹配多个消息,并且他们二者都不支持消息消费确认。

于是在 Redis 5.0 之后提供了新的数据类型 Stream 解决了消息确认的问题,但它同样不能提供复杂的路由匹配规则,因此在业务不复杂的场景下可以尝试性的使用 Redis 提供的消息队列

 RabbitMQ

RabbitMQ 是一个老牌开源的消息中间件,它实现了标准的 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)消息中间件,使用 Erlang 语言开发,支持集群部署,和多种客户端语言混合调用,它支持的主流开发语言有以下这些:

Java and Spring、.NET、Ruby、Python、PHP、JavaScript and Node、Objective-C and Swift、Rust、Scala、Go......更多支持语言,请点击这里访问官网查看。

重要的概念

RabbitMQ 中有 3 个重要的概念:生产者、消费者和代理。

生产者:消息的创建者,负责创建和推送数据到消息服务器。
消费者:消息的接收方,用于处理数据和确认消息。
代理:也就是 RabbitMQ 服务本身,它用于扮演“快递”的角色,因为它本身并不生产消息,只是扮演了“快递”的角色,把消息进行暂存和传递。

运行流程

RabbitMQ 优点

• 支持持久化,RabbitMQ 支持磁盘持久化功能,保证了消息不会丢失;
高并发,RabbitMQ 使用了 Erlang 开发语言,Erlang 是为电话交换机开发的语言,天生自带高并发光环和高可用特性;
支持分布式集群,正是因为 Erlang 语言实现的,因此 RabbitMQ 集群部署也非常简单,只需要启动每个节点并使用 --link 把节点加入到集群中即可,并且 RabbitMQ 支持自动选主和自动容灾;
支持多种语言,比如 Java、.NET、PHP、Python、JavaScript、Ruby、Go 等;
支持消息确认,支持消息消费确认(ack)保证了每条消息可以被正常消费;
它支持很多插件,比如网页控制台消息管理插件、消息延迟插件等,RabbitMQ 的插件很多并且使用都很方便。

RabbitMQ 的消息类型

(1)direct(默认类型)模式,此模式为一对一的发送方式,也就是一条消息只会发送给一个消费者;
(2)headers 模式,允许你匹配消息的 header 而非路由键(RoutingKey),除此之外 headers 和 direct 的使用完全一致,但因为 headers 匹配的性能很差,几乎不会被用到;
(3)fanout 模式,为多播的方式,会把一个消息分发给所有的订阅者;
(4)topic 模式,为主题订阅模式,允许使用通配符(#、*)匹配一个或者多个消息,我可以使用“cn.mq.#”匹配到多个前缀是“cn.mq.xxx”的消息,比如可以匹配到“cn.mq.rabbit”、“cn.mq.kafka”等消息。

 RabbitMQ 集群

RabbitMQ 集群是由多个节点组成,但默认情况下每个节点并不是存储所有队列的完整拷贝,这是出于存储空间和性能的考虑,因为如果存储了队列的完整拷贝,那么就会有很多冗余的重复数据,并且在新增节点的情况下,不但没有新增存储空间,反而需要更大的空间来存储旧的数据;同样的道理,如果每个节点都保存了所有队列的完整信息,那么非查询操作的性能就会很慢,就会需要更多的网络带宽和磁盘负载来存储这些数据。

为了能兼顾性能和稳定性,RabbitMQ 集群的节点分为两种类型,即磁盘节点和内存节点对于磁盘节点来说显然它的优势就是稳定,可以把相关数据保存下来,若 RabbitMQ 因为意外情况宕机,重启之后保证了数据不丢失;而内存节点的优势是快,因为是在内存中进行数据交换和操作,因此性能比磁盘节点要高出很多倍。

如果是单个 RabbitMQ 那么就必须要求是磁盘节点,否则当 RabbitMQ 服务器重启之后所有的数据都会丢失,这样显然是不能接受的。在 RabbitMQ 的集群中,至少需要一个磁盘节点,这样至少能保证集群数据的相对可靠性。如果集群中的某一个磁盘节点崩溃了,此时整个 RabbitMQ 服务也不会处于崩溃的状态,不过部分操作会受影响,比如不能创建队列、交换器、也不能添加用户及修改用户权限,更不能添加和删除集群的节点等功能。

注意:对于 RabbitMQ 集群来说,我们启动集群节点的顺序应该是先启动磁盘节点再启动内存节点,而关闭的顺序正好和启动的顺序相反,不然可能会导致 RabbitMQ 集群启动失败或者是数据丢失等异常问题。

RocketMQ

消息队列RocketMQ是阿里巴巴集团中间件技术部自主研发的专业消息中间件。产品基于高可用分布式集群技术,提供消息发布订阅、消息轨迹查询、定时(延时)消息、资源统计、监控报警等一系列消息云服务,是企业级互联网架构的核心产品。

RocketMQ特点

稳定性

阿里巴巴双十一官方指定消息产品,支撑阿里巴巴集团所有的消息服务,历经十余年高可用与高可靠的严苛考验,是阿里巴巴交易链路的核心产品。服务可用性 99.95%,Region 化、多可用区、分布式集群化部署,确保服务高可用,即便整个机房不可用仍可正常提供消息服务;数据可靠性99.99999999%,同步双写、超三副本数据冗余与快速切换技术确保数据可靠;

高性能

历年双 11 购物狂欢节零点千万级 TPS、万亿级数据洪峰,创造了全球最大的业务消息并发以及流转纪录(日志类消息除外);在始终保证高性能前提下,支持亿级消息堆积,不影响集群的正常服务,在削峰填谷(蓄洪)、微服务解耦的场景下尤为重要;

丰富消息类型

提供丰富的消息类型,满足各种严苛场景下的高级特性需求,当前支持的消息类型涵盖普通消息、顺序消息(全局顺序 / 分区顺序)、分布式事务消息、定时消息/延时消息。

RocketMQ消息重试

对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。

无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

重试次数

消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:

如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。

注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。

死信队列

当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

死信特性

死信消息具有以下特性

  • 不会再被消费者正常消费。
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。

死信队列具有以下特性:

  • 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
  • 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
  • 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。

查看死信信息

在RocketMQ控制台查询出现死信队列的主题信息

 在消息界面根据主题查询死信消息

 选择重新发送消息

一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。

实战

WKD项目消息传送第三方系统(SAP)失败,需要手动重传,从日志获取topic和messageId,搜索消息后点击消费验证重传。

 

MessageId的生成(源码)

 

 常见面试题

项目里用的MQ组件是哪个?

RabbitMQ集群怎么做的?工作原理?(YY两年)

RocketMQ的重推机制,默认重推几次?

 参考/好文:

ROCKETMQ高级 --

 https://www.freesion.com/article/2860186254/

拉钩教育 --

https://kaiwu.lagou.com/course/courseInfo.htm?courseId=59#/detail/pc?id=1775

希望本文章对您有帮助,您的转发、点赞是我的创作动力,十分感谢。更多好文推荐,请关注我的微信公众号--JustJavaIt
原文地址:https://www.cnblogs.com/liaowenhui/p/12891993.html