rocketMq学习

Mq的作用

1.系统解耦
2.流量削峰
3.数据分发

安装目录说明:

bin:启动脚本,包括shell脚本和CMD脚本
conf:实例配置文件,包括broker配置文件,logback配置文件
lib:依赖jar包,包括Netty,commons-lang,FastJson等

启动/停止方式:

1. 启动rocketMq

#a.启动NameServer
nohup sh bin/mqnamesrv &
#b.查看启动日志
tail -f ~/log/rocketmqlogs/namesrv.log

2. 启动broker

#a.启动broker
nohup sh bin/mqbroker -n localhost:9876 &
#b.查看启动日志
tail -f ~/log/rocketmqlogs/broker.log``

 **启动broker时,默认设置的内存较大。可能会发生启动失败的情况。这时需要编辑以下两个文件,修改jvm内存大小

#编辑runbroker.sh和runserver.sh默认修改JVM大小
vi runbroker.sh
vi runserver.sh

关闭rocketMq

#1.关闭nameServer
sh bin/myshutdown namesrv
#2.关闭broker
sh bin/myshutdown broker

可以用jps可以查看进程 || 查看日志。看namesrv和broker是否启动成功

测试rocketMq发送和接收消息

1.发送消息
#1.设置环境变量
export NAMESRV_ADDR=localhost:9876
#2.使用安装包的Demo发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Product

2.接收消息
#1.设置环境变量
export NAMESRV_ADDR=localhost:9876
#2.接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

rocketMq集群搭建

product:消息的发送者;举例:发信者

Consumer:消息的接收者;举例:收信者

Broker:暂存和传输消息;举例:邮局

NameServer:管理Broker;举例:邮局的管理机构

Topic:区分消息的种类;一个发送者可以发送给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息

Message Queue:相当于topic的分区;用于并行发送和接收消息

集群特点:

NameServer:无状态的节点,启动多个即为集群部署,节点之间没有信息同步

broker:部署相对复杂,Broker分为Master和Slave。一个Master对应多个Slave。但是一个Slave只能对应一个master

Master和Slave的对应关系通过指定相同的的BrokerName,不同的BrokerId来指定。brokerId为0表示为master。不为0表示为slave。

Master也可以部署多个,每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer

Product与NameServer集群中的一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息。并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。product完全无状态,可集群部署

Consumer与NameServer集群中的一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息。并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订购规则由Broker配置决定

消息:

结构:

1.消息主体Topic
2.消息Tag
3.消息内容

基本方式:

生产者发送:

1.同步--send

2.异步--异步send(有回调函数)

3.发送单向消息(只管发 不管结果)--sendOneway

消息者消费:

consumer.subscribe(Tpoic, subExpression);//消息的订阅

设置消息的回调函数,处理消息

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) ->{
    for(MessageExt msg: msgs) {
        System.out.println("消费消息:"+new String (msg.getBody()));
    }
    return ConsumeOrderlyStatus.SUCCESS;
})

consumer.setMessageModel 可以设置消息的消费模式。不设置的话默认是负载均衡模式

广播的消费模式:rocketMq有N条数据。每个消费者都消费N条

负载均衡的消费模式(默认):rocketMq有N条数据。所有消费者加起来共消费N条

顺序消息:

生产者发送(增加了一个消息队列的选择器):

/**
* 实现将同样orderId的消息发送到同一个队列里面
* message:要发送的消息
* MessageQueueSelector:消息队列选择器,规则结果相同的 返回同一个队列
* orderId:队列选择逻辑参数。用来当规则的入参
*/

SendResult sendResult = producer.send(message, new MessageQueueSelector() {   @Override   public MessageQueue select (List<MessageQueue> mqs, Message msg, Object arg) {     long orderId = (long) arg;     long index = orderId % mqs.size();     return mqs.get((int) index);   } }, orderId);

消费者消费(增加了顺序性的消息监听器): 

/**
* 消费者订阅完消息后,注册消息监听器
*/

consumer.registerMessageListener(new MessageListenerOrderly() {
  @Override
  public ConsumeOrderlyStatus consumeMessage (List<MessageExt> msgs, ConsumeOrderlyContext context) {
        for(MessageExt msg: msgs) {
            System.out.println("消费消息:"+new String (msg.getBody()));
        }
    return ConsumeOrderlyStatus.SUCCESS;
  }
});

//启动消费者
consumer.start();

延迟消息:

消费消息的速度 比 存储的消息延迟一段时间

 生产者:

增加对消息的延迟设置

message.setDelayTimeLevel(2);//设置消息延迟的等级。

其他上述的生产者

消费者和上述几个消费者一样

批量消息:

生产者发送消息是 不用for循环 逐条发送,直接发送一个List

消费者没变

数据不能超过4m。如果超过 需要分割

过滤消息:

1.使用tag进行过滤

2.使用sql语法进行过滤--使用消息选择器

生产者:message.putUserProperty("i", String.valueof(i));

消费者:consumer.subscribe(Tpoic, MessageSelector.bySql());//使用消息选择器进行消息的订阅

事务消息:

事务消息有3种状态:提交状态,回滚状态,中间状态

消息的存储结构

rocketMq将消息数据存在磁盘文件中,通过安装时的配置文件,可以知道消息存储的文件位置。

1.commitLog: 存储消息的元数据,每个1G。包括Topic、QueueId、Message

2.ConsumerQueue: 存储消息在CommitLog的索引

3.IndexFile: 为了消息查询提供了一种通过key或者时间区间来查询消息的方法。通过这种IndexFile来查找消息的方法不影响发送和消费消息的主流程

刷盘机制

1.同步刷盘:生产者发送消息过来时,先将消息写到内存中,然后阻塞生产者响应,唤醒刷盘线程,等刷盘完成后,唤醒生产者响应,将响应结果告知生产者

2.异步刷盘:生产者发送消息过来时,将消息写到内存中,然后立即返回生产者响应,等数据积累一定量后,唤醒刷盘线程,进行刷盘保存。

通过Broker配置文件中的flushDiskType参数设置刷盘机制的模式。这个参数被配置成SYNC_FLUSH或者ASYNC_FLUSH

建议:一般刷盘设置成异步刷盘的。broker设置为主从同步复制

生产者|消费者的负载均衡

product:

在发送消息时,默认会才用Roundbin轮询所有的messageQueue进行发送。让消息平均落在不同的Queue上。而由于Queue可以散落在不同的broker上。所以消息可以发送到不同的broker节点上。

consumer:

负载均衡模式(1条消息只被消费1次):采用AllocateMessageQueueAveragely或者AllocateMessageQueueAveragelybyCircle分配算法。每个consumer平均分配consumer queue。如果消费者个数>consumer queue个数。多出来的消费者将分配不到消息队列。此时多出来的消费者是没有用的。

广播模式(1条消息被所有消费者消费):由于广播模式的特性,所以每个consumer分配到了所有的consumer queue

消息重试

顺序消息的重试:

对于顺序消息,当消费者消费失败后,消息队列会不断进行消息重试(1s1次),此时。容易造成应用程序的消息阻塞。因此在使用顺序消息时,要保证应用能够及时监控并处理消费失败的情况。避免阻塞现象的发生。

无序消息的重试:

包括普通、定时、延时、事务消息。当无序消息失败时。可以通过设置返回状态达到消息重试的结果。无序消息的重试只是针对负载均衡的消费模式。若是广播模式,失败的消息不进行重试,继续消费新的消息。

最多可以进行16次消息重试。需要4小时46分钟。若是超过16次都重试失败,则该条消息不再重试投递,进入死信队列。一条消息无论重试多少次,这些重试消息的MessageId不会改变。

 

死信队列

多次重试失败的消息进入死信队列

特征:

1.不会再被消费者正常消费

2.有效期和正常消息一样 都是3天。3天后自动删除

3.1个死信队列对应一个Group Id ,而不是对应单个消费者实例

可以在RocketMq的控制台查看并重新发送该消息。让消费者重新消费

消息幂等性

消息重复场景:

1.发送时消息重复

2.投递时消息重复

3.负载均衡时消息重复

原文地址:https://www.cnblogs.com/linhongwenBlog/p/12368137.html