消息中间件-RabbitMq(可靠性方案&死信队列&监控)

消息中间件-RabbitMq(可靠性方案&死信队列&监控)

上一章节聊到,他有三个重要的部分,【生产者】、【blocker(rabbit节点】、【消费者】 ,换言之,我们保障了发送可靠性、存储可靠性、消费可靠性,也就保证了消息可靠性。下面会出一个消息可靠性的方案,有时候我们需要对一个超时订单做处理,我们可以使用rabbit的死信队列做这个,下面我们也会聊聊死信队列,已经对rabbit进行监控。

可靠性思考&方案

一般消息可靠性分为三个级别:

【最多一次(rabbit支持):消息可能会丢失,但是不会重复传输。那需要->

  •   消费者就需要开启事务机制或者confirm机制(rabbit提供的,当消息发送到消费者的时候,生产端有会收到异步回调,从而知道正确投递),以此保障可以传递到mq中
  •        消费者需要备份交换机,确定消息可以从交换机路由到队列中,这样就可以让消息不丢失
  •   或者通过mandatory属性(rabbit提供,当消息无法找到相关的队列,那就mq就返回没有投递成功的消息给生产者)

【最少一次rabbit支持;消息绝对不会丢失,但是可能重复传输 那需要->

  •   无需考虑最多一次所需要考虑的东西,消费者发一次,但是这样很难保证成功。

【恰好一次rabbit不支持:每条消息会传输一次且只传输一次

下面结合给出一个解决方案(这个性能不太好,因为要走下面的流程,而且还要对数据库进行操作):

  • 【发送消息的时候】:把发送的消息保存在数据库中
  • 【当消息发送到rabbit上】:在rabbit上有一个监听器
  • 【当把消息发送到生产者这里的时候】:消费者给一个回复(ack)给mq(mq通知生产者,这样就知道消息投递成功)
  • 【更新消息表中的消息状态 】
    •   这个时候我们要有一个定时任务去检查数据库中的消息状态(查看是否是发送成功状态(发送成功会在消费端对数据库的数据状态进行修改))
      • 消息状态为【未知:把消息拿过来进行重发,并且记录消息重发的次数(因为不能一直重发,这样服务器压力大),同时修改状态为定时任务干预,你可以设置一个阈值,到达一定次数后,改变状态为人工干预。
      •  消息状态为【人工干预:这个时候就要人工进行处理异常状态的信息

springBoot中confirm的实现->【生产端

这里是监听功能

@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

    /*** @param correlationData
     *  相关配置信息
     *  @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
     *  @param cause 失败原因 */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack){
            System.out.println("成功发送");
        }else {
            System.out.println("失败"+cause);
            //这里进行重发,或者自己的业务处理
        }
    }
}

交换机和队列进行绑定

/*** 队列与交换机绑定 */
@Configuration
public class QueueConfig {
    @Bean(name = "confirmTestQueue")
    public Queue confirmTestQueue() {
        return new Queue("confirm_test_queue", true, false, false);
    }


    @Bean(name = "confirmTestExchange")
    public FanoutExchange confirmTestExchange() {
        return new FanoutExchange("confirmTestExchange");
    }

    @Bean
    public Binding confirmTestFanoutExchangeAndQueue(@Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange, @Qualifier("confirmTestQueue") Queue confirmTestQueue) {
        return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
    }
}
View Code

生产者发送消息

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootForkJoinApplication.class)
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate; //注入rabbitmq对象
    @Autowired
    private ConfirmCallbackService confirmCallbackService; //注入 ConfirmCallback对象

    @Test
    public void test() {
        rabbitTemplate.setConfirmCallback(confirmCallbackService);
        //发送消息
        rabbitTemplate.convertAndSend("dlq_exchange1", "", "hello,ConfirmCallback你好");
    }
}
View Code

自动回调生产者

 springBoot配置文件

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启confirm确认模式

消费端(进行手动签收)

@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage {
    @RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {System.out.println("消息内容===>" + new String(message.getBody()));
            //TODO 具体业务逻辑 //手动签收[参数1:消息投递序号,参数2:批量签收]
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            //拒绝签收[参数1:消息投递序号,参数2:批量拒绝,参数3:是否重新加入队列]
            channel.basicNack(deliveryTag, true, true); }
    }

}

死信交换机(DLX【dead-letter-exchange】)

就是,当消息成为死消息后,可以被重新发送到另一个队列,这个队列就是死信队列(他和别的队列没有区别),绑定这个队列的交换机就是DLX,当存在有一个死信消息的时候,rebbit会将这个消息发送到死信交换机上,从而路由到死信队列上

消息变成死消息一般有这几种情况:

  • 消息被拒绝(basic.reject / basic.nack),并且requeue = false
  • 消息TTL过期
  • 队列达到最大长度

TTL(Time-To-Live):表示的是你发送消息的有效期,一般有两种设置方式

  • 声明队列的时候,在队列的属性中设置
  • 发送消息时给消息设置属性
    •  
  • 死信交换机绑定(对于死信队列,他是一个正常队列,我们按照之前的章节中设置的方法设置就行)

模拟订单支付流程【使用死信队列】

 一个正常的支付消息发送到交换机->交换机路由到,支付消息应有的队列中->这个时候用户超时还没有进行消费(TTL)->支付消息变成了死信消息->死信消息进入了死信交换机->死信交换机把死信消息路由到死信队列中->对死信队列进行监听->发信息通知用户,或者修改订单状态 

进行监控

在真正的环境中,我们需要实时了解到我们mq的状态和情况,从而保障他可以完美的提供服务,以下给出3中常见方案:

使用management】:这个一般在小项目中进行使用,【简单,但是麻烦,因为如果是集群可能要开多个窗口】

使用他给的api:】 他自己能把数据展示出来到management上,肯定调用了http接口,那我们也可以这样干【需要开发,并且要看相关文档】,常用的api如下:

  • 概括信息:http://localhost:15672/api/overview
  • channel 列表:http://localhost:15672/api/channels节点信息:http://localhost:15672/api/nodes
  • 交换机信息:http://localhost:15672/api/exchanges
  • 队列信息:http://localhost:15672/api/queues
  • vhost 列表:http://localhost:15672/api/vhosts

prometheus + grafana 监控rabbitmq

zabbix不同,zabbix是主动接受消息,而普罗米修斯(prometheus )是定时对咱们的消息进行拉取的,他制定了一些规范【exporter】,各个想让他监控的中间件都要实现自己的exporter,并且他要求传递给他的都是json的数据,

我们把rabbit提供的普罗米修斯的插件启动:rabbitmq-plugins enable rabbitmq_prometheus

同时开启端口:firewall-cmd --zone=public --add-port=15692/tcp --permanent

访问普罗米修斯可以认识的数据:http://你的ip:15692/metrics

剩下的就交给普罗米修斯了

grafana 中搜索相关的模板进行导入(https://grafana.com/grafana/dashboards?search=rabbitmq)

 最终效果(中间的过程太繁琐了,咱们只是聊聊这个东西,具体可以参考别的博文。)

 这里是官方对Prometheus集成获取指标的文档 https://www.rabbitmq.com/prometheus.html

常见问题

【延迟队列】

实际上还是可以用上面的死信队列的流程进行实现,设置队列过期时间给你想要的延迟时间,当延迟时间过后,进入死信队列,这个时候执行真正的逻辑

【消息积压】

前面说到当内存到一定程度(0.4)的时候,rabbit不会接受消息,我们说过可以通过配置文件对这个数据临时增大,这个时候rabbit运行正常,我们就可以增加消费者对数据进行消费,这样就可以解决积压问题。

【不重复消费】:

每个消费者给一个唯一id,使用id进行判断

 

原文地址:https://www.cnblogs.com/UpGx/p/15006057.html