一、初识RabbitMQ
1、核心思想:接收并转发消息。可以把它想象成一个邮局
producer:消息生产者
queue:队列
consumer:消息消费者
2、消息队列
3、消息队列的特性
业务无关
FIFO(先进先出)
容灾
性能
4、为什么要用消息队列
系统解耦
异步调用
流量削峰
5、RabbitMQ的特点
开源、跨语言
Erlang语言编写
应用广泛
社区活跃、API丰富
6、AMQP协议
Advanced Message Queuing Protocol
7、RabbitMQ核心概念
Server:服务
connection:与Server建立连接
channel:信道,几乎所有的操作都在信道上进行,客户端可以建立多个信道
message:消息,由properties和body组成
virtual host:虚拟主机,顶层隔离。同一个虚拟主机下,不能有重复的交换机和queue
exchange:交换机,接收生产者的消息,然后根据指定的路由器去把消息转发到所绑定的队列上
binding:绑定交换机和队列
routing key:路由键,路由规则,虚拟机可以用它来确定这个消息如何进行一个路由
queue:队列,消费者只需要监听队列来消费消息,不需要关注消息来自哪个Exchange。 Exchange和Message Queue存在着绑定关系,一个Exchange可以绑定多个消息队列
8、消息流转过程
二、RabbitMQ的安装和启动
1、安装
# vim /etc/yum.repos.d/rabbitmq_erlang.repo
[rabbitmq-erlang] name=rabbitmq-erlang baseurl=https://dl.bintray.com/rabbitmq-erlang/rpm/erlang/22/el/7 gpgcheck=1 gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc repo_gpgcheck=0 enabled=1
# yum clean all
# yum makecache
# yum install erlang(交互窗口输入y)
# rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
# wget https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.8.2/rabbitmq-server-3.8.2-1.el7.noarch.rpm
# yum install rabbitmq-server-3.8.2-1.el7.noarch.rpm(交互窗口输入y)
2、启动RabbitMQ
# systemctl start rabbitmq-server
3、查看状态
# rabbitmqctl status
4、停止
# rabbitmqctl stop
5、设置开机自启
# systemctl enable rabbitmq-server
6、检查服务器状态
# systemctl status rabbitmq-server
7、开启web管理界面
# rabbitmq-plugins enable rabbitmq_management
8、添加管理界面用户
# rabbitmqctl add_user admin password
9、为用户设置权限
# rabbitmqctl set_user_tags admin administrator
10、访问管理后台
# 192.168.211.133:15672
三、实战案例演示
1、简单demo
发送消息
package helloword; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * helloword发送类 */ public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ地址 factory.setHost("192.168.211.133"); factory.setUsername("admin"); factory.setPassword("password"); //建立连接 Connection connection = factory.newConnection(); //获得信道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //发布消息 String message = "hello word hahaha!"; channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8")); System.out.println("发送了消息:"+message); //关闭连接 channel.close(); connection.close(); } }
接收消息
package helloword; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 接收消息并打印,持续接收 */ public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ地址 factory.setHost("192.168.211.133"); factory.setUsername("admin"); factory.setPassword("password"); //建立连接 Connection connection = factory.newConnection(); //获得信道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //接收消息并消费 channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("收到消息:"+ message); } }); } }
2、多个消费者
循环调度
公平派遣
消息确认
发送消息
package workqueues; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 任务有所耗时,多个任务 */ public class NewTask { private final static String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ地址 factory.setHost("192.168.211.133"); factory.setUsername("admin"); factory.setPassword("password"); //建立连接 Connection connection = factory.newConnection(); //获得信道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null); for (int i = 0; i <10 ; i++) { String message; if(i % 2 ==0){ message = i + "..."; }else{ message = String.valueOf(i); } channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8")); System.out.println("发送了消息:" + message); } channel.close(); connection.close(); } }
接收消息
package workqueues; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 接收批量消息 */ public class Worker { private final static String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ地址 factory.setHost("192.168.211.133"); factory.setUsername("admin"); factory.setPassword("password"); //建立连接 Connection connection = factory.newConnection(); //获得信道 final Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null); System.out.println("开始接收消息"); //一次只处理一个任务,没处理完不会接收下一个任务 channel.basicQos(1); channel.basicConsume(TASK_QUEUE_NAME,false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("收到了消息:"+ message); try{ doWork(message); }finally { System.out.println("消息处理完成"); //处理完消息之后进行确认 channel.basicAck(envelope.getDeliveryTag(),false); } } }); } private static void doWork(String task){ char[] chars = task.toCharArray(); for (char aChar : chars) { if(aChar == '.'){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
四、交换机工作模式
fanout:广播,这种模式只需要将队列绑定到交换机上即可,是不需要设置路由键的
direct:根据RoutingKey匹配消息路由到指定队列
topic:生产者指定RoutingKey消息根据消费端指定的队列通过模糊匹配的方式进行相应转发
* 可以代替一个单词
# 可以代替零个或多个单词
headers:根据发送消息内容中的headers属性来匹配
1、fanout
package fanout; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.133"); factory.setUsername("admin"); factory.setPassword("password"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT); String message = "info:Hello World !"; channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8")); System.out.println("发送了消息:" + message); channel.close(); connection.close(); } }
package fanout; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.133"); factory.setUsername("admin"); factory.setPassword("password"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_NAME,""); System.out.println("开始接收消息"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("收到消息:" + message); } }; channel.basicConsume(queueName,true,consumer); } }
2、direct
package direct; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * direct类型的交换机发送消息 */ public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.133"); factory.setUsername("admin"); factory.setPassword("password"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT); String message = "Hello World !"; channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8")); System.out.println("发送了消息,等级为:info,消息内容为:"+ message); channel.basicPublish(EXCHANGE_NAME,"warning",null,message.getBytes("UTF-8")); System.out.println("发送了消息,等级为:warning,消息内容为:"+ message); channel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes("UTF-8")); System.out.println("发送了消息,等级为:error,消息内容为:"+ message); channel.close(); connection.close(); } }
package direct; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 接收三种类型的等级的日志 */ public class ReceiveLogsDirect1 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.133"); factory.setUsername("admin"); factory.setPassword("password"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT); //生成一个随机的临时的queue String queueName = channel.queueDeclare().getQueue(); //一个交换机同时绑定3个queue channel.queueBind(queueName,EXCHANGE_NAME,"info"); channel.queueBind(queueName,EXCHANGE_NAME,"warning"); channel.queueBind(queueName,EXCHANGE_NAME,"error"); System.out.println("开始接收消息"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("收到消息:" + message); } }; channel.basicConsume(queueName,true,consumer); } }
package direct; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 只接收一种类型的等级的日志 */ public class ReceiveLogsDirect2 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.133"); factory.setUsername("admin"); factory.setPassword("password"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT); //生成一个随机的临时的queue String queueName = channel.queueDeclare().getQueue(); //一个交换机同时绑定3个queue channel.queueBind(queueName,EXCHANGE_NAME,"error"); System.out.println("开始接收消息"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("收到消息:" + message); } }; channel.basicConsume(queueName,true,consumer); } }
3、topic
package topic; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * topic类型的交换机发送消息 */ public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.133"); factory.setUsername("admin"); factory.setPassword("password"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC); String message = "Animal World !"; String[] routingKeys = new String[9]; routingKeys[0] = "quick.orange.rabbit"; routingKeys[1] = "lazy.orange.elephant"; routingKeys[2] = "quick.orange.fox"; routingKeys[3] = "lazy.brown.fox"; routingKeys[4] = "lazy.pink.rabbit"; routingKeys[5] = "quick.brown.fox"; routingKeys[6] = "orange"; routingKeys[7] = "quick.orange.male.rabbit"; routingKeys[8] = "lazy.orange.male.rabbit"; for (int i = 0; i < routingKeys.length; i++) { channel.basicPublish(EXCHANGE_NAME,routingKeys[i],null,message.getBytes("UTF-8")); System.out.println(i+"——"+"发送了:"+message+"——"+routingKeys[i]); } channel.close(); connection.close(); } }
package topic; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 特定路由键消息接收1 */ public class ReceiveLogsTopic1 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.133"); factory.setUsername("admin"); factory.setPassword("password"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC); //生成一个随机的临时的queue String queueName = channel.queueDeclare().getQueue(); String routingKey = "*.orange.*"; //一个交换机同时绑定3个queue channel.queueBind(queueName,EXCHANGE_NAME,routingKey); System.out.println("开始接收消息"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("收到消息:" + message + "——" + envelope.getRoutingKey()); } }; channel.basicConsume(queueName,true,consumer); } }
package topic; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 特定路由键消息接收2 */ public class ReceiveLogsTopic2 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.133"); factory.setUsername("admin"); factory.setPassword("password"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC); //生成一个随机的临时的queue String queueName = channel.queueDeclare().getQueue(); String routingKey = "*.*.rabbit"; channel.queueBind(queueName,EXCHANGE_NAME,routingKey); String routingKey2 = "lazy.#"; channel.queueBind(queueName,EXCHANGE_NAME,routingKey2); //一个交换机同时绑定3个queue System.out.println("开始接收消息"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("收到消息:" + message + "——" + envelope.getRoutingKey()); } }; channel.basicConsume(queueName,true,consumer); } }
五、Spring Boot整合RabbitMQ
1、生产者项目创建
创建springboot项目,添加RabbitMQ相关依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.learn</groupId> <artifactId>spring-boot-rabbitmq-producer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-boot-rabbitmq-producer</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!--添加RabbitMQ相关依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
配置RabbitMQ连接信息
server.port=8080
spring.application.name=producer
spring.rabbitmq.addresses=192.168.211.133:5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=password
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
编写RabbitMQ配置信息
package com.learn.springbootrabbitmqproducer; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** *rabbitmq配置类 */ @Configuration public class TopicRabbitConfig { @Bean public Queue queue1() { return new Queue("queue1"); } @Bean public Queue queue2() { return new Queue("queue2"); } @Bean TopicExchange exchange() { return new TopicExchange("bootExchange"); } @Bean Binding bindingExchangeMessage1(Queue queue1, TopicExchange exchange) { return BindingBuilder.bind(queue1).to(exchange).with("dog.red"); } @Bean Binding bindingExchangeMessage2(Queue queue2, TopicExchange exchange) { return BindingBuilder.bind(queue2).to(exchange).with("dog.#"); } }
发送消息
package com.learn.springbootrabbitmqproducer; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 发送消息 */ @Component public class MessageSender { @Autowired private AmqpTemplate template; public void send1() { String message = "this is message 1,routing key id dog.red"; System.out.println("发送了消息:" + message); this.template.convertAndSend("bootExchange", "dog.red", message); } public void send2() { String message = "this is message 2,routing key id dog.black"; System.out.println("发送了消息:" + message); this.template.convertAndSend("bootExchange", "dog.black", message); } }
编写消息发送测试类并启动测试类
package com.learn.springbootrabbitmqproducer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class SpringBootRabbitmqProducerApplicationTests { @Autowired MessageSender sender; @Test public void send1(){ sender.send1(); } @Test public void send2(){ sender.send2(); } }
2、消费者项目创建
创建springboot项目,添加RabbitMQ相关依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.learn</groupId> <artifactId>spring-boot-rabbitmq-consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-boot-rabbitmq-consumer</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
配置RabbitMQ连接信息
server.port=8081
spring.application.name=consumer
spring.rabbitmq.addresses=192.168.211.133:5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=password
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
编写两个消费者消费消息
package com.learn.springbootrabbitmqconsumer; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消费者1 */ @Component @RabbitListener(queues = "queue1") public class Receiver1 { @RabbitHandler public void process(String message){ System.out.println("Receiver1:" + message); } }
package com.learn.springbootrabbitmqconsumer; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消费者2 */ @Component @RabbitListener(queues = "queue2") public class Receiver2 { @RabbitHandler public void process(String message){ System.out.println("Receiver2:" + message); } }