springboot集成RabbitMQ

一、安装

安装我是按照这个哥们安装的,cdns 过客幽影星风

二、在 spring boot 中使用

  1. 在pom.xml配置文件中加入如下依赖。

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  2. 在application.yml文件中配置rabbitmq

    spring:
      rabbitmq:
        host: 127.0.0.1   # 主机ip
        port: 5672		  # 主机端口
        username: guest	  # 访问用户名
        password: guest   # 访问密码
        publisher-confirms: true # 启用消息确认,发送失败会异常
        virtual-host: /   # 定义虚拟主机的路径,默认为根目录
    
  3. rabbitmq默认使用字节方式进行发送数据,为了在管理界面方便查看,我们可以使用JSON字符串的方式传输

    // 添加配置文件 RabbitConfig.java
    
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * 默认情况下RabbitMQ发送的消息是转换为字节码,这里改为发送JSON数据。
     *
     * @author lixingwu
     */
    @Configuration
    public class RabbitConfig {
        @Bean
        public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
            final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            return rabbitTemplate;
        }
    }
    
    

    注意:如果使用JSON数据发送数据,就只能使用convertAndSend方法,不能使用send方法了,具体怎么解决还不知道。

  4. 创建测试文件RabbitMQTest.java

    import cn.hutool.core.lang.Console;
    import org.junit.Test;
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Autowired;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * RabbitMQ 测试
     */
    public class RabbitMQTest extends BaseSpringBootTest {
        @Autowired
        private AmqpTemplate mq;
        @Autowired
        AmqpAdmin amqpAdmin;
    
        //创建交换机
        @Test
        public void declareExchange() {
            // 创建 Direct 一对一的交换器,一个交换器对应一个队列
            // 该交换机里面的三个参数分别为: 名字,持久化,是否自动删除
            DirectExchange directExchange = new DirectExchange("direct-exchange-001", false, false);
            amqpAdmin.declareExchange(directExchange);
    
            // 创建FANOUT,发布订阅模式(不存在路由键 将被投放到exchange对应的队列中)
            // 可绑定多个队列,消息会给全部的队列每人发送一份
            FanoutExchange fanoutExchange = new FanoutExchange("fanout-exchange-001", false, false);
            amqpAdmin.declareExchange(fanoutExchange);
    
            // 创建Topic,可以使得不同源头的数据投放到一个队列中
            // 通过路由键的命名分类来进行筛选,其中
            // * 表示:可以(只能)匹配一个单词,
            // # 表示:可以匹配多个单词(或者零个)
            TopicExchange topicExchange = new TopicExchange("topic-exchange-001", false, false);
            amqpAdmin.declareExchange(topicExchange);
        }
    
        //创建队列
        @Test
        public void declareQueue() {
            amqpAdmin.declareQueue(new Queue("direct-queue"));
            amqpAdmin.declareQueue(new Queue("fanout-queue-001"));
            amqpAdmin.declareQueue(new Queue("fanout-queue-002"));
            amqpAdmin.declareQueue(new Queue("fanout-queue-003"));
            amqpAdmin.declareQueue(new Queue("topic-queue-001"));
            amqpAdmin.declareQueue(new Queue("topic-queue-002"));
            amqpAdmin.declareQueue(new Queue("topic-queue-003"));
        }
    
        //绑定交换机队列
        @Test
        public void binding() {
            // 绑定Direct
            Binding directBinding = BindingBuilder.bind(new Queue("direct-queue")).to(new DirectExchange("direct-exchange-001", false, false)).with("direct-rout-key");
            amqpAdmin.declareBinding(directBinding);
    
            // 绑定FANOUT
            Binding fanoutBinding1 = BindingBuilder.bind(new Queue("fanout-queue-001")).to(new FanoutExchange("fanout-exchange-001", false, false));
            Binding fanoutBinding2 = BindingBuilder.bind(new Queue("fanout-queue-002")).to(new FanoutExchange("fanout-exchange-001", false, false));
            Binding fanoutBinding3 = BindingBuilder.bind(new Queue("fanout-queue-003")).to(new FanoutExchange("fanout-exchange-001", false, false));
            amqpAdmin.declareBinding(fanoutBinding1);
            amqpAdmin.declareBinding(fanoutBinding2);
            amqpAdmin.declareBinding(fanoutBinding3);
    
            // 绑定Topic
            Binding topicBinding1 = BindingBuilder.bind(new Queue("topic-queue-001")).to(new TopicExchange("topic-exchange-001", false, false)).with("*.to");
            Binding topicBinding2 = BindingBuilder.bind(new Queue("topic-queue-002")).to(new TopicExchange("topic-exchange-001", false, false)).with("log.*");
            Binding topicBinding3 = BindingBuilder.bind(new Queue("topic-queue-003")).to(new TopicExchange("topic-exchange-001", false, false)).with("log1.*");
            amqpAdmin.declareBinding(topicBinding1);
            amqpAdmin.declareBinding(topicBinding2);
            amqpAdmin.declareBinding(topicBinding3);
        }
    
        //发送者消息
        @Test
        public void send() {
            Map<String, Object> param = new HashMap<>(3);
            param.put("name", "李兴武");
            param.put("age", 24);
            param.put("sex", false);
    
            // 发送自动转换后的消息,test为消息队列的名字
            // mq.convertAndSend("test", param);
    
            // 根据路由键发送
            // mq.convertAndSend("direct-rout-key", param);
    
            // 根据交换器+路由键发送
            // mq.convertAndSend("topic-exchange-001", "log.bak", param);
    
            // 自定义消息头
            mq.convertAndSend("direct-exchange-001", "direct-rout-key", param, message -> {
                message.getMessageProperties().getHeaders().put("token", "123456789");
                return message;
            });
        }
    
        // 接收消息
        @Test
        public void process() {
            Message message1 = mq.receive("topic-queue-001");
            Console.log(message1);
            Message message2 = mq.receive("topic-queue-002");
            Console.log(message2);
            // 获取消息头
            Message message3 = mq.receive("direct-queue");
            MessageProperties messageProperties = message3.getMessageProperties();
            Console.log(messageProperties.getHeaders().get("token"));
        }
    }
    
  5. 未完待续...

原文地址:https://www.cnblogs.com/lixingwu/p/10940804.html