SpringBoot整合RabbitMQ实践教程

1. MQ

  MQ(Message Queue),消息队列,是生产者和消费者模型中传递信息的容器,主要用于线程或进程之间通信。
  MQ主要的应用场景为:应用解耦、异步处理,流量削锋,日志处理等。
  应用解耦:假设应用要与应用B、C、D通信,当某个应用挂掉或者进行调整后,其他应用都做出相应的调整。但是使用MQ之后,每个应用只需要从消息中间件中发送或消费消息,而不关心其他应用是否为正常状态。
  异步处理:将消息发送到消息中间件中,继续处理下面的业务,而不需要等待消费者消费完成的响应。
  流量削锋:当出现秒杀等业务场景时,短时间内将大量消息存储在消息队列中,当达到消息阈值后,消息队列拒绝接手消息,应用返回错误页面。
  日志处理:业务在处理过程中,将日志使用异步方式存储,不仅不会阻塞业务执行,提高效率,而且消息中间件的可靠性也能满足业务日志不丢失等可靠性要求。
  常见的MQ有:ActiveMQ、RabbitMQ、Kafka、RocketMQ、ZeroMQ,其实Redis也可以支持MQ功能。

2. 常见MQ对比

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级 万级 十万级 十万级
时效性 ms级 μs级 ms级 ms级
高可用性 高,主从架构 高,主从架构 非常高,分布式架构 非常高,分布式架构
消息可靠性 较低概率丢失 不丢失 优化后可不丢失 优化后可不丢失
MQ功能支持 较全 较全 较全 简单
优点 功能较全 延时低,可靠 扩展性好 大数据领域及日志采集上普遍使用

3. RabbitMQ

  RabbitMQ是使用Erlang语言来编写的,并且基于AMQP协议。Erlang语言在数据交互方面性能较优秀,具有和原生Socket一样的延迟,这也是RabbitMQ高性能的原因所在。
  RabbitMQ特点:
  a. 开源、性能交友,消息持久化不丢失,可靠性得到保障
  b. 提供可靠性的消息投递模式、返回模式
  c. 与spring-boot-starter-amqp完美整合,API功能丰富且资料较多
  d. 支持集群,保障高可用

4. AMQP协议

  AMQP(Advanced Message Queuing Protocol),是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端、中间件不同产品、不同开发语言等条件的限制。
  AMQP基本概念:
  Server:服务,接收客户端请求。
  Connection:连接,应用于Server之间的TCP连接。
  Channel:信道,消息的读写在信道中进行。每个客户端可以建立多个通道,每个通道即一个会话任务。
  Message:消息,应用于Server之间传递的数据实体。每个消息由Properties和Body组成,Properties存储消息的优先级、延迟等配置属性,Body及消息内容。
  Virtual Host:虚拟主机,用于Server内部之间逻辑隔离。每个Server之间可以有多个虚拟主机,每个虚拟主机内有多个交换机和队列,每个虚拟主机内的交换机和队列名称不能相同。
  Exchange:交换机,用于接收消息,并按照一定的路由规则将消息绑定到对应的队列。常见的交换机类型有:Direct、Topic、Fanout、Header等。
  Queue:队列,用于保存消息并交给消费者消费消息。
  Binding:绑定,交换机和队列之间的虚拟连接,每个绑定连接中包含对一个或多个路由键。
  RoutingKey:路由键,生产者发生消息时会指定对应的路由键规则,交换机根据规则将消息绑定到具体的队列上,消息者通过队列消息消息。
  概念中提到两个属性:Connection和Channel。既然存在Connection,又为什么需要Channel。原因是:在一个业务场景中必然存在多个线程环境操作RabbitMQ Server进行生产和消费消息,因此将会建立多个Connection,即多个TCP连接。对于操作系统而言,TCP连接的创建和销毁是十分浪费资源的,在高并发使用场景中,势必达到性能瓶颈。所以RabitMQ采用TCP连接复用方式,使用Channel建立应用与Server连接,提高效率,也便于管理。

5. 常见交换机

  • Default Exchange
      默认交换机。实则为名称为空的直连交换机。特点是每个队列都会绑定到默认交换机上,且路由键与队列名称相同。例如:创建名称为“simple-queue”的队列,AMQP代理会自动将该队列绑定到默认交换机上,绑定的路由键也为“simple-queue”。
  • Direct Exchange
      直流交换机。消息携带路由键(RoutingKey)经过该类型交换机时会绑定到规则匹配的队列中。
  • Fanout Exchange
      扇形交换机。交换机直接与队列绑定,及时绑定路由键也不起作用。当生产者发送消息后,消息经过该类型交换机直接绑定到与该交换机绑定的所有队列上。
  • Topic Exchange
      主题交换机。实则为直连交换机和扇形交换机相结合,即模糊匹配。其中规则为星号代表必须为一个单词,井号为零个或多个单词。例如:队列1的绑定键为A.*,队列2的绑定键为B.#,如果消息绑定的路由键为A.C,则队列1会收到;如果消息绑定的路由键为A.C.D,则队列2会收到。
  • Header Exchange
      头交换机。该类型交换机不依赖于路由键,而是由消息内容中的header属性进行匹配。

6. 消息确认

  消费者在消费消息往往会因为网络或其他原因导致异常,因此需要和队列建立确认机制才能表明该条消息已经成功消费。因此在AMQP协议中给出两种建议:
  (1)自动确认模式:即消息发送给消费者后立即从队列中删除;
  (2)手动确认模式:即消费者者将消息者处理后给队列发送确认回执(ACK机制,Acknowledgement),再根据情况删除消息。

7. 安装RabbitMQ

  Docker部署RabbitMQ

8. Direct Exchange示例

  • 创建生产者
  • 修改pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.c3stones</groupId>
	<artifactId>direct-exchange-provider-demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>direct-exchange-provider-demo</name>
	<description>Direct Exchange Provider Demo</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.6.RELEASE</version>
		<relativePath />
	</parent>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
		</dependency>
	</dependencies>

</project>
  • 添加配置文件application.yml
server:
  port: 8980
  
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
  • RabbitMQ配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ配置类
 * 
 * @author CL
 *
 */
@Configuration
public class RabbitMqConfig {

	/**
	 * 交换机名称
	 */
	public static final String EXCHANGE_NAME = "c3stones.direct";

	/**
	 * 路由键
	 */
	public static final String ROUNTING_KEY = "test";

	/**
	 * 队列名称
	 */
	public static final String QUEUE_NAME = "test.queue";

	/**
	 * 配置Direct交换机
	 * 
	 * @return
	 */
	@Bean
	public DirectExchange directExchange() {
		return new DirectExchange(EXCHANGE_NAME);
	}

	/**
	 * 配置队列
	 * 
	 * @return
	 */
	@Bean
	public Queue testQueue() {
		return new Queue(QUEUE_NAME);
	}

	/**
	 * 将队列与交换机通过路由键绑定
	 * 
	 * @return
	 */
	@Bean
	public Binding binding() {
		return BindingBuilder.bind(testQueue()).to(directExchange()).with(ROUNTING_KEY);
	}

}
  • 创建发送消息Controller
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.c3stones.config.RabbitMqConfig;

/**
 * 发送消息Controller
 * 
 * @author CL
 *
 */
@RestController
public class SendMsgController {

	private static Logger log = LoggerFactory.getLogger(SendMsgController.class);

	@Autowired
	private RabbitTemplate rabbitTemplate;

	/**
	 * 发送消息
	 * 
	 * @param msg 消息内容
	 * @return
	 */
	@RequestMapping(value = "/send", method = RequestMethod.GET)
	public boolean send(String msg) {
		try {
			rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, RabbitMqConfig.ROUNTING_KEY, msg);
		} catch (AmqpException e) {
			log.error("发送消息异常:{}", e);
			return false;
		}
		return true;
	}
}
  • 创建启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 启动类
 * 
 * @author CL
 *
 */
@SpringBootApplication
public class DirectProviderApplication {

	public static void main(String[] args) {
		SpringApplication.run(DirectProviderApplication.class, args);
	}

}
  • 启动项目,并测试发送消息
  • 查看RabbitMQ管理界面

      可以看出RabbitMQ Server已经接收到消息,并经过交换机成功转发到队列中,此时消息为等待消费状态。
  • 创建消费者
  • 修改pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.c3stones</groupId>
	<artifactId>direct-exchange-consumer-demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>direct-exchange-consumer-demo</name>
	<description>Direct Exchange Consumer Demo</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.6.RELEASE</version>
		<relativePath />
	</parent>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
		</dependency>
	</dependencies>

</project>
  • 添加配置文件application.yml
server:
  port: 8981
  
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
  • 创建处理消息Service
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 处理消息Service
 * 
 * @author CL
 *
 */
@Component
public class HandleMsgService {

	private static Logger log = LoggerFactory.getLogger(HandleMsgService.class);

	/**
	 * 方法1-处理消息
	 * 
	 * @param msg 消息内容
	 */
	@RabbitListener(queues = "test.queue")
	public void handle1(String msg) {
		log.info("方法1已接收到消息:{}", msg);
	}

}
  • 创建启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 启动类
 * 
 * @author CL
 *
 */
@SpringBootApplication
public class DirectConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(DirectConsumerApplication.class, args);
	}

}
  • 启动项目,并观察控制台打印日志
2020-07-24 16:45:30.284  INFO 5400 --- [ntContainer#0-1] com.c3stones.service.HandleMsgService    : 方法1已接收到消息:测试

  可以看到,在项目启动后消费者Service已经成功监听到消息,并消费。

  • 修改处理消息Service,配置多个监听方法监听同一队列
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 处理消息Service
 * 
 * @author CL
 *
 */
@Component
public class HandleMsgService {

	private static Logger log = LoggerFactory.getLogger(HandleMsgService.class);

	/**
	 * 方法1-处理消息
	 * 
	 * @param msg 消息内容
	 */
	@RabbitListener(queues = "test.queue")
	public void handle1(String msg) {
		log.info("方法1已接收到消息:{}", msg);
	}

	/**
	 * 方法2-处理消息
	 * 
	 * @param msg 消息内容
	 */
	@RabbitListener(queues = "test.queue")
	public void handle2(String msg) {
		log.info("方法2已接收到消息:{}", msg);
	}

	/**
	 * 方法3-处理消息
	 * 
	 * @param msg 消息内容
	 */
	@RabbitListener(queues = "test.queue")
	public void handle3(String msg) {
		log.info("方法3已接收到消息:{}", msg);
	}

}
  • 重启消费者,并启动生产者
  • 通过Postman发送三条消息,并观察消费者控制台
2020-07-24 16:47:49.471  INFO 8532 --- [ntContainer#0-1] com.c3stones.service.HandleMsgService    : 方法2已接收到消息:测试1
2020-07-24 16:47:51.567  INFO 8532 --- [ntContainer#1-1] com.c3stones.service.HandleMsgService    : 方法1已接收到消息:测试2
2020-07-24 16:47:54.070  INFO 8532 --- [ntContainer#2-1] com.c3stones.service.HandleMsgService    : 方法3已接收到消息:测试3

  可以看到多个方法监听队列,并不会重复消费消息,而是轮询消费。

9. Fanout Exchange示例

  • 创建生产者
  • 修改pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.c3stones</groupId>
	<artifactId>fanout-exchange-provider-demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>fanout-exchange-privoder-demo</name>
	<description>Fanout Exchange Provider Demo</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.6.RELEASE</version>
		<relativePath />
	</parent>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
		</dependency>
	</dependencies>

</project>
  • 添加配置文件application.yml
server:
  port: 8982
  
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
  • 创建RabbitMQ配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ配置类
 * 
 * @author CL
 *
 */
@Configuration
public class RabbitMqConfig {

	/**
	 * 交换机名称
	 */
	public static final String EXCHANGE_NAME = "c3stones.fanout";

	/**
	 * 队列1名称
	 */
	public static final String QUEUE_NAME_1 = "test1.fanout.queue";

	/**
	 * 队列2名称
	 */
	public static final String QUEUE_NAME_2 = "test2.fanout.queue";

	/**
	 * 队列3名称
	 */
	public static final String QUEUE_NAME_3 = "test3.fanout.queue";

	/**
	 * 配置Direct交换机
	 * 
	 * @return
	 */
	@Bean
	public FanoutExchange fanoutExchange() {
		return new FanoutExchange(EXCHANGE_NAME);
	}

	/**
	 * 配置队列1
	 * 
	 * @return
	 */
	@Bean
	public Queue test1Queue() {
		return new Queue(QUEUE_NAME_1);
	}

	/**
	 * 配置队列2
	 * 
	 * @return
	 */
	@Bean
	public Queue test2Queue() {
		return new Queue(QUEUE_NAME_2);
	}

	/**
	 * 配置队列3
	 * 
	 * @return
	 */
	@Bean
	public Queue test3Queue() {
		return new Queue(QUEUE_NAME_3);
	}

	/**
	 * 将队列1与交换机绑定
	 * 
	 * @return
	 */
	@Bean
	public Binding bindingQueue1() {
		return BindingBuilder.bind(test1Queue()).to(fanoutExchange());
	}

	/**
	 * 将队列2与交换机绑定
	 * 
	 * @return
	 */
	@Bean
	public Binding bindingQueue2() {
		return BindingBuilder.bind(test2Queue()).to(fanoutExchange());
	}

	/**
	 * 将队列3与交换机绑定
	 * 
	 * @return
	 */
	@Bean
	public Binding bindingQueue3() {
		return BindingBuilder.bind(test3Queue()).to(fanoutExchange());
	}

}
  • 创建发送消息Controller
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.c3stones.config.RabbitMqConfig;

/**
 * 发送消息Controller
 * 
 * @author CL
 *
 */
@RestController
public class SendMsgController {

	private static Logger log = LoggerFactory.getLogger(SendMsgController.class);

	@Autowired
	private RabbitTemplate rabbitTemplate;

	/**
	 * 发送消息
	 * 
	 * @param msg 消息内容
	 * @return
	 */
	@RequestMapping(value = "/send", method = RequestMethod.GET)
	public boolean send1(String msg) {
		try {
			rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, null, msg);
		} catch (AmqpException e) {
			log.error("发送消息异常:{}", e);
			return false;
		}
		return true;
	}

}
  • 创建启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 启动类
 * 
 * @author CL
 *
 */
@SpringBootApplication
public class FanoutProviderApplication {

	public static void main(String[] args) {
		SpringApplication.run(FanoutProviderApplication.class, args);
	}

}
  • 启动项目,并测试发送消息
  • 创建消费者
  • 修改pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.c3stones</groupId>
	<artifactId>fanout-exchange-consumer-demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>fanout-exchange-consumer-demo</name>
	<description>Fanout Exchange Consumer Demo</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.6.RELEASE</version>
		<relativePath />
	</parent>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
		</dependency>
	</dependencies>

</project>
  • 添加配置文件application.yml
server:
  port: 8983
  
spring:
  rabbitmq:
    host: 1277.0.0.1
    port: 5672
    username: guest
    password: guest
  • 创建处理消息Service
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 处理消息Service
 * 
 * @author CL
 *
 */
@Component
public class HandleMsgService {

	private static Logger log = LoggerFactory.getLogger(HandleMsgService.class);

	/**
	 * 方法1-处理消息
	 * 
	 * @param msg 消息内容
	 */
	@RabbitListener(queues = "test1.fanout.queue")
	public void handle1(String msg) {
		log.info("方法1已接收到消息:{}", msg);
	}

	/**
	 * 方法2-处理消息
	 * 
	 * @param msg 消息内容
	 */
	@RabbitListener(queues = "test2.fanout.queue")
	public void handle2(String msg) {
		log.info("方法2已接收到消息:{}", msg);
	}

	/**
	 * 方法3-处理消息
	 * 
	 * @param msg 消息内容
	 */
	@RabbitListener(queues = "test3.fanout.queue")
	public void handle3(String msg) {
		log.info("方法3已接收到消息:{}", msg);
	}

}
  • 创建启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 启动类
 * 
 * @author CL
 *
 */
@SpringBootApplication
public class FanoutConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(FanoutConsumerApplication.class, args);
	}

}
  • 启动项目,并观察控制台打印日志
2020-07-24 17:11:51.177  INFO 12620 --- [ntContainer#0-1] com.c3stones.service.HandleMsgService    : 方法3已接收到消息:测试
2020-07-24 17:11:51.180  INFO 12620 --- [ntContainer#1-1] com.c3stones.service.HandleMsgService    : 方法2已接收到消息:测试
2020-07-24 17:11:51.189  INFO 12620 --- [ntContainer#2-1] com.c3stones.service.HandleMsgService    : 方法1已接收到消息:测试

  可以看到,与该类型交换机绑定的队列,均可监听到消息。

10. Topic Exchange示例

  • 创建生产者
  • 修改pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.c3stones</groupId>
	<artifactId>topic-exchange-provider-demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>topic-exchange-provider-demo</name>
	<description>Topic Exchange Provider Demo</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.6.RELEASE</version>
		<relativePath />
	</parent>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
		</dependency>
	</dependencies>

</project>
  • 添加配置文件application.yml
server:
  port: 8984
  
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
  • 创建RabbitMQ配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ配置类
 * 
 * @author CL
 *
 */
@Configuration
public class RabbitMqConfig {

	/**
	 * 交换机名称
	 */
	public static final String EXCHANGE_NAME = "c3stones.topic";

	/**
	 * 绑定键1
	 */
	public static final String BINDING_KEY_1 = "topic.key1";

	/**
	 * 绑定键2
	 */
	public static final String BINDING_KEY_2 = "topic.key2";

	/**
	 * 绑定键前缀,即以topic.开头的键值都会被监听
	 */
	public static final String BINDING_KEY_PREFIX = "topic.#";

	/**
	 * 队列1名称
	 */
	public static final String QUEUE_NAME_1 = "test1.queue";

	/**
	 * 队列2名称
	 */
	public static final String QUEUE_NAME_2 = "test2.queue";

	/**
	 * 配置Direct交换机
	 * 
	 * @return
	 */
	@Bean
	public TopicExchange topicExchange() {
		return new TopicExchange(EXCHANGE_NAME);
	}

	/**
	 * 配置队列1
	 * 
	 * @return
	 */
	@Bean
	public Queue test1Queue() {
		return new Queue(QUEUE_NAME_1);
	}

	/**
	 * 配置队列2
	 * 
	 * @return
	 */
	@Bean
	public Queue test2Queue() {
		return new Queue(QUEUE_NAME_2);
	}

	/**
	 * 将队列1与交换机通过绑定键1绑定
	 * 
	 * @return
	 */
	@Bean
	public Binding bindingQueue1() {
		return BindingBuilder.bind(test1Queue()).to(topicExchange()).with(BINDING_KEY_1);
	}

	/**
	 * 将队列2与交换机通过绑定键前缀绑定
	 * 
	 * @return
	 */
	@Bean
	public Binding bindingQueue2() {
		return BindingBuilder.bind(test2Queue()).to(topicExchange()).with(BINDING_KEY_PREFIX);
	}

}
  • 创建发送消息Controller
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.c3stones.config.RabbitMqConfig;

/**
 * 发送消息Controller
 * 
 * @author CL
 *
 */
@RestController
public class SendMsgController {

	private static Logger log = LoggerFactory.getLogger(SendMsgController.class);

	@Autowired
	private RabbitTemplate rabbitTemplate;

	/**
	 * 发送消息1
	 * 
	 * @param msg 消息内容
	 * @return
	 */
	@RequestMapping(value = "/send1", method = RequestMethod.GET)
	public boolean send1(String msg) {
		try {
			rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, RabbitMqConfig.BINDING_KEY_1, msg);
		} catch (AmqpException e) {
			log.error("发送消息1异常:{}", e);
			return false;
		}
		return true;
	}

	/**
	 * 发送消息2
	 * 
	 * @param msg 消息内容
	 * @return
	 */
	@RequestMapping(value = "/send2", method = RequestMethod.GET)
	public boolean send2(String msg) {
		try {
			rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, RabbitMqConfig.BINDING_KEY_2, msg);
		} catch (AmqpException e) {
			log.error("发送消息2异常:{}", e);
			return false;
		}
		return true;
	}
}
  • 创建启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 启动类
 * 
 * @author CL
 *
 */
@SpringBootApplication
public class TopicProviderApplication {

	public static void main(String[] args) {
		SpringApplication.run(TopicProviderApplication.class, args);
	}

}
  • 启动项目,并测试发送两条消息

  • 创建消费者
  • 修改pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.c3stones</groupId>
	<artifactId>topic-exchange-consumer-demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>topic-exchange-consumer-demo</name>
	<description>Topic Exchange Consumer Demo</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.6.RELEASE</version>
		<relativePath />
	</parent>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
		</dependency>
	</dependencies>

</project>
  • 添加配置文件application.yml
server:
  port: 8985
  
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
  • 创建处理消息Service
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 处理消息Service
 * 
 * @author CL
 *
 */
@Component
public class HandleMsgService {

	private static Logger log = LoggerFactory.getLogger(HandleMsgService.class);

	/**
	 * 方法1-处理队列1消息
	 * 
	 * @param msg 消息内容
	 */
	@RabbitListener(queues = "test1.queue")
	public void handle1(String msg) {
		log.info("方法1已接收到消息:{}", msg);
	}

	/**
	 * 方法2-处理队列2消息
	 * 
	 * @param msg 消息内容
	 */
	@RabbitListener(queues = "test2.queue")
	public void handle2(String msg) {
		log.info("方法2已接收到消息:{}", msg);
	}

}
  • 创建启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 启动类
 * 
 * @author CL
 *
 */
@SpringBootApplication
public class TopicConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(TopicConsumerApplication.class, args);
	}

}
  • 启动项目,并观察控制台打印日志
2020-07-24 18:32:29.916  INFO 3776 --- [ntContainer#1-1] com.c3stones.service.HandleMsgService    : 方法1已接收到消息:测试1
2020-07-24 18:32:29.916  INFO 3776 --- [ntContainer#0-1] com.c3stones.service.HandleMsgService    : 方法2已接收到消息:测试1
2020-07-24 18:32:29.919  INFO 3776 --- [ntContainer#0-1] com.c3stones.service.HandleMsgService    : 方法2已接收到消息:测试2

  可以看到,方法2监听绑定键以“topic.”开头的消息,即两条消息都会监听到,方法1只监听绑定键为“topic.key1”的消息,即只能监听到第一条消息。

11. 消息确认示例

  以Direct Exchange为例。

  • 修改生产者配置文件application.yml
server:
  port: 8980
  
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    publisher-confirm: true  # 确认消息已发送到交换机
    publisher-returns: true # 确认消息已发送到队列
  • 修改RabbitMQ配置类,添加发送确认回调
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ配置类
 * 
 * @author CL
 *
 */
@Configuration
public class RabbitMqConfig {

	private static Logger log = LoggerFactory.getLogger(RabbitMqConfig.class);

	@Autowired
	private ConnectionFactory connectionFactory;

	/**
	 * 交换机名称
	 */
	public static final String EXCHANGE_NAME = "c3stones.confirm.direct";

	/**
	 * 路由键
	 */
	public static final String ROUNTING_KEY = "test.confirm.key";

	/**
	 * 队列名称
	 */
	public static final String QUEUE_NAME = "test.confirm.queue";

	/**
	 * 配置Direct交换机
	 * 
	 * @return
	 */
	@Bean
	public DirectExchange directExchange() {
		return new DirectExchange(EXCHANGE_NAME);
	}

	/**
	 * 配置队列
	 * 
	 * @return
	 */
	@Bean
	public Queue testQueue() {
		return new Queue(QUEUE_NAME);
	}

	/**
	 * 将队列与交换机通过路由键绑定
	 * 
	 * @return
	 */
	@Bean
	public Binding binding() {
		return BindingBuilder.bind(testQueue()).to(directExchange()).with(ROUNTING_KEY);
	}

	/**
	 * 配置消息发送模板
	 * 
	 * @return
	 */
	@Bean
	public RabbitTemplate createRabbitTemplate() {
		RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

		// 确认消息已发送到交换机
		rabbitTemplate.setConfirmCallback(new ConfirmCallback() {

			@Override
			public void confirm(CorrelationData correlationData, boolean ack, String cause) {
				if (!ack) {
					log.error("发送到交换机失败!原因:{}", cause);
				}
			}

		});

		// 强制调用回调方法
		rabbitTemplate.setMandatory(true);

		// 确认消息已发送到队列
		rabbitTemplate.setReturnCallback(new ReturnCallback() {

			@Override
			public void returnedMessage(Message message, int replyCode, String replyText, String exchange,
					String routingKey) {
				log.error("绑定到队列异常,消息:{},回应码:{},回应文本:{},交换机:{},路由键:{}", message, replyCode, replyText, exchange,
						routingKey);
			}

		});

		return rabbitTemplate;
	}

}
  • 修改发送消息Controller,添加发送消息ID
import java.util.UUID;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.c3stones.config.RabbitMqConfig;

/**
 * 发送消息Controller
 * 
 * @author CL
 *
 */
@RestController
public class SendMsgController {

	private static Logger log = LoggerFactory.getLogger(SendMsgController.class);

	@Autowired
	private RabbitTemplate rabbitTemplate;

	/**
	 * 发送消息
	 * 
	 * @param msg 消息内容
	 * @return
	 */
	@RequestMapping(value = "/send", method = RequestMethod.GET)
	public boolean send(String msg) {
		try {
			rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, RabbitMqConfig.ROUNTING_KEY, msg,
					new CorrelationData(UUID.randomUUID().toString()));
		} catch (AmqpException e) {
			log.error("发送消息异常:{}", e);
			return false;
		}
		return true;
	}
}
  • 启动消息,并测试发送消息
      请在发送消息Controller中分别指定正确的交换机名称正确队列、错误交换机正确队列、错误交换机错误队列、正确交换机错误队列四种情况,并仔细观察控制台。
  • 修改消费者配置文件
server:
  port: 8981
  
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    listener:
      direct:
        acknowledge-mode: manual # 手动确认
      simple:
        acknowledge-mode: manual # 手动确认
  • 修改处理消息Service
import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

/**
 * 处理消息Service
 * 
 * @author CL
 *
 */
@Component
public class HandleMsgService {

	private static Logger log = LoggerFactory.getLogger(HandleMsgService.class);

	/**
	 * 方法1-处理消息
	 * 
	 * @param msg 消息内容
	 */
	@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "test.confirm.queue", durable = "true"), key = "test.confirm.key", exchange = @Exchange("c3stones.confirm.direct")))
	public void handle1(Message message, Channel channel) {
		try {
			log.info("方法1已接收到消息:{}", message.getBody());

			// 模拟处理异常
//			int a = 1 / 0;

			// 正常消费,手动应答
			channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
		} catch (Exception e) {
			log.info("方法1处理消息异常:{}", e);

			// 异常消费,将消息重新放入队列里
			try {
				channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
			} catch (IOException e1) {
				log.info("将消息重新放入队列里异常:{}", e1);
			}
		}
	}

}
  • 启动项目,并测试
      测试正常处理消息,和处理消息时发送异常两种情况,并观察RabbitMQ Server中队列详情。

12. 项目地址

  spring-boot-rabbitmq-demo

原文地址:https://www.cnblogs.com/cao-lei/p/13366310.html