python 模块 SQLalchemy

 SQLalchemy 概述:

# &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
# SQLAlchemy ORM框架(需要依赖数据库API语言)

# 架构图
# SQLalchemy orm
# SQLalchemy core
#       --> schema/types 架构/类型
#       --> sql expression language SQL表达式语言
#       --> engine(框架引擎)
#           --> connection pooling (数据库连接池)
#           --> diaiect (选择连接数据路的DB api种类)
# DBAPI(pymysql,oracle,sqlite)


执行流程:
    类/对箱操作 -> SQL -> pymsql、mysqldb -> 在去数据中执行

连接数据库格式:

连接数据库:
    MYSQL - python
    mysql+mysqldb://<user>:<password>@<host>:<port>/<dbname>
    
    pymysql
    mysql+pymsql://<username>:<password>@<host>/<dbname>
    示例:
    "mysql+pymysql://root:123456@127.0.0.1:3306/t1?charset=utf8"
   
    cx_Oracle
    oracle+cx_oracle://user:pwd@host:port/dbname 

线程池/单例方式

from sqlalchemy import create_engine
from sqlalchemy.orm import Session,scoped_session,sessionmaker
# 创建连接
# engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/anec?charset=utf8")

# # 这种方式自动从连接池中获取连接(方式一)
sess = sessionmaker(bind=engine)
s = scoped_session(sess)

# 单例方式(方式二)
s = Session(bind=engine)

创建数据表  常用数据类型

from datetime import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import (create_engine, Column, Integer, String, SmallInteger, DateTime,Float,DECIMAL,Boolean,Enum,Date,Time,Text)
from sqlalchemy.dialects.mysql import LONGTEXT

engine = create_engine("mysql+pymysql://root:QWer1234!!@172.16.3.108:3306/cmdb?charset=utf8")
Base = declarative_base()

class Human2(Base):
    __tablename__ = "human2"
    id = Column("id", Integer, autoincrement=True, primary_key=True)    
   # mysql 为 int(11) name = Column("name", String(20), nullable=False, unique=True)
   # mysql 为 varchar(20) age = Column("age", Integer, nullable=False) sex = Column("sex", SmallInteger, default=1) price = Column('price',Float,nullable=True)
   # mysql 为 float 保留3位小数 price2 = Column(DECIMAL(7, 3))
   # float精度类型 mysql 为 decimal(7,3) delete = Column(Boolean)
   # mysql 为 tinyint(1) 值为 1/0 sex2 = Column(Enum("男", "女"))
   # mysql 为 enum("男", "女") create_time1 = Column("create_time", DateTime, default=datetime.now)
   # mysql 为 datetime() create_time2 = Column(Date)
   # 只能存储指定的年月日 mysql 为 date create_time3 = Column(Time)
   # 只能存储指定的时间 mysql 为 time content = Column(Text)
   # mysql 为 text content2 = Column(LONGTEXT)
   # 注意从sqlalchemy.dialects.mysql 导入,longtext
def __repr__(self): return "name {}".format(self.name) # 执行在数据库中创建表 Base.metadata.create_all(bind=engine)

四种外键约束

from sqlalchemy import ForeignKey

四种外键约束:
1.第一种:RESTRICT(默认就是这种。当父表数据被删除,从表会拒绝删除)
语法:uid = Column(Integer , ForeignKey("user.id" ,ondelete="RESTRICT")) 

2.第二种:NO ACTIION(同RESTRICT 一样)
语法: uid = Column(Integer , ForeignKey("user.id" ,ondelete="NO ACTION")) 

3.第三种:CASCADE (父表数据删除、从表数据也会跟着删除)
语法:uid = Column(Integer , ForeignKey("user.id" ,ondelete="CASCADE"))

4.第四种: SET NULL (父表数据删除,从表外键字段设为NULL)
语法:uid = Column(Integer , ForeignKey("user.id" ,ondelete="SET NULL"))
注意: 如果uid字段设置了 nullable=False , 再设置 ondelete = "SET NULL",pycharm运行程序则会报错
View Code

经典样式 创建数据表

# 经典样式创建表
from sqlalchemy import (Table, MetaData, create_engine,Column, Integer, String, SmallInteger, DateTime)
from datetime import datetime
from sqlalchemy.orm import mapper

engine = create_engine("mysql+pymysql://root:QWer1234!!@172.16.3.108:3306/cmdb?charset=utf8")
metadata = MetaData()

# 创建 user 表
user = Table("user", metadata,
        Column("id", Integer, nullable=False, primary_key=True, autoincrement=True),
        Column("username", String(20), nullable=False),
        Column("age", Integer, nullable=False),
        Column("sex", SmallInteger, default=1),
        Column("create_time", DateTime, default=datetime.now)
    )

# model
class User(object):
    def __init__(self, username=None, age=None, sex=None):
        if username:
            self.username = username
        if age:
            self.age =age
        if sex:
            self.sex =sex

# table与model映射
mapper(User, user)

# 执行在数据库中创建表
metadata.create_all(bind=engine)

经典样式 操作表

from sqlalchemy.orm import  sessionmaker
from sqlalchemy  import create_engine


engine = create_engine("mysql+pymysql://root:QWer1234!!@172.16.3.108:3306/cmdb?charset=utf8")
Session = sessionmaker(bind=engine)
s = Session()

# 示例话 创建的表类(当前没有的话,需要导入创建表)
from 导入经典样式的表类 import User
try:
    user = User("rose", 20, 0)
    s.add(user)
    s.commit()
except Exception as e:
    s.rollback()

ORM样式 创建数据表

# orm样式创建表
from datetime import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import (create_engine, Column, Integer, String, SmallInteger, DateTime)
from sqlalchemy.orm import Session

engine = create_engine("mysql+pymysql://root:QWer1234!!@172.16.3.108:3306/cmdb?charset=utf8")
Base = declarative_base()

class Human(Base):
    __tablename__ = "human"
    id = Column("id", Integer, autoincrement=True, primary_key=True)
    name = Column("name", String(20), nullable=False, unique=True)
    age = Column("age", Integer, nullable=False)
    sex = Column("sex", SmallInteger, default=1)
    create_time = Column("create_time", DateTime, default=datetime.now)

    def __repr__(self):
        return "name {}".format(self.name)

# 执行在数据库中创建表
Base.metadata.create_all(bind=engine)
Base.metadata.drop_all(engine)

s = Session(bind=engine)

# 实例化创建表
h = Human(name="king002", age=30, sex=1)

# 添加数据至human表中
s.add(h)
s.commit()

relationship关联创建表

from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, ForeignKey,String,DateTime
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base
import datetime


engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/anec?charset=utf8")

Base = declarative_base()
class Parent(Base):
    __tablename__ = 'parent'
    id = Column(Integer, primary_key=True,autoincrement=True)
    name = Column(String(64))
    ctime = Column(DateTime,default=datetime.datetime.now)
    children = relationship("Child")

    def __repr__(self):
        return "name {}".format(self.name)
class Child(Base):
    __tablename__ = 'child'
    id = Column(Integer, primary_key=True,autoincrement=True)
    name = Column(String(64))
    parent_id = Column(Integer, ForeignKey('parent.id'))

    def __repr__(self):
        return "name {}".format(self.name)

# Base.metadata.create_all(engine)
# Base.metadata.drop_all(engine)
View Code

数据库中已存在数据表,反射连接数据库中的表

方式一:

from sqlalchemy import create_engine
from sqlalchemy import MetaData
from sqlalchemy import select

# 连接数据库
engine = create_engine("mysql+pymysql://root:QWer1234!!@172.16.3.108:3306/cmdb?charset=utf8", pool_size=5)

# 从连接的数据库池中获取一个连接
conn = engine.connect()

# 实例化一个 表的类
metadata = MetaData()

# 指定反射与连接数据库绑定
metadata.reflect(bind=engine)

# 获取数据库中的表对象:(这里表名是 t1 )
# 方式一:
t1 = metadata.tables.get('t1')

# 方式二:
from sqlalchemy import Table
t2 = Table('t1', metadata, autoload=True, autoload_with=engine)

# 连接执行查询,返回查询内容( t1.c.字段名 )
info = conn.execute(select([t1.c.id,t1.c.name]).limit(3)).fetchall()
print(info)

cc = select([t1.c.id,t1.c.name]).where(t1.c.id>8).limit(3).offset(0)
info2 = conn.execute(cc).fetchall()
print(info2)

# cc = select([t1.c.it, func.count(t1.c.id), func.sum(t1.c.age)]).group_by(human.c.sex)
# info2 = conn.execute(cc).fetchall()
# print(info2)

# 连表查询
# i = requirement.join(project, project.c.id==requirement.c.prj_id).join(product, product.c.id==project.c.prod_id)
# s = select([project.c.prj_name.label("prj_name"), product.c.prod_name.label("prod_name"), requirement.c.req_name.label("req_name")]).select_from(i)
# res = conn.execute(s).fetchall()

# 插入数据:
#   插入字典数据:
tt1 = t1.insert()
conn.execute(tt1,[{'name':'con6'},{'name':'con5'}])
conn.close()

# 指定字段插入:
tt2 = t1.insert().values(name='con6')
conn.execute(tt2)
conn.close()

# 修改数据:
tt3 = t1.update().values(name='ccccc').where(t1.c.id == 1)
conn.execute(tt3)
conn.close()

# 删除数据:
tt4 = t1.delete().where(t1.c.id == 6)
conn.execute(tt4)
conn.close()

方式二:

# ORM样式反射存在的表 返回sqlalchemy  sqlalchemy.ext.automap 对象:
from sqlalchemy.ext.automap import automap_base
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from datetime import datetime

engine = create_engine("mysql+pymysql://root:QWer1234!!@172.16.3.108:3306/cmdb?charset=utf8")

Base = automap_base()
Base.prepare(engine, reflect=True)

# 反射得到orm
Human = Base.classes.t1

# 通信
session = Session(bind=engine)

# 插入数据:
# h = Human(name="vcr")
# session.add(h)
# # session.add_all(h)
# session.commit()

# # 修改数据
# h_obj = session.query(Human).filter_by(name="vcr").first()
# h_obj.name = "aaaaaaaaaaaaaaaaaaa"
# session.add(h_obj)
# session.commit()

# # 删除数据
# h_obj = session.query(Human).filter_by(name="aaaaaaaaaaaaaaaaaaa").first()
# session.delete(h_obj)
# session.commit()

# 查询数据(为sqlalchemy.ext.automap 对象)
# res = session.query(Human).filter(Human.id > 7)
# for i in res:
#     print(i.name)

数据操作:

######################################## 查
# 查询所有数据 [ obj,obj2,.... ]
# info = s.query(Parent).all()
# for i in info:
#     print(i.name)

# # 参数查询
# info = s.query(Parent).filter_by(id=2).
# for i in info:
#     print(i.id)
#     print(i.name)
#     print(i.ctime)

# 语句查询
# from sqlalchemy import text
# info = s.query(Parent).from_statement(text("select * from Parent where id = 2")).all()
# print(info)

# 条件查询
# info = s.query(Parent).filter(Parent.id >2)
# for i in info:
#     print(i.name)
#     print(i.ctime)
# ret = s.query(Users).filter_by(name='alex').all()
# ret = s.query(Users).filter(Users.id > 1,Users.name == 'alex').all()
# ret = s.query(Users).filter(Users.id.between(1,3),Users.name == 'alex').all()
# ret = s.query(Users).filter(Users.id.in_([1,3,4])).all()
# ret = s.query(Users).filter(~Users.id.in_([1,3,4])).all()
# ret = s.query(Users).filter(Users.id.in_(s.query(Users.id).filter_by(name='alex')))
#
# from sqlalchemy import and_,or_
#
# ret = s.query(Users).filter(and_(Users.id > 3,User.name == 'alex')).all()
# ret = s.query(Users).filter(or_(Users.id > 3,User.name == 'alex')).all()
# ret = s.query(Users).filter(
#     or_(
#         Users.id > 3,
#         and_(User.name == 'alex',Users.id>3),
#         Users.extra != ''
#     )).all()

# # 通配符
# ret= s.query(User).filter(User.name.like('e%')).all()
# ret= s.query(User).filter(~User.name.like('e%')).all()

# 限制
# ret = s.query(Users)[1:3]

# # 排序
# ret = s.query(Users).order_by(Users.name.desc()).all()
# ret = s.query(Users).order_by(Users.name.desc(),User.id.asc()).all()

# # 分组
# # from sqlalchemy.sql import func
# ret = s.query(Users).group_by(Users.extra).all()
# ret = s.query(
#     func.max(Users.id),
#     func.sum(Users.id),
#     func.min(Users.id)).group_by(Users.name).all()
# ret = s.query(
#     func.max(Users.id),
#     func.sum(Users.id),
#     func.min(Users.id)).group_by(Users.name).having(func.min(Users.id)>2).all()

# # 连表
# ret = s.query(Users,Favor).filter(Users.id == Favor.user_id).all()
# ret = s.query(Users).join(Favor).all()
# ret = s.query(Users).join(Favor,isouter=True).all()

# # 组合
# q1 = s.query(Users.name).filter(Users.id > 2)
# q2 = s.query(Favor.caption).filter(Favor.nid < 2)
# ret = q1.union(q2).all()
#
# ret = q1.union_all(q2).all()

######################################## 增
# # 添加数据至human表中
# s.add(h)        # 数据格式为 {}
# s.add_all(h)   数据格式为:  [{},{}...]
# s.commit()

# ----------------------------------
    # sqlalchemy 连表,一对多连表查询
    # 方式一(连表查询方式)
    # s.query(表1,表2).join(表2,isouter=True).all()

    # 方式二,在创建表的时候加入关联字段
    #   外键所在的表增加如下一个字段:
    #   字段名 = relationship("外连表",backref='pers(反向字段)')
    #   backref='pers' 反向关联

    # 添加以后查询方式:
    # info = s.query(表1).all()
    # for i in info:
    #     print(表1.字段,表1.关联字段名.表2)

    # # sqlalchemy跨表增加
    # 方式一
    # hb = 表1(参数)
    # hb.pers = [表2(参数),表2(参数)]
    # s.add(hb)
    # s.commit()

    # 方式二
    # hb = 表1(name=参数,关联字段=表2(参数))
    # s.add(hb)
    # s.commit()

# ----------------------------------
    # sqlalchemy 连表,多对多连表查询
    # 男孩表
    # 男女孩约会表
    # 女孩表

    # 添加查询字段:
    # 在男孩表中添加:
    # girls = relationship('女孩表',secondary='男女孩约会表',backref='男孩表')
    # 要在女孩表中也可以添加
    # 或者在约会表中也可以添加两个字段

######################################## 删
# 删除
# s.query(Parent).filter(Parent.id == 4).delete()
# s.commit()

######################################## 修
# 修改
# s.query(Parent).filter(Parent.id == 3).update({'name':'aaaaaaaa'})
# s.query(Parent).filter(Parent.id == 3).update({Parent.name:Parent.name +'傻逼'},synchronize_session=False)  # 字符互操作
# s.query(Parent).filter(Parent.id == 3).update({Parent.age:Parent.name + 8 },synchronize_session='evaluate')  # 数字互操作
# s.commit()

数据库反向生成sqlalchemy ORM

# 数据表反向生成sqlalchemy ORM:
# 依赖sqlacodegen 模块
# 应用:
# sqlacodegen --tables student(表名) --outfile ./a.py(输出的文件名) mysql+pymysql://数据库用户名:密码@地址/数据库?charset=utf8
# windows 环境下cmd窗口 执行 即可.(注意输出的文件)

原生sql语句执行:

 # sqlalchemy 原生sql语句应用:

    # 1.用text包含sql语句,也可以传递参数至sql语句中
    # from sqlalchemy import text
    # result = db.execute(text('select * from table where id < :id and typeName=:type'), {'id': 2, 'type': 'USER_TABLE'})

    # 2.如果不指定parameter的类型, 默认为字符串类型;
    # 如果要传日期参数, 需要使用text()
    # 的bindparams参数来声明
    # from sqlalchemy import DateTime
    # date_param = datetime.today() + timedelta(days=-1 * 10)
    # sql = "delete from caw_job_alarm_log  where alarm_time<:alarm_time_param"
    # t = text(sql, bindparams=[bindparam('alarm_time_param', type_=DateTime, required=True)])
    # db.execute(t, {"alarm_time_param": date_param})

    # 参数bindparam可以使用type_来指定参数的类型, 也可以使用 initial 值来指定参数类型
    #     bindparam('alarm_time_param', type_=DateTime) #直接指定参数类型
    #     bindparam('alarm_time_param', DateTime()) #使用初始值指定参数类型

    # 3.如要转换查询的结果中的数据类型, 可以通过text()
    # 的参数typemap参数指定.这点比mybatis还灵活,
    # t = text("SELECT id, name FROM users",
    #          typemap={
    #              'id': Integer,
    #              'name': Unicode
    #          }
    #          )

    # result = db.session.execute('select * from alist limit 5;')
    # print(result.fetchall())
    # print(result.fetchmany(5))
    # print(result.fetchone())
    # print(result.scalar())              # 返回数据条数
    # print(result.returns_rows)          # 判断是否有数据
    # print(result.rowcount)              # 更新/删除影响的行数

    #try:
    #   添加/删除/修改
    #   result.commit
    #ecept:
    #   result.rollback() 回滚数据

    # # sql 语句添加 也需要commit
    # tex = db.session.execute('insert into alist(name) values("bbbbbbbbb")')
    # db.session.commit()

flask-sqlchemy 连表增加方式

   # 表结构:
    # User       relationship表 字段名 article
    # Article    外键表 外键名 user_id

    # 通过relationship 添加数据 方式一
    # a = Article(title='bbbbbb', content='1dadasdad3')
    # u = User(email='4444444', password='123123123',article=[a])
    # db.session.add(u)
    # db.session.commit()

    # 通过relationship 添加数据 方式二
    # a = Article(title='bbbbbb', content='1dadasdad3')
    # u = User(email='4444444', password='123123123')
    # u.article.append(a)
    # db.session.add(u)
    # db.session.commit()

    # 上面方式的变种
    # u={
    #     'email':'eeeeee',
    #     'password':'qwdqdwqdqd'
    # }
    # a={
    #     'title':'qqq',
    #     'content':'qqq'
    # }
    # db.session.add(User(**u,article=[Article(**a)]))
    # db.session.commit()

flask-sqlchemy 删除操作

    # # 方式一
    # # db.session.query(传入表明),filter(筛选条件删除).delete()
    # db.session.query(User).filter(User.id == 7).delete()
    # db.session.commit()

    # # 方式二
    # u = User.query.filter(User.id==11).first()
    # db.session.delete(u)
    # db.session.commit()

flask-sqlchemy 修改操作

# 方式一:
    # u = User.query.filter(User.id == 10).first()
    # u.email = 'llllllllll'
    # u.password = 'qweqweqwe'
    # db.session.commit()

    aaa = {
        'email':'8989898989898',
        'password':'9999999999'
    }
    # 方式二:
    # u = User.query.filter(User.id == 10).update(aaa)
    # db.session.commit()

    # 方式三
    db.session.query(User).filter(User.id==10).update(aaa)
    db.session.commit()

简化开发过程

1.用Navicat Premium 模型设计数据,并将模型转为sql执行文件,并生成数据库

2.根据sqlacodegen 逆向模块 反向生成sqlalchemy 的ClassBean文件

UserBean文件

# coding: utf-8
from sqlalchemy import Column, DateTime, String
from datetime import datetime
from sqlalchemy.dialects.mysql import INTEGER, SMALLINT
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()
metadata = Base.metadata


class User(Base):
    __tablename__ = 'user'

    id = Column(INTEGER(11), primary_key=True)
    username = Column(String(20), nullable=False)
    age = Column(INTEGER(11), nullable=False)
    sex = Column(SMALLINT(6))
    create_time = Column(DateTime, default=datetime.now())

# 先设计数据库,然后根据以下方式 输入当前的UserBean
# sqlacodegen --tables student(表名) --outfile ./UserBean.py(输出的文件名) mysql+pymysql://数据库用户名:密码@地址/数据库?charset=utf8
# sqlacodegen --tables user --outfile ./UserBean.py mysql+pymysql://root:123456@192.168.0.118:3306/test?charset=utf8"

3.在UserServer 文件中创建/绑定 数据表和Object之间的关系,然后就可以操作数据数据了。

UserServer 文件

from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from sqlalchemy.ext.declarative import declarative_base
from UserBean import User

engine = create_engine("mysql+pymysql://root:123456@192.168.0.118:3306/test?charset=utf8")

# Base.metadata.drop_all(bind=engine)
Base = declarative_base()
Base.metadata.create_all(bind=engine)

s = Session(bind=engine)

# 新增操作
# try:
#     for i in range(1, 10):
#         user = User(username="apy_%s" % i, age=i, sex=0)
#         s.add(user)
#         s.commit()
# except Exception as e:
#     s.rollback()

# 查询操作
UserList = s.query(User)[0:5]
for i in UserList:
    print(i.username)
原文地址:https://www.cnblogs.com/Anec/p/10469923.html