消息队列<一>

说到消息队列很多人无需多考虑,出口就是削峰、异步、解耦。但是这些名词是如何在具体的实际场景中使用的,可能就没有那么清楚。

下面就给大家介绍一下什么是消息队列,使用的场景是怎样,还有一些生产中常见的问题:1.如何保证消息不丢失?2.如何处理重复消息?3.如何保证消息的有序性?4.如何处理消息堆积?

一、什么是消息队列?

简单的说消息队列就是使用队列来完成通信的开发组件,百度百科:消息队列是在消息的传输过程中保存消息的容器。

二、为什么要使用消息队列?

互联网发展速度肉眼可见,尤其是近几年互联网的普及,5g时代的到来,再也不是以前5块钱30m的时代了,用户量递增,服务体系的增大,促使技术架构需要不断的演进 ,随之而来的服务拆分,有时整个一套服务有成百上千个微服务。服务与服务之间相互调用和依赖,这时我们需要消息队列的解耦,由于用户量大就需要消息队列来控制流量的接入,也就是所谓的削峰。一些电商项目中有些服务不需要及时响应的就可以采用异步的方式来进行处理,例如:积分服务、短信通知。下面拿具体场景来具体介绍一下削峰、异步、解耦。

1、削峰(流量控制):

可能有的人会把以下的流量控制混为一谈,上次和同时探讨时就出现过这样的说法,【限流的控制还有别的技术手段来进行控制,例如:Nginx、网关(zuul、gateway)】。 但是消息队列的流量控制指的是将消息存储在消息队列中等待服务慢慢消费。而不是像网关到达限制直接返回不做任何业务的处理。

相对来说后端服务是比较脆弱的,因为随着项目的不断演进,业务需求越来越多,处理时长也就会随之增加。像一些大促秒杀活动,爆发式的流量进入,可能就顶不住了。消息队列的出现就可以起到缓冲的作用,也就是所谓的削峰填谷。

某些后台任务,不需要及时地响应,并且业务处理复杂且流程长,那么过来的请求先放入消息队列中,后端服务按照自己的节奏处理。

2、异步:

举个例子:在电商项目中在没有积分服务和短信服务时  ,就是直接扣库存创建预付订单。

随着业务增多,服务拆分越多,请求的链路就会越长,响应的速度越来越慢,影响用户体验,而且会降低qps,相对于扣库存订单生成,短信通知和积分的添加没有必要那么及时,这里在下单结束后可以将处理数据放到消息队列中,直接返回响应给客户端,而短信服务和积分服务可以慢慢的去消费消息队列中的消息。这样可以减少请求的等待,还能让服务异步并发处理,提升系统总体性能。

 3、解耦:

如果业务不断增加,出现了数据上报服务,之后说不定还会出现各种服务。也就是说在订单的下游会不定期的加减服务,如果是硬编码对服务的改动特别大,这时消息队列的出现就可以无状态的添加下游服务,只需要去订阅消息队列的主题即可,这样大大降低了服务与服务之间的耦合度。

 三、消息队列的基本概念

首先在讨论  【 1.如何保证消息不丢失?2.如何处理重复消息?3.如何保证消息的有序性?4.如何处理消息堆积?】之前先说一下消息队列的基本概念。

消息队列有两种模型:队列模型发布/订阅模型

1、队列模型

每条消息只能被一个消费者消费;生产者有多个,消费者有多个,也就是微服务中生产者的节点有多个,消费者的节点也有多个,消费者在消费消息过程存在竞争关系。

 2、发布/订阅模型

一条消息可以被多个消费者同时消费,消息生产者生成消息发到同一个topic中,不同的消费者同时订阅这个主题,也就是说这里存在数据的冗余,可以保证一条消息能被多个消费者消费。即只有一个消费者的情况下和队列模型基本一致。RabbitMQ 采用队列模型,RocketMQKafka 采用发布/订阅模型。

 四、如何保证消息不丢失?

一般我们称发送消息方为生产者 Producer,接受消费消息方为消费者Consumer,消息队列服务端为Broker

消息从Producer发往BrokerBroker将消息存储至本地,然后ConsumerBroker拉取消息,或者Broker推送消息至Consumer,最后消费。

上图可以看到一共有三个阶段,分别是生产消息、存储消息和消费消息。我们从这三个阶段分别入手来看看如何确保消息不会丢失。

1、生产者生产消息

生产者发送消息至Broker,需要处理Broker的响应,不论是同步还是异步发送消息,同步和异步回调都需要做好try-catch,妥善的处理响应,如果Broker返回写入失败等错误消息,需要重试发送。当多次发送失败需要作报警,日志记录等。

这样就能保证在生产消息阶段消息不会丢失。

2、消息存储

存储消息阶段需要在消息刷盘之后再给生产者响应,假设消息写入缓存中就返回响应,那么机器突然断电这消息就没了,而生产者以为已经发送成功了。

如果Broker是集群部署,有多副本机制,即消息不仅仅要写入当前Broker,还需要写入副本机中。那配置成至少写入两台机子后再给生产者响应。这样基本上就能保证存储的可靠了。一台挂了还有一台还在呢(假如怕两台都挂了..那就再多些)。

3、消费者消费消息

这里经常会有同学犯错,有些同学当消费者拿到消息之后直接存入内存队列中就直接返回给Broker消费成功,这是不对的。

你需要考虑拿到消息放在内存之后消费者就宕机了怎么办。所以我们应该在消费者真正执行完业务逻辑之后,再发送给Broker消费成功,这才是真正的消费了。

所以只要我们在消息业务逻辑处理完成之后再给Broker响应,那么消费阶段消息就不会丢失。

总结】要注意消息可靠性增强了,性能就下降了,等待消息刷盘、多副本同步后返回都会影响性能。因此还是看业务,例如日志的传输可能丢那么一两条关系不大,因此没必要等消息刷盘再响应。

五、如何处理重复消息?

假设我们发送消息,就管发,不管Broker的响应,那么我们发往Broker是不会重复的。

但是一般情况我们是不允许这样的,这样消息就完全不可靠了,我们的基本需求是消息至少得发到Broker上,那就得等Broker的响应,那么就可能存在Broker已经写入了,当时响应由于网络原因生产者没有收到,然后生产者又重发了一次,此时消息就重复了。

再看消费者消费的时候,假设我们消费者拿到消息消费了,业务逻辑已经走完了,事务提交了,此时需要更新Consumer offset了,然后这个消费者挂了,另一个消费者顶上,此时Consumer offset还没更新,于是又拿到刚才那条消息,业务又被执行了一遍。于是消息又重复了。

可以看到正常业务而言消息重复是不可避免的,因此我们只能从另一个角度来解决重复消息的问题。

关键点就是幂等。既然我们不能防止重复消息的产生,那么我们只能在业务上处理重复消息所带来的影响。

六、如何保证消息的有序性

1、全局有序性:

如果要保证消息的全局有序,首先只能由一个生产者往Topic发送消息,并且一个Topic内部只能有一个队列(分区)。消费者也必须是单线程消费这个队列。这样的消息就是全局有序的!不过一般情况下我们都不需要全局有序。

2、部分有序性:

因此绝大部分的有序需求是部分有序,部分有序我们就可以将Topic内部划分成我们需要的队列数,把消息通过特定的策略发往固定的队列中,然后每个队列对应一个单线程处理的消费者。这样即完成了部分有序的需求,又可以通过队列数量的并发来提高消息处理效率。

七、如何处理消息堆积?

消息的堆积往往是因为生产者的生产速度与消费者的消费速度不匹配。有可能是因为消息消费失败反复重试造成的,也有可能就是消费者消费能力弱,渐渐地消息就积压了。

因此我们需要先定位消费慢的原因,如果是bug则处理 bug ,如果是因为本身消费能力较弱,我们可以优化下消费逻辑,比如之前是一条一条消息消费处理的,这次我们批量处理,比如数据库的插入,一条一条插和批量插效率是不一样的。

假如逻辑我们已经都优化了,但还是慢,那就得考虑水平扩容了,增加Topic的队列数和消费者数量,注意队列数一定要增加,不然新增加的消费者是没东西消费的。一个Topic中,一个队列只会分配给一个消费者

当然你消费者内部是单线程还是多线程消费那看具体场景。不过要注意上面提高的消息丢失的问题,如果你是将接受到的消息写入内存队列之后,然后就返回响应给Broker,然后多线程向内存队列消费消息,假设此时消费者宕机了,内存队列里面还未消费的消息也就丢了。

原文地址:https://www.cnblogs.com/dongl961230/p/14890413.html