python之sqlalchemy的使用

准备数据

 1 from sqlalchemy.ext.declarative import declarative_base
 2 from sqlalchemy import Column
 3 from sqlalchemy import Integer, String, Text, Date, DateTime, ForeignKey, UniqueConstraint, Index
 4 from sqlalchemy import create_engine
 5 from sqlalchemy.orm import relationship
 6 
 7 Base = declarative_base()
 8 
 9 
10 class Depart(Base):
11     __tablename__ = 'depart'
12     id = Column(Integer, primary_key=True)
13     title = Column(String(32), index=True, nullable=False)
14 
15 
16 class Users(Base):
17     __tablename__ = 'users'
18 
19     id = Column(Integer, primary_key=True)
20     name = Column(String(32), index=True, nullable=False)
21     depart_id = Column(Integer, ForeignKey("depart.id"))
22 
23     # 用于链表操作 与表的创建无关
24     dp = relationship("Depart", backref='pers')
25 
26 
27 class Student(Base):
28     __tablename__ = 'student'
29     id = Column(Integer, primary_key=True)
30     name = Column(String(32), index=True, nullable=False)
31 
32     course_list = relationship('Course', secondary='student2course', backref='student_list')
33 
34 
35 class Course(Base):
36     __tablename__ = 'course'
37     id = Column(Integer, primary_key=True)
38     title = Column(String(32), index=True, nullable=False)
39 
40 
41 class Student2Course(Base):
42     __tablename__ = 'student2course'
43     id = Column(Integer, primary_key=True, autoincrement=True)
44     student_id = Column(Integer, ForeignKey('student.id'))
45     course_id = Column(Integer, ForeignKey('course.id'))
46 
47     __table_args__ = (
48         UniqueConstraint('student_id', 'course_id', name='uix_stu_cou'),  # 联合唯一索引
49         # Index('ix_id_name', 'name', 'extra'),                          # 联合索引
50     )
51 
52 
53 def create_all():
54     engine = create_engine(
55         "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
56         max_overflow=0,  # 超过连接池大小外最多创建的连接
57         pool_size=5,  # 连接池大小
58         pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
59         pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
60     )
61 
62     Base.metadata.create_all(engine)
63 
64 
65 def drop_all():
66     engine = create_engine(
67         "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
68         max_overflow=0,  # 超过连接池大小外最多创建的连接
69         pool_size=5,  # 连接池大小
70         pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
71         pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
72     )
73     Base.metadata.drop_all(engine)
74 
75 
76 if __name__ == '__main__':
77     # drop_all()
78     create_all()
models.py

基本操作

  1 from sqlalchemy.orm import sessionmaker
  2 from sqlalchemy import create_engine
  3 from models import Users, Student, Depart
  4 
  5 engine = create_engine(
  6         "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
  7         max_overflow=0,  # 超过连接池大小外最多创建的连接
  8         pool_size=5,  # 连接池大小
  9         pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
 10         pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
 11     )
 12 SessionFactory = sessionmaker(bind=engine)
 13 
 14 # 从连接池获取一个连接
 15 session = SessionFactory()
 16 
 17 # ############################## 基本增删改查 ###############################
 18 # 1. 增加
 19 obj = Users(name='tang')
 20 session.add(obj)
 21 session.commit()
 22 
 23 # 批量增加
 24 session.add_all([
 25         Users(name='tang'),
 26         Users(name='chen')
 27 ])
 28 session.commit()
 29 
 30 # 2. 查
 31 result = session.query(Users).all()
 32 for row in result:
 33         print(row.id,row.name)
 34 
 35 # sqlalchemy 的语法跟Python很相似
 36 result = session.query(Users).filter(Users.id >= 2)
 37 for row in result:
 38         print(row.id,row.name)
 39 
 40 
 41 # 获取第一个
 42 result = session.query(Users).filter(Users.id >= 2).first()
 43 print(result)
 44 
 45 # 3.删
 46 session.query(Users).filter(Users.id >= 2).delete()
 47 session.commit()
 48 
 49 # 4.改  通过字典
 50 session.query(Users).filter(Users.id == 4).update({Users.name:'tang'})
 51 session.query(Users).filter(Users.id == 4).update({'name':'tang'})
 52 session.query(Users).filter(Users.id == 4).update({'name':Users.name+"_lao"},synchronize_session=False)
 53 session.commit()
 54 
 55 # ############################## 其他常用 ###############################
 56 # 1. 指定列 去别名
 57 # 对应原生SQL:select id,name as cname from users;
 58 result = session.query(Users.id,Users.name.label('cname')).all()
 59 for item in result:
 60         print(item[0],item.id,item.cname)
 61 
 62 
 63 # 2. 默认条件and
 64 session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
 65 
 66 # 3. between
 67 session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
 68 
 69 # 4. in
 70 session.query(Users).filter(Users.id.in_([1,3,4])).all()
 71 # not in
 72 session.query(Users).filter(~Users.id.in_([1,3,4])).all()
 73 
 74 # 5. 子查询
 75 session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='tang'))).all()
 76 
 77 # 6. and 和 or
 78 from sqlalchemy import and_, or_
 79 session.query(Users).filter(Users.id > 3, Users.name == 'tang').all()
 80 session.query(Users).filter(and_(Users.id > 3, Users.name == 'tang')).all()
 81 session.query(Users).filter(or_(Users.id < 2, Users.name == 'tang')).all()
 82 session.query(Users).filter(or_(Users.id < 2,and_(Users.name == 'eric', Users.id > 3),Users.extra != "")).all()
 83 
 84 # 7. filter_by  只需字段名
 85 session.query(Users).filter_by(name='alex').all()
 86 
 87 # 8. 通配符
 88 ret = session.query(Users).filter(Users.name.like('e%')).all()
 89 ret = session.query(Users).filter(~Users.name.like('e%')).all()
 90 
 91 # 9. 切片
 92 result = session.query(Users)[1:2]
 93 
 94 # 10.排序
 95 ret = session.query(Users).order_by(Users.name.desc()).all()
 96 ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
 97 
 98 # 11. group by
 99 from sqlalchemy.sql import func
100 
101 ret = session.query(Users.depart_id,func.count(Users.id),).group_by(Users.depart_id).all()
102 for item in ret:
103         print(item)
104 #
105 # from sqlalchemy.sql import func
106 # 分组之后再进行查询
107 ret = session.query(
108         Users.depart_id,
109         func.count(Users.id),
110 ).group_by(Users.depart_id).having(func.count(Users.id) >= 2).all()
111 for item in ret:
112         print(item)
113 
114 # 12.union 和 union all
115 """
116 select id,name from users
117 UNION
118 select id,name from users;
119 """
120 """
121 select id,name from users
122 UNION ALL 
123 select id,name from users;
124 """
125 q1 = session.query(Depart.title).filter(Depart.id > 2)
126 q2 = session.query(Student.name).filter(Student.id < 2)
127 ret = q1.union(q2).all()
128 #
129 # q1 = session.query(Users.name).filter(Users.id > 2)
130 # q2 = session.query(Favor.caption).filter(Favor.nid < 2)
131 # ret = q1.union_all(q2).all()
132 
133 """
134 union 和 union_all 的区别
135 union 去重
136 union_all 不去重
137 
138 相同点:合并的两张表的列要相同
139 """
140 
141 """
142 union 和 join的区别
143 union是垂直合并成一张表
144 join是水平合并成一张表
145 """
146 
147 """
148 查看原生sql 打印不获取结果的语句就可以
149 sql = session.query(Users).filter(Users.id==1)
150 print(sql)
151 """
152 
153 session.close()

链表操作 与 外键relation字段的使用

 1 from sqlalchemy.orm import sessionmaker
 2 from sqlalchemy import create_engine
 3 from models import Users,Depart
 4 
 5 
 6 engine = create_engine(
 7     "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
 8     max_overflow=0,  # 超过连接池大小外最多创建的连接
 9     pool_size=5,  # 连接池大小
10     pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
11     pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
12 )
13 
14 SessionFactory = sessionmaker(bind=engine)
15 session = SessionFactory()
16 
17 
18 # 单表操作
19 ret = session.query(Users).all()
20 for row in ret:
21     print(row.id,row.name, row.depart_id)
22 
23 
24 # 链表操作
25 ret = session.query(Users.id, Users.name, Depart.title).join(Depart,Users.depart_id==Depart.id).all()
26 for row in ret:
27     print(row.id, row.name, row.title)
28 
29 # isouter 表示 left join  没有right join 只能调换查询顺序
30 ret = session.query(Users.id, Users.name, Depart.title).join(Users,isouter=True).all()
31 # print(ret)
32 for row in ret:
33     print(row.id, row.name, row.title)
34 
35 
36 # 3. relation字段:查询所有用户+所属部门名称
37 ret = session.query(Users).all()
38 for row in ret:
39     # relation dp的作用
40     print(row.id,row.name,row.depart_id, row.dp.title)
41 
42 # 4. relation字段:查询销售部所有的人员
43 ret = session.query(Depart).filter(Depart.title=='销售部').first()
44 for row in ret.pers:
45     print(row.id, row.name, ret.title)
46 
47 # 5. 创建一个名称叫:IT部门,再在该部门中添加一个员工:tanglaoer
48 u1 = Users(name='tanglaoer',dp=Depart(title='IT'))
49 session.add(u1)
50 session.commit()
51 
52 # 6. 创建一个名称叫:技术部,再在该部门中添加一个员工:tang lao san
53 d1 = Depart(title='技术部')
54 d1.pers = [Users(name='tang'),Users(name='lao'), Users(name='san')]
55 session.add(d1)
56 session.commit()
57 
58 # 在已存在的技术部 添加几名员工
59 d1 = session.query(Depart).filter(Depart.title == '技术部').first()
60 d1.pers = [Users(name='LIN'), Users(name='WU'),Users(name='SEN')]
61 session.add(d1)
62 session.commit()
63 
64 session.close()
Foreign and join

多对多操作

 1 from sqlalchemy.orm import sessionmaker
 2 from sqlalchemy import create_engine
 3 from models import Student, Course, Student2Course
 4 
 5 engine = create_engine(
 6     "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
 7     max_overflow=0,  # 超过连接池大小外最多创建的连接
 8     pool_size=5,  # 连接池大小
 9     pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
10     pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
11 )
12 
13 SessionFactory = sessionmaker(bind=engine)
14 
15 session = SessionFactory()
16 # 1. 录入数据
17 session.add_all([
18     Student(name='tang'),
19     Student(name='chen'),
20     Course(title='生物'),
21     Course(title='体育'),
22 ])
23 session.commit()
24 
25 # 可批量增加多对多外键
26 session.add_all([
27     Student2Course(student_id=2,course_id=1),
28     Student2Course(student_id=1,course_id=1),
29     Student2Course(student_id=1,course_id=2),
30 ])
31 
32 # 2. 三张表关联
33 ret = session.query(Student2Course.id, Student.name, Course.title, Course.id).join(Student,Student2Course.student_id==Student.id).join(Course,Student2Course.course_id==Course.id).order_by(Course.id.asc()).all()
34 print(ret)
35 session.commit()
36 
37 # 3. “tang”选的所有课
38 ret = session.query(Student2Course.id, Student.name, Course.title, Course.id).join(Student,Student2Course.student_id==Student.id).join(Course,Student2Course.course_id==Course.id).filter(Student.name=='tang').all()
39 print(ret)
40 
41 # relation 字段的使用
42 ret = session.query(Student).filter(Student.name== 'tang').first()
43 for row in ret.course_list:
44     print(row.title)
45 
46 
47 # 4. 选了“生物”的所有人
48 # relation 字段的方向使用
49 ret = session.query(Course).filter(Course.title == '生物').first()
50 for row in ret.student_list:
51     print(row.name, ret.title)
52 
53 # 5. 创建一个课程,创建2学生,两个学生选新创建的课程。
54 obj = Course(title='英语')
55 obj.student_list = [Student(name='lin'), Student(name='wu')]
56 session.add(obj)
57 session.commit()
58 
59 # 创建一个学生,加入多门新创建课程
60 stu = Student(name='tang')
61 stu.course_list = [Course(title='数学'), Course(title='地理')]
62 session.add(stu)
63 session.commit()
64 
65 # 把tang添加到已存在的课程中
66 from sqlalchemy import or_
67 stu = session.query(Student).filter(Student.name=='tang').first()
68 stu.course_list = session.query(Course).filter(or_(Course.id == 1, Course.id ==3)).all()
69 print(stu.course_list)
70 session.add(stu)
71 session.commit()
72 
73 session.close()
many2many

sqlalchemy 连接与多线程的操作

 1 from sqlalchemy.orm import sessionmaker
 2 from sqlalchemy import create_engine
 3 from models import Student
 4 engine = create_engine(
 5     "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
 6     max_overflow=0,  # 超过连接池大小外最多创建的连接
 7     pool_size=5,  # 连接池大小
 8     pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
 9     pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
10 )
11 SessionFactory = sessionmaker(bind=engine)
12 
13 def task():
14     # 去连接池中获取一个连接
15     # 第一版本
16     session = SessionFactory()
17 
18     ret = session.query(Student).all()
19     print(ret)
20     # 将连接交还给连接池
21     session.close()
22 
23 
24 from threading import Thread
25 
26 for i in range(20):
27     t = Thread(target=task)
28     t.start()
第一版本
 1 from sqlalchemy.orm import sessionmaker
 2 from sqlalchemy import create_engine
 3 from sqlalchemy.orm import scoped_session
 4 from models import Student,Course,Student2Course
 5 
 6 engine = create_engine(
 7         "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
 8         max_overflow=0,  # 超过连接池大小外最多创建的连接
 9         pool_size=5,  # 连接池大小
10         pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
11         pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
12     )
13 SessionFactory = sessionmaker(bind=engine)
14 session = scoped_session(SessionFactory)
15 # scoped_session  里面有threading.local
16 # 为每个线程赋予一个连接
17 
18 def task():
19     ret = session.query(Student).all()
20     print(ret)
21     # 将连接交还给连接池
22     session.remove()
23 
24 
25 from threading import Thread
26 
27 for i in range(20):
28     t = Thread(target=task)
29     t.start()
第二版本scoped_session

sqlalchemy 写原生SQL语句

 1 from sqlalchemy.orm import sessionmaker
 2 from sqlalchemy import create_engine
 3 from sqlalchemy.orm import scoped_session
 4 
 5 engine = create_engine(
 6         "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
 7         max_overflow=0,  # 超过连接池大小外最多创建的连接
 8         pool_size=5,  # 连接池大小
 9         pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
10         pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
11     )
12 SessionFactory = sessionmaker(bind=engine)
13 session = scoped_session(SessionFactory)
14 
15 
16 def task():
17     """"""
18     # 方式一:
19     # 查询
20     cursor = session.execute('select * from users')
21     result = cursor.fetchall()
22     print(result)
23 
24     # 添加 参数通过"冒号"
25     cursor = session.execute('INSERT INTO users(name) VALUES(:value)', params={"value": 'tanglaoer'})
26     session.commit()
27     print(cursor.lastrowid)
28 
29     # 方式二:
30     # 与pymysql的链接一模一样
31     conn = engine.raw_connection()
32     cursor = conn.cursor()
33     cursor.execute(
34         "select * from users"
35     )
36     result = cursor.fetchall()
37     print(result)
38     cursor.close()
39     conn.close()
40 
41     # 将连接交还给连接池
42     session.remove()
43 
44 
45 from threading import Thread
46 
47 for i in range(20):
48     t = Thread(target=task)
49     t.start()
原生SQL
原文地址:https://www.cnblogs.com/tangkaishou/p/10184386.html