SpringBoot整合RabbitMQ

Direct模式

通过routingKey和exchange决定的那个唯一的queue可以接收消息

即消息被发送到指定的消息队列中,然后被接收。

实验步骤概述:

  • 引入mq相关的依赖
  • 添加配置信息(MQ是个网络组建,需要IP、端口号等)
  • 编写配置类,创建MQ
  • 编写发送端代码、接收端代码
  • 编写测试类

pom

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml

spring:
  rabbitmq:
    host: 192.168.16.128
    port: 5672
    username: guest
    password: guest

注意,代码中使用5672端口,之前的15672是管理端用的。

配置类MqConfig

package com.ah.mq.direct;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.*;

@Configuration
public class DirectConfig {
	static final String MQ_NAME = "direct";

	@Bean
	public Queue createQueue() {
		// 创建一个命名消息队列
		return new Queue(MQ_NAME);
	}
}

发送端、接收端写一起:

发送端使用Controller

接收端是自动接收,没必要用控制器,使用的是@Component

package com.ah.mq.direct;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.*;

@RestController
public class DirectSendRecv {

	@Autowired
	private AmqpTemplate mq;

	public void send() {
		for (int i = 0; i < 10; i++) {
			this.mq.convertAndSend(DirectConfig.MQ_NAME, "消息" + i);
		}
	}
}

@Component
class Recv {
	@RabbitListener(queues = DirectConfig.MQ_NAME)
	public void receive(String message) {
		System.out.println("消费者收到消息:" + message);
	}
}

测试类:

package com.ah.mq.direct;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class DirectTest {

	@Autowired
	private DirectSendRecv mq;

	@Test
	public void send() {
		mq.send();
	}
}

Topic模式

Topic模式:

所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。

即Topic交换机按Topic向队列发送消息,一个队列可以绑定一种Topic,也可以绑定多种。

模拟场景:

  • VIP可以收到所有消息(对应多个Topic)
  • Common用户只能收到一般消息(对应特定的Topic)

配置类,在这里创建队列、Topic交换机,并且和交换机绑定。

package com.ah.mq.topic;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.*;

@Configuration
public class TopicConfig {
	final static String EXCHANGE_NAME = "topicExchange";
	final static String QUEUE_VIP = "topic.vip";
	final static String QUEUE_COMMON = "topic.common";

	@Bean
	public Queue queueVip() {
		// 创建队列
		return new Queue(QUEUE_VIP);
	}

	@Bean
	public Queue queueCommon() {
		return new Queue(QUEUE_COMMON);
	}

	@Bean
	TopicExchange exchange() {
		// Topic交换器
		return new TopicExchange(EXCHANGE_NAME);
	}

	// 将对列绑定到Topic交换器(绑定后,修改代码不会自动解绑,要到MQ控制界面去解绑)
	// (VIP可以接收所有消息,采用#的方式)
	@Bean
	// 此处是自动绑定,参数名要求和上面的方法名一致
	Binding bindingExchangeVIP(Queue queueVip, TopicExchange exchange) {
		// with的参数是routingKey,范围比queue的那么要广
		return BindingBuilder.bind(queueVip).to(exchange).with("topic.#");
	}

	// 将对列绑定到Topic交换器
	@Bean
	Binding bindingExchange2(Queue queueCommon, TopicExchange exchange) {
		// 普通人只能接收部分消息
		return BindingBuilder.bind(queueCommon).to(exchange).with(QUEUE_COMMON);
	}
}

发送方和接收方写在一起

  • 一个发送方
  • 两个接收方(模拟VIP和普通用户)(使用@Component,自动接收,没必要使用控制器)
package com.ah.mq.topic;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TopicRendRecv {
	@Autowired
	private AmqpTemplate mq;

	public void sendVip() {
		String msg = "VIP专享";
		System.out.println("Sender:" + msg);
		this.mq.convertAndSend(TopicConfig.EXCHANGE_NAME, TopicConfig.QUEUE_VIP, msg);
	}

	public void sendAll() {
		String msg = "所有人";
		System.out.println("Sender:" + msg);
		this.mq.convertAndSend(TopicConfig.EXCHANGE_NAME, TopicConfig.QUEUE_COMMON, msg);
	}
}

@Component
class TopicReceiver {
	@RabbitListener(queues = TopicConfig.QUEUE_VIP)
	public void recv1(String message) {
		System.out.println("VIP:" + message);
	}
}

@Component
class TopicReceiver2 {
	@RabbitListener(queues = TopicConfig.QUEUE_COMMON)
	public void recv1(String message) {
		System.out.println("普通人: " + message);
	}
}

测试类

package com.ah.mq.topic;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class TopicTest {

	@Autowired
	private TopicRendRecv topic;

	@Test
	public void sendVip() {
		topic.sendVip();
	}

	@Test
	public void sendAll() {
		topic.sendAll();
	}
}

运行结果:

普通人 : 所有人

VIP : 所有人

VIP : VIP专享

Fanout模式

fanout:展开、散开

广播,即所有bind到某exchange的queue,都可以接收消息。

package com.ah.mq.fanout;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.*;

@Configuration
public class FanoutConfig {
	static final String MQ_NAME_1 = "fanout.1";
	static final String MQ_NAME_2 = "fanout.2";
	static final String EXCHAGE_NAME = "fanoutExchange";

	@Bean
	public Queue queue1() {
		return new Queue(MQ_NAME_1);
	}

	@Bean
	public Queue queue2() {
		return new Queue(MQ_NAME_2);
	}

	// 创建Fanout交换器
	@Bean
	FanoutExchange fanoutExchange() {
		return new FanoutExchange(EXCHAGE_NAME);
	}

	// 绑定
	@Bean
	Binding bindingExchangeA(Queue queue1, FanoutExchange fanoutExchange) {
		return BindingBuilder.bind(queue1).to(fanoutExchange);
	}

	@Bean
	Binding bindingExchangeB(Queue queue2, FanoutExchange fanoutExchange) {
		return BindingBuilder.bind(queue2).to(fanoutExchange);
	}
}
package com.ah.mq.fanout;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class FanoutSendRecv {

	@Autowired
	private AmqpTemplate mq;

	public void send() {
		String msg = "@所有人";
		System.out.println("Sender:" + msg);
		// routingKey为空字符串
		this.mq.convertAndSend(FanoutConfig.EXCHAGE_NAME, "", msg);
	}

	@RabbitListener(queues = FanoutConfig.MQ_NAME_1)
	public void process(String message) {
		System.out.println("Recv1:" + message);
	}
}

@Component
class ClientA {
	@RabbitListener(queues = FanoutConfig.MQ_NAME_2)
	public void process(String message) {
		System.out.println("Recv2:" + message);
	}
}
package com.ah.mq.fanout;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class FanoutTest {

	@Autowired
    private FanoutSendRecv send;

    @Test
    public void sendFanout () {
    	send.send();
    }
}

Header类型

使用较少,用Headers来匹配。

Headers是一个键值对,可以定义成Hashtable。

发送者在发送的时候定义一些键值对,接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。

匹配有两种方式all和any。

  • all代表定义的多个键值对都要满足,

  • any只要满足一个即可。

package com.ah.mq.headers;

import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.*;

@Configuration
public class HeadersConfig {
	static final String MQ_NAME_ALL = "HeaderQueue1";
	static final String MQ_NAME_ANY = "HeaderQueue2";
	static final String EXCHAGE_NAME_ALL = "HeaderExchange1";
	static final String EXCHAGE_NAME_ANY = "HeaderExchange2";

	@Bean
	public Queue queue1() {
		return new Queue(MQ_NAME_ALL);
	}

	@Bean
	public Queue queue2() {
		return new Queue(MQ_NAME_ANY);
	}

	@Bean
	public HeadersExchange exchange1() {
		return new HeadersExchange(EXCHAGE_NAME_ALL);
	}

	@Bean
	public HeadersExchange exchange2() {
		return new HeadersExchange(EXCHAGE_NAME_ANY);
	}

	static private Map<String, Object> map = new HashMap();
	static {
		map.put("key1", "v1");
		map.put("key2", "v2");
	}

	@Bean
	public Binding bindingExchange1(Queue queue1, HeadersExchange exchange1) {
		return BindingBuilder.bind(queue1).to(exchange1).whereAll(map).match();
	}

	@Bean
	public Binding bindingExchange2(Queue queue2, HeadersExchange exchange2) {
		return BindingBuilder.bind(queue2).to(exchange2).whereAny(map).match();
	}
}
package com.ah.mq.headers;

import java.util.Map;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HeadersSendRecv {

	@Autowired
	private AmqpTemplate mq;

	public void sendALL(Map<String, Object> head, String msg) {
		mq.convertAndSend(HeadersConfig.EXCHAGE_NAME_ALL, HeadersConfig.MQ_NAME_ALL, getMessage(head, msg));
	}

	public void sendAny(Map<String, Object> head, String msg) {
		mq.convertAndSend(HeadersConfig.EXCHAGE_NAME_ANY, HeadersConfig.MQ_NAME_ANY, getMessage(head, msg));
	}

	private Message getMessage(Map<String, Object> head, Object msg) {
		MessageProperties messageProperties = new MessageProperties();
		for (Map.Entry<String, Object> entry : head.entrySet()) {
			messageProperties.setHeader(entry.getKey(), entry.getValue());
		}
		MessageConverter messageConverter = new SimpleMessageConverter();
		return messageConverter.toMessage(msg, messageProperties);
	}
}

@Component
class HeaderRecv {

	@RabbitListener(queues = HeadersConfig.MQ_NAME_ALL)
	public void creditBank(String msg) {
		System.out.println("RECV:要求ALL," + msg);
	}

	@RabbitListener(queues = HeadersConfig.MQ_NAME_ANY)
	public void creditFinance(String msg) {
		System.out.println("RECV:要求ANY," + msg);
	}
}
package com.ah.mq.headers;

import java.util.HashMap;
import java.util.Map;

import org.junit.*;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class HeadersTest {

	@Autowired
	private HeadersSendRecv sender;
	
	static Map<String, Object> head = new HashMap();
	@Before
	public void init() {
		head.clear();
	}

	@Test
	public void t1() {
		head.put("key1", "v1");
		sender.sendALL(head, "实际Part");
	}

	@Test
	public void t2() {
		head.put("key1", "v1");
		head.put("key2", "v2");
		sender.sendALL(head, "实际All");
	}

	@Test
	public void t3() {
		head.put("key2", "v2");
		sender.sendAny(head, "实际Part");
	}

	@Test
	public void t4() {
		head.put("key1", "v1");
		head.put("key2", "v2");
		sender.sendAny(head, "实际All");
	}
}

结果:

RECV:要求ANY,实际Part

RECV:要求ALL,实际All

RECV:要求ANY,实际All

(要求ALL,实际Part的,没有收到)

删除队列

关闭应用:rabbitmqctl stop_app
清除队列:rabbitmqctl reset
重新启动:rabbitmqctl start_app
原文地址:https://www.cnblogs.com/tigerlion/p/12951888.html