rabbitMQ

maven:
<!-- 使用抽象级MQ接口 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

yml文件:

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin

配置常量类:

package com.wxf.mq.config;

import lombok.AccessLevel;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Component;

/**
 * @author wxf
 * @date 2019/4/12
 * @description rabbitMq配置需要的常量
 */
@Data
@Component
public class RabbitMqConstants {
    //交换机常量

    //转发交换机的bean名称
    public static final String FANOUT_EXCHANGE_BEAN_NAME = "fanoutExchange";
    //转发交换机名称
    public static final String FANOUT_EXCHANGE = "fanoutExchange";

    //转发交换机的bean名称
    public static final String TOPIC_EXCHANGE_BEAN_NAME = "topicExchange";
    //转发交换机名称
    public static final String TOPIC_EXCHANGE = "topicExchange";

    // 延迟交换机
    // 延迟交换机的bean名称
    public static final String  DELAYED_EXCHANGE_BEAN_NAME = "delayedExchange";
    //延迟交换机的名称
    public static final String DELAYED_EXCHANGE = "DELAYED_EXCHANGE";
    //延迟交换机的type  x-delayed-message 固定值
    public static final String DELAYED_EXCHANGE_TYPE = "x-delayed-message";


    //测试交换机的bean名称
   public static final String TEST_EXCHANGE_BEAN_NAME = "testExchange";
    //测试交换机名称
   public static final String TEST_EXCHANGE = "testExchange";


   //路由常量

    // fanout路由bean名称
    public static final String FANOUT_BINDING_BEAN_NAME = "fanoutBinding";

    // direct路由beanm名称
    public static final String DIRECT_BINDING_BEAN_NAME = "directBinding";

   //队列常量

    //fanout队列1的bean1名称
    public static final String FANOUT_QUEUE_BEAN_NAME_1 = "fanoutQueue1";
    //fanout队列1的名称常量
    public static final String FANOUT_QUEUE_1 = "FANOUT_QUEUE1";

    //fanout队列2的bean1名称
    public static final String FANOUT_QUEUE_BEAN_NAME_2 = "fanoutQueue2";
    //fanout队列2的名称常量
    public static final String FANOUT_QUEUE_2 = "FANOUT_QUEUE2";

    //topic队列1的bean1名称
    public static final String TOPIC_QUEUE_BEAN_NAME_1 = "topicQueue1";
    //topic队列1的名称常量
    public static final String TOPIC_QUEUE_1 = "TOPIC_QUEUE_1";

    //topic队列2的bean1名称
    public static final String TOPIC_QUEUE_BEAN_NAME_2 = "topicQueue2";
    //topic队列2的名称常量
    public static final String TOPIC_QUEUE_2 = "TOPIC_QUEUE_2";


    //延迟队列的bean名称
    public static final String DELAYED_QUEUE_BEAN_NAME = "delayedQueue";
    //延迟队列的名称常量
    public static final String DELAYED_QUEUE = "DELAYED_QUEUE";


    //测试队列的bean名称
    public static final String DIRECT_QUEUE_BEAN_NAME = "directQueue";
    //测试队列的名称常量
    public static final String DIRECT_QUEUE = "DIRECT_QUEUE";
}
rabbitMQ配置类:
package com.wxf.mq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * @author wxf
 * @date 2019/4/11
 * @description rabbitMq配置类
 */
@Component
public class RabbitMqConfig {

    //存在多个交换机和多个队列 需要在 @Bean(name = "")中加上名称区分 否则会报错


    //交换机 类型
    //routing_key==路由键
    //Direct:direct 类型的行为是"先匹配, 再投送". 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.
    //Direct<直接>:1对1-----一个消息只能被一个消费者消费
    //Topic:按规则转发消息(最灵活) 1对多-----一个消息可以被多个消费者消费
    //Fanout:转发消息到所有绑定队列 广播
    //Headers:设置 header attribute 参数类型的交换机


    /**
     * 订阅交换机 适用场景 如 群发信息
     * @return
     */
    @Bean(name = RabbitMqConstants.FANOUT_EXCHANGE_BEAN_NAME)
    public Exchange fanoutExchange() {
        //交换机构造参数 String name, boolean durable, boolean autoDelete
        //String name 交换机名称
        //boolean durable 重启的时候是否保留这个交换机 true保留 false不保留
        //boolean autoDelete 交换机中没有数据是否删除这个交换机 true删除 false不删除
        Exchange exchange = new FanoutExchange(RabbitMqConstants.FANOUT_EXCHANGE,true,false);
        return exchange;
    }

    /**
     * topicExchange topic交换机
     * @return
     */
    @Bean(name = RabbitMqConstants.TOPIC_EXCHANGE_BEAN_NAME)
    public Exchange topicExchange() {
        Exchange exchange = new TopicExchange(RabbitMqConstants.TOPIC_EXCHANGE,true,false);
        return exchange;
    }


    /**
     * 延迟队列交换机
     *
     * @return the exchange
     */
//    @Bean(name = RabbitMqConstants.DELAYED_EXCHANGE_BEAN_NAME)
//    public CustomExchange delayExchange() {
//        Map<String, Object> args = new HashMap<>();
//        args.put("x-delayed-type", "direct");
//        CustomExchange customExchange = new CustomExchange(RabbitMqConstants.DELAYED_EXCHANGE,
//                RabbitMqConstants.DELAYED_EXCHANGE_TYPE, true, false, args);
//        customExchange.setDelayed(true);
//        return customExchange;
//    }



    /**
     * direct模式交换机  适用场景 异步通知  比如 商品订单下单成功 异步通知到库存系统减库存
     * 交换机的类型为 direct
     * direct 类型的行为是"先匹配, 再投送". 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.
     * @return 返回交换机 注入spring中
     */
    @Bean(name = RabbitMqConstants.TEST_EXCHANGE_BEAN_NAME)
    public Exchange directExchange() {
        //Map<String, Object> args = new HashMap<>();
        Exchange exchange = new DirectExchange(RabbitMqConstants.TEST_EXCHANGE, true, false);
        return exchange;
    }

    /**
     * fanout队列1
     * @return
     */
    @Bean(name = RabbitMqConstants.FANOUT_QUEUE_BEAN_NAME_1)
    public Queue fanoutQueue1(){
        //参数
        // String name 队列名称
        // boolean durable 启的时候是否保留这个队列 true保留 false不保留
        Queue queue = new Queue(RabbitMqConstants.FANOUT_QUEUE_1,true);
        return queue;
    }

    /**
     * fanout队列2
     * @return
     */
    @Bean(name = RabbitMqConstants.FANOUT_QUEUE_BEAN_NAME_2)
    public Queue fanoutQueue2(){
        Queue queue = new Queue(RabbitMqConstants.FANOUT_QUEUE_2,true);
        return queue;
    }

    /**
     * 将fanout队列1 绑定到 fanoutExchange交换机上
     * 注意 Exchange 没有指明类型的交换机不能直接绑定队列
     * Exchange 是所有交换机的父类 Exchange只能通过绑定路由键绑定队列
     * Exchange 的 fanout模式 下 队列不需要绑定路由键
     * 队列直接帮交换机 需强转 如 这里 fanoutExchange交换机类型是Exchange 需强转成FanoutExchange类型 即可直接绑定到指定的交换机上
     * @return
     */
    @Bean
    public Binding fanoutBinding1() {
        //bind 绑定队列
        //to 到哪个交换机上
        return BindingBuilder.bind(fanoutQueue1()).to((FanoutExchange) fanoutExchange());
    }

    /**
     * 将fanout队列2 绑定到 fanoutExchange交换机上
     * @return
     */
    @Bean
    public Binding fanoutBinding2() {
        return BindingBuilder.bind(fanoutQueue2()).to((FanoutExchange) fanoutExchange());
    }


    /**
     * direct队列
     * @return
     */
    @Bean(name = RabbitMqConstants.DIRECT_QUEUE_BEAN_NAME)
    public Queue directQueue(){
        Queue queue = new Queue(RabbitMqConstants.DIRECT_QUEUE,true);
        return queue;
    }

    /**
     * 将directQueue队列绑定到 direct交换机上 指明路由键
     * @return
     */
    @Bean(name = RabbitMqConstants.DIRECT_BINDING_BEAN_NAME)
    public Binding directBinding() {
        //bind 绑定队列
        //to 到哪个交换机上
        //with 路由键 表明是哪个路由的
        return BindingBuilder.bind(directQueue()).to(directExchange()).with(RabbitMqConstants.DIRECT_QUEUE).noargs();
    }


    /**
     * topic队列1
     * @return
     */
    @Bean(name = RabbitMqConstants.TOPIC_QUEUE_BEAN_NAME_1)
    public Queue topicQueue1(){
        Queue queue = new Queue(RabbitMqConstants.TOPIC_QUEUE_1,true);
        return queue;
    }

    /**
     * topic队列2
     * @return
     */
    @Bean(name = RabbitMqConstants.TOPIC_QUEUE_BEAN_NAME_2)
    public Queue topicQueue2(){
        Queue queue = new Queue(RabbitMqConstants.TOPIC_QUEUE_2,true);
        return queue;
    }

    /**
     * 将topic队列1 绑定到 topicExchange交换机上
     * topic.# .# 代表任意个单词
     * @return
     */
    @Bean
    public Binding topicBinding1() {
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.#").noargs();
    }

    /**
     * 将topic队列1 绑定到 topicExchange交换机上
     * topic.* * 仅代表一个单词
     * @return
     */
    @Bean
    public Binding topicBinding2() {
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.*").noargs();
    }


    /**
     * 延迟队列
     * @return
     */
    @Bean(name = RabbitMqConstants.DELAYED_QUEUE_BEAN_NAME)
    public Queue delayeQueue(){
        Queue queue = new Queue(RabbitMqConstants.DELAYED_QUEUE,true);
        return queue;
    }


    /**
     * 将延迟队列 绑定到 delayeExchange交换机上
     * @return
     */
//    @Bean(name = "delayeBinding")
//    public Binding delayeBinding() {
//        return BindingBuilder.bind(delayeQueue()).to(delayExchange()).with(RabbitMqConstants.DELAYED_QUEUE).noargs();
//    }




}

几种模式的生产者:

package com.wxf.mq.handler;

import com.wxf.mq.config.RabbitMqConstants;
import com.wxf.mq.vo.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.Duration;
import java.time.Instant;
import java.util.Date;

/**
 * @author wxf
 * @date 2019/4/11
 * @description 发送消息 创建消息生产者Sender。通过注入AmqpTemplate接口的实例来实现消息的发送,
 * AmqpTemplate接口定义了一套针对AMQP协议的基础操作。在Spring Boot中会根据配置来注入其具体实现。
 * 在该生产者,我们会产生一个字符串,并发送到名为hello的队列中
 */
@RestController
@RequestMapping("mq")
@Slf4j
public class SendMq {
    @Autowired
    AmqpTemplate rabbitTemplate;

    /**
     * 简单模式:一个生产者,一个消费者
     */
    @RequestMapping("/hello")
    public void send() {
        String context = "hello " + new Date();
        log.info("发送消息 : " + context);
        rabbitTemplate.convertAndSend("hello", context);
    }


    /**
     * work模式:一个生产者,多个消费者,每个消费者获取到的消息唯一
     */
    @RequestMapping("/workSend")
    public void workSend() {
        User user = null;
        for (int i = 0; i < 5; i++) {
            user = new User();
            user.setName("wxf");
            user.setAge(i);
            log.info("发送消息 : " + user);
            String s = user.toString();
            rabbitTemplate.convertAndSend("hello", s);
        }
    }

    /**
     * Fanout 广播 模式 :对应Fanout交换机的队列都会收到消息
     */
    @RequestMapping("/fanoutSend")
    public void fanoutSend() {
        User user = null;
        for (int i = 0; i < 5; i++) {
            user = new User();
            user.setName("wxf");
            user.setAge(i);
            log.info("发送消息 : " + user);
            String s = user.toString();
            rabbitTemplate.convertAndSend(RabbitMqConstants.FANOUT_EXCHANGE,null,s);
        }
    }

    /**
     *  topic 模式 : 主题模式 匹配模式的路由
     */
    @RequestMapping("/topicSend")
    public void topicSend() {
        User user = null;
        String routingKeyA = "topic.wxf.w";
        for (int i = 0; i < 5; i++) {
            user = new User();
            user.setName("wxf");
            user.setAge(i);
            log.info("发送消息 : " + user);
            String s = user.toString();
            rabbitTemplate.convertAndSend(RabbitMqConstants.TOPIC_EXCHANGE,routingKeyA,s);
        }
    }

    /**
     *  延迟队列 模式
     */
//    @RequestMapping("/delayeSend")
//    public void delayeSend() {
//        User user = new User();
//        user.setName("wxf");
//        user.setAge(21);
//        String msg = user.toString();
//        Long timeStamp = 1555064580000L;
//        Instant instant = Instant.ofEpochMilli(timeStamp);
//        rabbitTemplate.convertAndSend(RabbitMqConstants.DELAYED_EXCHANGE,
//                RabbitMqConstants.DELAYED_QUEUE,
//                msg,
//                new MessagePostProcessor() {
//                    @Override
//                    public Message postProcessMessage(Message message) throws AmqpException {
//                        /* 设置过期时间 */
//                        long diff = Math.abs(Duration.between(
//                                Instant.now(),instant
//                        ).toMillis());
//                        message.getMessageProperties().setHeader("x-delay", diff);
//                        return message;
//                    }
//                }
//            );
//    }


}

对应的消费者:

package com.wxf.mq.handler;

import com.wxf.mq.config.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author wxf
 * @date 2019/4/11
 * @description 消费mq 创建消息消费者Receiver。通过@RabbitListener注解定义该类对hello队列的监听,
 * 并用@RabbitHandler注解来指定对消息的处理方法。所以,该消费者实现了对hello队列的消费,消费操作为输出消息的字符串内容
 */
@Component
@Slf4j
public class ConsumMQ {
    /**
     * 简单模式和work模式的消费者1
     * queues 监听的队列名称
     * @param hello
     */
    @RabbitListener(queues = "hello")
    @RabbitHandler
    public void consum1(String hello) {
       log.info("消费者1消费消息:"+hello);
    }

    /**
     * 简单模式和work模式的消费者2
     * @param hello
     */
    @RabbitListener(queues = "hello")
    @RabbitHandler
    public void consum2(String hello) {
        log.info("消费者2消费消息:"+hello);
    }

    @RabbitListener(queues = RabbitMqConstants.FANOUT_QUEUE_1)
    @RabbitHandler
    public void consumFanout1A(String msg) {
        log.info("Fanout1A消费消息:"+msg);
    }

    @RabbitListener(queues = RabbitMqConstants.FANOUT_QUEUE_2)
    @RabbitHandler
    public void consumFanout2A(String msg) {
        log.info("Fanout2A消费消息:"+msg);
    }

    @RabbitListener(queues = RabbitMqConstants.FANOUT_QUEUE_1)
    @RabbitHandler
    public void consumFanout1B(String msg) {
        log.info("Fanout1B消费消息:"+msg);
    }

    @RabbitListener(queues = RabbitMqConstants.FANOUT_QUEUE_2)
    @RabbitHandler
    public void consumFanout2B(String msg) {
        log.info("Fanout2B消费消息:"+msg);
    }


    @RabbitListener(queues = RabbitMqConstants.TOPIC_QUEUE_1)
    @RabbitHandler
    public void consumTopic1(String msg) {
        log.info("topic1消费消息:"+msg);
    }

    @RabbitListener(queues = RabbitMqConstants.TOPIC_QUEUE_2)
    @RabbitHandler
    public void consumTopic2(String msg) {
        log.info("topic2消费消息:"+msg);
    }

//    @RabbitListener(queues = RabbitMqConstants.DELAYED_QUEUE)
//    @RabbitHandler
//    public void consumDelaye(String msg) {
//        log.info("延迟队列消费消息:"+msg);
//    }


}
原文地址:https://www.cnblogs.com/wxf-com/p/10710522.html