RabbitMQ学习笔记

参考资料(主要是黑马、尚硅谷):https://www.bilibili.com/video/BV15k4y1k7Ep、https://www.bilibili.com/video/BV1np4y1C7Yf?p=248

安装包位置:I:归档资料资料-RabbitMQ深入浅出资料

百度云:

链接:https://pan.baidu.com/s/16VLFvA1u_52UOg2lvIgs6Q
提取码:erpl

博客园、CSDN同步更新

博客园:https://www.cnblogs.com/tangliping/p/14800943.html

CSDN:https://blog.csdn.net/qq_37023928/article/details/117194127

1.消息中间件概述

1.1. 什么是消息中间件

MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。

  • 为什么使用MQ

    在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高系统吞吐量

  • 开发中消息队列通常有如下应用场景:

    1、任务异步处理

    将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。

    2、应用程序解耦合

    MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。

    3、削峰填谷

    如订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒1000左右的并发写入,并发量再高就容易宕机。低峰期的时候并发也就100多个,但是在高峰期时候,并发量会突然激增到5000以上,这个时候数据库肯定卡死了。

    消息被MQ保存起来了,然后系统就可以按照自己的消费能力来消费,比如每秒1000个数据,这样慢慢写入数据库,这样就不会卡死数据库了。

    但是使用了MQ之后,限制消费消息的速度为1000,但是这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了。但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000QPS,直到消费完积压的消息,这就叫做“填谷”

1.2. AMQP 和 JMS

MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS。

1.2.1. AMQP

AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。

1.2.2. JMS

JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

1.2.3. AMQP 与 JMS 区别

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
  • JMS规定了两种消息模式;而AMQP的消息模式更加丰富

1.3. 消息队列产品

市场上常见的消息队列有如下:

  • ActiveMQ:基于JMS
  • ZeroMQ:基于C语言开发
  • RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
  • RocketMQ:基于JMS,阿里巴巴产品
  • Kafka:类似MQ的产品;分布式消息系统,高吞吐量

1.4. RabbitMQ

RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。

RabbitMQ官方地址:http://www.rabbitmq.com/

RabbitMQ提供了6种模式:简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式(远程调用,不太算MQ;暂不作介绍);

官网对应模式介绍:https://www.rabbitmq.com/getstarted.html

1.5 RabbitMQ组成架构

2.安装及配置RabbitMQ

2.1. 安装Socat

在线安装依赖环境:

yum install gcc
yum install socat

2.2安装Erlang

查看版本对于关系:https://www.rabbitmq.com/which-erlang.html

# 上传 erlang-22.0.7-1.el7.x86_64.rpm 安装包上传
# 安装
rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm

2.3 安装RabbitMQ

# 安装
rpm -ivh rabbitmq-server-3.7.17-1.el7.noarch.rpm

2.4 开启管理界面及配置

# 开启管理界面
rabbitmq-plugins enable rabbitmq_management

# 配置远程可使用guest登录mq
cd /usr/share/doc/rabbitmq-server-3.7.17

cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

# 修改配置文件
vi /etc/rabbitmq/rabbitmq.config

2.5 启动

centos6用这个命令:
/sbin/service rabbitmq-server restart

centos7用这个命令:
systemctl start rabbitmq-server

2.6 配置虚拟主机及用户

用户角色

RabbitMQ在安装好后,可以访问http://ip地址:15672 ;其自带了guest/guest的用户名和密码;如果需要创建自定义用户;那么也可以登录管理界面后,如下操作:

角色说明

1、 超级管理员(administrator)

可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

2、 监控者(monitoring)

可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

3、 策略制定者(policymaker)

可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

4、 普通管理者(management)

仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

5、 其他

无法登陆管理控制台,通常就是普通的生产者和消费者。

2.7Virtual Hosts配置

像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。RabbitMQ也有类似的权限管理;在RabbitMQ中可以虚拟消息服务器Virtual Host,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。Virtual Name一般以/开头。

创建Virtual Hosts

设置Virtual Hosts权限

3.RabbitMQ工作模式

3.1Work queues 工作队列模式

生产者直接将消息发送到队列,队列中的消息可以由多个消费者直接消费。多个消费者存在竞争关系,一条消息被消费者1消费了,消费者2就不能再消费该消息了

3.2发布订阅模式

工作队列模式中,只有3个角色:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分

而在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • C:消费者,消息的接受者,会一直等待消息到来。
  • Queue:消息队列,接收消息、缓存消息。
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

发布订阅模式: 1、每个消费者监听自己的队列。 2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

关键步骤

1、创建FanoutExchange交换机

2、创建两个队列,绑定到FanoutExchange交换机上。

代码:https://blog.csdn.net/houxian1103/article/details/105173059

生产者:

        <!-- springboot-web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
spring:
  rabbitmq:
  ####连接地址
    host: 127.0.0.1
   ####端口号   
    port: 5672
   ####账号 
    username: guest
   ####密码  
    password: guest
   ### 地址
    virtual-host: /
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
 
//Fanout 类型 发布订阅模式
@Component
public class FanoutConfig {
 
	// 邮件队列
	private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";
 
	// 短信队列
	private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
	// fanout 交换机
	private String EXCHANGE_NAME = "fanoutExchange";
 
	// 1.定义邮件队列
	@Bean
	public Queue fanOutEamilQueue() {
		return new Queue(FANOUT_EMAIL_QUEUE);
	}
 
	// 2.定义短信队列
	@Bean
	public Queue fanOutSmsQueue() {
		return new Queue(FANOUT_SMS_QUEUE);
	}
 
	// 2.定义交换机
	@Bean
	FanoutExchange fanoutExchange() {
		return new FanoutExchange(EXCHANGE_NAME);
	}
 
	// 3.队列与交换机绑定邮件队列
	@Bean
	Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) {
		return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);
	}
 
	// 4.队列与交换机绑定短信队列
	@Bean
	Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {
		return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);
	}
}

发送消息:

import java.util.UUID;
 
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import com.alibaba.fastjson.JSONObject;
 
@Component
public class FanoutProducer {
	@Autowired
	private AmqpTemplate amqpTemplate;
 
	public void send(String queueName) {
		JSONObject jsonObject = new JSONObject();
		jsonObject.put("email", "22222@qq.com");
		jsonObject.put("timestamp", System.currentTimeMillis());
		String jsonString = jsonObject.toJSONString();
		System.out.println("jsonString:" + jsonString);
		// 设置消息唯一id 保证每次重试消息id唯一
		Message message = MessageBuilder.withBody(jsonString.getBytes())
				.setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8")
				.setMessageId(UUID.randomUUID() + "").build();
		amqpTemplate.convertAndSend(queueName, message);
	}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
 
import com.itmayiedu.rabbitmq.FanoutProducer;
 
@RestController
public class ProducerController {
	@Autowired
	private FanoutProducer fanoutProducer;
 
	@RequestMapping("/sendFanout")
	public String sendFanout(String queueName) {
		fanoutProducer.send(queueName);
		return "success";
	}
}

消费者端:

		<!-- springboot-web组件 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<!-- 添加springboot对amqp的支持 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
 
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-mail</artifactId>
		</dependency>
 
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-lang3</artifactId>
		</dependency>
		<!--fastjson -->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.49</version>
		</dependency>
spring:
  rabbitmq:
  ####连接地址
    host: 127.0.0.1
   ####端口号   
    port: 5672
   ####账号 
    username: guest
   ####密码  
    password: guest
   ### 地址
    virtual-host: /
    listener: 
      simple:
        retry:
        ####开启消费者异常重试
          enabled: true
         ####最大重试次数
          max-attempts: 5
        ####重试间隔次数
          initial-interval: 2000
        ####开启手动ack  
        acknowledge-mode: manual 

邮件消费者:

import java.util.Map;
 
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
 
import com.alibaba.fastjson.JSONObject;
import com.itmayiedu.rabbitmq.utils.HttpClientUtils;
import com.rabbitmq.client.Channel;
 
//邮件队列
@Component
public class FanoutEamilConsumer {
	@RabbitListener(queues = "fanout_email_queue")
	public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
		// 获取消息Id
		String messageId = message.getMessageProperties().getMessageId();
		String msg = new String(message.getBody(), "UTF-8");
		System.out.println("邮件消费者获取生产者消息" + "messageId:" + messageId + ",消息内容:" + msg);
		JSONObject jsonObject = JSONObject.parseObject(msg);
		// 获取email参数
		String email = jsonObject.getString("email");
		// 请求地址
		String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
		JSONObject result = HttpClientUtils.httpGet(emailUrl);
		if (result == null) {
			// 因为网络原因,造成无法访问,继续重试
			throw new Exception("调用接口失败!");
		}
		// // 手动ack
		Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
		// 手动签收
		channel.basicAck(deliveryTag, false);
		System.out.println("执行结束....");
	}
}

3.3路由模式

路由模式特点:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

3.4Topics通配符模式

Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

item.#:能够匹配item.insert.abc 或者 item.insert
item.*:只能匹配item.insert

关键步骤:

创建交换机的类型为TopicExchange

交换机绑定队列时,使用通配符的方式进行绑定,例如:item.*

Topic主题模式可以实现 Publish/Subscribe发布与订阅模式Routing路由模式 的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。

模式总结:

RabbitMQ工作模式: 1、简单模式 HelloWorld 一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)

2、工作队列模式 Work Queue 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)

3、发布订阅模式 Publish/subscribe 需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列

4、路由模式 Routing 需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

5、通配符模式 Topic 需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

4.RbbitMQ高可用集群

4.1RabbitMQ集群架构模式

1、主备模式

用来实现RabbitMQ的高可用集群,一般是在并发和数据不是特别多的时候使用,当主节点挂掉以后会从备份节点中选择一个节点出来作为主节点对外提供服务。

2、远程模式

主要用来实现双活,简称为Shovel模式,所谓的Shovel模式就是让我们可以把消息复制到不同的数据中心,让两个跨地域的集群互联。

3、镜像队列模式

镜像队列也被称为Mirror队列,主要是用来保证mq消息的可靠性的,他通过消息复制的方式能够保证我们的消息100%不丢失,同时该集群模式也是企业中使用最多的模式。

4、多活模式

多活模式主要是用来实现异地数据复制,Shovel模式其实也可以实现,但是他的配置及其繁琐同时还要受到版本的限制,所以如果做异地多活我们更加推荐使用多活模式,使用多活模式我们需要借助federation插件来实现集群与集群之间或者节点与节点之前的消息复制,该模式被广泛应用于饿了么、美团、滴滴等企业。

5、集群模式总结

主备模式下主节点提供读写,从节点不提供读写服务,只是负责提供备份服务,备份节点的主要功能是在主节点宕机时,完成自动切换 从-->主,同时因为主备模式下始终只有一个对外提供服务那么对于高并发的情况下该模式并不合适.

远程模式可以让我们实现异地多活的mq,但是现在已经有了更好的异地多活解决方案,所以在实际的项目中已经不推荐使用了

镜像队列模式可以让我们的消息100%不丢失,同时可以结合HAProxy来实现高并发的业务场景所以在项目中使用得最多

4.2镜像队列集群搭建

使用三台节点安装,node01、node02、node03

首先,三台节点按照第二章进行基础安装。

在三台执行下面的添加用户操作:

rabbitmqctl add_user admin 123456
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
rabbitmqctl set_user_tags admin administrator

在node01复制到node02、node03:

scp /var/lib/rabbitmq/.erlang.cookie root@node02:/var/lib/rabbitmq/
scp /var/lib/rabbitmq/.erlang.cookie root@node03:/var/lib/rabbitmq/

停止所有节点(三台节点执行):

systemctl stop rabbitmq-server
/etc/init.d/rabbitmq-server stop
ps -ef|grep rabbitmq
如果进程还在的话,直接kill掉

将node02、node03加入集群(在node02、node03执行)

rabbitmq-server -detached
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node01
rabbitmqctl start_app

查看集群是否成功

rabbitmqctl cluster_status

设置镜像队列策略

# 将所有队列设置为镜像队列,即队列会被复制到各个节点:  主节点执行
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

前台访问地址:http://192.168.66.61:15672/

可以修改集群信息(可选)

#在任何一个节点上执行该命令修改集群的名字
rabbitmqctl set_cluster_name rabbitmq_cd_itcast 
#在非server2的节点上执行可以移除server2节点
rabbitmqctl forget_cluster_node rabbit@server2 
# 修改了信息以后可以查看修改是否成功
rabbitmqctl cluster_status

4.3HAProxy 的配置

在node02、node03安装

HAProxy 简介:

HAProxy是一款提供高可用性、负载均衡以及基于TCP和HTTP应用的代理软件,HAProxy是完全免费的、借助HAProxy可以快速并且可靠的提供基于TCP和HTTP应用的代理解决方案。

HAProxy适用于那些负载较大的web站点,这些站点通常又需要会话保持或七层处理。

HAProxy可以支持数以万计的并发连接,并且HAProxy的运行模式使得它可以很简单安全的整合进架构中,同时可以保护web服务器不被暴露到网络上。

在node02、node03进行安装,下载比较慢,直接从本地获取软件包:

#下载依赖包 - 如果已安装可以跳过
yum install gcc vim wget
# 下载haproxy
wget http://www.haproxy.org/download/1.6/src/haproxy-1.6.5.tar.gz
#解压
tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local
#进入目录、进行编译、安装
cd /usr/local/haproxy-1.6.5
make TARGET=linux31 PREFIX=/usr/local/haproxy
make install PREFIX=/usr/local/haproxy
#用来存放配置文件
mkdir /etc/haproxy
#赋权
groupadd -r -g 149 haproxy
useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy
#创建haproxy配置文件
touch /etc/haproxy/haproxy.cfg

node02的配置:

#logging options
global
    log 127.0.0.1 local0 info
    maxconn 5120
    # ha的安装地址
    chroot /usr/local/haproxy
    uid 99
    gid 99
    daemon
    quiet
    nbproc 20
    pidfile /var/run/haproxy.pid

defaults
    log global
    #使用4层代理模式,”mode http”为7层代理模式
    mode tcp
    #if you set mode to tcp,then you nust change tcplog into httplog
    option tcplog
    option dontlognull
    retries 3
    option redispatch
    maxconn 2000
    contimeout 5s
     ##客户端空闲超时时间为 30秒 则HA 发起重连机制
     clitimeout 30s
     ##服务器端链接超时时间为 15秒 则HA 发起重连机制
     srvtimeout 15s
#front-end IP for consumers and producters

listen rabbitmq_cluster
    bind 192.168.66.62:5670
    #配置TCP模式
    mode tcp
    #balance url_param userid
    #balance url_param session_id check_post 64
    #balance hdr(User-Agent)
    #balance hdr(host)
    #balance hdr(Host) use_domain_only
    #balance rdp-cookie
    #balance leastconn
    #balance source //ip
    #简单的轮询
    balance roundrobin
    #rabbitmq集群节点配置 #inter 每隔五秒对mq集群做健康检查, 2次正确证明服务器可用,2次失败证明服务器不可用,并且配置主备机制
        server server1 192.168.66.61:5672 check inter 5000 rise 2 fall 2
        server server2 192.168.66.62:5672 check inter 5000 rise 2 fall 2
        server server3 192.168.66.63:5672 check inter 5000 rise 2 fall 2
#配置haproxy web监控,查看统计信息
listen stats
    bind 192.168.66.62:8100 # 注意此处的ip地址,我们配置了2台机器
    mode http
    option httplog
    stats enable
    #设置haproxy监控地址为http://192.168.66.62:8100/rabbitmq-stats
    stats uri /rabbitmq-stats
    stats refresh 5s

node03的配置:

#logging options
global
    log 127.0.0.1 local0 info
    maxconn 5120
    # ha的安装地址
    chroot /usr/local/haproxy
    uid 99
    gid 99
    daemon
    quiet
    nbproc 20
    pidfile /var/run/haproxy.pid
defaults
    log global
    #使用4层代理模式,”mode http”为7层代理模式
    mode tcp
    #if you set mode to tcp,then you nust change tcplog into httplog
    option tcplog
    option dontlognull
    retries 3
    option redispatch
    maxconn 2000
    contimeout 5s
     ##客户端空闲超时时间为 30秒 则HA 发起重连机制
     clitimeout 30s
     ##服务器端链接超时时间为 15秒 则HA 发起重连机制
     srvtimeout 15s
#front-end IP for consumers and producters
listen rabbitmq_cluster
    bind 192.168.66.63:5670
    #配置TCP模式
    mode tcp
    #balance url_param userid
    #balance url_param session_id check_post 64
    #balance hdr(User-Agent)
    #balance hdr(host)
    #balance hdr(Host) use_domain_only
    #balance rdp-cookie
    #balance leastconn
    #balance source //ip
    #简单的轮询
    balance roundrobin
    #rabbitmq集群节点配置 #inter 每隔五秒对mq集群做健康检查, 2次正确证明服务器可用,2次失败证明服务器不可用,并且配置主备机制
        server server1 192.168.66.61:5672 check inter 5000 rise 2 fall 2
        server server2 192.168.66.62:5672 check inter 5000 rise 2 fall 2
        server server3 192.168.66.63:5672 check inter 5000 rise 2 fall 2
#配置haproxy web监控,查看统计信息
listen stats
    bind 192.168.66.63:8100 # 注意此处的ip地址,我们配置了2台机器
    mode http
    option httplog
    stats enable
    #设置haproxy监控地址为http://192.168.66.63:8100/rabbitmq-stats
    stats uri /rabbitmq-stats
    stats refresh 5s

停止掉node03的服务:

[root@node03 haproxy-1.6.5]# systemctl stop rabbitmq-server

停止失败了的话,干脆直接kill掉...

HA的server3也变成红色了。

看到了结果了,就将node03的rabbitmq服务启动起来吧,启动之后管理界面又恢复成绿色了。

启动类添加注解

@ImportResource("classpath:spring-rabbit.xml")

spring-rabbit.xml文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
    <!--自动创建队列、交换机-->
    <rabbit:queue id="itemQueue" name="item_queue" auto-declare="true"/>
    <rabbit:topic-exchange id="itemTopicExchange" name="item_topic_exchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding pattern="item.#" queue="itemQueue"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>

</beans>

注意地址哈,HA地址:

spring:
  rabbitmq:
    host: 192.168.66.62 #这个是HA服务器地址
    username: admin
    password: 123456
    virtual-host: /

发送消息

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @RequestMapping("/")
    public String test(){
        for (int i=0; i<2; i++) {
            rabbitTemplate.convertAndSend("item_topic_exchange",
                    "item.insert", "商品新增,路由Key为item.insert" +i);
        }
        return "ok";
    }

4.4KeepAlived 搭建高可用的HAProxy集群

安装在node02、node03,默认node02主节点。

KeepAlived 简介:

Keepalived,它是一个高性能的服务器高可用或热备解决方案,Keepalived主要来防止服务器单点故障的发生问题,可以通过其与Nginx、Haproxy等反向代理的负载均衡服务器配合实现web服务端的高可用。Keepalived以VRRP协议为实现基础,用VRRP协议来实现高可用性(HA).VRRP(Virtual Router Redundancy Protocol)协议是用于实现路由器冗余的协议,VRRP协议将两台或多台路由器设备虚拟成一个设备,对外提供虚拟路由器IP(一个或多个)。

KeepAlived 的安装:

#安装所需软件包
yum install -y openssl openssl-devel
#下载安装包
wget http://www.keepalived.org/software/keepalived-1.2.18.tar.gz
#解压、编译、安装
tar -zxvf keepalived-1.2.18.tar.gz -C /usr/local/
cd /usr/local/keepalived-1.2.18/ && ./configure --prefix=/usr/local/keepalived
make && make install
# 将keepalived安装成Linux系统服务,因为没有使用keepalived的默认安装路径(默认路径:/usr/local),安装完成之后,需要做一些修改工作
#首先创建文件夹,将keepalived配置文件进行复制:
mkdir /etc/keepalived
cp /usr/local/keepalived/etc/keepalived/keepalived.conf /etc/keepalived/
#然后复制keepalived脚本文件:
cp /usr/local/keepalived/etc/rc.d/init.d/keepalived /etc/init.d/
cp /usr/local/keepalived/etc/sysconfig/keepalived /etc/sysconfig/
# 设置的过程中如果出现已经存在则删除原有的文件,重新创建
ln -s -f /usr/local/sbin/keepalived /usr/sbin/
ln -s -f /usr/local/keepalived/sbin/keepalived /sbin/
#可以设置开机启动:chkconfig keepalived on,到此我们安装完毕!
chkconfig keepalived on

node02配置:

! Configuration File for keepalived
global_defs {
   router_id node02  ##标识节点的字符串,通常为本机hostname
}
vrrp_script chk_haproxy {
    script "/etc/keepalived/haproxy_check.sh"  ##执行脚本位置
    interval 2  ##检测时间间隔
    weight -20  ##如果条件成立则权重减20
}
vrrp_instance VI_1 {
    state MASTER  ## 主节点为MASTER,备份节点为BACKUP-该配置非常重要
    interface ens33 ## 绑定虚拟IP的网络接口(网卡可以使用ifconfig查看)
    virtual_router_id 110  ## 虚拟路由ID号(主备节点一定要相同)-该配置非常重要
    mcast_src_ip 192.168.66.62 ## 本机ip地址
    priority 100  ##优先级配置(0-254的值),一般主节点的权重大于备份节点
    nopreempt
    advert_int 1  ## 组播信息发送间隔,俩个节点必须配置一致,默认1s
authentication {  ## 认证匹配
        auth_type PASS
        auth_pass itcast
    }
    track_script {
        chk_haproxy
    }
    virtual_ipaddress {
        192.168.66.110  ## 虚拟ip,可以指定多个,以后连接mq就使用该虚拟ip进行连接
    }
}

node03配置:

! Configuration File for keepalived
global_defs {
   router_id node03  ##标识节点的字符串,通常为本机hostname
}
vrrp_script chk_haproxy {
    script "/etc/keepalived/haproxy_check.sh"  ##执行脚本位置
    interval 2  ##检测时间间隔
    weight -20  ##如果条件成立则权重减20
}
vrrp_instance VI_1 {
    state BACKUP  ## 主节点为MASTER,备份节点为BACKUP-该配置非常重要
    interface ens33 ## 绑定虚拟IP的网络接口(网卡可以使用ifconfig查看)
    virtual_router_id 110  ## 虚拟路由ID号(主备节点一定要相同)-该配置非常重要
    mcast_src_ip 192.168.66.63 ## 本机ip地址
    priority 100  ##优先级配置(0-254的值),一般主节点的权重大于备份节点
    nopreempt
    advert_int 1  ## 组播信息发送间隔,俩个节点必须配置一致,默认1s
authentication {  ## 认证匹配
        auth_type PASS
        auth_pass itcast
    }
    track_script {
        chk_haproxy
    }
    virtual_ipaddress {
        192.168.66.110  ## 虚拟ip,可以指定多个,以后连接mq就使用该虚拟ip进行连接
    }
}

两台都添加 /etc/keepalived/haproxy_check.sh

#!/bin/bash
COUNT=`ps -C haproxy --no-header |wc -l`
if [ $COUNT -eq 0 ];then
    /usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
    sleep 2
    if [ `ps -C haproxy --no-header |wc -l` -eq 0 ];then
        killall keepalived
    fi
fi

授权

chmod +x /etc/keepalived/haproxy_check.sh

启动

#启动两台机器的keepalived
service keepalived start | stop | status | restart
#查看状态
ps -ef | grep haproxy
ps -ef | grep keepalived

高可用状态测试

在主节点查看浮动ip:

ip a

停止主节点node02,看一下node03上是否有浮动ip。

在代码的配置中,我们需要将主机地址配置成浮动ip。

4.5开启消息追踪

对于企业级的应用开发来讲,我们通常都会比较关注我们的消息,甚至很多的场景把消息的可靠性放在第一位,但是我们的MQ集群难免会出现消息异常丢失或者客户端无法发送消息等异常情况,此时为了帮助开发人员快速的定位问题,我们就可以对消息的投递和消费过程进行监控,而tracing日志监控插件帮我们很好的实现了该功能。

以下是trace的相关命令和使用(要使用需要先rabbitmq启用插件,再打开开关才能使用):

命令集 描述
rabbitmq-plugins list 查看插件列表
rabbitmq-plugins enable rabbitmq_tracing rabbitmq启用trace插件
rabbitmqctl trace_on 打开trace的开关
rabbitmqctl trace_on -p itcast 打开trace的开关(itcast为需要日志追踪的vhost)
rabbitmqctl trace_off 关闭trace的开关
rabbitmq-plugins disable rabbitmq_tracing rabbitmq关闭Trace插件
rabbitmqctl set_user_tags heima administrator 只有administrator的角色才能查看日志界面

只需在一台节点执行即可,我这边在node01执行:

  rabbitmq-plugins list
  rabbitmq-plugins enable rabbitmq_tracing
  rabbitmqctl trace_on -p /

不需要重启,我们可以看到多了一个追踪菜单:

添加一个trace:用户名密码自己定义

发送消息到队列之后,我们点开这个追踪的文件:

需要登录一下刚刚设置的用户名和密码。

我们可以看到消息的追踪信息。

4.6集群启动命令

三台都执行:

rabbitmq-server -detached

启动keepalived:在node02、node03执行:

service keepalived start

5.RabbitMQ应用与面试

5.1消息堆积

当消息生产的速度长时间,远远大于消费的速度时。就会造成消息堆积。

消息堆积的影响

  • 可能导致新消息无法进入队列
  • 可能导致旧消息无法丢失
  • 消息等待消费的时间过长,超出了业务容忍范围。

产生堆积的情况

  • 生产者突然大量发布消息
  • 消费者消费失败
  • 消费者出现性能瓶颈。
  • 消费者挂掉

解决办法

  • 排查消费者的消费性能瓶颈

  • 增加消费者的多线程处理

  • 部署增加多个消费者

  • 场景介绍

在用户登录成功之后,会向rabbitmq发送一个登录成功的消息。这个消息可以被多类业务订阅。

登录成功,记录登录日志;登录成功,根据规则送积分。其中登录送积分可以模拟成较为耗时的处理

场景重现:让消息产生堆积

1、生产者大量发送消息:使用Jmeter开启多线程,循环发送消息大量进入队列。

模拟堆积10万条数据

2、消费者消费失败:随机抛出异常,模拟消费者消费失败,没有ack(手动ack的时候)。

3、设置消费者的性能瓶颈:在消费方法中设置休眠时间,模拟性能瓶颈

4、关闭消费者:停掉消费者,模拟消费者挂掉

5、消费者端示例核心代码:

public class LoginIntegralComsumer implements MessageListener {
    public void onMessage(Message message) {
        String jsonString = null;
        try {
            jsonString = new String(message.getBody(),"UTF8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        if(new Random().nextInt(5)==2){
            //模拟发生异常
            throw new RuntimeException("模拟处理异常");
        }
        try {
            //模拟耗时的处理过程
            TimeUnit.MILLISECONDS.sleep(1000);
            System.out.println(Thread.currentThread().getName()+"处理消息:"+jsonString);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

6、如果每1秒钟处理一条消息

1小时处理 60*60=3600条

处理完10万条数据 100000/3600=27.7小时

问题解决:消息已经堆积如何解决

消息队列堆积,想办法把消息转移到一个新的队列,增加服务器慢慢来消费这个消息,可以让生产环境的队列可用状态。

1、解决消费者消费异常问题

2、解决消费者的性能瓶颈:改短休眠时间

5.4小时。

3、增加消费线程,增加多台服务器部署消费者。快速消费。

增加10个线程

concurrency="10" prefetch="10"

1小时

增加一台服务器

0.5小时

5.2消息丢失

在实际的生产环境中有可能出现一条消息因为一些原因丢失,导致消息没有消费成功,从而造成数据不一致等问题,造成严重的影响,比如:在一个商城的下单业务中,需要生成订单信息和扣减库存两个动作,如果使用RabbitMQ来实现该业务,那么在订单服务下单成功后需要发送一条消息到库存服务进行扣减库存,如果在此过程中,一条消息因为某些原因丢失,那么就会出现下单成功但是库存没有扣减,从而导致超卖的情况,也就是库存已经没有了,但是用户还能下单,这个问题对于商城系统来说是致命的。

消息丢失的场景主要分为:消息在生产者丢失,消息在RabbitMQ丢失,消息在消费者丢失。

5.2.1消息在生产者丢失

场景介绍

消息生产者发送消息成功,但是MQ没有收到该消息,消息在从生产者传输到MQ的过程中丢失,一般是由于网络不稳定的原因。

解决方案

采用RabbitMQ 发送方消息确认机制,当消息成功被MQ接收到时,会给生产者发送一个确认消息,表示接收成功。RabbitMQ 发送方消息确认模式有以下三种:普通确认模式,批量确认模式,异步监听确认模式。spring整合RabbitMQ后只使用了异步监听确认模式。

说明

异步监听模式,可以实现边发送消息边进行确认,不影响主线程任务执行。

步骤

  1. 生产者发送3000条消息
  2. 在发送消息前开启开启发送方确认模式
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"
                               publisher-confirms="true"
        />
  1. 在发送消息前添加异步确认监听器
//添加异步确认监听器
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            // 处理ack
            System.out.println("已确认消息,标识:" + correlationData.getId());
        } else {
            // 处理nack, 此时cause包含nack的原因。
            System.out.println("未确认消息,标识:" + correlationData.getId());
            System.out.println("未确认原因:" + cause);
            //重发
        }
    }
});

5.2.2消息在rabbitMQ丢失

场景介绍

消息成功发送到MQ,消息还没被消费却在MQ中丢失,比如MQ服务器宕机或者重启会出现这种情况

解决方案

持久化交换机,队列,消息,确保MQ服务器重启时依然能从磁盘恢复对应的交换机,队列和消息。

spring整合后默认开启了交换机,队列,消息的持久化,所以不修改任何设置就可以保证消息不在RabbitMQ丢失。但是为了以防万一,还是可以申明下。

5.2.3消息在消费者丢失

场景介绍

消息费者消费消息时,如果设置为自动回复MQ,消息者端收到消息后会自动回复MQ服务器,MQ则会删除该条消息,如果消息已经在MQ被删除但是消费者的业务处理出现异常或者消费者服务宕机,那么就会导致该消息没有处理成功从而导致该条消息丢失。

解决方案

设置为手动回复MQ服务器,当消费者出现异常或者服务宕机时,MQ服务器不会删除该消息,而是会把消息重发给绑定该队列的消费者,如果该队列只绑定了一个消费者,那么该消息会一直保存在MQ服务器,直到消息者能正常消费为止。本解决方案以一个队列绑定多个消费者为例来说明,一般在生产环境上也会让一个队列绑定多个消费者也就是工作队列模式来减轻压力,提高消息处理效率

MQ重发消息场景:

1.消费者未响应ACK,主动关闭频道或者连接

2.消费者未响应ACK,消费者服务挂掉

5.3有序消费消息

场景1

当RabbitMQ采用work Queue模式,此时只会有一个Queue但是会有多个Consumer,同时多个Consumer直接是竞争关系,此时就会出现MQ消息乱序的问题。

场景2

当RabbitMQ采用简单队列模式的时候,如果消费者采用多线程的方式来加速消息的处理,此时也会出现消息乱序的问题。

场景1解决

场景2解决

5.4重复消费

场景介绍

为了防止消息在消费者端丢失,会采用手动回复MQ的方式来解决,同时也引出了一个问题,消费者处理消息成功,手动回复MQ时由于网络不稳定,连接断开,导致MQ没有收到消费者回复的消息,那么该条消息还会保存在MQ的消息队列,由于MQ的消息重发机制,会重新把该条消息发给和该队列绑定的消息者处理,这样就会导致消息重复消费。而有些操作是不允许重复消费的,比如下单,减库存,扣款等操作。

MQ重发消息场景:

1.消费者未响应ACK,主动关闭频道或者连接

2.消费者未响应ACK,消费者服务挂掉

解决方案

如果消费消息的业务是幂等性操作(同一个操作执行多次,结果不变)就算重复消费也没问题,可以不做处理,如果不支持幂等性操作,如:下单,减库存,扣款等,那么可以在消费者端每次消费成功后将该条消息id保存到数据库,每次消费前查询该消息id,如果该条消息id已经存在那么表示已经消费过就不再消费否则就消费。本方案采用redis存储消息id,因为redis是单线程的,并且性能也非常好,提供了很多原子性的命令,本方案使用setnx命令存储消息id。

setnx(key,value):如果key不存在则插入成功且返回1,如果key存在,则不进行任何操作,返回0

6.Springboot整合RabbitMQ

6.1依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.8.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
        <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.4.5</version>
            </plugin>
        </plugins>
    </build>

6.2配置

spring:
  rabbitmq:
    host: 192.168.66.110 #这个是keepalive的浮动IP地址,我这边配置了HA+keepalive
    port: 5672
    username: admin
    password: 123456
    virtual-host: /

启动类添加注解:

@EnableRabbit

6.3amqpAdmin

创建交换机、队列、绑定

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQTest {

    @Autowired
    AmqpAdmin amqpAdmin;

    /**
     * 创建交换机
     */
    @Test
    public void createExchange(){
        // 交换机名称、是否持久化、自动删除、自定义参数
        DirectExchange directExchange = new DirectExchange("myExchange", true, false, null);
        amqpAdmin.declareExchange(directExchange);
        System.out.println("交换机创建成功");
    }

    /**
     * 创建队列
     */
    @Test
    public void createQueue(){
        //排它,表示只有声明它的人才能使用这个队列,但是我们一般队列所有人都能使用,排它设置成false
        Queue myQueue = new Queue("myQueue", true, false, false, null);
        amqpAdmin.declareQueue(myQueue);
    }

    /**
     * 交换机和队列绑定
     */
    @Test
    public void createBinding(){
        Binding binding = new Binding("myQueue", Binding.DestinationType.QUEUE, "myExchange", "my.routing.key", null);
        amqpAdmin.declareBinding(binding);
    }
}

6.4RabbitTemplate

发送消息

@Configuration
public class RabbitConfig {

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendMessageTest(){
        //rabbitTemplate.convertAndSend("myExchange","my.routing.key","msg001");
        //如果发送的是一个对象必须实现serializable接口,如果想要将对象转为json,需要自定义配置
        User user = new User();
        user.setUsername("Tangliping");
        user.setPassword("000000");
        rabbitTemplate.convertAndSend("myExchange","my.routing.key",user);
    }

6.5接收消息

@Service
public class MessageServiceImpl {

    /**
     * 
     * @param message 原生消息体
     * @param content 发送消息的类型
     * @param channel 当前传输数据的通道
     */
    @RabbitListener(queues = "myQueue")
    public void recieveMessage(Message message, User content, Channel channel){
        byte[] body = message.getBody();
        System.out.println(content);
    }
}

场景一、如果启动多个服务,同一个消息也只能只有一个服务收到

先复制一份服务配置,修改端口号

将发送10次消息,两个服务进行接收:

    @Test
    public void sendMessageTest(){
        for (int i =0; i<10;i++){
            User user = new User();
            user.setUsername("Tangliping");
            user.setPassword("000000");
            user.setId(i);
            rabbitTemplate.convertAndSend("myExchange","my.routing.key",user);
        }

    }

观察结果:

一条消息只能被消费一次,不管是谁消费的,只要消费了,就不能再消费了。

它们存在竞争关系。

场景二、如果只有一个服务,这个服务处理消息很耗时,那么,它接收消息是有序的,只有一个消息完全处理完,那么它才开始接收下一个消息:

    @RabbitListener(queues = "myQueue")
    public void recieveMessage(Message message, User content, Channel channel) throws InterruptedException {
        System.out.println(content);
        byte[] body = message.getBody();
        Thread.sleep(3000);
    }

输出:

User{id=0, username='Tangliping', password='000000'}
User{id=2, username='Tangliping', password='000000'}
User{id=4, username='Tangliping', password='000000'}
User{id=6, username='Tangliping', password='000000'}
User{id=8, username='Tangliping', password='000000'}
User{id=3, username='Tangliping', password='000000'}
User{id=5, username='Tangliping', password='000000'}
User{id=7, username='Tangliping', password='000000'}
User{id=9, username='Tangliping', password='000000'}

观察结论,每接收到一条消息,就会睡眠3s,然后再接收下一条消息。

场景三、使用RabbitHandler处理不同类型的消息

    @Test
    public void sendMessageTest(){
        //rabbitTemplate.convertAndSend("myExchange","my.routing.key","msg001");
        //如果发送的是一个对象必须实现serializable接口,如果想要将对象转为json,需要自定义配置
        for (int i =0; i<10;i++){
            if (i%2 ==0) {
                User user = new User();
                user.setUsername("Tangliping");
                user.setPassword("000000");
                user.setId(i);
                rabbitTemplate.convertAndSend("myExchange","my.routing.key",user);
            }else {
                String msg = "msg" + i;
                rabbitTemplate.convertAndSend("myExchange","my.routing.key",msg);
            }
        }
    }
@RabbitListener(queues = "myQueue")
@Service
public class MessageServiceImpl {

    /**
     * 处理User类型的消息
     */
    @RabbitHandler
    public void recieveMessage(Message message, User content, Channel channel) throws InterruptedException {
        System.out.println(content);
    }

    /**
     * 处理String类型的消息
     * @param content str
     */
    @RabbitHandler
    public void recieveMessage2(String content){
        System.out.println(content);
    }
}

6.6事务支持

事务和消息确认机制不能并存,我们如果使用事务,就不能使用消息确认机制,如果使用事务,会导致性能下降250倍左右,这里就不对事务支持进行更多的描述了,因为我们一般使用消息确认机制。如果还想了解springboot支持事务的相关配置,可以参考:https://blog.csdn.net/zhanngle/article/details/86267986

6.7发送端消息确认

  • 消息发送给Broker,confirmCallback确认模式

  • 消息从交换机投递到队列,returnCallback,未投递到queue退回模式

  • 消费者端 ack机制

发送端消息确认:

增加配置:

spring:
  rabbitmq:
    publisher-confirms: true # 开启发送者确认机制
    publisher-returns: true # 开启消息由交换机投递到队列确认机制
    template:
      mandatory: true # 只要抵达队列,以异步优先回调returnConfirm
/**
 * 生产者消息确认机制
 * 参考:https://www.jianshu.com/p/fae8fca98522
 */
@Component
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct //构造方法执行完成之后就会执行该方法
    public void init() {
        rabbitTemplate.setConfirmCallback(this);            //指定 ConfirmCallback
        rabbitTemplate.setReturnCallback(this);             //指定 ReturnCallback

    }

    /**
     *
     * @param correlationData 当前消息的唯一关联数据(这个消息的唯一id)
     * @param ack 只要消息能抵达Broker,就会返回true
     * @param cause 失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息发送成功" + correlationData);
        } else {
            System.out.println("消息发送失败:" + cause);
        }
    }

    /**
     * 只要消息没有投递到这个队列,就触发这个回调,注意哦,投递失败才触发
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        // 反序列化对象输出
        System.out.println("消息主体: " + message);
        System.out.println("应答码: " + replyCode);
        System.out.println("描述:" + replyText);
        System.out.println("消息使用的交换器 exchange : " + exchange);
        System.out.println("消息使用的路由键 routing : " + routingKey);
    }
}

如何测试由交换机投递队列失败的场景呢?

很简单,直接将发送消息时,路由键改一个别的就行了。

发送消息的时候,我们可以指定消息的唯一id

 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
 rabbitTemplate.convertAndSend("myExchange","my.routing.key2",user,correlationData);

6.8消费端消息确认

默认的是自动确认,只要消息接收到,客户端会自动确认,服务端就会删除此消息。

问题:我们收到很多消息,自动回复ack,只有一个消息处理成功,宕机了,那么消息就丢失了。

模拟:接收消息的时候打一个断点,收到一条后,观察一些rabbit队列,会减少一条消息,然后直接停止服务,发现rabbit中的消息清零了。

开启手动ack

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual # 开启手动ack

开启手动之后,只要没有ack,消息就处于unacked的状态,即使宕机,消息会重新回到ready状态。

    /**
     * 处理User类型的消息
     */
    @RabbitHandler
    public void recieveMessage(Message message, User content, Channel channel) throws InterruptedException {
        System.out.println(content);
        //channel内按照顺序自增的
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            if (deliveryTag%2 ==0) {//签收
                //第二个参数表示是否批量签收
                channel.basicAck(deliveryTag,false);
            }else {
                //拒绝签收  第三个参数,是不是重新退回队列,如果是true,就重新放回队列,否则,就丢弃
                channel.basicNack(deliveryTag,false,true);
                //第二个参数,是不是重新退回队列,如果是true,就重新放回队列,否则,就丢弃
                //和上面的拒绝区别,不支持批量拒绝签收
                //channel.basicReject(deliveryTag, true);
            }

        } catch (IOException e) {
            e.printStackTrace();
            //网络中断
        }
    }

6.9延迟队列

延时队列,需要结合TTL和死信队列

死信队列

DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。

消息变成死信,可能是由于以下的原因:

  • 消息被拒绝
  • 消息过期
  • 队列达到最大长度

DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。

场景:订单生成,1分钟后检查是否支付

消息进入延迟队列,这个队列不能被消费者监听,设置队列消息的过期时间,消息过期后,进入另一个队列,这个队列是可以被消费者监听的。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

@Configuration
public class RabbitConfig {

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public Exchange orderEventExchange() {
        /**
         *   String name,
         *   boolean durable,
         *   boolean autoDelete,
         *   Map<String, Object> arguments
         */
        return new TopicExchange("order-event-exchange", true, false);
    }

    /**
     * 延迟队列
     */
    @Bean
    public Queue orderDelayQueue() {
        /**
         Queue(String name,  队列名字
         boolean durable,  是否持久化
         boolean exclusive,  是否排他
         boolean autoDelete, 是否自动删除
         Map<String, Object> arguments) 属性
         */
        HashMap<String, Object> arguments = new HashMap<>();
        //死信交换机
        arguments.put("x-dead-letter-exchange", "order-event-exchange");
        //死信路由键
        arguments.put("x-dead-letter-routing-key", "order.release.order");
        arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
        return new Queue("order.delay.queue",true,false,false,arguments);
    }

    /**
     * 普通队列
     */
    @Bean
    public Queue orderReleaseQueue() {

        Queue queue = new Queue("order.release.order.queue", true, false, false);

        return queue;
    }

    /**
     * 交换机与延时队列绑定
     */
    @Bean
    public Binding orderCreateBinding() {
        /**
         * String destination, 目的地(队列名或者交换机名字)
         * DestinationType destinationType, 目的地类型(Queue、Exhcange)
         * String exchange,
         * String routingKey,
         * Map<String, Object> arguments
         * */
        return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null);
    }

    /**
     * 交换机与普通队列绑定
     */
    @Bean
    public Binding orderReleaseBinding() {
        return new Binding("order.release.order.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.order",
                null);
    }
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;
import java.util.UUID;

@RestController
public class MessageController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = "order.release.order.queue")
    public void listener(String order, Message msg, Channel channel) throws IOException {
        System.out.println("接收过期的订单信息"+ order);
        channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
    }

    @RequestMapping("/createOrder")
    public String createOrder() {
        rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order",UUID.randomUUID().toString());
        return "ok";
    }
}

创建订单:http://localhost:8080/createOrder

等待一分钟,看控制台的输出。

原文地址:https://www.cnblogs.com/tangliping/p/14800943.html