关于分布式事务处理

前言:

有一次从支付宝转账1万块钱到余额宝,这是日常生活的一件普通小事,但作为互联网研发人员的职业病,我就思考支付宝扣除1万之后,

如果系统挂掉怎么办,这时余额宝账户并没有增加1万,数据就会出现不一致状况了。

上述场景在各个类型的系统中都能找到相似影子,比如在电商系统中,当有用户下单后,除了在订单表插入一条记录外,对应商品表的这个商品数量必须减1吧,怎么保证?

在搜索广告系统中,当用户点击某广告后,除了在点击事件表中增加一条记录外,还得去商家账户表中找到这个商家并扣除广告费吧,怎么保证?等等,相信大家或多或多少都能碰到相似情景。

本质上问题可以抽象为:当一个表数据更新后,怎么保证另一个表的数据也必须要更新成功。

一.本地事务

还是以支付宝转账余额宝为例,假设有

(1)支付宝账户表:A(id,userId,amount)

(2)余额宝账户表:B(id,userId,amount)

(3)用户的userId=1;

从支付宝转账1万块钱到余额宝的动作分为两步:

(1)支付宝表扣除1万:update A set amount=amount-10000 where userId=1;

(2)余额宝表增加1万:update B set amount=amount+10000 where userId=1;

如何确保支付宝余额宝收支平衡呢?

有人说这个很简单嘛,可以用事务解决。

Begin transaction
         update A set amount=amount-10000 where userId=1;
         update B set amount=amount+10000 where userId=1;
End transaction
commit;

非常正确,如果你使用spring的话一个注解就能搞定上述事务功能。

@Transactional(rollbackFor=Exception.class)
    public void  update() {
        updateATable();//更新A表
        updateBTable();//更新B表
    }

如果系统规模较小,数据表都在一个数据库实例上,上述本地事务方式可以很好地运行,但是如果系统规模较大,

比如支付宝账户表和余额宝账户表显然不会在同一个数据库实例上,他们往往分布在不同的物理节点上,这时本地事务已经失去用武之地。

既然本地事务失效,分布式事务自然就登上舞台。

二.分布式事务

两阶段提交协议(Two-phase Commit,2PC)经常被用来实现分布式事务。

一般分为协调器C和若干事务执行者Si两种角色,这里的事务执行者就是具体的数据库,协调器可以和事务执行器在一台机器上。

(1)应用程序(client)发起一个开始请求到TC;

(2)TC先将<prepare>消息写到本地日志,之后向所有的Si发起<prepare>消息。

以支付宝转账到余额宝为例,TC给A的prepare消息是通知支付宝数据库相应账目扣款1万,

TC给B的prepare消息是通知余额宝数据库相应账目增加1w。为什么在执行任务前需要先写本地日志,主要是为了故障后恢复用,

本地日志起到现实生活中凭证 的效果,如果没有本地日志(凭证),出问题容易死无对证;

(3)Si收到<prepare>消息后,执行具体本机事务,但不会进行commit,如果成功返回<yes>,不成功返回<no>。同理,返回前都应把要返回的消息写到日志里,当作凭证。

(4)TC收集所有执行器返回的消息,如果所有执行器都返回yes,那么给所有执行器发生送commit消息,执行器收到commit后执行本地事务的commit操作;

如果有任一个执行器返回no,那么给所有执行器发送abort消息,执行器收到abort消息后执行事务abort操作。

注意:

TC或Si把发送或接收到的消息先写到日志里,主要是为了故障后恢复用。如某一Si从故障中恢复后,先检查本机的日志,如果已收到<commit >,则提交,如果<abort >则回滚。

如果是<yes>,则再向TC询问一下,确定下一步。如果什么都没有,则很可能在<prepare>阶段Si就崩溃了,因此需要回滚。

现如今实现基于两阶段提交的分布式事务也没那么困难了,如果使用java,那么可以使用开源软件atomikos(http://www.atomikos.com/)来快速实现。

不过但凡使用过的上述两阶段提交的同学都可以发现性能实在是太差,根本不适合高并发的系统。为什么?

(1)两阶段提交涉及多次节点间的网络通信,通信时间太长!

(2)事务时间相对于变长了,锁定的资源的时间也变长了,造成资源等待时间也增加好多!

正是由于分布式事务存在很严重的性能问题,大部分高并发服务都在避免使用,往往通过其他途径来解决数据一致性问题。

三.使用消息队列来避免分布式事务

比如在北京很有名的姚记炒肝点了炒肝并付了钱后,他们并不会直接把你点的炒肝给你,而是给你一张小票,然后让你拿着小票到出货区排队去取。

为什么他们要将付钱和取货两个动作分开呢?原因很多,其中一个很重要的原因是为了使他们接待能力增强(并发量更高)。

还是回到我们的问题,只要这张小票在,你最终是能拿到炒肝的。同理转账服务也是如此,当支付宝账户扣除1万后,我们只要生成一个凭证(消息)即可,

这个凭证(消息)上写着“让余额宝账户增加 1万”,只要这个凭证(消息)能可靠保存,我们最终是可以拿着这个凭证(消息)让余额宝账户增加1万的,即我们能依靠这个凭证(消息)完成最终一致性。

 

1、如何可靠保存凭证(消息)

有两种方法:

(1)业务与消息耦合的方式

支付宝在完成扣款的同时,同时记录消息数据,这个消息数据与业务数据保存在同一数据库实例里(消息记录表表名为message)。

Begin transaction
         update A set amount=amount-10000 where userId=1;
         insert into message(userId, amount,status) values(1, 10000, 1);
End transaction
commit;

上述事务能保证只要支付宝账户里被扣了钱,消息一定能保存下来。

当上述事务提交成功后,我们通过实时消息服务将此消息通知余额宝,余额宝处理成功后发送回复成功消息,支付宝收到回复后删除该条消息数据。

(2)业务与消息解耦方式

上述保存消息的方式使得消息数据和业务数据紧耦合在一起,从架构上看不够优雅,而且容易诱发其他问题。为了解耦,可以采用以下方式。

a、支付宝在扣款事务提交之前,向实时消息服务请求发送消息,实时消息服务只记录消息数据,而不真正发送,只有消息发送成功后才会提交事务;

b、当支付宝扣款事务被提交成功后,向实时消息服务确认发送。只有在得到确认发送指令后,实时消息服务才真正发送该消息;

c、当支付宝扣款事务提交失败回滚后,向实时消息服务取消发送。在得到取消发送指令后,该消息将不会被发送;

d、对于那些未确认的消息或者取消的消息,需要有一个消息状态确认系统定时去支付宝系统查询这个消息的状态并进行更新。为什么需要这一步骤,举个例子:

假设在第2步支付宝扣款事务被成功提交后,系统挂了,此时消息状态并未被更新为“确认发送”,从而导致消息不能被发送。

优点:消息数据独立存储,降低业务系统与消息系统间的耦合;

缺点:一次消息发送需要两次请求;业务处理服务需要实现消息状态回查接口。

2、如何解决消息重复投递的问题

还有一个很严重的问题就是消息重复投递,以我们支付宝转账到余额宝为例,如果相同的消息被重复投递两次,那么我们余额宝账户将会增加2万而不是1万了。

为什么相同的消息会被重复投递?比如余额宝处理完消息msg后,发送了处理成功的消息给支付宝,正常情况下支付宝应该要删除消息msg,

但如果支付宝这时候悲剧的挂了,重启后一看消息msg还在,就会继续发送消息msg。

解决方法很简单,在余额宝这边增加消息应用状态表(message_apply),通俗来说就是个账本,用于记录消息的消费情况,每次来一个消息,在真正执行之前,

先去消息应用状态表中查询一遍,如果找到说明是重复消息,丢弃即可,如果没找到才执行,同时插入到消息应用状态表(同一事务)。

for each  msg in queue
  Begin transaction
    select count(*) as cnt from message_apply where msg_id=msg.msg_id;
    if cnt==0 then
      update B set amount=amount+10000 where userId=1;
      insert into message_apply(msg_id) values(msg.msg_id);
  End transaction
  commit;

四.例子

说到分布式事务,一般的思路都是通过消息中间件来实现“最终一致性”:A系统扣钱,然后发条消息给中间件,B系统接收此消息,进行加钱。

但这里面有个问题:A是先update DB,后发送消息呢? 还是先发送消息,后update DB?

假设先update DB成功,发送消息网络失败,重发又失败,怎么办? 

假设先发送消息成功,update DB失败。消息已经发出去了,又不能撤回,怎么办?

所以,这里下个结论: 只要发送消息和update DB这2个操作不是原子的,无论谁先谁后,都是有问题的。

那这个问题怎么解决呢?

1、错误的方案

有人可能想到了,可以把“发送消息”这个网络调用和update DB放在同1个事务里面,如果发送消息失败,update DB自动回滚。这样不就保证2个操作的原子性了吗?

这个方案看似正确,其实是错误的,原因有2:

(1)网络的2将军问题:发送消息失败,发送方并不知道是消息中间件真的没有收到消息呢?还是消息已经收到了,只是返回response的时候失败了?

         如果是已经收到消息了,而发送端认为没有收到,执行update db的回滚操作。则会导致A账号的钱没有扣,B账号的钱却加了。

(2)把网络调用放在DB事务里面,可能会因为网络的延时,导致DB长事务。严重的,会block整个DB。这个风险很大。

         基于以上分析,这个方案其实是错误的!

2、业务方自己实现

假设消息中间件没有提供“事务消息”功能,比如你用的是Kafka。那如何解决这个问题呢?

解决方案如下: 

(1)Producer端准备1张消息表,把update DB和insert message这2个操作,放在一个DB事务里面。

(2)准备一个后台程序,源源不断的把消息表中的message传送给消息中间件。失败了,不断重试重传。允许消息重复,但消息不会丢,顺序也不会打乱。

(3)Consumer端准备一个判重表。处理过的消息,记在判重表里面。实现业务的幂等。

但这里又涉及一个原子性问题:如果保证消息消费 + insert message到判重表这2个操作的原子性?

消费成功,但insert判重表失败,怎么办?关于这个,在Kafka的源码分析系列,第1篇, exactly once问题的时候,有过讨论。

通过上面3步,基本就解决了这里update db和发送网络消息这2个操作的原子性问题。

但这个方案的一个缺点就是:需要设计DB消息表,同时还需要一个后台任务,不断扫描本地消息。导致消息的处理和业务逻辑耦合额外增加业务方的负担。

3、RocketMQ 事务消息

为了能解决该问题,同时又不和业务耦合,RocketMQ提出了“事务消息”的概念。

具体来说,就是把消息的发送分成了2个阶段:Prepare阶段和确认阶段。

具体来说,上面的2个步骤,被分解成3个步骤: 

(1) 发送Prepared消息;

(2) update DB; 

(3) 根据update DB结果成功或失败,Confirm或者取消Prepared消息。

可能有人会问了,前2步执行成功了,最后1步失败了怎么办?

这里就涉及到了RocketMQ的关键点:RocketMQ会定期(默认是1分钟)扫描所有的Prepared消息,询问发送方,到底是要确认这条消息发出去?还是取消此条消息?

具体代码实现如下:

也就是定义了一个checkListener,RocketMQ会回调此Listener,从而实现上面所说的方案。

// 也就是上文所说的,当RocketMQ发现`Prepared消息`时,会根据这个Listener实现的策略来决断事务
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 构造事务消息的生产者 TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 设置事务决断处理类 producer.setTransactionCheckListener(transactionCheckListener);
// 本地事务的处理逻辑,相当于示例中检查Bob账户并扣钱的逻辑 TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 构造MSG,省略构造参数 Message msg = new Message(......);
// 发送消息 SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); producer.shutdown(); public TransactionSendResult sendMessageInTransaction(.....) {
// 逻辑代码,非实际代码 // 1.发送消息 sendResult = this.send(msg); // sendResult.getSendStatus() == SEND_OK // 2.如果消息发送成功,处理与消息关联的本地事务单元 LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg); // 3.结束事务 this.endTransaction(sendResult, localTransactionState, localException);
}

总结:对比方案2和方案1,RocketMQ最大的改变,其实就是把“扫描消息表”这个事情,不让业务方做,而是消息中间件帮着做了。

至于消息表,其实还是没有省掉。因为消息中间件要询问发送方,事物是否执行成功,还是需要一个“变相的本地消息表”,记录事物执行状态。

4、人工介入

可能有人又要说了,无论方案1,还是方案2,发送端把消息成功放入了队列,但消费端消费失败怎么办?

消费失败了,重试,还一直失败怎么办?是不是要自动回滚整个流程?

答案是人工介入。从工程实践角度讲,这种整个流程自动回滚的代价是非常巨大的,不但实现复杂,还会引入新的问题。比如自动回滚失败,又怎么处理?

对应这种极低概率的case,采取人工处理,会比实现一个高复杂的自动化回滚系统,更加可靠,也更加简单。

原文地址:https://www.cnblogs.com/ZJOE80/p/7793527.html