技术干货丨如何在VIPKID中构建MQ服务

小结:

1、

https://mp.weixin.qq.com/s/FQ-DKvQZSP061kqG_qeRjA

文 |李伟 VIPKID数据中间件架构师

交流微信 | datapipeline2018

本文完整PPT获取 | 关注公众号后,后台回复“李伟”

本文整理自丨4月13日,DataPipeline联合Apache RocketMQ在北京举办的消息中间件Meetup

本文主要从三方面展开。首先介绍VIPKID使用MQ的历史,即如何演化到现在的RocketMQ + Kafka。其次,为RocketMQ在VIPKID的实践,主要介绍如何解决在使用RocketMQ过程中出现的相关问题。最后,提到VIPKID的MQ服务关于以后可能会如何变化和发展。

MQ History in VIPKID

关于VIPKID的需求:

第一性能。VIPKID是贯穿北美和中国的线上英语学习机构,数据、消息量庞大,对于MQ吞吐量、延迟等有极度的要求。

第二海量的topic。因为业务方的业务场景相对比较复杂,不同于常规,所以对多topic的要求较高。目前在测试环节,包含的topic应该有3000+,消费者有超过10k+。

第三统一的管理平台。由于各种topic,消费者纷繁复杂, 需要一个像Operator那样可以在平台上“指指画画”即可管理topic,消费者,和相关负责人的信息的平台。

第四友好的API。对RD同学而言,不需花费过多的精力学习,只需看API便能上手接入。

另外对提供MQ服务的同学而言,方便二次开发。中间件大部分选择用Java,如果使用其它语言,比如GO,Scala,对二次开发的难度会比较大,维护成本也会很高。

考虑到学习成本,以上为我们在之前面临的一些问题和诉求,基于这些诉求,我们在之前就开始做选择。

公司之前使用MQ服务时,管理简单。这也让我们面临一些较为尴尬的问题。

之后经过层层筛选,各种把关,选择了RocketMQ+Kafka,RocketMQ主要提供给业务方使用,做一些业务数据的流转。包括像削峰填谷,或是一些异步数据等同步,另外还需同步一些BI数据给业务方。

为什么会这么选?现在市面上有许多比较成熟的MQ产品,比如像目前使用的Kafka,第二个是RocketMQ,第三个为RabbitMQ,最后决定Kafka先保持现状,把业务型的ActiveMQ替换成RocketMQ。

同时,也积极和RocketMQ社区保持联系,一来可以依靠社区力量将公司MQ服务做得更加稳定。 另外也可以向社区其他同学分享我们的实践经验。

为什么Kafka会保持,原因在于对于ELK生态而言,Kafka目前是不可或缺的,在性能上可以满足要求。

为什么选择RocketMQ,有以下原因:低延迟、百万级topic、二次开发容易、管理部署方便。

下图为系统的简单结构图,大概描述了生产消费的大概过程。还有Broker端的部署结构,最上面一层为生产过程,中间生产到了Broker的集群,下面是一个消费者的过程,最右边为retry的队列,主要做容错。

比如发一个消息时不是一锤子买卖,当消息发过去之后,由于网络抖了一下,接下来第二第三API失败了,此时可不可以允许我再发一次消息,像这种容错的情况是支持的,所以RocketMQ也是很重要的原因。

RocketMQ 最佳实践

首先最重要的问题,也是研发同学反馈过来的问题中最难理解的一部分,即订阅关系一致。

所谓的订阅关系一致,指一个消费者组在一个服务中,假设消费者组消费了3个topic,该服务部署到任何一台机器上,消费者组消费到topic都必须一样,这就是订阅关系一致。因为测试环节是多环境的,在多环境情况下会遇到订阅关系不一致的情况,这属于使用问题。

第二个是Docker,因为多环境情况下,在测试环节走的是K8S,它会启动Docker容器,在Docker里会面临一个问题,消费者客户端在起写时,会将自己的订阅信息上报给Broker,告诉它订阅了哪些topic,以及消费者客户端ID是多少,以此来表明身份。当在取客户端ID时,取了一个本地IP,但是在Docker里面取出来的本地大部分是相同的。

上述情况会出现类似于订阅关系不一致的结果。会发现topic里面有的消息、队列没有消费者去订阅,实际是有的。因为Broker在做rebalance时,由于没有办法正常区分到底起了多少个消费者时令,导致分配队列时出现问题。原因在于当上报订阅关系给Broker时,数据有错误,这是Broker从本地IP取出来导致的。

第三个是多环境问题,假设有ABC三个服务,A服务依赖于B,B依赖于C。通常假设在开发一个服务时,会单独拉一个分支进行开发,此时可能会与上一步的下一个服务做联调。

同时在测试环节,还会有一个稳定的服务提供给其他人做联调,在没有开发时需要给QA做集成测试。这个环境中称为多环境,在多环境的情况就会遇到订阅关系不一致的问题。同时,在多环境下如何做消息路由也是需要考虑的问题。

首先解释订阅关系不一致,假设有两个人开发两个功能,其中一个是Host为1.1.1.1,消费者组有3个topic,1、2、3,当拿到一个新的需求后需要将topic3换成topic4,获取消息。

在消费写完之后,需要发到一个环境进行测试,假设将这两个同时放到环境中,就会出现订阅关系不一致,这时有可能会出现以下情况:

当添加topic4去掉topic3之后,在测试topic4是否能收到消息时,结果发现收不到。或是当发送一条消息之后,连续不断地收到了100多万条消息。

如果打开console平台,发现有的队列没有消费者去消费,这就是所谓的订阅关系不一致。

比如,同样的一个ConsumerGroup1,第一台机器消费了3个topic1、2、3,第二台机器消费了topic1、2、4,这就是订阅关系不一致。即同一个消费组部署的两台实力,消费者的topic不一样,称为订阅关系不一致。

为此,在多环境的情况下制定了一个简单规则。通常发这种分支时,是在多环境的子环境,比如,当发一个新功能时,需要将开发分支放到A1环境进行测试,这时就有一个简单的规则,即不能跨环境生产消费。

假设一个开发分支是A1环镜的,当消费消息时,可用A1环境的consumerGroup去消费A1环境的topic。如果要测试一下收的消息处理逻辑是否正确,就可以去控制台模拟发送给A1的topic。

这时就会发现,自己的开发分支跟正常的稳定环境完全不影响。多环境时无需害怕环境,只需排查当前处于哪个环境,哪些环境是不需要订阅的,那就将其去掉,换成所处的环境即可。

针对订阅关系不一致的情况,可以去客户端中查看订阅关系哪里不一致,哪个环境的topic或消费者不应该去订阅,这时容易排查问题。

另外关于Docker,当消费者集起来时,该如何告诉Broker起了多少个实例,以此让Broker来分配,哪些实例消费哪些partition或哪些Q。在consumer集起来时,可将订阅关系还有能唯一标识是我的客户端ID发给Broker,让Broker决定哪个客户端ID消费哪个队列,进而做好分配,这时就能保证正常消费。

那如何标识我是我?这时有一个字符串,字符串有三个值组成,第一是消费者所在机器的IP,第二是at符号,第三是instance name,即实例名。

之前的配置是一个consumer中的实例名是相同的,如果是Docker环境,取出的IP也是相同的。假设有10台机器在Docker里集起来,当将消息汇报上去之后,Broker在拿到ID去重后发现只有一台机器启起来,它便将所有的队列只分配给这一台机器,但这台机器很随机,其它机器便收不到消息。此时本来有10台机器,结果只有一台机器收到消息,或会重复收到消息,结果就会很棘手。

为解决上述问题,在配置时,可去掉消费者的instance name,这时它会变成一串数字,即当前进程启起来的进程号。大部分情况,容器中启起来的进程号,我们认为是不一样的,但也有可能是相同的,这时可以重启一下。但这种概率比较小,如果有两套集群就有可能了,这样就解决了Docker的问题。

接下来简单解释一下RocketMQ里面的订阅关系,一条订阅关系指一个消费者组订阅了一个topic的某一个tag,三者合起来称为一个订阅关系。

关于tag,指topic的一个子分类,topic是对消息逻辑的归类,我可以为某一些消息打上特定的tag,这样做的益处是会做一些服务端过滤,假设只订阅某个tag消息,那就不会发送其它tag消息给你,中间没有数据流量的,这对于网络舆论而言会减少很大一部分。

另外,我们做了简单的MQ云平台,可以理解为与阿里云平台是类似的,用户有租户隔离,不同的租户之间看不到彼此的数据,所有的管理维度是通过APP的维度。

简单介绍下,一个应用有应用的owner,应用的member,这些人可以管理APP中的所有资源,包括APP中的生产者组,topic、消费者组,通过这些可以完成以下事情:

一位研发同学如果想与另外一个团队进行消息交互,作为生产消息的人,可以去平台申请个topic,进而发消息将topic告诉消费者。比如,我的topic是这个,你去消费吧。在拿到之后,再去创建一个消费者组,订阅这个topic,全程无人干预,目前需要的干预是我们会做一层审核。

现在很多人不是很熟悉,所以需要引导一段时间后,再将权限完全开放出来,届时审计的工作就给应用的owner即可。

应用的owner通常是团队的leader,这样就可以在自己的小组中做到完全自助。对研发团队而言,可以省去许多繁杂流程。

目前整套MQ的工作过程是如果需要资源,先去VKMQ平台申请资源,比如我是生产者,这时需要一个topic,一个生产者组来生产消息,此时可以在自己创建之后用平台客户端发,无需担心到底需要发到哪个Broker,哪个name server。

这时研发同学用起来会比较方便,不用关心需要哪些集群。如果没有这样的发现机制,可能还需要写Broker地址或name server地址。

How about the future

提到以后可能的发展走向,之前与阿里中间件技术团队专家王昕聊过这个问题,了解到会有OpenMessaging,OpenMessaging的目的是有一套公开的标准,是MQ层面的标准,会定义一些基本操作。就现在情况而言,个人觉得,这三者可以结合到一起。

目前我们有用Kafka和RocketMQ。这时该如何管理,如何统一,对人的要求就很高。

但具体处理问题时会有差异,我可以大胆猜测一下,可以将两个可能合而为一,依循OpenMessaging的标准。既然有一套比较好的标准可以遵循,如果能实现统一,我觉得对后来的人就会很方便。

原文地址:https://www.cnblogs.com/rsapaper/p/10865042.html