SpringBoot:消息队列(集成RabbitMQ)

 简介

使用Erlang语言编写的一种消息中间件。消息中间件是一种数据传送的消息传递机制,换句话说,是一种软件应用之间的通讯方式。

举个栗子

消息中间件的作用之一是应用解耦。
拿取快递为例,前几年的快递收取方式通常是由快递员上门派件,那么也就是说快递员需要与顾客建立直接联系;近年来,快递行业蒸蒸日上,大量的快件如果由人工派送,不仅效率低,资源耗费也相当大。
而现在不管是小区,还是学校,都专门设置了快递柜,菜鸟驿站。这个快递柜就可以类比消息中间件,快递员无需把快件送到顾客手中,只要放入对应区域设置的快递柜中,顾客就可以自己来取。这样就消除了快递员与顾客的直接联,也就是应用解耦。

基础概念

Broker:简单来说就是消息队列服务器实体
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列,Exchange有4种类型:direct(默认点对点),fanout(发布订阅), topic(匹配规则发布订阅), 和headers,不同类型的Exchange转发消息的策略有所区别
Queue:消息队列载体,每个消息都会被投入到一个或多个队列
Binding:绑定,它的作用就是把 exchange和 queue按照路由规则绑定起来
Routing Key:路由关键字, exchange根据这个关键字进行消息投递
vhost:虚拟主机,一个 broker里可以开设多个 vhost,用作不同用户的权限分离
producer:消息生产者,就是投递消息的程序
consumer:消息消费者,就是接受消息的程序
channel:消息通道,在客户端的每个连接里,可建立多个 channel,每个 channel代表一个会话任务
Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

消息发送、接受流程

首先生产者把消息投递到exchange交换机,
exchange接受到消息后,根据消息的key和已经设置好的binging,进行消息路由,将消息投递到一个或者多个queue里面,
消费者从queue中取出消息,开始执行相应的代码

 

RabbitMQ三种分发策略

1.direct 单播

根据消息体中的路由键指定分发的队列

2.fanout 广播

忽略路由键,分发消息到所有队列

3.topic 多播(模式匹配)

路由键可包含模式匹配字符(*代表一个字符,#代表一个或多个字符)

搭建RabbitMQ环境

 1.下载安装RabbitMQ镜像

docker pull rabbitmq:3.8.1-management  #下载镜像
docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq 68055d63a993 #启动镜像

-d :后台运行

-p:暴露端口

访问:http://192.168.0.146:15672/(146服务器地址)  进入rabbitMQ管理页面,默认用户名和密码为guest。


SpringBoot集成RabbitMQ

1.Pom(引入starter)

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置application.properties

spring.rabbitmq.host=192.168.0.146
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

3.测试单播(点对点),多播(发布订阅)

  @Autowired
    RabbitTemplate rabbitTemplate;

    //单播发送
    @Test
    void directTest() {
        Person person = new Person("mengY",25,"男");
        rabbitTemplate.convertAndSend("exchange.direct","queue_01", person);
    }


    //单播接收
    @Test
    void directReceive(){
        Person person = (Person)rabbitTemplate.receiveAndConvert("queue_01");
        System.out.println(person.getClass());
        System.out.println(person.toString());
    }



    //广播发送
    @Test
    void fanoutTest() {
        Person person = new Person("mengY",25,"男");
        rabbitTemplate.convertAndSend("exchange.fanout","", person);
    }


    //广播接收
    @Test
    void fanoutReceive(){
        Object obj = rabbitTemplate.receiveAndConvert("queue_03");
        System.out.println(obj.getClass());
        System.out.println(obj);
    }

rabboitMQ默认的是一java的方式序列化消息对象

 上面的展示方式不太直观,有时我们需要用json的方式来序列化消息对象,可以通过配置MessageConverter来实现

@Configuration
public class RabbitConfig {
    /**
     * 消息转换器
     * @return
     */
    @Bean
    public MessageConverter getMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

 消息监听

@EnableRabbit
@SpringBootApplication
public class DemoMqApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoMqApplication.class, args);
    }

}
@EnableRabbit:开启基于注解的rabbitMQ模式
@Service
public class PersonService {
    /**
     * 监听接受消息
     */
    @RabbitListener(queues = "queue_01")
    public void receive(Person person){
        System.out.println("收到消息:"+person.toString());
    }
}

@RabbitListener:监听消息队列
@RabbitListener(queues = {"queue_03","queue_04"}):监听多个队列

AmqpAdmin管理组件

创建,删除交换器,队列

参考:https://blog.csdn.net/Leon_Jinhai_Sun/article/details/99692190

参考资料:

SpringBoot整合RabbitMQ之 典型应用场景实战一

原文地址:https://www.cnblogs.com/mengY/p/11784684.html