RocketMQ简单介绍

1    应用场景

a)         异步处理,比如A服务做了什么事情,异步发送一个消息给其他B服务。

b)         削峰,有些服务请求量很高,服务处理不过来,那么请求先放到消息队列里面,再进行处理。

c)         解耦,降低服务模块直接通信的耦合性。

2    特点

a)         Pull模型获取消息,支持TCP、JMS、OpenMessaging

b)         支持顺序消息,确保对消息进行严格排序,并可以正常扩展

c)         支持广播消息

d)         支持消息筛选

e)         高性能和低延迟文件存储

f)          无需其他套件支持主从模式

g)         开箱即用,用户只需要注意一些配置

h)         支持Web和终端命令显示核心指标

3    NameServer

主要维护broker集群和topice的相关信息

❑ private final HashMap<String/* topic */, List<QueueData>> topicQueueTable,存储每个topic对应的broker名称、queue数量等信息

❑ private final HashMap<String/* BrokerName */, BrokerData>Broker-AddrTable, 同一个BrokerName可能对应多台机器,一个Master和多个Slave。这个结构储存着一个BrokerName对应的属性信息,包括所属Cluster名称,一个Master Broker地址和多个SlaveBroker的地址等

❑ private final HashMap<String/* ClusterName */, Set<String/* BrokerName */>>ClusterAddrTable,储存集群中Cluster的信息,一个cluster下的多个BrokerName

❑ private final HashMap<String/* BrokerAddr */, BrokerLiveInfo> BrokerLiveTable存储的内容是这台Broker机器的实时状态,包括上次更新状态的时间戳,NameServer会定期检查这个时间戳,超时没有更新就认为这个Broker无效了,将其从Broker列表里清除。

❑ private final HashMap<String/* BrokerAddr */, List<String>/* Filter Server */> filterServerTableFilter Server是过滤服务器,是RocketMQ的一种服务端过滤方式,一个Broker可以有一个或多个FilterServer。这个结构的Key是Broker的地址,Value是和这个Broker关联的多个Filter Server的地址。

4    Broker

4.1    刷盘策略

异步刷盘:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。

同步刷盘:在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。

配置方式:配置文件里flushDiskType参数设置,可选值SYNC_FLUSH、ASYNC_FLUSH中的一个。

4.2    主从复制

如果一个Broker组有Master和Slave,消息需要从Master复制到Slave上,有同步和异步两种复制方式。同步复制方式是等Master和Slave均写成功后才反馈给客户端写成功状态;异步复制方式是只要Master写成功即可反馈给客户端写成功状态。

配置方式:配置文件里brokerRole参数设置,可选值ASYNC_MASTER(MASTER异步复制)、SYNC_MASTER(MASTER同步复制)、SLAVE(当前broker是SLAVE)三个值中的一个

4.3    高可用性

主从机制在可以使得master不可用或者繁忙时,仍然可以在slave上读取消息,保证了消费端的高可用性。

在创建Topic的时候,把Topic的多个消息队列创建在多个Broker组上,这样当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息。这样就保证了生产端的高可用性。

5    客户端

5.1    消费者Consumer

LitePullConsumer

主动拉取消息,LitePullConsumer是MQPullConsumer的优化版本,提供了offset的管理、持久化等。

调用poll()每次会获取一个队列的数据返回,如果有数据立刻返回,如果没有数据在等待timeout(默认是5秒钟)时间后返回空。

MQPushConsumer

自动推送消息,实际上这里push的本质也是长轮训pull。

MessageListenerConcurrentlyMessageListenerOrderly的区别:前者会使用consumer设置的线程数并发去消费所有messageQueue的消息;后者能保证每一个messageQueue里的消息都是顺序消费的。

Consumer可以设置两种消息模式集群模式CLUSTERING(默认)广播模式BROADCASTING

集群模式下,同一个consumer分组里的多个消费者每人消费一部分内容,各自的消息内容不一样,offet由broker储存。

广播模式下,同一个consumer分组里的每个consumer都收到全部消息,offet储存在本地,consumer只会去获取自身创建之后产生消息,且无法使用顺序消费监听器MessageListenerOrderly,但将PushConsumer线程数设置为1也能达到了顺序消费效果。

5.2    生产者Produce

消息发送的返回状态有如下四种:FLUSH_DISK_TIMEOUT、FLUSH_SLAVE_TIMEOUT、SLAVE_NOT_AVAILABLE、SEND_OK

FLUSH_DISK_TIMEOUT:表示没有在规定时间内完成刷盘(需要Broker的刷盘策略被设置成SYNC_FLUSH才会报这个错误)。

FLUSH_SLAVE_TIMEOUT:表示在主备方式下,并且Broker被设置成SYNC_MASTER方式,没有在设定时间内完成主从同步。

SLAVE_NOT_AVAILABLE:这个状态产生的场景和FLUSH_SLAVE_TIMEOUT类似,表示在主备方式下,并且Broker被设置成SYNC_MASTER,但是没有找到被配置成Slave的Broker。

SEND_OK:没有出现上述三种情况时返回。

自定义消息发送规则

默认情况下,producer发送的消息会轮流分配到不同的消息队列,Consumer再根据负载均衡选择一个队列消费。如果需要人工控制,producer可以使用MessageQueueSelector指定消息发送的队列,consumer再根据queueId判断消息是哪条队列的。

5.3    顺序消息

1.      全局顺序,要保证全局顺序消息,需要先把Topic的读写队列数设置为一,然后Producer和Consumer的并发设置也要是一。

2.      局部顺序,首先将同一业务ID的消息利用MessageQueueSelector发送到同一个消息队列,然后在消费端利用MessageListenerOrderly确保同一个消息队列不被并发消费。

参考《RocketMQ实战与原理解析》

原文地址:https://www.cnblogs.com/k-blog/p/12881653.html