一、sqlalchemy原生语句操作
from sqlalchemy import create_engine HOSTNAME = '127.0.0.1' PORT = 3306 DATABASE = 'demo' USERNAME = 'root' PASSWORD = '123456' # 链接数据库的字符串,固定格式 DB_URL = "mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format( username = USERNAME, password = PASSWORD, host = HOSTNAME, port = PORT, db = DATABASE, ) engine = create_engine(DB_URL) # 判断是否链接成功 conn = engine.connect() result = conn.execute('SELECT * FROM shop_area') print(result.fetchone())
二、定义一个简单的ORM模型
第一步:用declarative_base根据engine创建一个ORM基类
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base HOSTNAME = '127.0.0.1' PORT = 3306 DATABASE = 'demo' USERNAME = 'root' PASSWORD = '123456' # 链接数据库的字符串,固定格式 DB_URL = "mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format( username = USERNAME, password = PASSWORD, host = HOSTNAME, port = PORT, db = DATABASE, ) engine = create_engine(DB_URL) Base = declarative_base(engine)
第二步:用Base作为基类来定义自己的ORM类,首先定义__tablename__ = 'person'指定数据库表名。
class Person(Base): __tablename__ = 'person' # 表名
第三步:创建数据库映射的字段
from sqlalchemy import create_engine,Column,Integer,String class Person(Base): __tablename__ = 'person' # 表名 # 第二步:定义字段对象 id = Column(Integer,primary_key=True,autoincrement=True) name = Column(String(50)) age = Column(Integer)
第四步:将自己定义的ORM类映射到数据库中
Base.metadata.create_all()
三、常用数据类型
1. Integer:整型,映射到数据库中是int类型。
2. Float:浮点类型,映射到数据库中是float类型。它占据的32位。
3. Double:双精度浮点类型,映射到数据库中是double类型,占据64位。
4. String:可变字符类型,映射到数据库中是varchar类型。
5. Boolean:布尔类型,映射到数据库中的是tinyint类型。
6. DECIMAL:定点类型。是专门为了解决浮点类型精度丢失的问题的。在存储钱相关的字段的时候建议大家都使用这个数据类型。并且这个类型使用的时候需要传递两个参数,第一个参数是用来标记这个字段总能能存储多少个数字,第二个参数表示小数点后有多少位。
7. Enum:枚举类型。指定某个字段只能是枚举中指定的几个值,不能为其他值。在ORM模型中,使用Enum来作为枚举。
8. Date:存储时间,只能存储年月日。映射到数据库中是date类型。在Python代码中,可以使用`datetime.date`来指定。
9. DateTime:存储时间,可以存储年月日时分秒毫秒等。映射到数据库中也是datetime类型。在Python代码中,可以使用`datetime.datetime`来指定。
10. Time:存储时间,可以存储时分秒。映射到数据库中也是time类型。在Python代码中,可以使用`datetime.time`来指定。
11. Text:存储长字符串。一般可以存储6W多个字符。如果超出了这个范围,可以使用LONGTEXT类型。映射到数据库中就是text类型。
12. LONGTEXT:长文本类型,映射到数据库中是longtext类型。
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker HOSTNAME = '127.0.0.1' PORT = 3306 DATABASE = 'demo' USERNAME = 'root' PASSWORD = '123456' # 链接数据库的字符串,固定格式 DB_URL = "mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format( username = USERNAME, password = PASSWORD, host = HOSTNAME, port = PORT, db = DATABASE, ) engine = create_engine(DB_URL) Base = declarative_base(engine) session = sessionmaker(engine)() # 第一步:创建一个ORM模型 from sqlalchemy import Column,Integer,String,DECIMAL,Boolean,Enum,Date,DateTime,Time,Text from sqlalchemy.dialects.mysql import LONGTEXT import enum,datetime class TagEnum(enum.Enum): python = "python" java = "java" django = "diango" class Person(Base): __tablename__ = 'person' # 表名 id = Column(Integer,primary_key=True,autoincrement=True) name = Column(String(50)) age = Column(Integer) country = Column(String(20)) # DECIMAL(总位数,小数点后的位数) price = Column(DECIMAL(20,9)) # 布尔类型 is_delete = Column(Boolean) # 枚举类型方式一 tag1 = Column(Enum("python",'java','php')) # 枚举类型方式二 tag2 = Column(Enum(TagEnum)) # 时间 # 2017-10-05 create_time1 = Column(Date) # 2017-10-05 23:26:58 create_time2 = Column(DateTime) # 23:26:58 create_time3 = Column(Time) # 文本类型 content = Column(Text) lcontent = Column(LONGTEXT) # 删除数据表 === 删除数据库表 Base.metadata.drop_all() # 创建数据表 === 只会映射一次到数据库中 Base.metadata.create_all() per = Person( name = 'xxx', age = 18, country = 66, price = 6.555555555, is_delete = True, tag1 = "python", tag2 = TagEnum.python, create_time1 = datetime.date(2017,10,5), create_time2 = datetime.datetime(2017,10,5,23,26,58), create_time3 = datetime.time(23,26,58), content = "我可以存储6万多个字符串", lcontent = "我比6万多个字符串还多" ) session.add(per) session.commit()
四、Column常用参数
1.default:默认值
2.nullable:是否为空
3.primary_key:是否为主键
4.unique:是否唯一
5.autoincrement:是否自动增长
6.onupdate:更新的时候执行的函数
7.name:该属性再数据库中的字段映射
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker HOSTNAME = '127.0.0.1' PORT = 3306 DATABASE = 'demo' USERNAME = 'root' PASSWORD = '123456' # 链接数据库的字符串,固定格式 DB_URL = "mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format( username = USERNAME, password = PASSWORD, host = HOSTNAME, port = PORT, db = DATABASE, ) engine = create_engine(DB_URL) Base = declarative_base(engine) session = sessionmaker(engine)() # 第一步:创建一个ORM模型 from sqlalchemy import Column,Integer,String,DateTime from datetime import datetime class Person(Base): __tablename__ = 'person' # 表名 id = Column(Integer,primary_key=True,autoincrement=True) # 数据库字段名就变成了my_title title = Column(String(50),name="my_title",nullable=False) create_time = Column(DateTime,default=datetime.now()) update_date = Column(DateTime,onupdate=datetime.now) read_count = Column(Integer,default=0,nullable=False) # 删除数据表 === 删除数据库表 Base.metadata.drop_all() # 创建数据表 === 只会映射一次到数据库中 Base.metadata.create_all() # 添加数据 per = Person(title = "aaa") session.add(per) session.commit() # 修改数据 === 测试onupdate,只有更新时update_date字段才有数据修改 # p = session.query(Person).first() # p.title = "s5s5" # session.commit()
五、query可用参数
# 模型对象 # aces = session.query(Article).all() # print(aces) # 模型属性 # aces = session.query(Article.title,Article.price).all() # print(aces) # 聚合函数 from sqlalchemy import func aces = session.query(func.count(Article.id)).first() print('行数量',aces) aces1 = session.query(func.avg(Article.price)).first() print('求平均值',aces1) aces2 = session.query(func.max(Article.price)).first() print('求最大值',aces2) aces3 = session.query(func.min(Article.price)).first() print('求最小值',aces3) aces4 = session.query(func.sum(Article.price)).first() print('求和',aces4)
六、filter过滤条件
# equal a1 = session.query(Article).filter(Article.id == 1).first() # not equal a2 = session.query(Article).filter(Article.id != 1).all() # like && ilike(不区分大小写) a3 = session.query(Article).filter(Article.title.like('title%')).all() # in a4 = session.query(Article).filter(Article.title.in_(['title0','title2'])).all() # not in a5 = session.query(Article).filter(~Article.title.in_(['title0','title2'])).all() # is null a6 = session.query(Article).filter(Article.content == None).all() # is not null a7 = session.query(Article).filter(Article.content != None).all() # and from sqlalchemy import and_ a81 = session.query(Article).filter(and_(Article.title == 'aaa',Article.content == 'aaa')).all() a82 = session.query(Article).filter(Article.title == 'aaa',Article.content == 'aaa').all() # or from sqlalchemy import or_ a9 = session.query(Article).filter(or_(Article.title == 'aaa',Article.content == 'aaa')).all()
八、1对N的关系创建
from sqlalchemy import Column,Integer,String,Text,ForeignKey from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker,relationship HOSTNAME = '127.0.0.1' PORT = 3306 DATABASE = 'demo' USERNAME = 'root' PASSWORD = '123456' # 链接数据库的字符串,固定格式 DB_URL = "mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format( username = USERNAME, password = PASSWORD, host = HOSTNAME, port = PORT, db = DATABASE, ) engine = create_engine(DB_URL) Base = declarative_base(engine) session = sessionmaker(engine)() # 用户表 class User(Base): __tablename__ = 'user' id = Column(Integer,primary_key=True,autoincrement=True) username = Column(String(50),nullable=False) # 文章表 class Article(Base): __tablename__ = 'article' id = Column(Integer,primary_key=True,autoincrement=True) title = Column(String(50),nullable=False) content = Column(Text,nullable=False) uid = Column(Integer,ForeignKey("user.id",ondelete="CASCADE")) # 创建(N对1)或(1对n)的关系 Article1,Article2 ===》 User author = relationship("User",backref = "articles") def __repr__(self): return "Article_title=%s"%self.title # 删除数据表 === 删除数据库表 Base.metadata.drop_all() # 创建数据表 === 只会映射一次到数据库中 Base.metadata.create_all() # 1个用户对个文章 """ 将文章添加到用户对象中去 # 创建用户对象 u = User(username = "小海椒") # 创建两个文章对象 a1 = Article(title = "t1",content='11111111111111111111') a2 = Article(title = "t2",content='22222222222222222222') # 将文章分别添加到用户对象中 u.articles.append(a1) u.articles.append(a2) # 将用户对象添加到session中 session.add(u) session.commit() """ """ 将用户添加到文章对象中去 u = User(username = "小海椒2") a = Article(title = "t3",content = "33333333333333") a.author = u session.add(a) session.commit() """
九、1对1的创建关系
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker,relationship HOSTNAME = '127.0.0.1' PORT = 3306 DATABASE = 'demo' USERNAME = 'root' PASSWORD = '123456' # 链接数据库的字符串,固定格式 DB_URL = "mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format( username = USERNAME, password = PASSWORD, host = HOSTNAME, port = PORT, db = DATABASE, ) engine = create_engine(DB_URL) Base = declarative_base(engine) session = sessionmaker(engine)() from sqlalchemy import Column,Integer,String,Text,ForeignKey from sqlalchemy.orm import backref # 用户表 class User(Base): __tablename__ = 'user' id = Column(Integer,primary_key=True,autoincrement=True) username = Column(String(50),nullable=False) # 用户信息表 class UserExtend(Base): __tablename__ = 'user_extend' id = Column(Integer, primary_key=True, autoincrement=True) school = Column(String(50)) uid = Column(Integer,ForeignKey('user.id')) # 创建1对1的关系 user = relationship("User",backref = backref("extend",uselist=False)) # 删除数据表 === 删除数据库表 Base.metadata.drop_all() # 创建数据表 === 只会映射一次到数据库中 Base.metadata.create_all() # 1个用户对1个用户信息 u = User(username = "小黑") extend1 = UserExtend(school="北京广播电视大学") u.extend = extend1 session.add(u) session.commit()
十、多对多的创建关系
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker,relationship,backref HOSTNAME = '127.0.0.1' PORT = 3306 DATABASE = 'demo' USERNAME = 'root' PASSWORD = '123456' # 链接数据库的字符串,固定格式 DB_URL = "mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format( username = USERNAME, password = PASSWORD, host = HOSTNAME, port = PORT, db = DATABASE, ) engine = create_engine(DB_URL) Base = declarative_base(engine) session = sessionmaker(engine)() from sqlalchemy import Column,Integer,String,ForeignKey,Table article_tag = Table( "article_tag", # 表名 Base.metadata, Column('article_id',Integer,ForeignKey("article.id"),primary_key=True), Column('tag_id',Integer,ForeignKey("tag.id"),primary_key=True) ) # 文章表 class Article(Base): __tablename__ = 'article' id = Column(Integer,primary_key=True,autoincrement=True) title = Column(String(50),nullable=False) # tags = relationship("Tag",backref='articles',secondary=article_tag) # 文章标签 class Tag(Base): __tablename__ = 'tag' id = Column(Integer,primary_key=True,autoincrement=True) name = Column(String(50),nullable=False) articles = relationship("Article",backref='tags',secondary=article_tag) # # # # 删除数据表 === 删除数据库表 # Base.metadata.drop_all() # # # # 创建数据表 === 只会映射一次到数据库中 # Base.metadata.create_all() """ 1、定义两个多对多的模型 2、使用Table定义一个中间表(包含两个模型的外键字段并做复合主键) 3、随便再两个需要做多对多模型中选择一个定义relationship属性来绑定三者关系 例:再Article模型中 tags = relationship("Tag",backref='articles',secondary=article_tag) """ # 多对多添加数据 # a1 = Article(title="Scrapy能用于那些地方?") # a2 = Article(title="Flask和Django哪个更适用于web开发?") # # t1 = Tag(name='python') # t2 = Tag(name='flask') # # a1.tags.append(t1) # a1.tags.append(t2) # # a2.tags.append(t1) # a2.tags.append(t2) # # session.add(a1) # session.add(a2) # # session.commit() # 多对多查询 a = session.query(Article).first() for i in a.tags: print(i.name) t = session.query(Tag).first() print(t.articles)
十一、排序的三种方式
为防止每次都需要连接数据库代码做出文件分离:DB/db.py
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker HOSTNAME = '127.0.0.1' PORT = 3306 DATABASE = 'db_demo' USERNAME = 'root' PASSWORD = '123456' # 链接数据库的字符串,固定格式 DB_URL = "mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format( username = USERNAME, password = PASSWORD, host = HOSTNAME, port = PORT, db = DATABASE, ) engine = create_engine(DB_URL) Base = declarative_base(engine) session = sessionmaker(engine)()
三种方式:
from sqlalchemy import Column,Integer,String,DateTime,ForeignKey from sqlalchemy.orm import relationship,backref from DB.db import session,Base from datetime import datetime class User(Base): __tablename__='user' id = Column(Integer,primary_key=True,autoincrement=True) name = Column(String(50),nullable=False) # 文章表 class Article(Base): __tablename__ = 'article' id = Column(Integer,primary_key=True,autoincrement=True) title = Column(String(50),nullable=False) create_time = Column(DateTime,default=datetime.now(),nullable=False) uid = Column(Integer,ForeignKey("user.id")) # 方式三:可以在创建外键关系时通过backref参数指定字段排序方式 author = relationship( "User", backref=backref( 'articles', order_by=create_time.desc() ) ) # 方式二:可以指定查询排序方式 # __mapper_args__ ={ # "order_by":create_time.desc() # } def __repr__(self): return "<article {} {} {}>".format(self.id,self.title,self.create_time) def db_init(): # # # # 删除数据表 === 删除数据库表 Base.metadata.drop_all() # # # # 创建数据表 === 只会映射一次到数据库中 Base.metadata.create_all() def add_article(title): # 添加数据 article = Article(title=title) session.add(article) session.commit() def add_article2(): article1 = Article(title='ccccccccc') article2 = Article(title='ddddddddd') user = User(name='小红') user.articles = [article1,article2] session.add(user) session.commit() if __name__ == '__main__': # db_init() # add_article('xxx6') # add_article2() # 第一种方式:代码中指定排序方式 # 正序排序 # articles1 = session.query(Article).order_by(Article.create_time).all() # 倒序排序 # articles3 = session.query(Article).order_by(Article.create_time.desc()).all() # 第二中方式:模型中指定排序方式 """ __mapper_args__ ={ "order_by":create_time.desc() } """ # articles4 = session.query(Article).all() # 第三种方式: """ author = relationship( "User", backref=backref( 'articles', order_by=create_time.desc() ) ) """ u = session.query(User).first() print(u.articles)
十二、limit、offset,slice、切片
from sqlalchemy import Column,Integer,String,DateTime,ForeignKey from sqlalchemy.orm import relationship,backref from DB.db import session,Base from datetime import datetime # 文章表 class Article(Base): __tablename__ = 'article' id = Column(Integer,primary_key=True,autoincrement=True) title = Column(String(50),nullable=False) create_time = Column(DateTime,default=datetime.now(),nullable=False) def __repr__(self): return "<Article {}>".format(self.title) def db_init(): # # # # 删除数据表 === 删除数据库表 Base.metadata.drop_all() # # # # 创建数据表 === 只会映射一次到数据库中 Base.metadata.create_all() def add_article(): # 添加数据 for item in range(99): article = Article(title="文章标题{}".format(item)) session.add(article) session.commit() if __name__ == '__main__': # db_init() # add_article() # limit:可以查找的时候限制只查找几条 1-10 articles1 = session.query(Article).limit(10).all() # offset:可以限制查找数据的时候过滤前面多少条 倒叙10-20 articles2 = session.query(Article).order_by(Article.id.desc()).offset(10).limit(10).all() # 切片:可以对query对象使用切片来确定想要拿到的数据 倒叙最后10条数据 articles3 = session.query(Article).order_by(Article.id.desc()).slice(0, 10).all() articles4 = session.query(Article).order_by(Article.id.desc())[0:10]
十三、查询懒加载
from sqlalchemy import Column,Integer,String,DateTime,ForeignKey from sqlalchemy.orm import relationship,backref from DB.db import session,Base from datetime import datetime class User(Base): __tablename__ = "user" id = Column(Integer,primary_key=True,autoincrement=True) name = Column(String(50),nullable=False) # 文章表 class Article(Base): __tablename__ = 'article' id = Column(Integer,primary_key=True,autoincrement=True) title = Column(String(50),nullable=False) create_time = Column(DateTime,default=datetime.now(),nullable=False) uid = Column(Integer,ForeignKey("user.id")) articles = relationship("User",backref= backref("articles",lazy="dynamic")) def __repr__(self): return "<Article {}>".format(self.title) def db_init(): # # # # 删除数据表 === 删除数据库表 Base.metadata.drop_all() # # # # 创建数据表 === 只会映射一次到数据库中 Base.metadata.create_all() def add_data(): # 添加数据 u = User(name="小黑") article_li = [] for item in range(100): article_li.append(Article(title="文章标题{}".format(item))) u.articles = article_li session.add(u) session.commit() if __name__ == '__main__': # db_init() # add_data() u = session.query(User).first() # AppenderQuery对象继承Query可以使用Query的所有方法对sql进行过滤 res = u.articles.filter(Article.id<10).all() # 并且处理查询,也可以使用这个Query对象继续添加数据 u.articles.append(Article(title="文章标题{}".format(1000))) session.add(u) session.commit()
十四、group_by and having
from sqlalchemy import Column,Integer,String,DateTime,ForeignKey,Enum,func from sqlalchemy.orm import relationship,backref from DB.db import session,Base from datetime import datetime class User(Base): __tablename__ = "user" id = Column(Integer,primary_key=True,autoincrement=True) name = Column(String(50),nullable=False) age = Column(Integer,default=0) gender = Column(Enum("male","female","secert")) def db_init(): # # # # 删除数据表 === 删除数据库表 Base.metadata.drop_all() # # # # 创建数据表 === 只会映射一次到数据库中 Base.metadata.create_all() def add_data(): # 添加数据 u1 = User(name="小河", age=18, gender="male") u2 = User(name="小马", age=18, gender="female") u3 = User(name="张三", age=19, gender="female") u4 = User(name="黑核", age=25, gender="secert") u5 = User(name="流弊", age=36, gender="male") session.add_all([u1, u2, u3, u4, u5]) session.commit() if __name__ == '__main__': # db_init() # add_data() # group_by === [(18, 2), (19, 1), (25, 1), (36, 1)] a = session.query(User.age,func.count(User.id)).group_by(User.age).all() # having === [(18, 2), (19, 1)] b = session.query(User.age,func.count(User.id)).group_by(User.age).having(User.age<25).all()
十五、联表查询 -- join
from sqlalchemy import Column,Integer,String,DateTime,ForeignKey,Enum,func from sqlalchemy.orm import relationship,backref from DB.db import session,Base from datetime import datetime class User(Base): __tablename__ = "user" id = Column(Integer,primary_key=True,autoincrement=True) name = Column(String(50),nullable=False) class Article(Base): __tablename__ = "article" id = Column(Integer,primary_key=True,autoincrement=True) title = Column(String(50),nullable=False) create_time = Column(DateTime,nullable=False,default=datetime.now) uid = Column(Integer,ForeignKey('user.id')) articles = relationship("User",backref = backref('articles')) def db_init(): # # # # 删除数据表 === 删除数据库表 Base.metadata.drop_all() # # # # 创建数据表 === 只会映射一次到数据库中 Base.metadata.create_all() def add_data(): # 添加数据 u1 = User(name="小河") u2 = User(name="小马") for i in range(10): a = Article(title='xxx{}'.format(i)) a.articles = u1 session.add(a) session.commit() for i in range(7): a = Article(title='xxx{}'.format(i)) a.articles = u2 session.add(a) session.commit() session.add_all([u1, u2]) session.commit() if __name__ == '__main__': # db_init() # add_data() # 找到所有用户,按照发表的文章数量进行排序 res = session.query( User.name, func.count(Article.id) ).join( Article, Article.uid == User.id ).group_by( User.id ).order_by( func.count(Article.id).desc() ).all() print(res)
十六、子查询使用 -- subquery
from sqlalchemy import Column,Integer,String,DateTime,ForeignKey,Enum,func from sqlalchemy.orm import relationship,backref from DB.db import session,Base from datetime import datetime class User(Base): __tablename__ = "user" id = Column(Integer,primary_key=True,autoincrement=True) name = Column(String(50),nullable=False) city = Column(String(50),nullable=False) age = Column(Integer,default=0) def __repr__(self): return "< User {}>".format(self.name) def db_init(): # # # # 删除数据表 === 删除数据库表 Base.metadata.drop_all() # # # # 创建数据表 === 只会映射一次到数据库中 Base.metadata.create_all() def add_data(): # 添加数据 u1 = User(name="小河",city="重庆",age=18) u2 = User(name="小白",city="北京",age=19) u3 = User(name="小红",city="河北",age=17) u4 = User(name="小慧",city="重庆",age=18) u5 = User(name="小黑",city="江苏",age=20) session.add_all([u1, u2,u3,u4,u5]) session.commit() if __name__ == '__main__': # db_init() # add_data() # 寻找小河同城同年龄的人 u = session.query(User).filter(User.name=='小河').first() us = session.query(User).filter(User.age == u.age,User.city == u.city).all() # subquery实现子查询 # 设置子查询语句 subs = session.query( User.name.label('name'), User.city.label('city'), User.age.label('age') ).filter( User.name == "小河" ).subquery() # c是column的简写固定属性 result = session.query( User ).filter( User.city == subs.c.city, User.age == subs.c.age ).all() print(result)