第十章:(4)Spring Boot 与 消息 之 整合 RabbitMQ

一、依赖关系

  创建的 SpringBoot 模块中,引入了 amqp 的启动器

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

  amqp中包括了下面两个依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-messaging</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
</dependency>

二、自动配置

  1、自动配置类:RabbitAutoConfiguration

  2、自动配置了连接工厂 CachingConnectionFactory(通过RabbitProperties创建连接工厂)

    

  3、RabbitProperties 封装了 Rabbit 的配置

    

  4、RabbitTemplate 给 RabbitMQ 发送和接受消息

    

  5、AmqpAdmin :RabbitMQ 系统管理功能组件

 

三、测试

  1、配置 RabbitMQ 的信息

spring.rabbitmq.host=/192.168.1.6
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 默认就是5672
spring.rabbitmq.port=5672  
#spring.rabbitmq.virtual-host

  

  2、测试单播

  (1)发送数据

    /**
     * 1. 单播(点对点)
     */
    @Test
    public void testDirect() {
        //message 需要自己构造一个,定义消息体内容和消息头
        //rabbitTemplate.send(exchage, routeKey, message);

        //Object 只需要传入要发送的对象,自动序列化发送给 rabbitmq
        //rabbitTemplate.convertAndSend(exchage, routeKey, Object);
        Map<String, Object> map = new HashMap<>();
        map.put("msg", "这是第一个消息");
        map.put("data", Arrays.asList("helloworld", 123, true));

        //对象被默认序列化以后发送出去
        rabbitTemplate.convertAndSend("exchange.direct", "njf.news", map);
    }

  (2)接收数据

    //接收数据
    @Test
    public void receiveMsg() {
        Object o = rabbitTemplate.receiveAndConvert("njf.news");
        System.out.println(o.getClass());
        System.out.println("o = " + o);
    }

    发送数据,将数据自动转化为 JSON 发送出去,可以自定义序列化器

  3、自定义序列化器

    在 RabbitTemplate 类中有一个 MessageConverter 消息转换器:

    

     如果没有配置消息转换器就使用默认的转换器。

     在SimpleMessageConverter类中的 fromMessage 方法中:

     这是默认的消息转换器。

    RabbitMQ 中的 MessageConverter 是消息转换接口,提供了多个实现类:

    

     只需要在容器中配置想使用的转换器即可。

@Configuration
public class MyAMQPConfig {

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

    再次进行测试,发现就可以把数据转成 JSON格式的数据了。

  4、测试广播

    /**
     * 2、广播
     */
    @Test
    public void fanout() {
        rabbitTemplate.convertAndSend("exchange.fanout", "", new Book("三国演义", "罗贯中"));
    }

    从管理平台也可以看到消息。

四、监听消息

  1、@EnableRabbit 开启基于注解的 RabbitMQ 模式

@EnableRabbit  //开启基于注解的 RabbitMQ 模式
@SpringBootApplication
public class SpringBoot10AmqpApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBoot10AmqpApplication.class, args);
    }

}

  2、@RabbitListener 监听消息队列的内容

@Service
public class BookService {

    @RabbitListener(queues = {"njf.news"})
    public void receive(Book book){
        System.out.println("book = " + book);
        System.out.println("收到消息 = " + book);
    }

    @RabbitListener(queues = {"njf"})
    public void receiveMsg(Message message){
        System.out.println("收到消息 = " + message);
        System.out.println(message.getBody());
        System.out.println(message.getMessageProperties());
    }
}

    当监听到指定队列里面有内容时,就会来执行方法。

五、AmqpAdmin

  AmqpAdmin :RabbitMQ 系统管理功能组件。

  AmqpAdmin:创建和删除 Queue,Exchange,Binding规则等。

  示例:

    @Autowired
    AmqpAdmin amqpAdmin;

    @Test
    public void createExchange() {
        //declare 开头的用于创建组件
        //创建交换器
        Exchange exchange = new DirectExchange("amqpAdmin.exchange");
        amqpAdmin.declareExchange(exchange);
        System.out.println(exchange + "创建完成");

        //创建队列
        amqpAdmin.declareQueue(new Queue("adqpAdmin.queue", true));


        //创建绑定规则
        amqpAdmin.declareBinding(new Binding("adqpAdmin.queue", Binding.DestinationType.QUEUE,
                "amqpAdmin.exchange", "amqp.hhh", null));

    }

  效果:

原文地址:https://www.cnblogs.com/niujifei/p/15743077.html