mongo数据库的事务操作

一:mongo数据库的版本在4.0以上

二:以pymongo为例

from pymongo import MongoClient
class MongoTest(object):

    def __init__(self):
        self.conn = MongoClient(host='xxx', port=xxx)

    def transaction_test_normal(self):
        """正常给两条数据库插入两条数据"""
        # self.conn["test1"]["t1"] ql_test1指的是数据库名字 test1指的是集合名字
        t1 = self.conn["ql_test"]["test1"]
        t2 = self.conn["ql_test"]["test2"]
        # 1. pymongo的client开启事务
        with self.conn.start_session(causal_consistency=True) as session:
            with session.start_transaction():
                ressult1 = t1.insert_one(document={"name":"beijing"},session=session)
                ressult2 = t2.insert_one(document={"name":"shanghai"},session=session)
                # 返回两条插入数据的_id的值
                print(ressult1.inserted_id,ressult2.inserted_id)


    def transaction_test_abort(self):
        """事务删除两条数据,引发异常后,两条数据都未发生改变"""
        t1 = self.conn["ql_test"]["test1"]
        t2 = self.conn["ql_test"]["test2"]
        with self.conn.start_session(causal_consistency=True) as session:
            with session.start_transaction():
                ressult1 = t1.delete_one({"name":"beijing"},session=session)
                print(1/0)
                ressult2 = t2.delete_one({"name":"shanghai"},session=session)
                print(ressult1.deleted_count,ressult2.deleted_count)

    def test1(self):
        """插入一条数据成功后,删除一条数据,故意抛出异常,看插入是否成功"""
        t1 = self.conn["ql_test"]["test1"]
        t2 = self.conn["ql_test"]["test2"]
        with self.conn.start_session(causal_consistency=True) as session:
            with session.start_transaction():
                # xian未插入成功
                ressult1 = t1.insert_one({"name": "xian"}, session=session)
                print(1 / 0)
                ressult2 = t2.delete_one({"name": "guandong"}, session=session)
                print(ressult1.inserted_id, ressult2.deleted_count)

    def test2(self):
        """
        更新第一条数据,更新第二条数据,故意抛出异常,看是否更新成功---
        1. 不能使用update方法了,必须使用find_one_and_update
        2. find_one_and_update也有upsert=True选项,不存在就执行插入操作,存在就不做任何操作
        """

        t1 = self.conn["ql_test"]["test1"]
        t2 = self.conn["ql_test"]["test2"]
        with self.conn.start_session(causal_consistency=True) as session:
            with session.start_transaction():
                #
                r1 = t1.find_one_and_update({"name":"xian666"},{"$set":{"name":"xian"}},session=session)
                print(1 / 0)
                r2 = t2.find_one_and_update({"name":"chongqing"},{"$set":{"name":"chongqing666"}},session=session)
                print(r1,r2)



m = MongoTest()
m.test2()

# TODO

原文地址:https://www.cnblogs.com/meloncodezhang/p/13669758.html