Rocket简介以及单机版安装

1. MQ简介

1. MQ用途

1. 限流削峰

  mq 可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统被压垮。

2. 异步解耦

  服务之间同步调用改为通过MQ异步调用的方式,一方发消息,一方接收到消息之后进行处理。

3. 数据收集

  分布式系统会产生海量数据,比如业务日志、监控数据、用户行为等。针对这些数据进行实时或批量采集汇总,然后对这些数据流进行大数据分析。通过MQ完成此类数据收集是最好的选择。

2. 常见MQ对比

JMS: Java Message Service。 定义了几种消息类型,简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)

AMQP: Advanced Message Queuing Protocol。 协议指定了exchange交换机、queue消息队列、binding 绑定提供交换机和queue之间的路由规则。

3. RocketMQ 简介

  RocketMQ 是一个统一消息引擎、轻量级数据处理平台。是阿里巴巴开源的消息中间件,2016年11月28你啊你,捐赠给apache 软件基金会, 成为apache 的孵化项目。

4. RocketMQ 基本概念

1. 消息

  消息系统所传输的物理载体,生产和消费的最小单位,每个消息都必须属于一个Topic

2. 主题topic

  表示一类消息的集合,每个主题包含若干条消息,每个消息只能属于一个主题,是MQ进行消息订阅的基本单位。

  一个生产者可以同时发送多种Topic的消息,而一个消费者只能对一个Topic消费。

3. 标签tag

  为消息设置的标签,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,而已根据业务在同一主题下设置不同标签。标签能有效地保持代码的清晰度和连贯性,并优化RocketMQ查询。消费者可以根据不同的tag 实现对不同子主题的消费,实现更好的扩展。

  Topic是消息的一级分类,Tag是消息的二级分类。

4. 队列queue

  存储消息的物理实体。一个topic 可以包含多个Queue,每个Queue中存放的是该Topic 的消息, 一个Topic 的queue 也被称为一个Topic 中消息的分区(Partition)。

  一个Topic 的Queue中的消息只能被一个消费者组中的一个消费者消费;一个Queue 中的消息不允许同一个消费者组中的多个消费者同时消费。

  还有一个分片的概念。在RocketMQ 中分片指的是存放相应Topic 的Broker。 每个分片中会创建出相应数量的分区,即Queue, 每个Queue 的大小都是相同的。

5. 消息标识

  RocketMQ 中每个消息拥有唯一的MessageId,且可以携带具有业务标识的Key, 以便对消息的查询。 不过需要注意的是,MessageId 有两个: 在生产者send() 消息时会自动生成一个MessageId (msgId), 当消息到达Broker 后,Broker 也会自动生成一个MessageId (offsetMsgId)。 msgId、offsetMsgId 与 key 都称为消息标识。

  msgId: 由producer 端生成,其生成规则为: producerIp + 进程pid + MessageClientIdSetter 的ClassLoader的hashCode + 当前时间 + AutomicInteger 自增计数器

  offsetMsgId: 由broker 端生成,其生成规则为: brokerIp + 物理分区的offset(queue 中的偏移量)

  key: 由用户指定的业务相关的唯一标识

5. 系统架构

1. Producer

  消息生产者,负责生产消息。Producer 通过MQ 的负载均衡模块选择相应的Broker 集群队列进行消息投递,投递的过程中支持快速失败并且低延迟。

  生产者都是以生产者组出现的。 生产者组是一类生产者的集合,这类Producer 发送相同Topic 类型的消息。 一个生产者组可以同时发送多个Topic 的消息。

2. Consumer

  消费者。消费者都是以消费者组的形式出现的。消费者组是同一类消费者的集合,这类Consumer 消费的是同一个Topic 的消息。消费者组使得在消息消费方面,实现负载均衡(将一个Topic 的不同Queue 平均分配给同一个消费者组的不同Consumer,注意并不是将消息负载均衡)和容错(一个消费者挂了,该组的其他Consumer 可以继续消费原Consumer 消费的Queue)变得非常容易。

  消费者组中的Consumer 的数量小于等于订阅Topic 的queue 的数量。 如果超出,则多出的Consumer 将不能消费消息。

  一个Topic 类型的消息可以被多个消费者组消费。

  消费者组只能消费一个Topic 类型的消息, 不能同时消费多个Topic 类型的消息; 一个消费者组中的消费者必须订阅相同的Topic。

3. nameServer

  是一个Broker 与 Topic 路由的注册中心,支持Broker 的动态注册与发现。主要功能包括:

(1) broker 管理: 接收Broker 集群的注册信息以及心跳等

(2) 路由信息管理: 每个NameServer 保存着broker 集群的整个路由信息和用于客户端查询的队列信息。Producer 和 Consumer 可以通过NameServer 获取Broker 的路由信息,从而进行消息投递和消费。

路由注册: nameserver 通常以集群状态部署,nameserver 集群是无状态的,各个节点之间无差异且不进行通讯。节点之间的数据同步规则如下:brkerServer 启动时,轮询NameServer 列表与每个列表建立长连接,发起注册请求,在每个NameServer 内部维护着一个Broker 列表,用来动态存储Broker 的信息。也就是说,对于Broker来说,必须明确指出所有的nameServer 列表,且不支持动态扩展。broker 每30s 向nameserver 发送心跳。

路由剔除: nameServer 有一个定时任务,每隔10 s 会扫描一次broker 表,查看每个broker最近一次心跳时间距离当前是否超过120 s, 超过即剔除。

路由发现: RocketMQ的路由发现采用的是pull 模型。 当Topic 路由信息发生变化时,nameServer 不会推给客户端,而是需要客户端主动拉取,默认每30 s 拉取一次最新的路由。

补充: 客户端nameserver 选择策略, 客户端指的是生产者和消费者

  客户端在配置时必须写上nameServer 集群的地址,选择的时候先生产一个随机数,然后再与NameServer 节点数量取模得到所索引,然后进行连接,连接失败则会进行round-robin 策略。也就是首先采用随机,失败后采用轮询。

补充: 推模型和拉模型

push 模型: 推送模型。 实时性较好,是一个"发布-订阅"模型,需要维护一个长连接。而长链接是需要资源成本的,该模型适用于: 实时性要求较高; client 数量不多,server 数据频繁变化的场景。

pull 模型: 拉模型。 存在的问题: 实时性较差。

Long Polling 模型: 长轮询模型。 上面两个模型的结合,也就是长连接只维持一定时间,超过时间没有更新后自动断开。  

4. broker

  充当消息中转角色,负责存储消息、转发消息。存储Producer 生产的消息,同时为Consumer 拉取消息做准备。也存着消息相关的元数据,包括消费者组进度偏移offset、主题、队列等。

  broker 功能示意图如下:

ClientManager: 客户端管理,负责接收解析客户端(Producer/ consumer)请求

Store Service: 负责存储。 提供API接口,处理消息存储到物理硬盘和消息查询

HA Service: 高可用。 提供Master Broker 和 Slave Broker 之间的消息同步。

Index Service: 索引服务。根据特定的Message Key,对投递到Broker 的消息进行索引,同时也提供根据MessageKey 快速查询。

  注意Broker 集群方案为主从架构,一个Master 可以包含多个Slave。 Master 处理读写, slave 负责对master 的数据进行备份。master 与slave 的对应关系是通过指定相同的broker name、不同的BrokerId 来确定的。BrokerId 为0 标识为master, 非0 标识slave。每个Broker 与NameServer 集群中的所有节点建立长连接, 定时注册Topic 信息到所有的 NameServer。

6. 工作流程

1. 启动nameServer,监听指定端口,等待broker、producer、consumer 的连接

2. 启动broker,broker 与nameserver 集群中的所有节点建立长连接,然后每30 s向NameServer 定时发送心跳包

3. 发消息前或者发送消息时,创建Topic, 需要将Topic 与 Broker 的路由关系写入到NameServer

4. Producer 跟NameServer 集群中的一个节点建立长链接,并从NameServer 中获取路由信息,即发送的Topic 的Queue 与Broker(IP + Port)的映射关系。然后根据算法选择一个Queue,与Queue 所在的Broker 建立长连接发送消息。Producer 会将路由信息缓存到自己本地,每30 s 从nameServer更新一次。

5. Consumer 和 Producer 类似,不同的是还会每30 s向broker 发送心跳,以确保存活状态。

2. 单机版安装

参考: https://rocketmq.apache.org/docs/quick-start/

 1. 下载源码并且编译

(1) 下载源码

(2) 解压后使用mvn 进行编译安装

mvn -Prelease-all -DskipTests clean install -U

(3) 解压安装之后到target 目录下会看到有可执行文件生成 %rocketmq-all-4.9.2%\distribution\target\rocketmq-4.9.2.zip

2. 启动

我们把该文件传输到linux 服务器,然后按照教程启动即可。当然windows 下面也可以启动,为了方便我们以linux 为例子

(1) 解压

unzip ./rocketmq-4.9.2.zip

(2) 启动nameserver (如果是虚拟机启动的可能需要修改下启动的JVM参数, 修改bin/runserver.sh )

nohup sh bin/mqnamesrv &

(3) 启动broker (如果是虚拟机启动的可能需要修改下启动的JVM参数, 修改bin/runbroker.sh )

nohup sh bin/mqbroker -n localhost:9876 &

jps 查看启动的java 相关进程

[root@localhost rocketmq-4.9.2]# jps -l
8981 org.apache.rocketmq.namesrv.NamesrvStartup
9093 sun.tools.jps.Jps
9017 org.apache.rocketmq.broker.BrokerStartup

(4) 测试发送消息和接收消息

- 设置环境变量

export NAMESRV_ADDR=localhost:9876

- 发送消息

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

最终控制台打出的相关日志如下:

SendResult [sendStatus=SEND_OK, msgId=7F00000123B70194FA3E0C205C4903E4, offsetMsgId=C0A80D8F00002A9F000000000008C5B6, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost, queueId=3], queueOffset=749]
SendResult [sendStatus=SEND_OK, msgId=7F00000123B70194FA3E0C205C4A03E5, offsetMsgId=C0A80D8F00002A9F000000000008C676, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost, queueId=0], queueOffset=749]
SendResult [sendStatus=SEND_OK, msgId=7F00000123B70194FA3E0C205C4B03E6, offsetMsgId=C0A80D8F00002A9F000000000008C736, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost, queueId=1], queueOffset=749]
SendResult [sendStatus=SEND_OK, msgId=7F00000123B70194FA3E0C205C4D03E7, offsetMsgId=C0A80D8F00002A9F000000000008C7F6, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost, queueId=2], queueOffset=749]
。。。

- 接收消息

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

最終控制台打出的日志如下:

ConsumeMessageThread_8 Receive New Messages: [MessageExt [brokerName=localhost, queueId=3, storeSize=192, queueOffset=720, sysFlag=0, bornTimestamp=1641216646960, bornHost=/192.168.13.143:49262, storeTimestamp=1641216646963, storeHost=/192.168.13.143:10911, msgId=C0A80D8F00002A9F0000000000086EB6, commitLogOffset=552630, bodyCRC=1191992521, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1641216703248, UNIQ_KEY=7F00000123B70194FA3E0C205AB00370, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56, 56, 48], transactionId='null'}]] 
ConsumeMessageThread_20 Receive New Messages: [MessageExt [brokerName=localhost, queueId=3, storeSize=192, queueOffset=719, sysFlag=0, bornTimestamp=1641216646951, bornHost=/192.168.13.143:49262, storeTimestamp=1641216646953, storeHost=/192.168.13.143:10911, msgId=C0A80D8F00002A9F0000000000086BB6, commitLogOffset=551862, bodyCRC=704111923, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1641216703248, UNIQ_KEY=7F00000123B70194FA3E0C205AA6036C, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56, 55, 54], transactionId='null'}]] 
。。。

(5) 关闭Server

关闭name server 还是broker, 都是使用 bin/mqshutdown 命令

[root@localhost rocketmq-4.9.2]# sh bin/mqshutdown broker
The mqbroker(9017) is running...
Send shutdown request to mqbroker(9017) OK
[root@localhost rocketmq-4.9.2]# sh bin/mqshutdown namesrv
The mqnamesrv(8981) is running...
Send shutdown request to mqnamesrv(8981) OK
[2]+  Exit 143                nohup sh bin/mqbroker -n localhost:9876

3. 安裝可视化web 界面

  可视化界面可以运行于windows, 也可以运行于linux,只需要访问到nameserver 即可

1. 下载

https://github.com/apache/rocketmq-externals/tags    下载 rocketmq-console-1.0.0

2. 修改配置

下载的是一个Springboot 源码工程,修改application.properties 文件,修改server.port 和 rocketmq.config.namesrvAddr 地址,如下:

server.contextPath=
server.port=7000
#spring.application.index=true
spring.application.name=rocketmq-console
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
logging.config=classpath:logback.xml
#if this value is empty,use env value rocketmq.config.namesrvAddr  NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
rocketmq.config.namesrvAddr=192.168.13.143:9876
#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
rocketmq.config.isVIPChannel=
#rocketmq-console's data path:dashboard/monitor
rocketmq.config.dataPath=/tmp/rocketmq-console/data
#set it false if you don't want use dashboard.default true
rocketmq.config.enableDashBoardCollect=true

3. 执行打包

到源码pom 所在目录进行打包:

mvn clean package -DskipTests=true

4. 运行target 目录下面生成的jar 包即可

java -jar rocketmq-console-ng-1.0.0.jar

5. 启动后通过浏览器访问即可

 查看message:

 该可视化界面也可以选择Topic 后查看修改topic或者发送消息、接收消息等操作,类似于rabbitMQ 的可视化界面。

   至此简单完成了rocketMQ 的单机版安装。

补充: 关于Topic 的创建

手动创建topic 时,有两种模式:

1. 集群模式: 该模式下创建的Topic 在该集群中,所有Broker 的Queue 的数量是相同的

2. broker 模式:该模式下创建的topic在该集群中,所有的queue 的数量可以是不同的。

自动创建topic 时,默认是broker 模式,会为每个broker 创建4个queue。

补充: 关于读写队列

  从物理上说,写队列和读队列是同一个队列, 是逻辑上进行区分的概念。一般情况下,读写队列数量是相同的。

  例如创建一个Topic 时写队列为4, 读队列为8。 此时系统会创建8个queue,[0, 1, 2, 3, 4, 5, 6, 7]。 生产者会将消息写入0 1 2 3 这4个队列,消费者会消费0-7这8个队列,但实际上4-7是没有消息的。

  设置读写队列是为了方便Topic 的queue 缩容。例如原来16个queue,如何缩容为8个还不丢失消息?可以动态先将写队列的数据改为8个,读队列数量不变,此时生产者写入前八个队列,而消费者消费16个队列,待消费者消费完后8个队列的数量后,再将读队列数量改为8,完成缩容。

  perm 用于设置对当前创建Topic 的操作权限: 2、4、6 表示写、读、读写。

例如,查看测试队列如下:

【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
原文地址:https://www.cnblogs.com/qlqwjy/p/15754177.html