学习自叶大blog
https://www.cnblogs.com/yjf512/p/12651310.html
例子和解释都很详细,不再改动,仅作笔记。
package main import ( "database/sql" "fmt" "log" "strconv" "time" _ "github.com/go-sql-driver/mysql" "github.com/pkg/errors" ) func main() { var err error // db1的连接 db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1") if err != nil { panic(err.Error()) } defer db1.Close() // db2的连接 db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2") if err != nil { panic(err.Error()) } defer db2.Close() // 开始前显示 var score int db1.QueryRow("select score from user where id = 1").Scan(&score) fmt.Println("user1 score:", score) var money float64 db2.QueryRow("select money from wallet where id = 1").Scan(&money) fmt.Println("wallet1 money:", money) // 生成xid xid := strconv.FormatInt(time.Now().Unix(), 10) fmt.Println("=== xid:" + xid + " ====") defer func() { if err := recover(); err != nil { fmt.Printf("%+v ", err) fmt.Println("=== call rollback ====") db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid)) db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid)) } db1.QueryRow("select score from user where id = 1").Scan(&score) fmt.Println("user1 score:", score) db2.QueryRow("select money from wallet where id = 1").Scan(&money) fmt.Println("wallet1 money:", money) }() // XA 启动 fmt.Println("=== call start ====") if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil { panic(errors.WithStack(err)) } if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil { panic(errors.WithStack(err)) } // DML操作 if _, err = db1.Exec("update user set score=score+2 where id =1"); err != nil { panic(errors.WithStack(err)) } if _, err = db2.Exec("update wallet set money=money+1.2 where id=1"); err != nil { panic(errors.WithStack(err)) } // XA end fmt.Println("=== call end ====") if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil { panic(errors.WithStack(err)) } if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil { panic(errors.WithStack(err)) } // prepare fmt.Println("=== call prepare ====") if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil { panic(errors.WithStack(err)) } // panic(errors.New("db2 prepare error")) if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil { panic(errors.WithStack(err)) } // commit fmt.Println("=== call commit ====") if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil { // TODO: 尝试重新提交COMMIT // TODO: 如果还失败,记录xid,进入数据恢复逻辑,等待数据库恢复重新提交 log.Println("xid:" + xid) } // panic(errors.New("db2 commit error")) if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil { log.Println("xid:" + xid) } db1.QueryRow("select score from user where id = 1").Scan(&score) fmt.Println("user1 score:", score) db2.QueryRow("select money from wallet where id = 1").Scan(&money) fmt.Println("wallet1 money:", money) }
当出现特定的log,我们可以在MySQL终端执行
xa recover
这样可以看到一些失败的commit,所以我们根据情况可以手动commit
xa commit '1585644880'
业务中尽量不要用分布式存储,尽可能的用队列来替代,因为这个事务一致性前者太难保障。
end