MySQL两阶段提交

学习自叶大blog

https://www.cnblogs.com/yjf512/p/12651310.html

例子和解释都很详细,不再改动,仅作笔记。

package main

import (
     "database/sql"
     "fmt"
     "log"
     "strconv"
     "time"

     _ "github.com/go-sql-driver/mysql"
     "github.com/pkg/errors"
)

func main() {
     var err error

     // db1的连接
     db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1")
     if err != nil {
          panic(err.Error())
     }
     defer db1.Close()

     // db2的连接
     db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2")
     if err != nil {
          panic(err.Error())
     }
     defer db2.Close()

     // 开始前显示
     var score int
     db1.QueryRow("select score from user where id = 1").Scan(&score)
     fmt.Println("user1 score:", score)
     var money float64
     db2.QueryRow("select money from wallet where id = 1").Scan(&money)
     fmt.Println("wallet1 money:", money)

     // 生成xid
     xid := strconv.FormatInt(time.Now().Unix(), 10)
     fmt.Println("=== xid:" + xid + " ====")
     defer func() {
          if err := recover(); err != nil {
               fmt.Printf("%+v
", err)
               fmt.Println("=== call rollback ====")
               db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
               db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
          }

          db1.QueryRow("select score from user where id = 1").Scan(&score)
          fmt.Println("user1 score:", score)
          db2.QueryRow("select money from wallet where id = 1").Scan(&money)
          fmt.Println("wallet1 money:", money)
     }()

     // XA 启动
     fmt.Println("=== call start ====")
     if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }
     if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }

     // DML操作
     if _, err = db1.Exec("update user set score=score+2 where id =1"); err != nil {
          panic(errors.WithStack(err))
     }
     if _, err = db2.Exec("update wallet set money=money+1.2 where id=1"); err != nil {
          panic(errors.WithStack(err))
     }

     // XA end
     fmt.Println("=== call end ====")
     if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }
     if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }

     // prepare
     fmt.Println("=== call prepare ====")
     if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }
     // panic(errors.New("db2 prepare error"))
     if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }

     // commit
     fmt.Println("=== call commit ====")
     if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
          // TODO: 尝试重新提交COMMIT
          // TODO: 如果还失败,记录xid,进入数据恢复逻辑,等待数据库恢复重新提交
          log.Println("xid:" + xid)
     }
     // panic(errors.New("db2 commit error"))
     if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
          log.Println("xid:" + xid)
     }

     db1.QueryRow("select score from user where id = 1").Scan(&score)
     fmt.Println("user1 score:", score)
     db2.QueryRow("select money from wallet where id = 1").Scan(&money)
     fmt.Println("wallet1 money:", money)
}

当出现特定的log,我们可以在MySQL终端执行

xa recover

这样可以看到一些失败的commit,所以我们根据情况可以手动commit

xa commit '1585644880'

业务中尽量不要用分布式存储,尽可能的用队列来替代,因为这个事务一致性前者太难保障。

end

一个没有高级趣味的人。 email:hushui502@gmail.com
原文地址:https://www.cnblogs.com/CherryTab/p/12782358.html