Seata分布式事务简单使用

  在分布式开发过程中,分布式事务是必须面临的问题。因为分布式系统中,存在多个服务之间的调用。服务与服务之间存在事务问题,可能在某个服务调用链过程中某个服务发生异常导致数据不一致问题。

  每个服务内部的数据一致性由本地事务控制,通常用@Transactional 来控制。但是服务拆分之后,多个服务协同完成的操作如果需要保证数据一致性就需要引入分布式事务。

0.理论基础

  刚性事务:遵循ACID原则,强一致性,最常见的就是数据库事务。

  柔性事务:遵循BASE理论,最终一致性;与刚性事务不同,柔性事务允许一定时间内,不同节点的数据不一致,但要求最终一致。一般说的分布式事务也就是柔性事务。

1.事务

  一个操作单元,在这个操作中的所有操作最终要保持一致的行为,要么所有操作都成功,要么所有操作都被撤销。

单机版事务ACID:

A(Atomicity):原子性,操作这些指令时,要么全部失败,要么全部成功。

C(Consistency):一致性。事务的执行结果是从一个状态变为另一个状态,数据库的完整性约束没有被破坏。能量守恒,总量不变。  比如转账操作转来转去总额不变。

I(Isolation):隔离性。多个并发事务之间相互隔离。信息彼此隔离,互不干扰。

D(Durability):当事务正确完成后,它对于数据的改变是永久性的。

   分布式事务是因为提供服务的节点在不同的机器上,相互之间通过网络交互,因此分布式事务需要进一步的理论支持。

2.CAP原则

  在之间学习注册中心的时候也学习过CAP原则,又称CAP定理,指的是在一个分布式系统中,一致性(Consistency)、可用性(Avaliability)、分区容错性(Partition tolerance)。这三个原则最多同时兼顾两个,不能三者都实现。P是一定要满足的,且一定保留,如果不保证P,那就不是一个分布式系统了。

一致性:在分布式系统的所有数据备份,在同一时刻是否有同样的值。也就是副本最新。

可用性:在集群中一部分节点故障后,集群是否还能整体响应客户端的读写请求。也就是高可用。每次向未崩溃的节点发送请求,总能收到响应数据,允许数据不是最新的。

分区容忍性:系统任意分区后,在网络故障时,仍能操作。

(1)由于当前的网络硬件肯定会出现延迟丢包等问题,所以分区容忍性是我们必须需要实现的。所以我们只能在一致性和可用性之间进行权衡。

CA满足的情况下,P不能满足的原因:数据同步需要时间,也要正常的时间内响应(A),那么机器数量就要少,所以P就不满足

CP 满足的情况下,A不能满足的原因:数据同步需要时间, 机器数量也多,但是同步数据需要时间,所以不能再正常时间内响应,所以A就不满足

AP 满足的情况下,C不能满足的原因:机器数量也多,正常的时间内响应(A),那么数据就不能及时同步到其他节点,所以C不满足。

(2)关于三个注册中心满足的原则:

Zookeeper和Consul :CP设计,保证了一致性,集群搭建的时候,某个节点失效,则会进行选举行的leader,或者半数以上节点不可用,则无法提供服务,因此可用性没法满足。

Eureka:AP原则,无主从节点,一个节点挂了,自动切换其他节点可以使用,去中心化。

3.一致性:数据一致性

一致性可以分为强一一致性与弱一致性。所谓强一致性就是复制数据是同步的,弱一致性就是复制数据是异步的。

CAP定理中只能在CAP中选择两个,一般是牺牲一致性换取可用性和分区容错性,所谓牺牲一致性并不是完全放弃数据一致性,而是换取强一致性换取弱一致性。

1.强一致性:系统中的某个数据被更新成功后,后续任何对该数据的操作都将获取最新的值,也称为原子一致性。简单的说就是任意时刻,所有节点的数据都是一样的。

(1)一个集群如果保证强一致性,集群中某一台服务器的数据变了,那么就需要等待集群内其他服务器的数据同步完成后,才能正常对外提供服务

(2)保证了强一致性,务必会损耗可用性

2.弱一致性

  系统中的某个节点数据更新后,后续对该数据的读取有可能是更新前的值,也有可能是更新后的值。可以理解为更新数据后,如果能容忍后续的访问只能访问到部分或者全部访问不到,则是弱一致性。

3.最终一致性

是弱一致性的特殊形式,存储系统保证在没有新的更新条件下,最终所有的访问都是最后更新的值。

不保证在任意时刻任意节点上的同一份数据都是相同的,但是随着时间的迁移,不同节点上的同一份数据总是在向趋同的方向变化。

 简单说,就是在一段时间后,节点间的数据会达到一致状态。

4.Base理论

  BASE理论和CAP其实是互补的。BASE理论是对CAP中的一致性和可用性进行权衡的一个结果,理论的核心思想就是:我们无法做到强一致性,但每个应用都可以根据自身业务的特点,采用适当的方法来使系统达到最终一致性。

BA:BasicAvaliable 基本可用。

  整个系统某些不可抗力的情况下,仍然能保证可用性,即一定时间内仍然能返回一个明确的结果。只不过是"基本可用"和"高可用"的区别是:

  • 一定时间内可以适当延长。比如高可用3响应时间3s,基本可用5s响应时间
  • 给部分用户返回一个服务降级页面。降级页面仍然是一个明确的结果。

S:Soft State  柔性状态。是指允许系统中的数据存在中间状态,并认为该中间状态的存在不会影响系统的整体可用性,即允许系统不同节点的数据副本之间进行数据同步的过程存在延时。比如数据加在状态"同步中"、"等待同步"等。

E:Eventual Consisstency 最终一致性。 同一数据的不同副本的状态,可以不要实时一致性,但一定要保证经过有一定时间后仍然是一致的。

5. 2阶段提交协议2PC

  Two-Phase Commit 是常用的分布式事务解决方法,即将事务的提交过程分为两个阶段来进行处理。

阶段:

  • 准备阶段,又称为投票阶段

(1)协调者向所有参与者发送事务内容,询问是否可以提交事务,且等待回复,进入表决过程;

(2)参与者执行执行事务操作,将redo和undo日志记入事务日志中(但不提交事务);

(3)如参与者执行成功给协调者反馈成功,否则反馈失败

  • 提交阶段

  协调者基于第一个阶段的投票结果进行决策: 提交或取消。当且仅当所有的参与者同意提交事务,协调者才通知所有的参与者提交事务,否则协调者将通知所有的参与者取消事务,参与者在接收到协调者发来的消息后将执行响应的操作并且释放事务期间内占用的资源。

参与的角色:

  • 协调者:事务的发起者
  • 参与者:事务的执行者

2PC的优缺点:
优点:尽量保证了数据的强一致性,但也不能百分百保证强一致性
缺点:
(1)性能问题:执行过程中,所有参与节点都是事务阻塞型的。当参与者占用公共资源时,其他第三方节点访问资源必须处于阻塞状态
(2)可靠性问题:参与者发生故障,协调者需要给每个参与者额外指定超时机制,超时后整个事务失败。协调者发生故障,参与者一定会阻塞下去,需要额外的备机进行容错。
(3)数据一致性问题:二阶段无法解决的问题:协调者在发出commit消息之后宕机,而唯一接受消息的参与者也同时宕机,那么即使协调者选举产生了新的协调者,这条事务的状态也是不确定的,无法确定事务是否已经被提交。

6.3阶段提交3PC

相比于2PC,有两个改动点:

(1)协调者和参与者都引入超时机制
(2)第一阶段和第二阶段中间插入了一个准备阶段,保证了再最后提交阶段之前各参与节点的状态是一致的。
  也就是说除了引入超时机制之外,3pc把2pc的准备阶段一分为二,这样三阶段就有CanCommit,PreCommit,DoCommit
CanCommit:协调者向参与者发送commit请求,参与者如果可以提交就返回Yes响应,否则返回No响应。只询问是否可以但是不会执行
PreCommit: 进行事务的预执行,记录undo和redo日志,但不会提交
doCommit:执行事务提交。
  在上面的过程中,如果参与者给协调者的反馈是no,则会中断事务的操作。在doCommit阶段,如果参与者无法及时接收到来自协调者的doCommit或者rebort请求时,会在等待超时之后,会继续进行事务的提交。简单的说就是当进入第三阶段时,由于网络超时等原因,虽然参与者没有收到commit或者abort响应,但是他有理由相信:成功提交的几率很大。

优缺点:
优点:相比于二阶段提交,三阶段提交降低了阻塞范围,在等待超时后协调者或参与者会中断事务,避免了协调者单点问题,如果阶段3 doCommit 阶段出现问题,参与者会继续提交事务,会导致事务的不一致性。
缺点:数据不一致问题依然存在,当参与者接到preCommit请求后等待doCommit指令时,此时如果协调者请求中断事务,而协调者与参与者无法正常通信,会导致参与者继续提交事务,造成数据不一致。

7.分布式事务解决方案

1. TCC Try-Confirm-Cancel 最常见。补偿事务。核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作
  开源的实现: https://github.com/liuyangming/ByteTCC.git

  当然seata也有TCC模式的实现。
2. 全局消息
3. 基于可靠消息服务的分布式事务
4. 最大努力通知:业务活动的主动方在完成处理之后向业务活动的被动方发送消息,允许消息丢失。业务活动的被动方根据定时策略,向业务活动的主动方查询,恢复丢失的业务消息。

1.Seata简介

Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。

官网: http://seata.io/zh-cn/

1. 术语

一个ID加三个组件。

(1)XID:Transaction  ID 全局事务唯一ID

(2)三组件:

TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,驱动全局事务提交或回滚。

TM (Transaction Manager) - 事务管理器:定义全局事务的范围:开始全局事务、提交或回滚全局事务。

RM (Resource Manager) - 资源管理器:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

2. 处理过程

如下图:

1.TM 开启分布式事务(TM 向 TC 注册全局事务记录);

2.按业务场景,编排数据库、服务等事务内资源(RM 向 TC 汇报资源准备状态 );

3.TM 结束分布式事务,事务一阶段结束(TM 通知 TC 提交/回滚分布式事务);

4.TC 汇总事务信息,决定分布式事务是提交还是回滚;

5.TC 通知所有 RM 提交/回滚 资源,事务二阶段结束;

 2.Seta安装

1.下载

http://seata.io/zh-cn/blog/download.html

例如我下载的版本是:seata-server-1.4.0.zip

2.解压后目录如下:

3. 查看README文件,内容如下:

# 脚本说明

## [client](https://github.com/seata/seata/tree/develop/script/client) 

> 存放用于客户端的配置和SQL

- at: AT模式下的 `undo_log` 建表语句
- conf: 客户端的配置文件
- saga: SAGA 模式下所需表的建表语句
- spring: SpringBoot 应用支持的配置文件

## [server](https://github.com/seata/seata/tree/develop/script/server)

> 存放server侧所需SQL和部署脚本

- db: server 侧的保存模式为 `db` 时所需表的建表语句
- docker-compose: server 侧通过 docker-compose 部署的脚本
- helm: server 侧通过 Helm 部署的脚本
- kubernetes: server 侧通过 Kubernetes 部署的脚本

## [config-center](https://github.com/seata/seata/tree/develop/script/config-center)

> 用于存放各种配置中心的初始化脚本,执行时都会读取 `config.txt`配置文件,并写入配置中心

- nacos: 用于向 Nacos 中添加配置
- zk: 用于向 Zookeeper 中添加配置,脚本依赖 Zookeeper 的相关脚本,需要手动下载;ZooKeeper相关的配置可以写在 `zk-params.txt` 中,也可以在执行的时候输入
- apollo: 向 Apollo 中添加配置,Apollo 的地址端口等可以写在 `apollo-params.txt`,也可以在执行的时候输入
- etcd3: 用于向 Etcd3 中添加配置
- consul: 用于向 consul 中添加配置

4.到https://github.com/seata/seata/tree/develop/script/server 查看建表语句。(这里暂时记住是AT模式,区别之后分析)

到数据库执行mysql的建表语句

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME(6),
    `gmt_modified`      DATETIME(6),
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL,
    `xid`            VARCHAR(96),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

5.将配置导入到nacos (这一步也可以使用本地文件作为配置中心)

(1)nacos新建分命名空间

 (2) 参考 https://github.com/seata/seata/tree/develop/script/config-center  将配置文件导入nacos

1》下载config.txt, 并且修改里面的store.mode=db以及账户密码信息,如下:

transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
service.vgroupMapping.my_test_tx_group=default
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
store.mode=db
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true
store.db.user=root
store.db.password=123456
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
store.redis.host=127.0.0.1
store.redis.port=6379
store.redis.maxConn=10
store.redis.minConn=1
store.redis.database=0
store.redis.password=null
store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898

2》将config.txt拷贝到与nacos-config.sh同级目录(这个文件也是在config-center 下载)

3》执行下面:

liqiang@root MINGW64 ~/Desktop/file/seata/configuration/nacos
$ sh nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t 6a1ba3ab-1821-43a6-b7ba-77272ea94c7d -u nacos -w nacos

 4》导入成功可以看到:

=========================================================================
 Complete initialization parameters,  total-count:80 ,  failure-count:0
=========================================================================
 Init nacos config finished, please start seata-server.

6.修改conf/registry.conf文件:修改指明注册中心和配置中心

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"
  loadBalance = "RandomLoadBalance"
  loadBalanceVirtualNodes = 10

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = ""
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "nacos"

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = "6a1ba3ab-1821-43a6-b7ba-77272ea94c7d"
    group = "SEATA_GROUP"
    username = "nacos"
    password = "nacos"
  }
}

7.启动seata服务

seatain/seata-server.bat双击即可

8.启动成功可以到nacos查看服务列表与配置列表进行验证

3. 简单使用实现分布式事务

 如下场景涉及分布式事务:

模拟用户下单,会在订单服务创建一个订单,然后远程调用库存服务减去库存,再通过账户服务来减去用户账户的余额,最后修改订单状态为已完成。

涉及到的服务:订单、库存、账户服务。

参考:https://gitee.com/itCjb/spring-cloud-alibaba-seata-demo

这里的版本是基于seta1.4.0。 之前看0.9版本的教程都是将register.conf 拷到工程,目前的版本可以直接基于yml配置。

1. 库存服务

1.数据库  (注意每个业务数据库都需要加undo_log表)

/*
Navicat MySQL Data Transfer

Source Server         : mysql
Source Server Version : 50721
Source Host           : localhost:3306
Source Database       : test_storage

Target Server Type    : MYSQL
Target Server Version : 50721
File Encoding         : 65001

Date: 2019-12-08 15:11:00
*/

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------
-- Table structure for product
-- ----------------------------
DROP TABLE IF EXISTS `product`;
CREATE TABLE `product` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `price` double DEFAULT NULL,
  `stock` int(11) DEFAULT NULL,
  `last_update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of product
-- ----------------------------
INSERT INTO `product` VALUES ('1', '5', '9', '2019-12-06 21:51:01');

-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of undo_log
-- ----------------------------

2. pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring-cloud-alibaba-seata-demo</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>product-service</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>

        <!-- nacos 作为服务注册中心 -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <!-- nacos 作为配置中心 -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>com.alibaba</groupId>
                    <artifactId>druid</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-spring-boot-starter</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
        </dependency>
    </dependencies>

</project>

3.yml配置

1》application.yml

  注意   tx-service-group: my_test_tx_group  需要和之前server端配置的保持一致。 seata.config 可以理解为从nacos读取一些配置,之前和服务端部署的时候有一些共有的配置;seata.registry 表示seata要注册到的seata server,从nacos读取该服务并注册进去,用于RM和TC直接的交互。

spring:
   datasource:
      type: com.alibaba.druid.pool.DruidDataSource
      url: jdbc:mysql://127.0.0.1:3306/test_storage?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
      driver-class-name: com.mysql.cj.jdbc.Driver
      username: root
      password: 123456
      max-wait: 60000
      max-active: 100
      min-idle: 10
      initial-size: 10
mybatis-plus:
   mapper-locations: classpath:/mapper/*Mapper.xml
   typeAliasesPackage: icu.funkye.entity
   global-config:
      db-config:
         field-strategy: not-empty
         id-type: auto
         db-type: mysql
   configuration:
      map-underscore-to-camel-case: true
      cache-enabled: true
      auto-mapping-unknown-column-behavior: none
      log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
seata:
   enabled: true
   application-id: product-service
   tx-service-group: my_test_tx_group
   config:
      type: nacos
      nacos:
         namespace: 6a1ba3ab-1821-43a6-b7ba-77272ea94c7d
         serverAddr: 127.0.0.1:8848
         group: SEATA_GROUP
         username: "nacos"
         password: "nacos"
   registry:
      type: nacos
      nacos:
         application: seata-server
         server-addr: 127.0.0.1:8848
         group: SEATA_GROUP
         namespace:
         username: "nacos"
         password: "nacos"

2》bootstrap.yml

spring:
   application:
      name: product-service
   main:
      allow-bean-definition-overriding: true
   cloud:
      nacos:
         discovery:
            server-addr: 127.0.0.1:8848
            username: "nacos"
            password: "nacos"
         config:
            server-addr: 127.0.0.1:8848
            username: "nacos"
            password: "nacos"
server:
   port: 8083

4.主启动类

package icu.funkye;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author funkye
 */
@SpringBootApplication
public class ProductServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProductServiceApplication.class, args);
    }

}

5. 业务代码

业务代码就不贴了,就是一个简单的CRUD

6.启动: 注意先启动nacos,再启动seata-server,最后启动项目

启动项目可以看到RM相关注册信息:

 2. 账户服务和订单服务

。。。在最后的git地址贴出

3.最后的client服务

client服务相当于一个网关层,调用feignclient服务完成相关的操作。

测试Controller如下:

    /**
     * 测试异常回滚
     *
     * @return
     * @throws TransactionException
     */
    @GetMapping(value = "testRollback")
    @GlobalTransactional
    public Object testRollback() throws TransactionException {
        Product product = productService.getById(1);
        if (product.getStock() > 0) {
            LocalDateTime now = LocalDateTime.now();
            logger.info("seata分布式事务Id:{}", RootContext.getXID());
            Account account = accountService.getById(1);
            Orders orders = new Orders();
            orders.setCreateTime(now);
            orders.setProductId(product.getId());
            orders.setReplaceTime(now);
            orders.setSum(1);
            orders.setAmount(product.getPrice());
            orders.setAccountId(account.getId());
            product.setStock(product.getStock() - 1);
            account.setSum(account.getSum() != null ? account.getSum() + 1 : 1);
            account.setLastUpdateTime(now);
            // 库存减去一
            productService.updateById(product);
            // 账户加1
            accountService.updateById(account);

            int i = 1 / 0;

            // 创建订单
            orderService.save(orders);

            return true;
        }
        return false;
    }

4. debug之后测试

1.断点打在client中    int i = 1 / 0    这一行。

2.代码执行到这里,查看相关服务日志:

(1) 库存服务日志如下: 可以看到将库存数量  stock修改为8

Creating a new SqlSession
SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@73753ab8] was not registered for synchronization because synchronization is not active
JDBC Connection [io.seata.rm.datasource.ConnectionProxy@7db2daa6] will not be managed by Spring
Original SQL: SELECT id,price,stock,last_update_time FROM product WHERE id=? 
parser sql: SELECT id, price, stock, last_update_time FROM product WHERE id = ?
==>  Preparing: SELECT id, price, stock, last_update_time FROM product WHERE id = ? 
==> Parameters: 1(Integer)
<==    Columns: id, price, stock, last_update_time
<==        Row: 1, 5.0, 9, 2019-12-06 21:51:01
<==      Total: 1
Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@73753ab8]
2020-12-13 21:19:35.759  INFO 14932 --- [nio-8083-exec-2] icu.funkye.controller.ProductController  : product:Product(id=1, price=5.0, stock=8, lastUpdateTime=2019-12-06T21:51:01)
Creating a new SqlSession
SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6351d47d] was not registered for synchronization because synchronization is not active
JDBC Connection [io.seata.rm.datasource.ConnectionProxy@27e7719a] will not be managed by Spring
Original SQL: UPDATE product  SET price=?,
stock=?,
last_update_time=?  WHERE id=?
parser sql: UPDATE product SET price = ?, stock = ?, last_update_time = ? WHERE id = ?
==>  Preparing: UPDATE product SET price = ?, stock = ?, last_update_time = ? WHERE id = ? 
==> Parameters: 5.0(BigDecimal), 8(Integer), 2019-12-06T21:51:01(LocalDateTime), 1(Integer)
<==    Updates: 1
Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6351d47d]

(2)账户服务日志如下: 可以看到将sum修改为2,可以理解为消费2个

Creating a new SqlSession
SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@1ff5e152] was not registered for synchronization because synchronization is not active
JDBC Connection [io.seata.rm.datasource.ConnectionProxy@11321cdc] will not be managed by Spring
Original SQL: SELECT id,user_name,sum,last_update_time FROM account WHERE id=? 
parser sql: SELECT id, user_name, sum, last_update_time FROM account WHERE id = ?
==>  Preparing: SELECT id, user_name, sum, last_update_time FROM account WHERE id = ? 
==> Parameters: 1(Integer)
<==    Columns: id, user_name, sum, last_update_time
<==        Row: 1, 1, 1, 2019-12-08 15:05:05
<==      Total: 1
Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@1ff5e152]
Creating a new SqlSession
SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@24d434f9] was not registered for synchronization because synchronization is not active
JDBC Connection [io.seata.rm.datasource.ConnectionProxy@2bbfadf0] will not be managed by Spring
Original SQL: UPDATE account  SET user_name=?,
sum=?,
last_update_time=?  WHERE id=?
parser sql: UPDATE account SET user_name = ?, sum = ?, last_update_time = ? WHERE id = ?
==>  Preparing: UPDATE account SET user_name = ?, sum = ?, last_update_time = ? WHERE id = ? 
==> Parameters: 1(String), 2(Integer), 2020-12-13T21:19:35.710(LocalDateTime), 1(Integer)
<==    Updates: 1
Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@24d434f9]
2020-12-13 21:20:37.238  INFO 10632 --- [ch_RMROLE_1_5_8] i.s.c.r.p.c.RmBranchRollbackProcessor    : rm handle branch rollback process:xid=169.254.51.32:8091:81496901952864256,branchId=81496905018900480,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/test_pay,applicationData=null
2020-12-13 21:20:37.239  INFO 10632 --- [ch_RMROLE_1_5_8] io.seata.rm.AbstractRMHandler            : Branch Rollbacking: 169.254.51.32:8091:81496901952864256 81496905018900480 jdbc:mysql://127.0.0.1:3306/test_pay
Sun Dec 13 21:20:37 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
2020-12-13 21:20:37.404  INFO 10632 --- [ch_RMROLE_1_5_8] i.s.r.d.undo.AbstractUndoLogManager      : xid 169.254.51.32:8091:81496901952864256 branch 81496905018900480, undo_log deleted with GlobalFinished
2020-12-13 21:20:37.406  INFO 10632 --- [ch_RMROLE_1_5_8] io.seata.rm.AbstractRMHandler            : Branch Rollbacked result: PhaseTwo_Rollbacked

(3)client服务日志如下:

2020-12-13 21:19:35.677  INFO 19848 --- [nio-8081-exec-6] i.seata.tm.api.DefaultGlobalTransaction  : Begin new global transaction [169.254.51.32:8091:81496901952864256]
2020-12-13 21:19:35.710  INFO 19848 --- [nio-8081-exec-6] icu.funkye.controller.TestController     : seata分布式事务Id:169.254.51.32:8091:81496901952864256

3.代码执行到这里,查看数据库

(1) 查看seataserver服务器的三个表数据:

mysql> select * from lock_tableG
*************************** 1. row ***************************
       row_key: jdbc:mysql://127.0.0.1:3306/test_pay^^^account^^^1
           xid: 169.254.51.32:8091:81498624360579072
transaction_id: 81498624360579072
     branch_id: 81498628714266625
   resource_id: jdbc:mysql://127.0.0.1:3306/test_pay
    table_name: account
            pk: 1
    gmt_create: 2020-12-13 21:26:27
  gmt_modified: 2020-12-13 21:26:27
*************************** 2. row ***************************
       row_key: jdbc:mysql://127.0.0.1:3306/test_storage^^^product^^^1
           xid: 169.254.51.32:8091:81498624360579072
transaction_id: 81498624360579072
     branch_id: 81498626721972225
   resource_id: jdbc:mysql://127.0.0.1:3306/test_storage
    table_name: product
            pk: 1
    gmt_create: 2020-12-13 21:26:26
  gmt_modified: 2020-12-13 21:26:26
2 rows in set (0.00 sec)

mysql> select * from branch_tableG
*************************** 1. row ***************************
        branch_id: 81498626721972225
              xid: 169.254.51.32:8091:81498624360579072
   transaction_id: 81498624360579072
resource_group_id: NULL
      resource_id: jdbc:mysql://127.0.0.1:3306/test_storage
      branch_type: AT
           status: 0
        client_id: product-service:169.254.51.32:56219
 application_data: NULL
       gmt_create: 2020-12-13 21:26:26.893741
     gmt_modified: 2020-12-13 21:26:26.893741
*************************** 2. row ***************************
        branch_id: 81498628714266625
              xid: 169.254.51.32:8091:81498624360579072
   transaction_id: 81498624360579072
resource_group_id: NULL
      resource_id: jdbc:mysql://127.0.0.1:3306/test_pay
      branch_type: AT
           status: 0
        client_id: account-service:169.254.51.32:56051
 application_data: NULL
       gmt_create: 2020-12-13 21:26:27.387070
     gmt_modified: 2020-12-13 21:26:27.387070
2 rows in set (0.07 sec)

mysql> select * from global_tableG
*************************** 1. row ***************************
                      xid: 169.254.51.32:8091:81498624360579072
           transaction_id: 81498624360579072
                   status: 1
           application_id: client
transaction_service_group: my_test_tx_group
         transaction_name: testRollback()
                  timeout: 60000
               begin_time: 1607865986218
         application_data: NULL
               gmt_create: 2020-12-13 21:26:26
             gmt_modified: 2020-12-13 21:26:26
1 row in set (0.12 sec)

可以看到lock_table 记录了行锁的信息,锁住了数据库某个表某条数据的ID信息。

branch_table 记录了当前Tx参与的会话分支。

global_table 记录了Tx信息。

(2)查看库存库中的产品表:发现库存数是8

(3) 查看库存库中的undo_log中数据如下

 可以看到第三条数据,记录branchId、xid、以及一个rollback_info与log_status(1是已经处理,2是未处理)。rollback_info信息如下:

{
    "@class": "io.seata.rm.datasource.undo.BranchUndoLog",
    "xid": "169.254.51.32:8091:81498624360579072",
    "branchId": 81498626721972225,
    "sqlUndoLogs": ["java.util.ArrayList", [{
        "@class": "io.seata.rm.datasource.undo.SQLUndoLog",
        "sqlType": "UPDATE",
        "tableName": "product",
        "beforeImage": {
            "@class": "io.seata.rm.datasource.sql.struct.TableRecords",
            "tableName": "product",
            "rows": ["java.util.ArrayList", [{
                "@class": "io.seata.rm.datasource.sql.struct.Row",
                "fields": ["java.util.ArrayList", [{
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "id",
                    "keyType": "PRIMARY_KEY",
                    "type": 4,
                    "value": 1
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "price",
                    "keyType": "NULL",
                    "type": 8,
                    "value": 5.0
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "stock",
                    "keyType": "NULL",
                    "type": 4,
                    "value": 9
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "last_update_time",
                    "keyType": "NULL",
                    "type": 93,
                    "value": ["java.sql.Timestamp", [1575669061000, 0]]
                }]]
            }]]
        },
        "afterImage": {
            "@class": "io.seata.rm.datasource.sql.struct.TableRecords",
            "tableName": "product",
            "rows": ["java.util.ArrayList", [{
                "@class": "io.seata.rm.datasource.sql.struct.Row",
                "fields": ["java.util.ArrayList", [{
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "id",
                    "keyType": "PRIMARY_KEY",
                    "type": 4,
                    "value": 1
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "price",
                    "keyType": "NULL",
                    "type": 8,
                    "value": 5.0
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "stock",
                    "keyType": "NULL",
                    "type": 4,
                    "value": 8
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "last_update_time",
                    "keyType": "NULL",
                    "type": 93,
                    "value": ["java.sql.Timestamp", [1575640261000, 0]]
                }]]
            }]]
        }
    }]]
}

4. 放开断点让程序报错,查看产品库存:发现仍然是9,证明分布式事务确实回滚。

也可以查看业务日志:如下是库存服务的日志(xid、branchid与上面不一致是多次试验截得)

2020-12-13 21:39:11.182  INFO 14932 --- [ch_RMROLE_1_4_8] i.s.c.r.p.c.RmBranchRollbackProcessor    : rm handle branch rollback process:xid=169.254.51.32:8091:81501788107309056,branchId=81501789592092673,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/test_storage,applicationData=null
2020-12-13 21:39:11.182  INFO 14932 --- [ch_RMROLE_1_4_8] io.seata.rm.AbstractRMHandler            : Branch Rollbacking: 169.254.51.32:8091:81501788107309056 81501789592092673 jdbc:mysql://127.0.0.1:3306/test_storage
2020-12-13 21:39:11.408  INFO 14932 --- [ch_RMROLE_1_4_8] i.s.r.d.undo.AbstractUndoLogManager      : xid 169.254.51.32:8091:81501788107309056 branch 81501789592092673, undo_log deleted with GlobalFinished
2020-12-13 21:39:11.410  INFO 14932 --- [ch_RMROLE_1_4_8] io.seata.rm.AbstractRMHandler            : Branch Rollbacked result: PhaseTwo_Rollbacked

 5. Seata过程简单总结

 1. 五步骤再次理解

1.TM 开启分布式事务(TM 向 TC 注册全局事务记录);
2.按业务场景,编排数据库、服务等事务内资源(RM 向 TC 汇报资源准备状态 );
3.TM 结束分布式事务,事务一阶段结束(TM 通知 TC 提交/回滚分布式事务);
4.TC 汇总事务信息,决定分布式事务是提交还是回滚;
5.TC 通知所有 RM 提交/回滚 资源,事务二阶段结束;

 TM可以理解为加了GlobalTransactional 注解的方法,会向TC申请一个全局事务ID(XID),TC就是seataserver。RM就是资源管理器,可以理解为一个数据库连接就是一个RM,一个业务库对应一个RM。

2. 上面其实是 Seata的AT模式,可以理解为无代码侵入,不需自己编写回滚以及其他方法,seata自己控制。官方对AT模式解释如下:提供无侵入自动补偿的事务模式,目前已支持 MySQL、 Oracle 、PostgreSQL和 TiDB的AT模式。

3.AT模式整体机制如下:

两阶段提交协议的演变:

一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。

二阶段:

  提交异步化,非常快速地完成。
  回滚通过一阶段的回滚日志进行反向补偿。

解释:

第一阶段seata会拦截业务SQL(使用seata代理数据源进行处理,也就是对数据源做手脚):

(1)解析业务SQL将要更新的数据,记录其更新前的值,"BeforeImage"

(2)执行业务SQL,进行更新

(3)记录更新后的值,"AfterImage",最后生成行锁。

  上面的操作在一个事务内操作,可以保证原子性。可以通过上面业务库的undo_log日志表的rollback_info 字段进行查看,说白了就是记录一下更新前、更新后的值;如果报错需要回滚将数据修改为更新前的值,如果正常提交删掉undo_log中的记录即可。

第二阶段:如果报错就回滚,否则提交。

提交》如果正常操作进行提交事务。Seata框架只需要将一阶段生成的快照和行锁删掉即可,完成数据清理。

回滚》回滚时,使用"before image"进行还原数据,但是还原之前要校验脏写,对比"数据库当前数据"和"after image",如果两份数据一致证明没有脏写,可以还原;如果不一致,证明有脏写,这时候就需要人工处理(可以根据undo_log中的记录进行处理)。

 seataserver安装文件以及上面测试代码参考:https://gitee.com/Qiao-Zhi/seata-test

分布式事务 Seata 及其三种模式详解:  http://seata.io/zh-cn/blog/seata-at-tcc-saga.html

原文地址:https://www.cnblogs.com/qlqwjy/p/13909056.html