RocketMQ 概念

RocketMQ 源码包结构

  • rocketmq-broker 主要的业务逻辑,消息收发,主从同步,pagecache

  • rocketmq-client 客户端接口,比如生产者和消费者

  • rocketmq-example 例程

  • rocketmq-test 单元测试

  • rocketmq-common 公共数据结构等

  • rocketmq-distribution 编译模块,编译输出等

  • rocketmq-filter 进行Broker过滤消息传输,减小带宽压力

  • rocketmq-logappender,rocketmq-logging 日志相关

  • rocketmq-namesrv Namesrv服务,用于服务协调,类似注册中心

  • rocketmq-openmessaging 对外提供服务

  • rocketmq-remoting 远程调用接口,封装Netty底层通信

  • rocketmq-srvutil 提供一些公用的工具方法,比如解析命令行参数

  • rocketmq-store 消息存储

  • rocketmq-tools 管理工具,比如mqadmin工具

 

RocketMQ 网络部署结构

             

Name Server

  Name Server 是一个注册中心,整个分布式消息调度的总控制,给消息队列的生产者和消费者提供路由信息,提供轻量级的服务发现,路由,元数据信息,可以多个部署,相互独立(比zookeeper轻量);

  Name Server 被设计成几乎无状态的,可以横向扩展,节点之间无任何信息同步;

  每个 Broker 与 Name Server 集群中的所有节点建立长连接,定时注册 Topic 在启动的时候会到NameServer注册;

  Producer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳;

  Consumer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 由信息,并向提供 Topic 服务的 MasterSlave 建立长连接,且定时向 MasterSlave 发送心跳;

RocketMQ 逻辑部署结构

          

Producer 与 Producer Group  

  • Producer

    表示消息队列的生产者,负责生产消息,一般由业务系统负责产生消息;

      使用:

        1. 创建生产者对象 DefaultMQProducer;在一个应用程序中只能起一个组名称

        2. 设置NamesrvAddr

        3. 启动生产者服务

        4. 创建消息并发送

  • Producer Group

    生产者集合,表示发送同类消息的多个 Producer 实例(通常为发送一类消息,且发送逻辑一致),一个 Producer Group 下包含多个 Producer 实例,可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个 Producer 对象; 一个 Producer Group 可以发送多个 Topic 消息;

    Producer  Group  作用如下:

      1. 标识一类 Producer,多个生产者可以并行发消息,提高性能;

      2. 可以通过运维工具查询这个发送消息应用下有多个 Producer 实例;

      3. 发送分布式事务消息时,如果 Producer 中途意外宕机,Broker 会主动回调 Producer Group 内的任意一台机器来确认事务状态(即事务消息,broker会主动回调生产者,主动的rpc调用producer,做一个check操作

  • Producer 核心参数     
producerGroup:组名,一个应用里是唯一的,如果启动两个相同组名的应用会报错

createTopicKey:创建topic,一般不允许生产者创建topic,生产消费者topic由专门做MQ架构的人维护,生产者传到消费者会有一层封装,不会将createTopic暴露;

topicQueueNums:主题下面的队列数量,默认4个

sendMsgTimeout:消息发送的超时时间,单位ms

compressMsgBodyOverHowmuch:当消息容量超过设定值会进行压缩,默认压缩字节4096

retryTimesWhenSendFailed:消息发送失败的重发次数

retryAnotherBrokerWhenNotStoreOK:没有存储成功,往别的broker存储

maxMessageSize:最大的消息限制,默认128K

autoCreateTopicEnabale:是否自动创建主题Topic,开发建议为true,生产为false

defaultTopicQueueNums:自动创建服务器不存在的Topic,默认创建的队列数

autoCreateSubscriptionGroup:是否允许Broker自动创建订阅组,建议线下开发开启,线上关闭

brokerClusterName:集群名称

brokerId:0表示Master主节点,大于0表示从节点

brokerIP1:Broker服务地址

brokerRole:broker角色,ASYNC_MASTER/ SYNC_MASTER/ SLAVE

deleteWhen: 每天执行删除过期文件的时间,默认为每天凌晨4点

flushDiskType:刷盘策略,默认为ASYNC_FLUSH(异步刷盘),另外是 SYNC_FLUSH(同步刷盘)

listenPort:broker监听的端口号

mapedFileSizeCommitLog:单个commitlog文件大小,默认为1G

mapedFileSizeConsumeQueue:ConsumeQueue每个文件默认存30W条,可以根据项目调整

storePathRootDir:存储消息以及一些配置信息的根目录,默认为用户的${HOME}/store

storePathCommitLog:commitlog存储目录默认为${storePathRootDir}/commitlog

storePathIndex:消息索引存储路径

syncFlushTimeout:同步刷盘超时时间

diskMaxUsedSpaceRatio:检测可用的磁盘空间大小,超过会写入错误

  消息同步: HAService,HAconnection,WaitNotifyObject,对commitlog同步;

  • RocketMQ Broker消息投递状态(消息发送状态)

    • FLUSH_DISK_TIMEOUT

      没有在规定时间内完成刷盘(刷盘策略需要为SYNC_FLUSH才会出现这个错误)

    • FLUSH_SLAVE_TIMEOUT

      主从模式下,broker 是SYNC_MASTER,没有在规定时间内完成主从同步

    • SLAVE_NOT_AVAILABLE

      主从模式下,broker 是 SYNC_MASTER,但没有找到被配置成Slave的Broker

    • SEND_OK

      发送成功

Consumer 与 Consumer Group

    Consumer 表示消息队列的消费者,负责消费消息,一般是后台系统负责异步消费; 消息投递,调用的系统异步消费;通常主流消息获取模式:Push消息推送模式和Pull消息拉取模式,Consumer 被分为两类:Push Consumer 和 Pull Consumer;

  • Push

    实时性高;但会增加服务端的负载,即broker端的负载,消费者端的消费能力不一样,如果服务端Push过快,消费端会出现很多问题;

  • Pull

    消费者端从服务端(broker端)拉取消息,主动权在消费者端,可控性好;但拉取消息的时间间隔不好设置,间隔太短,则空请求;间隔太长,则消息不能及时处理;

  • 长轮询

    消费者端请求服务端(broker端),broker会保持当前连接一段时间,默认是15s,如果这段时间内有消息到达,则立刻将消息返回给消费端,没有消息且连接超过15s,则返回空,再进行重新请求;主动权在消费者端,broker端即使有大量消息也不会主动发送给消费者;

    缺点:服务端需要保持与消费端的连接,会占用资源,需要客户端连接数可控否则会一堆连接;

/**
* Maximum amount of time in minutes a message may block the consuming thread.
*/
private long consumeTimeout = 15;

  

  • PushConsumer使用长轮询模式进行实现

  PushConsumer是Consumer 的一种,应用向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer对象立刻回调 Listener 接口方法;(需要向Consumer对象注册监听器)

  RocketMQ收到消息后自动处理消息和offset,如果有新的消费者加入会自动做负载均衡;

  在broker端可以通过longPollingEnbable=true来开启长轮询;

  服务端 DefaultMQPushConsumerImpl 使用长轮询模式进行实现;执行流程:DefaultMQPushConsumerImpl ==> pullMessage方法 ==> PullCallback回调;

   消费端逻辑实现在:

      

    PushConsumer虽然是Push模式,但代码里用了大量的Pull,使用长轮询方式既有Pull的,又有Push的实时性;

    释放资源和保存Offset,调用shutdown()即可;可用@PostConstruct,@PreDestroy;

  • PullConsumer

    使用PullConsumer需要自己维护Offset;(需要主动请求Broker拉取消息)

    官方例程:org.apache.rocketmq.example.simple.PullConsumer;

    执行流程如下:

      先遍历消费端的MessageQueue,维护offset,本地存储offset,处理不同状态的消息;

public enum PullStatus {
    /**
     * Founded
     */
    FOUND,
    /**
     * No new message can be pull
     */
    NO_NEW_MSG,
    /**
     * Filtering results can not match
     */
    NO_MATCHED_MSG,
    /**
     * Illegal offset,may be too big or too small
     */
    OFFSET_ILLEGAL
}

  

  • Consumer Group

     消费者集合,一般用于接收同一类消息进行消费;表示消费同类消息的多个实例(通常消费一类消息,且消费逻辑一致),一个 Consumer Group 下包含多个 Consumer 实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个 Consumer 对象,涉及的消费方式:广播消费(BROADCASTING)和 集群消费(默认 CLUSTERING)

    • 广播消费:一条消息被多个 Consumer 消费,即使这些 Consumer 属于同一个 Consumer Group,消息也会被 Consumer Group 中的每个 Consumer 都消费一次

    • 集群消费:一个 Consumer Group 中的 Consumer 实例平均分摊消费消息,也就是说排除网络等其他原因,一条消息只会被消费一次;大多数场景应该使用的是此种消费方式

Broker

  Broker即为消息队列核心,MQ消息服务,中转角色;它负责存储消息、接收生产者产生的消息,为消费者转发消息。同时它还会存储与消息相关的元数据,包括消费者组,消费进度偏移和主题/队列信息;

Topic

  主题,表示消息的第一级类型,标识一类消息的逻辑名字,比如一个电商系统可以分为:交易消息、物流消息等;Queue 是消息的物理管理单位,而 Topic 是消息的逻辑管理单位;一条消息必须有一个Topic;RocketMQ最佳实践给出的建议是,一个应用尽可能用一个Topic;一个 Topic 下可以有多个 Queue,默认自动创建4个,手动创建8个;无论消息是生产还是消费,都需要指定 Topic;

  • Producer Group 和 Topic 消息之间的关系是多对多的关系       

    一个 Producer Group 下的实例(Producer)可以发送多个 Topic 的消息

      一个 Topic 的消息也可以由多个 Producer Group 下的实例(Producer)生产

  •  Broker Group 和 Topic 消息之间的关系是多对多的关系

   一个 Broker Group 可以为多个 Topic 提供服务

      一个 Topic 可以由一个或多个 Broker Group 提供服务

      一个 Topic 由多个 Broker Group 提供服务即《RocketMQ用户指南》中提到的多 Master,或多 Master 多 Slave 模式

      一个 Topic 由一个 Broker Group 提供服务即《RocketMQ用户指南》中提到的单 Master 模式(包含 Slave 或不包含 Slave)

  •  Consumer Group 和 Topic 消息之间的关系是多对多的关系

    一个  Consumer Group 下的实例 (Consumer)可以消费多个 Topic 的消息

     一个 Topic 的消息也可以由多个 Consumer Group 下的实例(Consumer)消费

 

   Producer Group,Broker Group,Consumer Group,Topic 之间的关系

          

        

Tag 

  标签,表示消息的第二级(子主题)类型,对 Topic 的进一步细化,用于区分同一主题下不同的业务消息,比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有Tag;

Message Queue

  消息队列,简称 Queue 或 Q;消息物理管理单位;一个Topic下可以设置多个 Queue;当发送消息时,需要执行该消息的 Topic,RocketMQ会轮询该 Topic下的所有队列,将消息发出去;一个 Topic 将有若干个 Q ;若 Topic 同时创建在不同的 Broker,则不同的 Broker上都有若干 Q,消息将物理地址存储落在不同 Broker 结点上,具有水平扩展的能力;

  无论生产者还是消费者,实际的生产和消费都是针对 Q 级别;例如 Producer 发送消息的时候,会预先选择(默认轮询)该 Topic 下面的某一条 Q 发送;Consumer 消费的时候也会负载均衡地分配若干个 Q,只拉取对应 Q 的消息;

  每一条 Message Queue 均对应一个文件,这个文件存储了实际消息的索引信息;即使文件被删除,也能通过实际纯粹的消息文件(commit log)恢复回来;

  消息存储是由ConsumeQueueCommitLog配合完成;CommitLog是真正存储消息文件;ComsumeQueue是逻辑队列,存储的是消息在CommitLog的位置;

   Topic下每个 message queue都有对应的ConsumeQueue文件,内容也会被持久化到磁盘,consumequeue存储如下:

      

   Broker,Topic,Queue 关系如下图:

            

广播消费

  消费者的一种消费模式。消息将对一个 Consumer Group 下的各个Consumer实例都投递一遍。即即使这些 Consumer 属于同一个 Consumer Group,消息也会被 Consumer Group 中的每个Consumer 都消费一次;

  实际上,是一个消费组下的每个消费者实例都获取到了 Topic 下面的每个 Message Queue 去拉取消费,消息会投递到每个消费者实例;

  这种模式下,消费进度会存储持久化到实例本地;

  

顺序消费

  消费消息的顺序要同发送消息的顺序一致;由于 Consumer 消费消息的时候是针对 Message Queue 顺序拉取并开始消费,且一条 Message Queue 只会给一个消费者(集群模式下),所以能够保证同一个消费者实例对于 Q 上消息的消费是顺序地开始消费(不一定顺序消费完成,因为消费可能并行);

  在RocketMQ中,顺序消费主要指的是都是Queue级别的局部顺序。这一类消息为满足顺序性,必须Producer单线程顺序发送,且发送到同一个队列,这样Consumer就可以按照Producer发送的顺序去消费消息;

  生产者发送的时候可以用 MessageQueueSelector 为某一批消息(通常是有相同的唯一标示id)选择同一个 Queue,则这一批消息的消费将是顺序消息(并由同一个 Consumer 完成消息)。或者Message Queue 的数量只有1,但这样消费的实例只能有一个,多出来的实例都会空跑。

 

  • 普通顺序消费

    顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生异常,Broker宕机或重启,由于队列总数发生发化,消费者会触发负载均衡,而默认地负载均衡算法采取哈希取模平均,这样负载均衡分配到定位的队列会发化,使得队列可能分配到别的实例上,则会短暂地出现消息顺序不一致;

    如果业务能容忍在集群异常情况(如某个 Broker 宕机或者重启)下,消息短暂的乱序,使用普通顺序方式比较合适;

  •  严格顺序消费

    顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式 Failover 特性,即 Broker集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低;

    如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不可用;(依赖同步双写,主备自动切换,自动切换功能目前并未实现)

             

 参考如下:

http://jm.taobao.org/2017/01/12/rocketmq-quick-start-in-10-minutes/

https://zhuanlan.zhihu.com/rocketmq

https://www.jianshu.com/p/453c6e7ff81c

《RocketMQ用户指南》

原文地址:https://www.cnblogs.com/coder-zyc/p/11834408.html