RocketMQ开发规范

前言

消息队列 RocketMQ 版是基于 Apache RocketMQ 构建的低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 版既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消

息堆积、高吞吐、可靠重试等特性。

2020年,RocketMQ面试题 -面试题驱动RocketMQ学习

技术选型

顺序消息

  • 局部顺序:适用于性能要求高,以Sharding Key作为分区字段,在同一个区块中严格地按照 FIFO 原则进行消息发布和消费的场景。

延时消息

  • RabbitMQ不支持延迟消息,Active和RocketMQ支持延迟消息。

可靠性

  • RocketMQ支持异步/同步刷盘;异步/同步Replication。Kafka使用异步刷盘方式,异步Replication。

支持的队列数

  • Kafka单机超过 64 个队列/分区,消息发送性能降低严重;RocketMQ 单机支持最高 5 万个队列,性能稳定。

消息回溯

  • Kafka可以按照Offset来回溯消息;RocketMQ支持按照时间来回溯消息,精度毫秒,例如从一天 之前的某时某分某秒开始重新消费消息。

消费失败重试机制

  • Kafka消费失败默认不支持重试;RocketMQ消费失败支持定时重试,每次重试间隔时间顺延。

二.消息队列核心概念

2.1 架构介绍:

    Name Server:是一个几乎无状态节点,可集群部署,在消息队列RocketMQ 版中提供命名服务,更新和发现Broker服务。

    Broker:消息中转角色,负责存储消息,转发消息。分为Master Broker和 Slave Broker,一个Master Broker可以对应多个Slave Broker,但是一个 Slave Broker 只能对应一个Master Broker。Broker启动后需要完成一次将自己注册

至Name Server 的操作;随后每隔 30s 定期向Name Server上报 Topic 路由信息。

    生产者:与Name Server集群中的其中一个节点(随机)建立长链接(Keep-alive),定期从Name Server读取Topic路由信息,并向提供Topic服务的Master Broker建立长链接,且定时向Master Broker发送心跳。

    消费者:与 Name Server 集群中的其中一个节点(随机)建立长连接,定期从 Name Server 拉取 Topic 路由信息,并向提供 Topic 服务的 Master Broker、Slave Broker 建立长连接,且定时向 Master Broker、Slave Broker 发送心

跳。Consumer 既可以从 Master Broker 订阅消息,也可以从 Slave Broker 订阅消息,订阅规则由 Broker 配置决定。

2.2 基本属性介绍:

    Topic:消息主题,一级消息类型,生产者向其发送消息。

    生产者:也称为消息发布者,负责生产并发送消息至 Topic。

    消费者:也称为消息订阅者,负责从 Topic 接收并消费消息。

    消息:生产者向 Topic 发送并最终传送给消费者的数据和(可选)属性的组合。

   消息属性:生产者可以为消息定义的属性,包含 Message Key 和 Tag。

   Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。

2.3 消息的收发模型:

    消息队列 RocketMQ版支持发布/订阅模型,消息生产者应用创建 Topic 并将消息发送到 Topic。消费者应用创建对 Topic 的订阅以便从其接收消息。通信可以是一对多(扇出)、多对一(扇入)和多对多。

   生产者集群:用来表示发送消息应用,一个生产者集群下包含多个生产者实例,可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个生产者对象。

   (一个生产者集群可以发送多个 Topic 消息。发送分布式事务消息时,如果生产者中途意外宕机,Broker 会主动回调生产者集群的任意一台机器来确认事务状态。)

   消费者集群:用来表示消费消息应用,一个消费者集群下包含多个消费者实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个消费者对象。

   (一个消费者集群下的多个消费者以均摊方式消费消息。如果设置的是广播方式,那么这个消费者集群下的每个实例都消费全量数据。一个消费者集群对应一个 Group ID,一个 Group ID 可以订阅多个 Topic)

三.消息中间件功能特性

    削峰填谷

     诸如秒杀、618大促、双十一等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列可提供削峰填谷的服务来解决该问题。

    异步解耦

      微服务系统架构,整体业务系统庞大而且复杂,往往采用RPC框架或负载均衡,仍有可能导致系统流量分配不均,导致应用中断业务。消息队列 RocketMQ 版可实现异步通信和应用解耦,确保业务的连续性。

    顺序收发

    日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出(First In First Out,缩写 FIFO)原理类似,消息队列 RocketMQ 版提供的顺序消息即保证消息 FIFO。顺序分为全局顺序、局部顺序。

   分布式事务一致性

     交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列 RocketMQ 版的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。

四.消息队列适用场景

   4.1 普通消息

          简单的消息发送,目前仅提供同步消息发送实现

   4.2 顺序消息

       全局顺序:适用于性能要求不高,所有的消息严格按照FIFO原则来发布和消费的场景。

       局部顺序:适用于性能要求高,以Sharding Key作为分区字段,在同一个区块中严格地按照 FIFO 原则进行消息发布和消费的场景。

       使用顺序消息时,请注意以下几点:

       顺序消息暂不支持广播模式。

       建议同一个Group ID只对应一种类型的Topic,即不同时用于顺序消息和无序消息的收发。

       顺序消息不支持异步发送方式,否则将无法严格保证顺序。

       对于全局顺序消息,建议消息不要有阻塞。同时运行多个实例,是为了防止工作实例意外退出而导致业务中断。当工作实例退出时,其他实例可以立即接手工作,不会导致业务中断,实际工作的只会有一个实例。

  4.3 延时消息(消息组件暂不提供)

      消息生产和消费有时间窗口要求:比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。

     通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息。

  4.4 消费消息

     消息订阅关系一致指的是同一个消费Group ID下所有Consumer 实例的处理逻辑必须完全一致。一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。由于消息队列 RocketMQ 版的订阅关系主要由 Topic + Tag 共同组成,因此,保持订阅关系一致意味着同一个消费者 Group ID 下所有的实例需在以下两方面均保持一致:

     A、订阅的Topic必须一致

     B、订阅的Topic中的Tag必须一致

  4.5 消息幂等

    4.5.1 当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这整个过程就可实现消息幂等。

    4.5.2消息重复的场景如下:

       发送时消息重复

       当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

     投递时消息重复

       消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列 RocketMQ 版的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且Message ID也相同的消息。

       负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及消费者应用重启)

      当消息队列的Broker或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

   4.5.3 处理方式:

       C1:根据消息组件msgId+msgType作第一层唯一校验;

       C2:业务系统根据业务标识去重;

       消息去重的实现机制:可以采用业务DB或者缓存中间件Redis ;

五.消息队列命名规范

    在这里,我简单介绍咱们对topic的定义规范以及消费者群组的定义规范,通过标准化的配置,利于咱们对相应的业务场景做监控

   (1)【强制】: 消息队列或者消费者组名称应该具备可读性和可管理性

   消息队列名称示例:MQ_hub2lmis_create_wo_pro(组成规则由3段组成,MQ:消息队列简称
   hub2lmis_create_wo:标识具体的业务语义。pro:标识该消息队列归属于生产环境)
参考2.2 基本属性介绍.topic
消费者组名称示例:CG_hub2lmis_create_wo_pro(组成规则同上)
参考2.2 基本属性介绍.Group

(2)【强制】:消息队列或者消费者组名称不要包含特殊字符

反例:包含空格、换行、单双引号以及其他转义字符

(3)【强制】: 不建议发送过大的消息 ,目前默认发送消息体大于 4MB 会被压缩,大于4MB的MQ报文数据通过文件服务存储到OSS中。

六.消息中间件与服务治理结合

   接入服务治理的好处:

     6.1 动态启停消费者群组,不用重复的变更配置、发布应用,安全可靠;

     6.2 动态调整消费者群组监听线程,避免因线程堆积等原因导致应用进程奔溃;

     6.3 客户端简化配置参数,各方只需关注自己的业务逻辑实现即可;

七.其他事项

   7.1 消息组件自身重试机制

      从使用RocketMQ开始,很多同事都在反馈RMQ的重试机制到底是如何的?

      RMQ是阶梯性的重试补偿,补偿频率为(10s,30s,1min,2min,3min,5min,6min,7min,8min,9min,10min,20min,30min,1h,2h),验证结果如下:

       

      此外,当应用服务重启后,消费失败的队列,会继续得到补偿,补偿频率仍为(10s,30s,1min,2min,3min,5min,6min,7min,8min,9min,10min,20min,30min,1h,2h)

如下图:

    

7.2 去消息仓库化   

    消息组件设计之初,引人了消息仓库(mongodb)来存储消息数据体,减轻rmq服务端的文件读写性能。随着新的技术栈迭代更新,现在已不在提供中央仓库,用来存放消息体。针对业务系统中大body,由业务系统自行将消息体拆分或者选择相应的消息存储仓库。

7.3 消费者群组和消息队列关系一一对应

    一个消费者群组仅监听一个topic。否则在应用重启或者因某个队列异常,进而导致整个消费者群组出现消费异常。

           

八、 最佳实践

   

   8.1消息组件架构图

    

  8.2消息组件数据交互图

   

  8.3 生产者

  8.3.1 发送消息注意事项

    Topic的使用

    一个业务使用一个Topic。

     Keys的使用

    每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消 息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是 哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。

  // 订单Id
  String orderId = "20034568923546";
  message.setKeys(orderId);

   日志的打印

    消息发送成功或者失败要打印消息日志,务必要打印 SendResult 和 key 字段。

  8.3.2 消息发送失败处理方式

      Producer 的 send 方法本身支持内部重试,重试逻辑如下:

  • 默认重试 3 次(同步发送为 3 次,异步发送为 3 次)。
  • 如果本身向broker发送消息产生超时异常,就不会再重试。

   以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用send同步方法发送失败时,则尝试将消息存储到db,然后由后台线程定时重试,确保消息一定到达Broker。

8.4 消费者

8.4.1 消费过程幂等

RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。

8.4.2 消费速度慢的处理方式

  1. 提高消费并行度
  • 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订 阅队列数的 Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式。
  • 提高单个 Consumer 的消费并行线程,通过修改注解 @RocketMQMessageListener 中参数 consumeThreadMin、consumeThreadMax实现。
  1. 批量方式消费 某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer 的 consumeMessageBatchMaxSize 返个参数,默认是 1 ,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。

8.4.3 消费打印日志

如果消息量较少,建议在消费入口方法打印消息日志,消费耗时等,方便后续排查问题。

8.4.4 其他消费建议

  1. 关于消费者和订阅 第一件需要注意的事情是,不同的消费者组可以独立的消费一些 topic,并且每个消费者组都有自己的 消费偏移量,请确保 同一组内的每个消费者订阅信息保持一致 。
  2. 关于线程数设置 消费者使用 ThreadPoolExecutor 在内部对消息进行消费,所以你可以通过设置 consumeThreadMin 或 consumeThreadMax 来改变它。
  3. 关于消费位点 当建立一个新的消费者组时,需要决定是否需要消费已经存在于 Broker 中的历史消息 CONSUME_FROM_LAST_OFFSET 将会忽略历史消息,并消费之后生成的任何消息。 CONSUME_FROM_FIRST_OFFSET 将会消费每个存在于 Broker 中的信息。你也可以使用 CONSUME_FROM_TIMESTAMP 来消费在指定时间戳后产生的消息。

8.4.5 订阅关系一致

订阅关系一致指的是同一个消费者 Group 下所有 Consumer 实例的处理逻辑必须完全一致。一旦订阅 关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。

背景信息

RocketMQ 里的一个消费者 Group 代表一个 Consumer 实例群组。对于大多数分布式应用来说,一个 消费者 Group 下通常会挂载多个 Consumer 实例。

由于 RocketMQ 的订阅关系主要由 Topic + Tag 共同组成,因此,保持订阅关系一致意味着同一个消费 者 Group 下所有的实例需在以下两方面均保持一致:

  • 订阅的 Topic 必须一致
  • 订阅的 Topic 中的 Tag 必须一致

正确订阅关系图片示例

多个 Group 订阅了多个 Topic,并且每个 Group 里的多个消费者实例的订阅关系保持了一致。

错误订阅关系图片示例

单个 Group 订阅了多个 Topic,但是该 Group 里的多个消费者实例的订阅关系并没有保持一致。

九.消息组件对接举例

   9.1.1、依赖包引入

   9.1.1.1、通用SDK(maven) 

<dependency>

      <groupId>com.cf.tic.gsdk</groupId>

      <artifactId>g-sdk</artifactId>

      <version>2.0.8</version>

</dependency>

9.1.1.2、通用SDK(Gradle)

compile 'com.cf.tic.gsdk:g-sdk:2.0.8

9.1.1.3、消息组件依赖

<dependency>

            <groupId>com.cf.scm.common</groupId>

        <artifactId>common-message-component-rocketmq</artifactId>

            <!--spring4.x-->

            <version>1.6.0.9</version>

<!--spring3.x-->

<version>1.6.1.9</version>

<!--spring-支持分布式消息去重(redis)-->
<version>1.6.2.9</version>

<!-- spring3.x(wms3.0)版本-->

<version>1.6.3.9</version>

</dependency>

9.1.1.4、消息组件依赖

compile ('com.cf.scm.common:common-message-component-rocketmq:1.6.0.9')

9.2、业务代码订阅治理信息数据

  9.2.1、剔除原有的zK通知模式(若存在即关注)

     ZkDateChangeManagerImpl类中changeData()方法删除掉,governanceService.init(mqservicePath);

  9.2.2、新增应用初始化类(若应用中已存在类似的初始化代码,可忽略)

/**

 * 模拟应用初始化

 * @author lisi

 * @create 2020/2/27

 */

public class ApplicationInitServiceImpl {

    @Autowired

    private CallbackService payGovernanceService;

    @PostConstruct

    public void init() {

        BeanRegistry sc = BeanRegistry.getInstance();

        CallbackService cbs = payGovernanceService;

        sc.add(CallbackService.class, cbs);

    }}

9.2.3、新增消息队列订阅关系变更通知类

/**

 * @author lisi

 * @create 2020/2/27

 */

@Service("payGovernanceService")

@Slf4j

public class PayGovernanceServiceImpl implements CallbackService

{

    @Autowired

    private GovernancePullServiceImpl governancePullService;

   

    @Override

    public String getCallbackName() {

        return null;

    }

    /**

     * 数据校验是否通过

     */

    public boolean validate(Map<String, Object> map) {

        //默认为false,需开发同事咨询处理

        return true;

    }

    @Override

    public void process(Map<String, Object> map) {

        Object o = map.get("CB_MAP_CONFIG_RESP");

        String text = (String) o;

        VersionQueryResp obj = JsonUtil.readValue(text, VersionQueryResp.class);

        String body = obj.getRuntime();

        Map<String,String> resultMap = JsonUtil.readValue(body, Map.class);

        String rmqBody =  resultMap.get("mq");

        if (StringUtil.isEmpty(rmqBody)) {

            log.warn("从服务端获取的数据为空:{}" , rmqBody);

            return;

        }

        governancePullService.init(rmqBody);

}

9.2.4、配置文件关联如上实现类

<bean id="applicationInitService" class="com.cfx.scm.baseservice.pay.service.ApplicationInitServiceImpl"

          init-method="init">

    </bean>

    <bean id="pgClientLauncher" class="com.cfx.tic.gsdk.pg.PgClientLauncher" init-method="init">

        <property name="appId" value="示例"/>

        <property name="secret" value="示例"/>

        <property name="apiGateWayUrl" value="http://sit-api-base.cfx.com/api(测试网关地址)"/>

        <property name="enabled" value="true"/>

    </bean>

9.5 业务逻辑

  9.5.1 发送消息

业务类中需引入发送生产者对象

A、@Autowired

    private RocketMQProducerServer producerServer ;

B、发送有二级标签的消息数据

producerServer.sendDataMsgConcurrently(topic,tags,MessageCommond) ;

C、发送无二级标签的消息数据

producerServer.sendDataMsgConcurrently(topic, MessageCommond) ;

MessageCommond实体对象

属性

类型

描述

是否必填

msgId

String

消息ID(由各个业务系统定义)

msgType

String

消息类型(如 该消息队列的接口名称)

isMsgBodySend

Boolean

消息体是否发送至mq

true :消息体发送至mq

false :消息体仅保存至mongodb,不发送到mq

msgBody

String

消息对象

sendTime

String

消息发送时间

feedBackTime

String

消息接收反馈时间

msgExt

String

消息描述<存放各自特殊需求的信息>

splitId

String

分库分表id

 9.5.2 接收消息

        接入本版本服务治理后,业务系统无需考虑采用配置方式来处理topic分发给什么样的bean及method处理业务逻辑,因为这些配置信息提前就已经统一配置于服务治理平台。

        客户端仅关注拿到的Topic消息交由哪个beanId、method来处理数据即可

9.6 消息仓库管理(暂停消息仓库使用)

若应用开启消息数据保存至mongodb,则需在main esourcesMETA-INFspring中引入xml文件

<import resource="classpath*:spring-cfx-mongo.xml"/>

Zk中需配置如下参数

msg.replica.set=10.88.27.120:27017,10.88.27.121:27017,10.88.27.122:27017

msg.connections.per.host=500

msg.threads.allowed.multiplier=100

msg.connect.timeout=10000

msg.max.wait.time=10000

msg.socket.keep.alive=false

msg.socket.timeout=10000

msg.mongodb.username=testuser

msg.mongodb.password=testpass

msg.database.store=cfx_rocketmq

msg.write.concern=SAFE

9.7 全局参数配置

属性名

属性介绍

默认值

msg.iscrossaccess

是否跨集群访问

false

cluster.networkmapping

网络mapping映射关系

mongodb.msg.isEnable

Mongodb开关

true

msg.gzip

消息是否开启压缩

false

三、注意事项

3.1 若对接应用为消费者,需在spring.xml中配置如下:

<!-- spring注入的是接口,关联的是实现类 [可以用实现类注入] -->

<aop:config proxy-target-class="true" />

解决如下异常:

3.2 生产者、消费者在发送、解析消息数据时,建议使com.cfx.utilities.json下的JsonUtil工具类

3.3 对接应用在引入相关附件文件时,需注意相关包名路径!!!

3.4 各个对接应用在创建(发送)消息话题(topic、队列)时,需联系RocketMQ管理员,由管理员为话题创建队列(分区)数。

3.5、接入新的消息组件,各个应用系统需提前在UAC系统中申请应用租户、租户密钥以及网关地址。

3.6、消息组件的租户是以实例类型为基准,因此PAC同一套代码、同样的prod环境,但细分为MQService、DaemonService、adapter等各个应用,因此需要向UAC环境申请多个租户(切记!)

3.7、队列(topic)创建规则:

MQ_业务2业务_profile:以MQ开头、业务流程说明、环境标识。

3.8、消费者群组示例:

CG_业务2业务_profile:将监听的队列MQ替换成CG。

四、消息组件应用技术点

4.1 消息内容加密

加密方式AES

4.2 spring事务

五、spring boot对接注意事项

5.1 包扫描的问题

SpringbootApplication 的上级目录至少是 com目录,如此消息组件中的注解类方可扫描到

5.2 相关代码附件

5.3 禁用springBoot默认mongodb加载

@SpringBootApplication(exclude = {MongoAutoConfiguration.class,MongoDataAutoConfiguration.class})

九、落地:如何让规范更好落地

1. 对于强制规范比如命名规则可以在公共RMQ客户端SDK API中进行校验;

2. 对于建议规则,由各个业务组架构师进行宣导,小组开发负责人做代码review时根据建议规则要求所属开发整改;

原文地址:https://www.cnblogs.com/liran123/p/13803872.html