RabbitMQ实战

2.1 创建表结构

CREATE TABLE `infrastructure_mq_producer` (
  `message_id` bigint(20) NOT NULL COMMENT '消息标识',
  `content_id` bigint(20) NOT NULL COMMENT '消息内容id,关联 infrastructure_message表',
  `type` varchar(4) NOT NULL DEFAULT '' COMMENT '消息类型',
  `status` int(2) NOT NULL DEFAULT '0' COMMENT '发送状态: 0:下发中;1:下发失败;2下发成功',
  `try_count` int(3) NOT NULL DEFAULT '0' COMMENT '重试次数',
  `next_retry_time` datetime DEFAULT NULL COMMENT '下一次执行时间',
  `error_message_id` bigint(20) DEFAULT NULL COMMENT '错误消息内容id,关联 infrastructure_message表',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` datetime DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`message_id`),
  UNIQUE KEY `unq_message_id` (`message_id`) USING BTREE,
  KEY `idx_query` (`status`,`create_time`,`type`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='MQ生产者记录';

CREATE TABLE `infrastructure_mq_consumer` (
  `message_id` bigint(20) NOT NULL  COMMENT '消息标识',
  `content_id` bigint(20) NOT NULL COMMENT '消息内容id,关联 infrastructure_message表',
  `type` varchar(4) NOT NULL DEFAULT '' COMMENT '消息类型', 
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` datetime DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`message_id`),
  UNIQUE KEY `unq_message_id` (`message_id`) USING BTREE,  
   KEY `idx_query` (`create_time`,`type`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='MQ消费者记录';

CREATE TABLE `infrastructure_message` (
  `id` bigint(20) NOT NULL COMMENT '消息标识',
  `key1` nvarchar(20) DEFAULT NULL COMMENT '相关主键',
  `content` text COMMENT '消息内容',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `unq_id` (`id`) USING BTREE,
  KEY `idx_key` (`key`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='报文数据';

2.2 引入依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <scope>runtime</scope>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.mybatis.spring.boot</groupId>
        <artifactId>mybatis-spring-boot-starter</artifactId>
        <version>2.0.1</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.3.1.RELEASE</version>
    </dependency>
</dependencies>

2.3 相关配置

# 数据库配置
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test?allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=sa000

#  mybatis 相关配置
mybatis.mapper-locations=classpath:mapper/*/*Mapper.xml

# rabbitmq 配置
## 主机地址
spring.rabbitmq.host=111.231.83.100
## 端口号
spring.rabbitmq.port=5672
## 虚拟主机路径
spring.rabbitmq.virtual-host=/
## 连接超时时间
spring.rabbitmq.connection-timeout=15000
## 消费者设置手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
## 消费者每次消费数量
spring.rabbitmq.listener.simple.concurrency=1
## 最大消费者数量
spring.rabbitmq.listener.simple.max-concurrency=5
## 开启 confirm 确认机制
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirm-type=correlated
## 开启 Return 确认机制
spring.rabbitmq.template.mandatory=true

2.4 实现MQ消息数据库持久化

@Repository
public class MQRepository {

    private final MqProducerMapper producerMapper;
    private final MqConsumerMapper consumerMapper;
    private final MessageMapper messageMapper;
    private final MQFactory mqFactory;

    public MQRepository(MqProducerMapper producerMapper, MqConsumerMapper consumerMapper, MessageMapper messageMapper, MQFactory mqFactory) {
        this.producerMapper = producerMapper;
        this.consumerMapper = consumerMapper;
        this.messageMapper = messageMapper;
        this.mqFactory = mqFactory;
    }


    public void saveMessage(Long messageId, String content, String type) {
        Date currentTime = new Date();
        Message message = mqFactory.createMessage(content, currentTime);
        messageMapper.insertSelective(message);

        final MqProducer producer = mqFactory.createProducer(messageId, message.getId(), type, currentTime);
        producerMapper.insertSelective(producer);
    }

    public void sendSuccess(Long messageId) {
        MqProducer producer = new MqProducer();
        producer.setMessageId(messageId);
        producer.setStatus(2);
        producer.setUpdateTime(new Date());

        MqProducerExample updateExample = new MqProducerExample();
        updateExample.createCriteria()
                .andMessageIdEqualTo(messageId)
                .andStatusEqualTo(0);
        producerMapper.updateByExampleSelective(producer, updateExample);
    }

    public void sendFailure(Long messageId, String errorInfo) {
        Message errorMessage = mqFactory.createMessage(errorInfo, new Date());
        messageMapper.insertSelective(errorMessage);

        MqProducer producer = new MqProducer();
        producer.setMessageId(messageId);
        producer.setStatus(1);
        producer.setUpdateTime(new Date());
        producer.setErrorMessageId(errorMessage.getId());
        producerMapper.updateByPrimaryKeySelective(producer);
    }


    public void consumeSuccess(Long messageId, String content){
        Date currentTime = new Date();
        Message message = mqFactory.createMessage(content, currentTime);
        messageMapper.insertSelective(message);

        final MqConsumer consumer = mqFactory.createConsumer (messageId, message.getId(), currentTime);
        consumerMapper.insertSelective(consumer);
    }
}

@Component
public class MQFactory {


    public MqProducer createProducer(Long messageId, Long contentId, String type, Date currentTime) {
        MqProducer producer = new MqProducer();
        producer.setMessageId(messageId);
        producer.setContentId(contentId);
        producer.setType(type);
        producer.setStatus(0);
        producer.setTryCount(0);
        producer.setNextRetryTime(currentTime);
        producer.setCreateTime(currentTime);
        producer.setUpdateTime(currentTime);
        return producer;
    }

    public Message createMessage(String content, Date currentTime) {
        Message message = new Message();
        message.setId(new Date().getTime());
        message.setCreateTime(currentTime);
        message.setContent(content);
        return message;
    }

    public MqConsumer createConsumer(Long messageId, Long contentId, Date currentTime) {
        MqConsumer consumer = new MqConsumer();
        consumer.setMessageId(messageId);
        consumer.setContentId(contentId);
        consumer.setType("");
        consumer.setCreateTime(currentTime);
        consumer.setUpdateTime(currentTime);
        return consumer;
    }
}

2.5 新建回调事件实现接口

@Component
public class ProducerSendConfirmCallback implements RabbitTemplate.ConfirmCallback {

    private final MQRepository mqRepository;

    public ProducerSendConfirmCallback(MQRepository mqRepository) {
        this.mqRepository = mqRepository;
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        Long messageId = Long.valueOf(correlationData.getId());
        if (ack) {
            mqRepository.sendSuccess(messageId);
        } else {
            mqRepository.sendFailure(messageId, cause);
        }
    }
}

@Component
public class ProducerSendReturnedCallback implements RabbitTemplate.ReturnCallback {

    private final MQRepository mqRepository;

    public ProducerSendReturnedCallback(MQRepository mqRepository) {
        this.mqRepository = mqRepository;
    }


    /**
     * @param message    消息对象
     * @param replyCode  错误码
     * @param replyText  错误文本
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        Long messageId = Long.valueOf(message.getMessageProperties()
                .getHeader("spring_returned_message_correlation"));
        mqRepository.sendFailure(messageId, replyText);
    }
}

2.6 新建 RabbitMQConfig 配置

@Configuration
public class RabbitMQConfig {

    private final RabbitTemplate rabbitTemplate;
    private final ProducerSendReturnedCallback producerSendReturnedCallback;
    private final ProducerSendConfirmCallback producerSendConfirmCallback;

    public RabbitMQConfig(RabbitTemplate rabbitTemplate, ProducerSendReturnedCallback producerSendReturnedCallback,
                          ProducerSendConfirmCallback producerSendConfirmCallback) {
        this.rabbitTemplate = rabbitTemplate;
        this.producerSendReturnedCallback = producerSendReturnedCallback;
        this.producerSendConfirmCallback = producerSendConfirmCallback;
    }

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(producerSendConfirmCallback);
        rabbitTemplate.setReturnCallback(producerSendReturnedCallback);
    }


    @Bean
    public TopicExchange orderExchange() {
        return new TopicExchange("orderExchange");
    }

    @Bean
    public Queue orderQueue() {
        return new Queue("orderQueue");
    }

    @Bean
    public Binding orderBinding(TopicExchange orderExchange, Queue orderQueue) {
        return BindingBuilder
                // 创建队列
                .bind(orderQueue)
                // 创建交换机
                .to(orderExchange)
                // 指定路由 Key
                .with("order.#");
    }
}

2.7 创建生产者

@RestController
public class ProducerController {


    private final RabbitTemplate rabbitTemplate;
    private final MQRepository mqRepository;

    public TestController(RabbitTemplate rabbitTemplate, MQRepository mqRepository) {
        this.rabbitTemplate = rabbitTemplate;
        this.mqRepository = mqRepository;
    }


    @GetMapping("/sendNotExistExchangeAndNotExistRoutingKeyMessage")
    public void sendNotExistExchangeAndNotExistRoutingKeyMessage() {
        Long messageId = new Date().getTime();
        String type = "test";
        String content = "123";
        CorrelationData correlationData = new CorrelationData(String.valueOf(messageId));
        mqRepository.saveMessage(messageId, content, type);
        rabbitTemplate.convertAndSend("NotExistExchange", "NotExistRoutingKey", content, correlationData);
    }

    @GetMapping("/sendNotExistRoutingKeyMessage")
    public void sendNotExistRoutingKeyMessage() {
        Long messageId = new Date().getTime();
        String type = "test";
        String content = "123";
        CorrelationData correlationData = new CorrelationData(String.valueOf(messageId));
        mqRepository.saveMessage(messageId, content, type);
        rabbitTemplate.convertAndSend("orderExchange", "NotExistRoutingKey", "123", correlationData);

    }

    @GetMapping("/sendMessage")
    public void sendMessage() {
        Long messageId = new Date().getTime();
        String type = "test";
        CorrelationData correlationData = new CorrelationData(String.valueOf(messageId));
        Order order = new Order("10001", BigDecimal.valueOf(150));
        mqRepository.saveMessage(messageId, order.toString(), type);
        rabbitTemplate.convertAndSend("orderExchange", "order.create", order, correlationData);

    }

}

2.8 创建消费者

@Component
public class OrderConsumer {

    private final MQRepository mqRepository;

    public OrderConsumer(MQRepository mqRepository) {
        this.mqRepository = mqRepository;
    }

    @RabbitListener(queues = "orderQueue")
    public void onMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        Long messageId = Long.valueOf((String)headers.get("spring_returned_message_correlation"));
        mqRepository.consumeSuccess(messageId, order.toString());
        System.err.println("消费端消费:" + order.toString());
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag, false);
    }

}
原文地址:https://www.cnblogs.com/markLogZhu/p/13305464.html