springboot集成rabbitmq

RabbitMQ简介

RabbitMQ使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现(AMQP的主要特征是面向消息、队列、路由、可靠性、安全)。支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现很出色。

相关概念

消息队列通常有三个概念:发送消息(生产者)、队列、接收消息(消费者)。RabbitMQ在这个基本概念之上,多做了一层抽象,在发送消息和队列之间,加入了交换机。这样发送消息和队列就没有直接关系,而是通过交换机来做转发,交换机会根据分发策略把消息转给队列。

RabbitMQ比较重要的几个概念:

虚拟主机:RabbitMQ支持权限控制,但是最小控制粒度为虚拟主机。一个虚拟主机可以包含多个交换机、队列、绑定。

交换机:RabbitMQ分发器,根据不同的策略将消息分发到相关的队列。

队列:缓存消息的容器。

绑定:设置交换机与队列的关系。

交换机(Exchange)

交换机的主要作用是接收相应的消息并且绑定到指定的队列.交换机有四种类型,分别为Direct,topic,headers,Fanout.

  Direct是RabbitMQ默认的交换机模式,也是最简单的模式.即创建消息队列的时候,指定一个BindingKey.当发送者发送消息的时候,指定对应的Key.当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中.

  topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.

  headers也是根据一个规则进行匹配,在消息队列和交换机绑定的时候会指定一组键值对规则,而发送消息的时候也会指定一组键值对规则,当两组键值对规则相匹配的时候,消息会被发送到匹配的消息队列中.

  Fanout是路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略.

sprinboot集成RabbitMQ

添加maven依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!--rabbitmq依赖-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

添加rabbitmq服务配置(application.yml)

#RabbitMQ的配置
spring:
  rabbitmq:
    host: ****
    port: 5672
    username: admin
    password: tonyzhang

通用的静态变量

package com.theeternity.rabbitmq.constant;

/**
 * @program: ApiBoot
 * @description: rabbitMQ常量
 * @author: TheEternity Zhang
 * @create: 2019-02-26 14:36
 */
public class RabbitConstant {

    //bean name
    public static final String DIRECT_QUEUE_BEAN = "direct_queue_bean";
    public static final String TOPIC_QUEUE_MESSAGE_BEAN = "topic_queue_message_bean";
    public static final String TOPIC_QUEUE_MESSAGES_BEAN = "topic_queue_messages_bean";
    public static final String FANOUT_FIRST_MESSAGE_BEAN = "fanout_first_message_bean";
    public static final String FANOUT_SECOND_MESSAGE_BEAN = "fanout_second_message_bean";
    public static final String FANOUT_THIRD_MESSAGE_BEAN = "fanout_third_message_bean";


    //queue
    public static final String DIRECT_QUEUE = "direct_queue";
    public static final String TOPIC_QUEUE_MESSAGE = "topic.message";
    public static final String TOPIC_QUEUE_MESSAGES = "topic.messages";
    public static final String FANOUT_FIRST_MESSAGE = "fanout.first";
    public static final String FANOUT_SECOND_MESSAGE = "fanout.second";
    public static final String FANOUT_THIRD_MESSAGE = "fanout.third";


    //exchange
    public static final String DIRECT_EXCHANGE = "direct_exchange";
    public static final String TOPIC_EXCHANGE = "topic_exchange";
    public static final String FANOUT_EXCHANGE = "fanout_exchange";


    //routing-key
    public static final String DIRECT_KEY = "direct_key";
    public static final String TOPIC_QUEUE_MESSAGE_KEY = "topic.message.key";
    public static final String TOPIC_QUEUE_MESSAGES_KEY = "topic.messages.key";
    public static final String TOPIC_KEY = "topic.#";

}

集成Direct交换机

将用到的消息队列及交换机注入到容器,并且进行绑定
package com.theeternity.rabbitmq.config;

import com.theeternity.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * @program: ApiBoot
 * @description: rabbit配置类
 * @author: TheEternity Zhang
 * @create: 2019-02-21 16:34
 */
@Configuration
public class RabbitConfig {

    /**
     * @Description: direct模式
     * @Param: []
     * @return: org.springframework.amqp.core.Queue
     * @Author: TheEternity Zhang
     * @Date: 2019-02-26 14:50
     */
    @Bean(name = RabbitConstant.DIRECT_QUEUE_BEAN)
    public Queue Queue() {
        return new Queue(RabbitConstant.DIRECT_QUEUE);
    }

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(RabbitConstant.DIRECT_EXCHANGE, true, false);
    }

    @Bean
    Binding bindingDirectExchangeMessage(@Qualifier(RabbitConstant.DIRECT_QUEUE_BEAN) Queue queue,
                                         DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitConstant.DIRECT_KEY);
    }
}
编辑发送逻辑
package com.theeternity.rabbitmq.sender;

import com.theeternity.rabbitmq.constant.RabbitConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @program: ApiBoot
 * @description:
 * @author: TheEternity Zhang
 * @create: 2019-02-21 16:37
 */
@Component
@Slf4j
public class DirectSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public <T>void  send(T t) {
        log.info("接收到的信息",t.toString());
        this.rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_EXCHANGE,RabbitConstant.DIRECT_KEY, t.toString());
    }
}

编辑接收逻辑
package com.theeternity.rabbitmq.receiver;

import com.theeternity.rabbitmq.constant.RabbitConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @program: ApiBoot
 * @description:
 * @author: TheEternity Zhang
 * @create: 2019-02-21 16:38
 */
@Component
@Slf4j
public class DirectReceiver {

    @RabbitListener(queues = RabbitConstant.DIRECT_QUEUE)
    @RabbitHandler
    public void process(String hello) {
        log.info("Receiver  : " + hello);
    }
}

集成Topic交换机

将用到的消息队列及交换机注入到容器,并且进行绑定
package com.theeternity.rabbitmq.config;

import com.theeternity.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * @program: ApiBoot
 * @description: rabbit配置类
 * @author: TheEternity Zhang
 * @create: 2019-02-21 16:34
 */
@Configuration
public class RabbitConfig {

    /**
     * @Description: topic模式
     * @Param: []
     * @return: org.springframework.amqp.core.Queue
     * @Author: TheEternity Zhang
     * @Date: 2019-02-26 14:50
     */
    @Bean
    public Queue queueMessage() {
        return new Queue(RabbitConstant.TOPIC_QUEUE_MESSAGE);
    }

    @Bean
    public Queue queueMessages() {
        return new Queue(RabbitConstant.TOPIC_QUEUE_MESSAGES);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(RabbitConstant.TOPIC_EXCHANGE);
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with(RabbitConstant.TOPIC_QUEUE_MESSAGE_KEY);
    }

    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        //*表示一个词,#表示零个或多个词
        return BindingBuilder.bind(queueMessages).to(exchange).with(RabbitConstant.TOPIC_KEY);
    }
}
发送逻辑
package com.theeternity.rabbitmq.sender;

import com.theeternity.rabbitmq.constant.RabbitConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @program: ApiBoot
 * @description: topic模式消息发送
 * @author: TheEternity Zhang
 * @create: 2019-02-25 13:46
 */
@Component
@Slf4j
public class TopicSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        log.info("topic-message发送");
        rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE,RabbitConstant.TOPIC_QUEUE_MESSAGE_KEY,"hello,rabbit");

    }

    public void send1() {
        log.info("topic-messages发送");
        rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE,RabbitConstant.TOPIC_QUEUE_MESSAGES_KEY,"hello,rabbits");
    }
}

接收监听逻辑
package com.theeternity.rabbitmq.receiver;

import com.theeternity.rabbitmq.constant.RabbitConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @program: ApiBoot
 * @description: topic模式消息监听接收
 * @author: TheEternity Zhang
 * @create: 2019-02-25 13:47
 */
@Component
@Slf4j
public class TopicReceiver {
    //监听器监听指定的Queue
    @RabbitListener(queues= RabbitConstant.TOPIC_QUEUE_MESSAGE)
    @RabbitHandler
    public void process1(String str) {
        log.info("message:"+str);
    }

    //监听器监听指定的Queue
    @RabbitListener(queues=RabbitConstant.TOPIC_QUEUE_MESSAGES)
    @RabbitHandler
    public void process2(String str) {
        log.info("messages:"+str);
    }
}

++知识点++:
TopicExchange交换机支持使用通配符*、#

*号只能向后多匹配一层路径。

#号可以向后匹配多层路径。
备注:

topic交换机上面的方法在queue上面没有设置bean的别名,如果没有设置的情况下,在绑定的时候,传入得queue名字需要跟对应queue方法名相同,为了造成不必要的麻烦,建议直接在queue上设置别名然后再绑定的时候,直接使用@Qualifier按照名字进行注入,如下:

/**
     * @Description: topic模式
     * @Param: []
     * @return: org.springframework.amqp.core.Queue
     * @Author: TheEternity Zhang
     * @Date: 2019-02-26 14:50
     */
    @Bean(name = RabbitConstant.TOPIC_QUEUE_MESSAGE_BEAN)
    public Queue queueMessage() {
        return new Queue(RabbitConstant.TOPIC_QUEUE_MESSAGE);
    }

    @Bean(name = RabbitConstant.TOPIC_QUEUE_MESSAGES_BEAN)
    public Queue queueMessages() {
        return new Queue(RabbitConstant.TOPIC_QUEUE_MESSAGES);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(RabbitConstant.TOPIC_EXCHANGE);
    }

    @Bean
    Binding bindingExchangeMessage(@Qualifier(RabbitConstant.TOPIC_QUEUE_MESSAGE_BEAN) Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with(RabbitConstant.TOPIC_QUEUE_MESSAGE_KEY);
    }

    @Bean
    Binding bindingExchangeMessages(@Qualifier(RabbitConstant.TOPIC_QUEUE_MESSAGES_BEAN) Queue queueMessages, TopicExchange exchange) {
        //*表示一个词,#表示零个或多个词
        return BindingBuilder.bind(queueMessages).to(exchange).with(RabbitConstant.TOPIC_KEY);
    }

集成Fanout交换机

将用到的消息队列及交换机注入到容器,并且进行绑定
package com.theeternity.rabbitmq.config;

import com.theeternity.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * @program: ApiBoot
 * @description: rabbit配置类
 * @author: TheEternity Zhang
 * @create: 2019-02-21 16:34
 */
@Configuration
public class RabbitConfig {

    /**
     * @Description: fanout模式,无需routingkey
     * @Param: []
     * @return: org.springframework.amqp.core.Queue
     * @Author: TheEternity Zhang
     * @Date: 2019-02-26 14:50
     */
    @Bean(name = RabbitConstant.FANOUT_FIRST_MESSAGE_BEAN)
    public Queue firstMessage() {
        return new Queue(RabbitConstant.FANOUT_FIRST_MESSAGE);
    }


    @Bean(name = RabbitConstant.FANOUT_SECOND_MESSAGE_BEAN)
    public Queue secondMessage() {
        return new Queue(RabbitConstant.FANOUT_SECOND_MESSAGE);
    }

    @Bean(name = RabbitConstant.FANOUT_THIRD_MESSAGE_BEAN)
    public Queue thirdMessage() {
        return new Queue(RabbitConstant.FANOUT_THIRD_MESSAGE);
    }

    @Bean
    FanoutExchange fanoutExchange() {
        //配置广播路由器
        return new FanoutExchange(RabbitConstant.FANOUT_EXCHANGE);
    }

    @Bean
    Binding bindingExchangeA(@Qualifier(RabbitConstant.FANOUT_FIRST_MESSAGE_BEAN) Queue firstMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(firstMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(@Qualifier(RabbitConstant.FANOUT_SECOND_MESSAGE_BEAN) Queue secondMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(secondMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(@Qualifier(RabbitConstant.FANOUT_THIRD_MESSAGE_BEAN) Queue thirdMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(thirdMessage).to(fanoutExchange);
    }
}
发送逻辑
package com.theeternity.rabbitmq.sender;

import com.theeternity.rabbitmq.constant.RabbitConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @program: ApiBoot
 * @description: Fanout模式消息发送
 * @author: TheEternity Zhang
 * @create: 2019-02-25 13:47
 */
@Component
@Slf4j
public class FanoutSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        log.info("fanout-message发送");
        rabbitTemplate.convertAndSend(RabbitConstant.FANOUT_EXCHANGE,"","hello,rabbit");

    }

}

接收监听逻辑
package com.theeternity.rabbitmq.receiver;

import com.theeternity.rabbitmq.constant.RabbitConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @program: ApiBoot
 * @description: Fanout模式消息监听接收
 * @author: TheEternity Zhang
 * @create: 2019-02-25 13:48
 */
@Component
@Slf4j
public class FanoutReceiver {

    @RabbitListener(queues = RabbitConstant.FANOUT_FIRST_MESSAGE)
    public void processA(String str) {
        log.info("firstReceive:" + str);
    }

    @RabbitListener(queues = RabbitConstant.FANOUT_SECOND_MESSAGE)
    public void processB(String str) {
        log.info("secondReceive:" + str);
    }

    @RabbitListener(queues = RabbitConstant.FANOUT_THIRD_MESSAGE)
    public void processC(String str) {
        log.info("thirdReceive:" + str);
    }

}

备注:

实际上RabbitMQ还可以支持发送对象:当然由于涉及到序列化和反序列化,该对象要实现Serilizable接口,例如:

package com.theeternity.core.AutoGenerator.entity;

import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import java.time.LocalDateTime;
import java.io.Serializable;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;

/**
 * @author TheEternity Zhang
 * @since 2019-02-01
 */
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("we_user")
public class WeUserEntity implements Serializable {

    private static final long serialVersionUID = 1L;

    @TableId(value = "user_id", type = IdType.AUTO)
    private Integer userId;

    private String userName;

    private String password;

    private LocalDateTime createdTime;

    private LocalDateTime updateTime;
    
}

配置文件整体:

package com.theeternity.rabbitmq.config;

import com.theeternity.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * @program: ApiBoot
 * @description: rabbit配置类
 * @author: TheEternity Zhang
 * @create: 2019-02-21 16:34
 */
@Configuration
public class RabbitConfig {

    /**
     * @Description: direct模式
     * @Param: []
     * @return: org.springframework.amqp.core.Queue
     * @Author: TheEternity Zhang
     * @Date: 2019-02-26 14:50
     */
    @Bean(name = RabbitConstant.DIRECT_QUEUE_BEAN)
    public Queue Queue() {
        return new Queue(RabbitConstant.DIRECT_QUEUE);
    }

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(RabbitConstant.DIRECT_EXCHANGE, true, false);
    }

    @Bean
    Binding bindingDirectExchangeMessage(@Qualifier(RabbitConstant.DIRECT_QUEUE_BEAN) Queue queue,
                                         DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitConstant.DIRECT_KEY);
    }

    /**
     * @Description: topic模式
     * @Param: []
     * @return: org.springframework.amqp.core.Queue
     * @Author: TheEternity Zhang
     * @Date: 2019-02-26 14:50
     */
    @Bean(name = RabbitConstant.TOPIC_QUEUE_MESSAGE_BEAN)
    public Queue queueMessage() {
        return new Queue(RabbitConstant.TOPIC_QUEUE_MESSAGE);
    }

    @Bean(name = RabbitConstant.TOPIC_QUEUE_MESSAGES_BEAN)
    public Queue queueMessages() {
        return new Queue(RabbitConstant.TOPIC_QUEUE_MESSAGES);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(RabbitConstant.TOPIC_EXCHANGE);
    }

    @Bean
    Binding bindingExchangeMessage(@Qualifier(RabbitConstant.TOPIC_QUEUE_MESSAGE_BEAN) Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with(RabbitConstant.TOPIC_QUEUE_MESSAGE_KEY);
    }

    @Bean
    Binding bindingExchangeMessages(@Qualifier(RabbitConstant.TOPIC_QUEUE_MESSAGES_BEAN) Queue queueMessages, TopicExchange exchange) {
        //*表示一个词,#表示零个或多个词
        return BindingBuilder.bind(queueMessages).to(exchange).with(RabbitConstant.TOPIC_KEY);
    }

    /**
     * @Description: fanout模式,无需routingkey
     * @Param: []
     * @return: org.springframework.amqp.core.Queue
     * @Author: TheEternity Zhang
     * @Date: 2019-02-26 14:50
     */
    @Bean(name = RabbitConstant.FANOUT_FIRST_MESSAGE_BEAN)
    public Queue firstMessage() {
        return new Queue(RabbitConstant.FANOUT_FIRST_MESSAGE);
    }


    @Bean(name = RabbitConstant.FANOUT_SECOND_MESSAGE_BEAN)
    public Queue secondMessage() {
        return new Queue(RabbitConstant.FANOUT_SECOND_MESSAGE);
    }

    @Bean(name = RabbitConstant.FANOUT_THIRD_MESSAGE_BEAN)
    public Queue thirdMessage() {
        return new Queue(RabbitConstant.FANOUT_THIRD_MESSAGE);
    }

    @Bean
    FanoutExchange fanoutExchange() {
        //配置广播路由器
        return new FanoutExchange(RabbitConstant.FANOUT_EXCHANGE);
    }

    @Bean
    Binding bindingExchangeA(@Qualifier(RabbitConstant.FANOUT_FIRST_MESSAGE_BEAN) Queue firstMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(firstMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(@Qualifier(RabbitConstant.FANOUT_SECOND_MESSAGE_BEAN) Queue secondMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(secondMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(@Qualifier(RabbitConstant.FANOUT_THIRD_MESSAGE_BEAN) Queue thirdMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(thirdMessage).to(fanoutExchange);
    }
}
原文地址:https://www.cnblogs.com/eternityz/p/12243190.html