RabbitMQ整合springboot

一、springboot版本和依赖

 

  • springboot 版本 2.1.5
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
  • dependencies
<dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

    </dependencies>

二、生产端

1、yml  文件配置  

server:
  port: 8001
  servlet:
    context-path: /
spring:
  rabbitmq:
    addresses: ip:5672,ip:5672,ip:5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 15000

    publisher-confirms: true
    publisher-returns: true
    template:
      mandatory: true


  application:
    name: rabbit-producer
  http:
    encoding:
      charset: UTF-8
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: NON_NULL

rabbitmq-exchange: exchange-1
rabbitmq-routingKey: springboot.abc
  1. address   server的ip地址加上端口号
  2. username  passowrd 账号密码
  3. virtual-host    最上层的一个域  ,例如  /order    /logistics
  4. 连接超时时间
  • 生产端的相关配置
  1. publisher-confirms  是否开启消息确认模式,举例:生产者发送消息到 broker(mq) ,我不确定我的消息是否100%已经投递到mq 中,我们会进行线程监听,mq 会返回一个成功或者是失败的情况
  2. publisher-returns  是否开启发布者退货模式,举例:生产者发送routingkey:  spring.xxx  ,queue routingkey:为 springboot.xxx 。那么不匹配路由规则。publisher-returns 设置为false 的话,这条消息就丢掉了,消失了。设置为true的话,会将消息 执行到我们指定的一对   exchange  和 queue 上。 需要和 mandatory 一起使用
  3. mandatory     是否开启强制性消息

2、编写发送消息方法

package com.example.producer.component;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.UUID;

/**
 * @Author: qiuj
 * @Description:
 * @Date: 2020-05-31 11:57
 */
@Component
public class Sender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${rabbitmq-exchange}")
    private String exchange;
    @Value("${rabbitmq-routingKey}")
    private String routingKey;

    /**
     * 这里就是确认消息的回调监听接口,用于确认消息是否被broker 所收到
     */
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("唯一id:" +  correlationData);
            System.out.println("消息是否成功投递" + ack );
            System.out.println("如果失败则会返回错误消息" + cause);
        }
    };


    public void sends (Object msg, Map<String,Object> properties) {
        //  第一步将消息包装成boot 支持的方式
        MessageHeaders messageHeaders = new MessageHeaders(properties);
        Message<?> message = MessageBuilder.createMessage(msg,messageHeaders);
        //  指定唯一id
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //  回调方法
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {
                System.out.println("在发送消息之前的前置方法" + message);
                return message;
            }
        };
        //  前置方法
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.convertAndSend(exchange,
                routingKey,
                message,
                messagePostProcessor,
                correlationData);
    }
}

三、消费端

1、yml  文件配置  

server:
  port: 8002
spring:
  rabbitmq:
    addresses: ip:5672,ip:5672,ip:5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 15000
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 5
        max-concurrency: 10
        prefetch: 2

  application:
    name: rabbit-consumer
  http:
    encoding:
      charset: UTF-8
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: NON_NULL


queue-name: queue-1
queue-durable: true
exchange-name: exchange-1
exchange-topic: topic
exchange-durable: true
exchange-ignoreDeclarationExceptions: true
routingkey: springboot.*
  • 消费端的相关配置
  1. acknowledge-mode 默认auto ,也就是自动签收消息,生产环境不建议,我们设置为 manual  手工的进行签收
  2. concurrency   max-concurrency  监听器调用线程的最小数量  和最大数量
  3. prefetch     在单个请求中处理的消息个数,开启限流,指定每次处理消息最多只能处理2条消息

2、编写接受消息方法

package com.example.consumer.component;


import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @Author: qiuj
 * @Description:
 * @Date: 2020-05-30 19:29
 */
@Component
public class RabbitReceive {


    /**
     * 组合使用监听
     *  @RabbitListener @QueueBinding   @Queue  @Exchange
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "${queue-name}",durable = "${queue-durable}"),
            exchange = @Exchange(
                    value = "${exchange-name}",
                    type = "${exchange-topic}",
                    durable = "${exchange-durable}",
                    ignoreDeclarationExceptions = "${exchange-ignoreDeclarationExceptions}"
            ),
            key = "${routingkey}"
    ))
    @RabbitHandler
    public void onMessage (Message message, Channel channel) throws IOException {
        //  1:收到消息以后进行业务端处理
        System.out.println("消费消息:" + message.getPayload());
        //  2:处理成功之后    获取deliveryTag   并进行手工的ACK操作,因为我们配置文件里配置的是   手工签收
        //  spring.rabbitmq.listener.simple.acknowiedge-mode=manual
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag,false);

    }


}

四、 源码

1、生产端源码

2、消费端源码

原文地址:https://www.cnblogs.com/blogspring/p/14191745.html