SQLAlchemy 嵌套事务的解决方案

 sqlachemy 是python的orm框架,在使用一段时间后,我们通常会出现事务嵌套的情况,看到很多人写代码的时候,居然是session到处传递,这无疑是加大了代码之间的耦合度。
    案例:
    def save(session):
        # TODO

    def update(session):
        # TODO

    def service():
        session = getSession();
        try:
            save(session);
            update(session);
            session.commit();
        except Exception as e:
            session.rollback();
        finally:
            if not session:
                session.close();

    假设save和update是同一个事务,但是上述的实践缺强制了save和update的 session相耦合来达成,好的实践应该是:save和update无任何关系,只是在实现业务逻辑时,组合到一个事务,确保事务性即可,即:

    def service():
        try:
            save();
            update();
        except Exception as e:
            # TODO 
    因此如何解决这个问题是本文需要阐述的主题。

解决方案是:

ACID是事务的四个基本特征,通常我们的理解事务是一个操作单元,要么一起成功要么一起失败(原子性);通过一个例子来直接说明如何解决的。
    场景:注册用户(添加一个用户,会未用户送10个积分)

创建表

create table user(
        id int not null auto_increment,
        name varchar(255) not null default '' comment '用户名',
        created_at datetime not null default current_timestamp comment '创建时间',
        updated_at datetime not null default current_timestamp comment '更新时间',
        primary key (`id`)
    )engine innodb charset=utf8 comment '用户表';  

    create table user_credits(
        id int not null auto_increment,
        user_id int not null default 0 comment '用户ID',
        user_name varchar(255) not null default '' comment '用户名',
        score int not null default 0 comment '积分',
        created_at datetime not null default current_timestamp comment '创建时间',
        updated_at datetime not null default current_timestamp comment '更新时间',
        primary key (`id`),
        unique key `uk_user_id` (`user_id`)
    )engine innodb charset=utf8 comment '用户积分表';

sqlalchemy 实现

# -*- coding:utf-8 -*-

import datetime
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session

engine = create_engine("mysql+pymysql://root:root@localhost:3306/csdn", echo=True)

# 必须使用scoped_session,域session可以将session进行共享
DBSession = scoped_session(sessionmaker(bind=engine))

BaseModel = declarative_base()


# ----------- Relation Model Object---------------- #

class User(BaseModel):

    __tablename__ = "user"

    id = Column(Integer, primary_key=True)
    name = Column(String)
    created_at = Column(DateTime, default=datetime.datetime.now)
    updated_at = Column(DateTime, default=datetime.datetime.now)


class UserCredits(BaseModel):

    __tablename__ = "user_credits"

    id = Column(Integer, primary_key=True)
    user_id = Column(Integer)
    user_name = Column(String)
    score = Column(Integer)
    created_at = Column(DateTime, default=datetime.datetime.now)
    updated_at = Column(DateTime, default=datetime.datetime.now)

# ----------- Service implements---------------- #


def add_user(user):
    " 添加用户 "
    session = DBSession()
    try:
        session.add(user)
        session.commit()
    except Exception as e:
        session.rollback()
        print("AddUser: ======={}=======".format(e))
    finally:
        if not session:
            session.close()


def add_user_credits(userCredits, interrupt=True):
    " 添加用户积分记录 "
    session = DBSession()
    try:
        if interrupt:
            raise Exception("--- interrupt ---")

        session.add(userCredits)
        session.commit()
    except Exception as e:
        session.rollback()
        print("AddUserCredits: ======={}=======".format(e))
    finally:
        if not session:
            session.close()


def regist_user():

    session = DBSession()
    try:
        # 开启子事务
        session.begin(subtransactions=True)

        # TODO Service
        user = User(name='wangzhiping')
        add_user(user)
        add_user_credits(UserCredits(
            user_id=user.id,
            user_name=user.name,
            score=10
        ), False)

        session.commit()
    except Exception as e:
        session.rollback()
        print("AddUserCredits: ======={}=======".format(e))
    finally:
        if not session:
            session.close()

# ---------- exec -----------
regist_user()
 1,设置session时,需要指定为scoped_session,目的是session可以共享(ThreadLocal);
    2,session.begin(subtransactions=True) 开启子事务管理;
    这是实际上regist_user是在同一个线程中的session,这是add_user,add_user_credits实际上session是同一个,所以可以实现。其实这个可以更进一步扩展,把事务隔离级别,传播属性,这里不做介绍

---------------------
作者:紫守笨 
来源:CSDN 
原文:https://blog.csdn.net/program_red/article/details/55194130?utm_source=copy 
版权声明:本文为博主原创文章,转载请附上博文链接!

转载:https://blog.csdn.net/program_red/article/details/55194130

原文地址:https://www.cnblogs.com/1a2a/p/9766548.html