Spring Boot集成RabbitMQ实践

一、创建生产者服务

1、创建生产者服务 rabbit-producer

     spring boot版本为 2.1.16.RELEASE

2、pom.xml 

引入spring-boot-starter-amqp

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

  

3、配置application.properties

server.servlet.context-path=/
server.port=8002

# 集群用逗号分割
spring.rabbitmq.addresses=xx.xx.xx.xx:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

# 使用启动消息确认模式
spring.rabbitmq.publisher-confirms=true

#设置return消息模式,注意要与mandatory一起去配合使用
#spring.rabbitmq.publisher-returns=true
#spring.rabbitmq.template.mandatory=true


spring.http.encoding.charset=UTF-8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=non_null

  

4、发送

@Component
public class RabbitSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 这里就是确认消息的回调监听接口,用于确认消息是否被broker所收到
     */
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {

        /**
         * @param correlationData 作为一个唯一的标识
         * @param ack broker是否落盘成功
         * @param cause 失败的异常信息
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("消息ACK结果:" + ack + ",correlationData: " +correlationData.getId())  ;
        }
    };


    /**
     * 对外发送消息的方法
     * @param message 具体的消息内容
     * @param properties 额外的附加属性
     * @throws Exception
     */
    public void send(Object message, Map<String,Object> properties) throws Exception{
        MessageHeaders mhs = new MessageHeaders(properties);
        Message<?>  msg = MessageBuilder.createMessage(message, mhs);
        rabbitTemplate.setConfirmCallback(confirmCallback);

        //指定业务唯一的ID
        String uuid = UUID.randomUUID().toString();
        System.out.println("生成业务唯一Id=" + uuid);
        CorrelationData correlationData = new CorrelationData(uuid);

        MessagePostProcessor mpp = new MessagePostProcessor() {
            @Override
            public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {
                System.out.println("postProcessMessage message: " +message);
                return message;
            }
        };
        rabbitTemplate.convertAndSend("myexchange1", "myroutingkey.1", msg, mpp, correlationData);

    }
}

  

5、测试发送消息

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitProducerApplicationTests {

    @Autowired
    private  RabbitSender rabbitSender;

    @Test
    public void testSender() throws  Exception {
        Map<String,Object> properties = new HashMap<>();
        properties.put("name","zhangsan");
        properties.put("age","18");
        rabbitSender.send("hello rabbit", properties);

        Thread.sleep(10000);
    }



}

  

二、RabbitMQ消费者服务 

1、创建RabbitMQ消费者服务 rabbit-consumer

     spring boot版本为 2.1.16.RELEASE

2、pom.xml 

引入spring-boot-starter-amqp

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

  

3、配置application.properties

server.servlet.context-path=/
server.port=8002

# 集群用逗号分割
spring.rabbitmq.addresses=xx.xx.xx.xx:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

# 表示消费者消费成功消息以后需要手工的信息签收(ack),默认为auto
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.listener.simple.prefetch=1

# RabbitListener 相关配置
spring.rabbitmq.listener.order.exchange.name=myexchange1
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.key=myroutingkey.*

spring.http.encoding.charset=UTF-8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=non_null

  

4、创建接收者类

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;


/**
 * @description: 消息接收者
 * @author:
 * @create: 2020-08-01 09:35
 */
@Component
public class RabbitReceiver {

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "myqueue1", durable = "true"),
                    exchange = @Exchange(name = "${spring.rabbitmq.listener.order.exchange.name}",
                    durable= "${spring.rabbitmq.listener.order.exchange.durable}",
                    type = "${spring.rabbitmq.listener.order.exchange.type}",
                    ignoreDeclarationExceptions = "true"),
                    key = "${spring.rabbitmq.listener.order.exchange.key}"
                )
         )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception{
        // 步骤1、收到消息后进行业务端消费处理
        System.out.println("消费消息" + message.getPayload());

        //步骤2、处理成功后,获取deliveryTag,并进行手工ACK操作,因为配置文件配置的是手工签收模式
        // spring.rabbitmq.listener.simple.acknowledge-mode=manual
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag, false);
    }
}

RabbitListener相关属性配置在属性文件中。

消费者采用手工配置 channel.basicAck

  

原文地址:https://www.cnblogs.com/linlf03/p/13413889.html