消息队列RabbitMQ

一、初识RabbitMQ

1、核心思想:接收并转发消息。可以把它想象成一个邮局

  producer:消息生产者
  queue:队列
  consumer:消息消费者
     

2、消息队列
    

3、消息队列的特性

  业务无关
  FIFO(先进先出)
  容灾
  性能

4、为什么要用消息队列

  系统解耦
  异步调用
  流量削峰

5、RabbitMQ的特点

  开源、跨语言
  Erlang语言编写
  应用广泛
  社区活跃、API丰富

6、AMQP协议

  Advanced Message Queuing Protocol
   
7、RabbitMQ核心概念

  
  Server:服务
  connection:与Server建立连接
  channel:信道,几乎所有的操作都在信道上进行,客户端可以建立多个信道
  message:消息,由properties和body组成
  virtual host:虚拟主机,顶层隔离。同一个虚拟主机下,不能有重复的交换机和queue
  exchange:交换机,接收生产者的消息,然后根据指定的路由器去把消息转发到所绑定的队列上
  binding:绑定交换机和队列
  routing key:路由键,路由规则,虚拟机可以用它来确定这个消息如何进行一个路由
  queue:队列,消费者只需要监听队列来消费消息,不需要关注消息来自哪个Exchange。 Exchange和Message Queue存在着绑定关系,一个Exchange可以绑定多个消息队列

8、消息流转过程

  

二、RabbitMQ的安装和启动

1、安装

  # vim /etc/yum.repos.d/rabbitmq_erlang.repo

[rabbitmq-erlang]
name=rabbitmq-erlang
baseurl=https://dl.bintray.com/rabbitmq-erlang/rpm/erlang/22/el/7
gpgcheck=1
gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
repo_gpgcheck=0
enabled=1

  # yum clean all
  # yum makecache
  # yum install erlang(交互窗口输入y)
  # rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
  # wget https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.8.2/rabbitmq-server-3.8.2-1.el7.noarch.rpm
  # yum install rabbitmq-server-3.8.2-1.el7.noarch.rpm(交互窗口输入y)

2、启动RabbitMQ

  # systemctl start rabbitmq-server

3、查看状态

  # rabbitmqctl status

4、停止

  # rabbitmqctl stop

5、设置开机自启

  # systemctl enable rabbitmq-server

6、检查服务器状态

  # systemctl status rabbitmq-server

7、开启web管理界面

  # rabbitmq-plugins enable rabbitmq_management

8、添加管理界面用户

  # rabbitmqctl add_user admin password

9、为用户设置权限

  # rabbitmqctl set_user_tags admin administrator

10、访问管理后台

  # 192.168.211.133:15672

三、实战案例演示

1、简单demo

  发送消息

package helloword;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * helloword发送类
 */
public class Send {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("192.168.211.133");
        factory.setUsername("admin");
        factory.setPassword("password");
        //建立连接
        Connection connection = factory.newConnection();
        //获得信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发布消息
        String  message = "hello word hahaha!";
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
        System.out.println("发送了消息:"+message);
        //关闭连接
        channel.close();
        connection.close();
    }
}

  接收消息

package helloword;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 接收消息并打印,持续接收
 */
public class Recv {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("192.168.211.133");
        factory.setUsername("admin");
        factory.setPassword("password");
        //建立连接
        Connection connection = factory.newConnection();
        //获得信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //接收消息并消费
        channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到消息:"+ message);
            }
        });


    }
}

2、多个消费者

  循环调度
  公平派遣
  消息确认

  发送消息

package workqueues;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 任务有所耗时,多个任务
 */
public class NewTask {

    private final static String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("192.168.211.133");
        factory.setUsername("admin");
        factory.setPassword("password");
        //建立连接
        Connection connection = factory.newConnection();
        //获得信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
        for (int i = 0; i <10 ; i++) {
            String message;
            if(i % 2 ==0){
                message = i + "...";
            }else{
                message = String.valueOf(i);
            }

            channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));
            System.out.println("发送了消息:" + message);
        }
        channel.close();
        connection.close();
    }
}

   接收消息

package workqueues;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 接收批量消息
 */
public class Worker {
    private final static String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("192.168.211.133");
        factory.setUsername("admin");
        factory.setPassword("password");
        //建立连接
        Connection connection = factory.newConnection();
        //获得信道
        final Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
        System.out.println("开始接收消息");
        //一次只处理一个任务,没处理完不会接收下一个任务
        channel.basicQos(1);
        channel.basicConsume(TASK_QUEUE_NAME,false,new DefaultConsumer(channel){
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               String message = new String(body,"UTF-8");
               System.out.println("收到了消息:"+ message);
               try{
                   doWork(message);
               }finally {
                    System.out.println("消息处理完成");
                    //处理完消息之后进行确认
                    channel.basicAck(envelope.getDeliveryTag(),false);
               }
           }
       });

    }

    private static void doWork(String task){
        char[] chars = task.toCharArray();
        for (char aChar : chars) {
            if(aChar == '.'){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

四、交换机工作模式

fanout:广播,这种模式只需要将队列绑定到交换机上即可,是不需要设置路由键的
direct:根据RoutingKey匹配消息路由到指定队列
topic:生产者指定RoutingKey消息根据消费端指定的队列通过模糊匹配的方式进行相应转发
  * 可以代替一个单词
  # 可以代替零个或多个单词
headers:根据发送消息内容中的headers属性来匹配

1、fanout

package fanout;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class EmitLog {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.211.133");
        factory.setUsername("admin");
        factory.setPassword("password");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
        String message = "info:Hello World !";
        channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
        System.out.println("发送了消息:" + message);
        channel.close();
        connection.close();
    }
}
package fanout;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLogs {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.211.133");
        factory.setUsername("admin");
        factory.setPassword("password");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("开始接收消息");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到消息:" + message);
            }
        };
        channel.basicConsume(queueName,true,consumer);

    }
}

2、direct

package direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * direct类型的交换机发送消息
 */
public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.211.133");
        factory.setUsername("admin");
        factory.setPassword("password");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
        String message = "Hello World !";
        channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));
        System.out.println("发送了消息,等级为:info,消息内容为:"+ message);

        channel.basicPublish(EXCHANGE_NAME,"warning",null,message.getBytes("UTF-8"));
        System.out.println("发送了消息,等级为:warning,消息内容为:"+ message);

        channel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes("UTF-8"));
        System.out.println("发送了消息,等级为:error,消息内容为:"+ message);
        channel.close();
        connection.close();
    }
}
package direct;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 接收三种类型的等级的日志
 */
public class ReceiveLogsDirect1 {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.211.133");
        factory.setUsername("admin");
        factory.setPassword("password");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
        //生成一个随机的临时的queue
        String queueName = channel.queueDeclare().getQueue();
        //一个交换机同时绑定3个queue
        channel.queueBind(queueName,EXCHANGE_NAME,"info");
        channel.queueBind(queueName,EXCHANGE_NAME,"warning");
        channel.queueBind(queueName,EXCHANGE_NAME,"error");
        System.out.println("开始接收消息");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到消息:" + message);
            }
        };
        channel.basicConsume(queueName,true,consumer);

    }
}
package direct;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 只接收一种类型的等级的日志
 */
public class ReceiveLogsDirect2 {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.211.133");
        factory.setUsername("admin");
        factory.setPassword("password");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
        //生成一个随机的临时的queue
        String queueName = channel.queueDeclare().getQueue();
        //一个交换机同时绑定3个queue
        channel.queueBind(queueName,EXCHANGE_NAME,"error");
        System.out.println("开始接收消息");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到消息:" + message);
            }
        };
        channel.basicConsume(queueName,true,consumer);

    }
}

3、topic

package topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * topic类型的交换机发送消息
 */
public class EmitLogTopic {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.211.133");
        factory.setUsername("admin");
        factory.setPassword("password");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
        String message = "Animal World !";
        String[] routingKeys = new String[9];
        routingKeys[0] = "quick.orange.rabbit";
        routingKeys[1] = "lazy.orange.elephant";
        routingKeys[2] = "quick.orange.fox";
        routingKeys[3] = "lazy.brown.fox";
        routingKeys[4] = "lazy.pink.rabbit";
        routingKeys[5] = "quick.brown.fox";
        routingKeys[6] = "orange";
        routingKeys[7] = "quick.orange.male.rabbit";
        routingKeys[8] = "lazy.orange.male.rabbit";
        for (int i = 0; i < routingKeys.length; i++) {
            channel.basicPublish(EXCHANGE_NAME,routingKeys[i],null,message.getBytes("UTF-8"));
            System.out.println(i+"——"+"发送了:"+message+"——"+routingKeys[i]);
        }
        channel.close();
        connection.close();
    }
}
package topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 特定路由键消息接收1
 */
public class ReceiveLogsTopic1 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.211.133");
        factory.setUsername("admin");
        factory.setPassword("password");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
        //生成一个随机的临时的queue
        String queueName = channel.queueDeclare().getQueue();
        String routingKey = "*.orange.*";
        //一个交换机同时绑定3个queue
        channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
        System.out.println("开始接收消息");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到消息:" + message + "——" + envelope.getRoutingKey());
            }
        };
        channel.basicConsume(queueName,true,consumer);

    }
}
package topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 特定路由键消息接收2
 */
public class ReceiveLogsTopic2 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.211.133");
        factory.setUsername("admin");
        factory.setPassword("password");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
        //生成一个随机的临时的queue
        String queueName = channel.queueDeclare().getQueue();
        String routingKey = "*.*.rabbit";
        channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
        String routingKey2 = "lazy.#";
        channel.queueBind(queueName,EXCHANGE_NAME,routingKey2);
        //一个交换机同时绑定3个queue
        System.out.println("开始接收消息");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到消息:" + message + "——" + envelope.getRoutingKey());
            }
        };
        channel.basicConsume(queueName,true,consumer);

    }
} 

五、Spring Boot整合RabbitMQ

1、生产者项目创建

创建springboot项目,添加RabbitMQ相关依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.learn</groupId>
    <artifactId>spring-boot-rabbitmq-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-rabbitmq-producer</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!--添加RabbitMQ相关依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

配置RabbitMQ连接信息

server.port=8080
spring.application.name=producer
spring.rabbitmq.addresses=192.168.211.133:5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=password
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

编写RabbitMQ配置信息

package com.learn.springbootrabbitmqproducer;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 *rabbitmq配置类
 */
@Configuration
public class TopicRabbitConfig {
    @Bean
    public Queue queue1() {
        return new Queue("queue1");
    }

    @Bean
    public Queue queue2() {
        return new Queue("queue2");
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("bootExchange");
    }

    @Bean
    Binding bindingExchangeMessage1(Queue queue1, TopicExchange exchange) {
        return BindingBuilder.bind(queue1).to(exchange).with("dog.red");

    }

    @Bean
    Binding bindingExchangeMessage2(Queue queue2, TopicExchange exchange) {
        return BindingBuilder.bind(queue2).to(exchange).with("dog.#");

    }
}

发送消息

package com.learn.springbootrabbitmqproducer;


import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 发送消息
 */
@Component
public class MessageSender {
    @Autowired
    private AmqpTemplate template;

    public void send1() {
        String message = "this is message 1,routing key id dog.red";
        System.out.println("发送了消息:" + message);
        this.template.convertAndSend("bootExchange", "dog.red", message);
    }

    public void send2() {
        String message = "this is message 2,routing key id dog.black";
        System.out.println("发送了消息:" + message);
        this.template.convertAndSend("bootExchange", "dog.black", message);
    }
}

编写消息发送测试类并启动测试类

package com.learn.springbootrabbitmqproducer;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class SpringBootRabbitmqProducerApplicationTests {


    @Autowired
    MessageSender sender;

    @Test
    public void send1(){
        sender.send1();
    }

    @Test
    public void send2(){
        sender.send2();
    }

}

2、消费者项目创建

创建springboot项目,添加RabbitMQ相关依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.learn</groupId>
    <artifactId>spring-boot-rabbitmq-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-rabbitmq-consumer</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

配置RabbitMQ连接信息

server.port=8081
spring.application.name=consumer
spring.rabbitmq.addresses=192.168.211.133:5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=password
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

编写两个消费者消费消息

package com.learn.springbootrabbitmqconsumer;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消费者1
 */
@Component
@RabbitListener(queues = "queue1")
public class Receiver1 {

    @RabbitHandler
    public void process(String message){
        System.out.println("Receiver1:" + message);
    }
}
package com.learn.springbootrabbitmqconsumer;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消费者2
 */
@Component
@RabbitListener(queues = "queue2")
public class Receiver2 {

    @RabbitHandler
    public void process(String message){
        System.out.println("Receiver2:" + message);
    }
}
原文地址:https://www.cnblogs.com/michealyang/p/14173618.html