SQL Expression Language Tutorial 学习笔记二

11. Using Textual SQL 直接使用 SQL

如果实在玩不转, 还是可以通过 test() 直接写 SQL.

In [51]: s = text(
    ...:     "SELECT users.fullname || ',' || addresses.email_address AS title "
    ...:
    ...:     "FROM users, addresses "
    ...:     "WHERE users.id = addresses.user_id "
    ...:     "AND users.name BETWEEN :x AND :y "
    ...:     "AND (addresses.email_address LIKE :e1 "
    ...:         "OR addresses.email_address LIKE :e2)")

In [52]: conn.execute(s, x='m', y='z', e1='%@aol.com', e2='%@msn.com').fetchall(
    ...: )
2017-02-07 08:03:25,157 INFO sqlalchemy.engine.base.Engine SELECT users.fullname || ',' || addresses.email_address AS title FROM users, addresses WHERE users.id = addresses.user_id AND users.name BETWEEN ? AND ? AND (addresses.email_address LIKE ? OR addresses.email_address LIKE ?)
2017-02-07 08:03:25,157 INFO sqlalchemy.engine.base.Engine ('m', 'z', '%@aol.com', '%@msn.com')
Out[52]: [(u'Wendy Williams,wendy@aol.com',)]

至于为什么这个 SQL 这么怪异, 还是因为它要翻译成不同数据库后端支持的格式.

参数绑定

一直没有翻译一个词, 叫 bound parameter, 参数绑定, 就是 sql 中的参数可以由外部同名参数赋值, sql 中的参数名前需要加前缀.

有时候, 我们想在 execute 之前就先把 sql 中的参数赋值, 专业点叫绑定参数, 就需要 TextClause.bindparams()

In [53]: stmt = text("SELECT * FROM users WHERE users.name BETWEEN :x AND :y")

In [54]: stmt = stmt.bindparams(x="m", y="z")

并且, 还可以限定参数的类型, 不过, 我尝试把 y 限定为 Integer, 居然也没有报错.

In [58]: from sqlalchemy.sql import bindparam

In [59]: stmt = stmt.bindparams(bindparam("x", String), bindparam("y", String))

In [60]: result = conn.execute(stmt, {"x": "m", "y": "z"})
2017-02-07 09:46:32,036 INFO sqlalchemy.engine.base.Engine SELECT * FROM users WHERE users.name BETWEEN ? AND ?
2017-02-07 09:46:32,037 INFO sqlalchemy.engine.base.Engine ('m', 'z')

In [61]: for row in result:
    ...:     print(row)
    ...:
(2, u'wendy', u'Wendy Williams')

In [63]: stmt = stmt.bindparams(bindparam("x", String), bindparam("y", Integer))
    ...:

In [64]: result = conn.execute(stmt, {"x": "m", "y": "z"})
2017-02-07 09:47:32,727 INFO sqlalchemy.engine.base.Engine SELECT * FROM users WHERE users.name BETWEEN ? AND ?
2017-02-07 09:47:32,727 INFO sqlalchemy.engine.base.Engine ('m', 'z')

In [65]: for row in result:
    ...:     print(row)
    ...:
(2, u'wendy', u'Wendy Williams')

指定结果字段行为

TextClause.columns() 可以设置结果字段的返回类型. 然而, 这个设置好像并没有什么用.

In [85]: stmt = stmt.columns(id=Float, name=String)

In [86]: result = conn.execute(stmt, {"x": "m", "y": "z"})
2017-02-09 16:18:55,066 INFO sqlalchemy.engine.base.Engine SELECT * FROM users WHERE users.name BETWEEN ? AND ?
2017-02-09 16:18:55,066 INFO sqlalchemy.engine.base.Engine ('m', 'z')

In [87]: row = result.fetchone()

In [88]: row["id"]
Out[88]: 2

In [89]: row.id
Out[89]: 2

In [90]: type(row.id)
Out[90]: int

Column 对象也可以作为 TextClause.columns() 的参数, 无论是否指定类型. 下面这个例子, 在使用 text() 之后, 通过使用 Column 对象作为参数, 可以将查询结果与 Column 对象重新关联. 

In [91]: stmt = text("SELECT id, name FROM users")

In [92]: stmt = stmt.columns(users.c.id, users.c.name)

TextClause.columns() 方法返回一个 TextAsForm 对象, 支持 TextClause.columns 的所有功能, 还有其他 "selectable" 操作.

In [93]: j = stmt.join(addresses, stmt.c.id == addresses.c.user_id)

In [94]: new_stmt = select([stmt.c.id, addresses.c.id]).select_from(j).where(stm
    ...: t.c.name == 'x')

这两条就超前了, join() 表示级联, 但是使用的格式如何? 之前没有介绍. select_from() 这个方法也是第一次出现, 虽然不难理解. 不得不说, 写文档的时候,例子确实不好找. join() 和 select_from() 后面会讲到.

在对已经存在的 Core 或 ORM 模型, TextClause.columns() 使用 Columns 对象作为位置参数非常有用. 因为不用担心结果字段命名冲突的问题.

In [96]: stmt = text("SELECT users.id, addresses.id, users.id, "
    ...:     "users.name, addresses.email_address AS email "
    ...:     "FROM users JOIN addresses ON users.id = addresses.user_id "
    ...:     "WHERE users.id = 1").columns(
    ...:         users.c.id,
    ...:         addresses.c.id,
    ...:         addresses.c.user_id,
    ...:         users.c.name,
    ...:         addresses.c.email_address
    ...:     )

In [97]: result = conn.execute(stmt)
2017-02-09 17:05:03,309 INFO sqlalchemy.engine.base.Engine SELECT users.id, addresses.id, users.id, users.name, addresses.email_address AS email FROM users JOIN addresses ON users.id = addresses.user_id WHERE users.id = 1
2017-02-09 17:05:03,309 INFO sqlalchemy.engine.base.Engine ()

In [98]: row = result.fetchone()

In [99]: row[addresses.c.email_address]
Out[99]: u'jack@yahoo.com'

 注意到 select 的结果字段里有三个 id, 因为直接使用 Column 对象, 所以不用担心命名的问题, 确实很方面, 不容易出错.

在更大的查询语句中使用 text()

text() 可以在 select() 中使用. 下面的例子, select() 搭起查询语句的脚手架, text() 构建查询语句的内容. 基于此我们可以独立的构建一个查询语句, 而不引用任何已存在的 Table metadata.

In [100]: s = select([
     ...:     text("users.fullname || ',' || addresses.email_address AS title")
     ...: ]).where(
     ...:         and_(
     ...:             text("users.id = addresses.user_id"),
     ...:             text("users.name BETWEEN 'm' AND 'z'"),
     ...:             text(
     ...:                 "(addresses.email_address LIKE :x "
     ...:                 "OR addresses.email_address LIKE :y)")
     ...:         )
     ...: ).select_from(text('users, addresses'))

In [101]: conn.execute(s, x='%@aol.com', y='%@msn.com').fetchall()
2017-02-09 17:23:43,439 INFO sqlalchemy.engine.base.Engine SELECT users.fullname || ',' || addresses.email_address AS title
FROM users, addresses
WHERE users.id = addresses.user_id AND users.name BETWEEN 'm' AND 'z' AND (addresses.email_address LIKE ? OR addresses.email_address LIKE ?)
2017-02-09 17:23:43,439 INFO sqlalchemy.engine.base.Engine ('%@aol.com', '%@msn.com')
Out[101]: [(u'Wendy Williams,wendy@aol.com',)]

使用table(), literal_column() 和 column() 写更复杂的 sql

In [102]: from sqlalchemy.sql import table, literal_column

In [103]: s = select([
     ...:     literal_column("users.fullname", String) + ',' +
     ...:     literal_column("addresses.email_address").label("title")
     ...: ]).where(
     ...:     and_(
     ...:         literal_column("users.id") == literal_column("addresses.user_i
     ...: d"),
     ...:         text("users.name BETWEEN 'm' AND 'z'"),
     ...:         text(
     ...:             "(addresses.email_address LIKE :x OR "
     ...:             "addresses.email_address LIKE :y)")
     ...:     )
     ...: ).select_from(table('users')).select_from(table('addresses'))

In [104]: conn.execute(s, x='%@aol.com', y='%@msn.com').fetchall()
2017-02-10 16:42:27,870 INFO sqlalchemy.engine.base.Engine SELECT users.fullname || ? || addresses.email_address AS anon_1
FROM users, addresses
WHERE users.id = addresses.user_id AND users.name BETWEEN 'm' AND 'z' AND (addresses.email_address LIKE ? OR addresses.email_address LIKE ?)
2017-02-10 16:42:27,871 INFO sqlalchemy.engine.base.Engine (',', '%@aol.com', '%@msn.com')
Out[104]: [(u'Wendy Williams,wendy@aol.com',)]

column() 和 literal_column() 有什么区别?

区别在于, literal_column("users.fullname") 是 users 表的 fullname 字段, column("users.fullname") 是名叫 "users.fullname" 的字段.

In [106]: from sqlalchemy.sql import column

In [107]: s = select([
     ...:     column("users.fullname", String) + ',' +
     ...:     column("addresses.email_address").label("title")
     ...: ]).where(
     ...:     and_(
     ...:         column("users.id") == column("addresses.user_id"),
     ...:         text("users.name BETWEEN 'm' AND 'z'"),
     ...:         text(
     ...:             "(addresses.email_address LIKE :x OR "
     ...:             "addresses.email_address LIKE :y)")
     ...:     )
     ...: ).select_from(table('users')).select_from(table('addresses'))

In [108]: conn.execute(s, x='%@aol.com', y='%@msn.com').fetchall()
2017-02-10 17:01:15,316 INFO sqlalchemy.engine.base.Engine SELECT "users.fullname" || ? || "addresses.email_address" AS anon_1
FROM users, addresses
WHERE "users.id" = "addresses.user_id" AND users.name BETWEEN 'm' AND 'z' AND (addresses.email_address LIKE ? OR addresses.email_address LIKE ?)
2017-02-10 17:01:15,316 INFO sqlalchemy.engine.base.Engine (',', '%@aol.com', '%@msn.com')
Out[108]: []


In [109]: s = select([
     ...:     column("fullname", String) + ',' +
     ...:     column("email_address").label("title")
     ...: ]).where(
     ...:     and_(
     ...:         column("id") == column("user_id"),
     ...:         text("users.name BETWEEN 'm' AND 'z'"),
     ...:         text(
     ...:             "(addresses.email_address LIKE :x OR "
     ...:             "addresses.email_address LIKE :y)")
     ...:     )
     ...: ).select_from(table('users')).select_from(table('addresses'))

In [110]: conn.execute(s, x='%@aol.com', y='%@msn.com').fetchall()
2017-02-10 17:01:58,962 INFO sqlalchemy.engine.base.Engine SELECT fullname || ? || email_address AS anon_1
FROM users, addresses
WHERE id = user_id AND users.name BETWEEN 'm' AND 'z' AND (addresses.email_address LIKE ? OR addresses.email_address LIKE ?)
2017-02-10 17:01:58,962 INFO sqlalchemy.engine.base.Engine (',', '%@aol.com', '%@msn.com')
2017-02-10 17:01:58,973 INFO sqlalchemy.engine.base.Engine ROLLBACK
---------------------------------------------------------------------------
OperationalError                          Traceback (most recent call last)

OperationalError: (sqlite3.OperationalError) ambiguous column name: id [SQL: u"SELECT fullname || ? || email_address AS anon_1 
FROM users, addresses 
WHERE id = user_id AND users.name BETWEEN 'm' AND 'z' AND (addresses.email_address LIKE ? OR addresses.email_address LIKE ?)"] [parameters: (',', '%@aol.com', '%@msn.com')

Ordering or Grouping by a Label

有时候需要使用字段别名以方便进行 ORDER BY 或 GROUP BY. 如果在 select() 中就已经有了别名, 可以把它直接传递给 select.order_by() 或 select_group_by() 使用. 

In [112]: from sqlalchemy import func

In [113]: stmt = select([
     ...:     addresses.c.user_id,
     ...:     func.count(addresses.c.id).label('num_addresses')]).order_by("num_
     ...: addresses")

In [115]: conn.execute(stmt).fetchall()
2017-02-14 20:44:26,955 INFO sqlalchemy.engine.base.Engine SELECT addresses.user_id, count(addresses.id) AS num_addresses
FROM addresses ORDER BY num_addresses
2017-02-14 20:44:26,955 INFO sqlalchemy.engine.base.Engine ()
Out[115]: [(2, 4)]

注意到 count 放在了 func 里面. 如果是逆序排列, 就需要 desc, 类似的, 默认的应该是 asc, 如何显式的使用 asc.

In [119]: from sqlalchemy import desc

In [120]: stmt = select([
     ...:     addresses.c.user_id,
     ...:     func.count(addresses.c.id).label('num_addresses')]).order_by(desc(
     ...: "num_addresses"))

In [121]: conn.execute(stmt).fetchall()
2017-02-14 20:52:20,567 INFO sqlalchemy.engine.base.Engine SELECT addresses.user_id, count(addresses.id) AS num_addresses
FROM addresses ORDER BY num_addresses DESC
2017-02-14 20:52:20,567 INFO sqlalchemy.engine.base.Engine ()
Out[121]: [(2, 4)]

也可以直接使用 ColumnElement 对象作为参数, 并且可以有效的处理不同表的重名字段问题.

In [122]: u1a, u1b = users.alias(), users.alias()

In [123]: stmt = select([u1a, u1b]).
     ...:            where(u1a.c.name > u1b.c.name).
     ...:            order_by(u1a.c.name)  # 如果在这里使用 "name" 就会引发冲突, 因为 u1a 和 u1b 都有 name 字段

In [124]: conn.execute(stmt).fetchall()
2017-02-14 20:59:41,505 INFO sqlalchemy.engine.base.Engine SELECT users_1.id, users_1.name, users_1.fullname, users_2.id, users_2.name, users_2.fullname
FROM users AS users_1, users AS users_2
WHERE users_1.name > users_2.name ORDER BY users_1.name
2017-02-14 20:59:41,505 INFO sqlalchemy.engine.base.Engine ()
Out[124]: [(2, u'wendy', u'Wendy Williams', 1, u'jack', u'Jack Jones')]

这个就是直接写 sql 都有点绕, 并且稍微注意下 alias(), 这个下面就说到了.

In [129]: conn.execute(text("SELECT u1.id, u1.name, u1.fullname, u2.id, u2.name,
     ...:  u2.fullname FROM users u1 JOIN users u2 WHERE u1.name > u2.nam
     ...: e ORDER BY u1.name")).fetchall()
2017-02-14 21:06:29,974 INFO sqlalchemy.engine.base.Engine select u1.id, u1.name, u1.fullname, u2.id, u2.name, u2.fullname from users u1 inner join users u2  where u1.name > u2.name order by u1.name
2017-02-14 21:06:29,974 INFO sqlalchemy.engine.base.Engine ()
Out[129]: [(2, u'wendy', u'Wendy Williams', 1, u'jack', u'Jack Jones')]

这一节看下来有点莫名其妙, 纯 SQL 和 SQL Expression 的使用会让程序看起来更混乱, 如果不是比较熟悉 SQL, 绕晕是分分钟的事情.

 

12. Using Aliases 使用别名

在 SQLAlchemy 中, 任何 Table 实例, select() 结构, 或其他 selectable 结构可以使用 FromClause.alias() 创建别名, Alias 结构. 

举例, 如何使用两个地址的组合反推出用户? 

In [130]: a1 = addresses.alias()

In [131]: a2 = addresses.alias()

In [132]: s = select([users]).
     ...:         where(and_(
     ...:             users.c.id == a1.c.user_id,
     ...:             users.c.id == a2.c.user_id,
     ...:             a1.c.email_address == 'jack@msn.com',
     ...:             a2.c.email_address == 'jack@yahoo.com'
     ...: ))

In [133]: conn.execute(s).fetchall()
2017-02-14 22:02:52,079 INFO sqlalchemy.engine.base.Engine SELECT users.id, users.name, users.fullname
FROM users, addresses AS addresses_1, addresses AS addresses_2
WHERE users.id = addresses_1.user_id AND users.id = addresses_2.user_id AND addresses_1.email_address = ? AND addresses_2.email_address = ?
2017-02-14 22:02:52,080 INFO sqlalchemy.engine.base.Engine ('jack@msn.com', 'jack@yahoo.com')
Out[133]: [(1, u'jack', u'Jack Jones')]

注意一下, 在生成的 SQL 里面, addresses 的别名分别是 addresses_1 和 addresses_2, 这个自动的, 如果 select 中只使用 a2, 生成的是 addresses_1.

这个如果用纯 SQL 该怎么写?

In [137]: s = text("SELECT users.id, users.name, users.fullname FROM users JOIN
     ...: addresses AS a1 ON users.id = a1.user_id JOIN addresses AS a2 ON users
     ...: .id = a2.user_id WHERE a1.email_address = 'jack@msn.com' AND a2.email_
     ...: address = 'jack@yahoo.com'")

In [138]: conn.execute(s).fetchall()
2017-02-14 22:06:46,463 INFO sqlalchemy.engine.base.Engine select users.id, users.name, users.fullname from users join addresses as a1 on users.id = a1.user_id join addresses as a2 on users.id = a2.user_id where a1.email_address = 'jack@msn.com' and a2.email_address = 'jack@yahoo.com'
2017-02-14 22:06:46,464 INFO sqlalchemy.engine.base.Engine ()
Out[138]: [(1, u'jack', u'Jack Jones')]

如果想控制最终生成的 SQL 中的别名, 可以如下方式

In [143]: a3 = addresses.alias('a3')

In [145]: conn.execute(select([a3])).fetchall()
2017-02-14 22:16:16,382 INFO sqlalchemy.engine.base.Engine SELECT a3.id, a3.user_id, a3.email_address
FROM addresses AS a3
2017-02-14 22:16:16,382 INFO sqlalchemy.engine.base.Engine ()
Out[145]:
[(1, 1, u'jack@yahoo.com'),
 (2, 1, u'jack@msn.com'),
 (3, 2, u'www@www.org'),
 (4, 2, u'wendy@aol.com')]

13. Using Join 使用联结

可以使用 join 和 outerjoin 创建联结结构, 默认的 join 是 LEFT JOIN.

In [148]: print(users.join(addresses))
users JOIN addresses ON users.id = addresses.user_id

join 的方法默认会寻找外键作为联结条件, 但是如何指定非外键的联结条件?

In [149]: print(users.join(addresses,
     ...:                  addresses.c.email_address.like(users.c.name + '%')
     ...: ))
users JOIN addresses ON addresses.email_address LIKE (users.name || :name_1)

但是, 这个 SQL 的目的是什么? 寻找以 user.name 作为邮箱前缀的记录. 并且注意到 like 函数中的 users.c.name + '%', '%' 是 SQL 中的通配符. 在这里, 一个 Column 实例加上一个字符串, 还是有点别扭.

outerjoin 创建了 LEFT OUTER JOIN 结构, 用法与 join() 类似.

In [150]: s = select([users.c.fullname]).select_from(users.outerjoin(addresses))
     ...:

In [151]: print(s)
SELECT users.fullname
FROM users LEFT OUTER JOIN addresses ON users.id = addresses.user_id

14. Everything Else

Bind Parameter Objects 绑定参数对象

这个涉及到一个概念, Bind Parameter, 绑定参数. SQLAlchemy 创建 SQL, 在需要外部赋值的地方, 会使用类似 ":name" 来进行标示, 调用时把 name 作为参数进行赋值即可对 SQL 进行初始化. 

使用 bindparam, 可以自己创建并命名 bind parameter.

In [152]: from sqlalchemy.sql import bindparam

In [153]: s = users.select(users.c.name == bindparam("username"))

In [154]: conn.execute(s, username='wendy').fetchall()
2017-02-14 22:57:09,592 INFO sqlalchemy.engine.base.Engine SELECT users.id, users.name, users.fullname
FROM users
WHERE users.name = ?
2017-02-14 22:57:09,592 INFO sqlalchemy.engine.base.Engine ('wendy',)
Out[154]: [(2, u'wendy', u'Wendy Williams')]

同时还可以在参数中指定 parameter 的类型, 这个类型会影响 SQL 的创建.

In [155]: s = users.select(users.c.name.like(bindparam('username', type_=String)
     ...:  + text("'%'")))

In [156]: conn.execute(s, username='wendy').fetchall()
2017-02-14 23:02:21,590 INFO sqlalchemy.engine.base.Engine SELECT users.id, users.name, users.fullname
FROM users
WHERE users.name LIKE (? || '%')
2017-02-14 23:02:21,590 INFO sqlalchemy.engine.base.Engine ('wendy',)
Out[156]: [(2, u'wendy', u'Wendy Williams')]

In [157]: s = users.select(users.c.name.like(bindparam('username', type_=String)
     ...:  + '%'))

In [158]: conn.execute(s, username='wendy').fetchall()
2017-02-14 23:02:38,788 INFO sqlalchemy.engine.base.Engine SELECT users.id, users.name, users.fullname
FROM users
WHERE users.name LIKE (? || ?)
2017-02-14 23:02:38,788 INFO sqlalchemy.engine.base.Engine ('wendy', '%')
Out[158]: [(2, u'wendy', u'Wendy Williams')]

Functions 方法

func 中包含了 SQL 中比较常用的函数.

In [160]: print(func.now())
now()

In [161]: print(func.concat('x', 'y'))
concat(:concat_1, :concat_2)

In [162]: func.now()
Out[162]: <sqlalchemy.sql.functions.now at 0x10aceb510; now>

In [163]: func.concat('x', 'y')
Out[163]: <sqlalchemy.sql.functions.concat at 0x10ad2a210; concat>

In [164]: print(func.am_i_a_function())
am_i_a_function()

In [165]: func.am_i_a_funciton()
Out[165]: <sqlalchemy.sql.functions.Function at 0x10ad2ad50; am_i_a_funciton>

另外, am_i_a_function 是随便写的.

SQLAlchemy 会识别特定的一些函数, 进行不同的表现.

In [166]: print(func.current_timestamp())
CURRENT_TIMESTAMP

Select 语句中, 可以对函数打标签和设定一个类型. 打标签的好处是可以使用字符串定位结果字段, 并且给它设置一个你需要的类型, 比如用于 Unicode 转换或 Date 转换. 下面的例子中, 我们使用 scalar() 来读取查询结果的第一列第一行, 然后关闭查询.

In [167]: conn.execute(
     ...:     select([
     ...:         func.max(addresses.c.email_address, type_=String).label('maxem
     ...: ail')
     ...:     ])
     ...: ).scalar()
2017-02-15 07:52:21,523 INFO sqlalchemy.engine.base.Engine SELECT max(addresses.email_address) AS maxemail
FROM addresses
2017-02-15 07:52:21,524 INFO sqlalchemy.engine.base.Engine ()
Out[167]: u'www@www.org'

In [169]: conn.execute(
     ...:     select([
     ...:         func.max(addresses.c.email_address, type_=Integer).label('maxe
     ...: mail')
     ...:     ])
     ...: ).scalar()
2017-02-15 07:53:11,869 INFO sqlalchemy.engine.base.Engine SELECT max(addresses.email_address) AS maxemail
FROM addresses
2017-02-15 07:53:11,869 INFO sqlalchemy.engine.base.Engine ()
Out[169]: u'www@www.org'

另外同样注意到, type_ 不会强制进行转换, 否则下面的例子会报错.

Unions and Other Set Operations

联合查询 UNION 和 UNION ALL, 可以通过模块级方法 union() 和 union_all() 实现.

In [175]: u = union(
     ...:     addresses.select().where(addresses.c.email_address == 'foo@bar.com
     ...: '),
     ...:     addresses.select().where(addresses.c.email_address.like ('%@yahoo.
     ...: com')),).order_by(addresses.c.email_address)

In [176]: conn.execute(u).fetchall()
2017-02-15 08:11:11,095 INFO sqlalchemy.engine.base.Engine SELECT addresses.id, addresses.user_id, addresses.email_address
FROM addresses
WHERE addresses.email_address = ? UNION SELECT addresses.id, addresses.user_id, addresses.email_address
FROM addresses
WHERE addresses.email_address LIKE ? ORDER BY addresses.email_address
2017-02-15 08:11:11,095 INFO sqlalchemy.engine.base.Engine ('foo@bar.com', '%@yahoo.com')
Out[176]: [(1, 1, u'jack@yahoo.com')]

MySQL 不支持 intersect, except. 

Scalar Select

select() 结构可以通过调用 as_scalar() 或 label(), 继而表现的像一个字段.

In [177]: stmt = select([func.count(addresses.c.id)]).where(users.c.id == addres
     ...: ses.c.user_id).as_scalar()

In [178]: conn.execute(select([users.c.name, stmt])).fetchall()
2017-02-15 08:46:07,071 INFO sqlalchemy.e ngine.base.Engine SELECT users.name, (SELECT count(addresses.id) AS count_1
FROM addresses
WHERE users.id = addresses.user_id) AS anon_1
FROM users
2017-02-15 08:46:07,071 INFO sqlalchemy.engine.base.Engine ()
Out[178]: [(u'jack', 2), (u'wendy', 2)]

stmt 是一个 ScalarSelect 对象, 可以作为参数传给 select(). 但是使用 as_scalar() 生成的字段是匿名的, 在 SQL 也看到了, 字段名称是 anon_1.

In [179]: stmt = select([func.count(addresses.c.id)]).where(users.c.id == addres
     ...: ses.c.user_id).label("addresses_count")

In [180]: conn.execute(select([users.c.name, stmt])).fetchall()
2017-02-15 08:55:00,720 INFO sqlalchemy.engine.base.Engine SELECT users.name, (SELECT count(addresses.id) AS count_1
FROM addresses
WHERE users.id = addresses.user_id) AS addresses_count
FROM users
2017-02-15 08:55:00,720 INFO sqlalchemy.engine.base.Engine ()
Out[180]: [(u'jack', 2), (u'wendy', 2)]

使用 label(), 可以让字段不再匿名. 多用于 debug.

Correlated Subqueries 关联子查询

注意到, Scalar Select 的例子中, 括号中嵌入的 FROM 后面并没有跟 users 表, 这是因为 SQLAlchemy 进行了自动关联.

In [18]: stmt = select([addresses.c.user_id]).
    ...:     where(addresses.c.user_id == users.c.id).
    ...:     where(addresses.c.email_address == 'jack@yahoo.com')

In [19]: enclosing_stmt = select([users.c.name]).where(users.c.id == stmt)

In [20]: conn.execute(enclosing_stmt).fetchall()
2017-02-26 10:34:17,573 INFO sqlalchemy.engine.base.Engine SELECT users.name
FROM users
WHERE users.id = (SELECT addresses.user_id
FROM addresses
WHERE addresses.user_id = users.id AND addresses.email_address = ?)
2017-02-26 10:34:17,573 INFO sqlalchemy.engine.base.Engine ('jack@yahoo.com',)
Out[20]: [(u'jack',)]

自动关联的行为也可以被控制, 不过看不到控制它的意义是什么, 暂时略过.

Ordering, Grouping, Limiting, Offset...ing...

排序通过 order_by() 实现

In [21]: stmt = select([users.c.name]).order_by(users.c.name)

In [22]: conn.execute(stmt).fetchall()
2017-02-26 10:41:49,006 INFO sqlalchemy.engine.base.Engine SELECT users.name
FROM users ORDER BY users.name
2017-02-26 10:41:49,006 INFO sqlalchemy.engine.base.Engine ()
Out[22]: [(u'jack',), (u'wendy',)]

正序和逆序通过 asc() 和 desc() 实现

In [23]: stmt = select([users.c.name]).order_by(users.c.name.desc())

In [24]: conn.execute(stmt).fetchall()
2017-02-26 10:43:31,448 INFO sqlalchemy.engine.base.Engine SELECT users.name
FROM users ORDER BY users.name DESC
2017-02-26 10:43:31,449 INFO sqlalchemy.engine.base.Engine ()
Out[24]: [(u'wendy',), (u'jack',)]

GROUP BY 通过 group_by() 实现

In [27]: stmt = select([users.c.name, func.count(addresses.c.id)]).
    ...:     select_from(users.join(addresses)).
    ...:     group_by(users.c.name)

In [28]: conn.execute(stmt).fetchall()
2017-02-26 10:46:01,558 INFO sqlalchemy.engine.base.Engine SELECT users.name, count(addresses.id) AS count_1
FROM users JOIN addresses ON users.id = addresses.user_id GROUP BY users.name
2017-02-26 10:46:01,558 INFO sqlalchemy.engine.base.Engine ()
Out[28]: [(u'jack', 2), (u'wendy', 2)]

HAVING 通过 having() 实现

In [29]: stmt = select([users.c.name, func.count(addresses.c.id)]).
    ...:     select_from(users.join(addresses)).
    ...:     group_by(users.c.name).
    ...:     having(func.length(users.c.name) > 4)

In [30]: conn.execute(stmt).fetchall()
2017-02-26 10:47:50,669 INFO sqlalchemy.engine.base.Engine SELECT users.name, count(addresses.id) AS count_1
FROM users JOIN addresses ON users.id = addresses.user_id GROUP BY users.name
HAVING length(users.name) > ?
2017-02-26 10:47:50,669 INFO sqlalchemy.engine.base.Engine (4,)
Out[30]: [(u'wendy', 2)]

LIMIT 和 OFFSET 通过 limit() 和 offset() 实现

In [31]: stmt = select([users.c.name, addresses.c.email_address]).
    ...:     select_from(users.join(addresses)).
    ...:     limit(1).offset(1)

In [32]: conn.execute(stmt).fetchall()
2017-02-26 10:52:47,639 INFO sqlalchemy.engine.base.Engine SELECT users.name, addresses.email_address
FROM users JOIN addresses ON users.id = addresses.user_id
 LIMIT ? OFFSET ?
2017-02-26 10:52:47,639 INFO sqlalchemy.engine.base.Engine (1, 1)
Out[32]: [(u'jack', u'jack@msn.com')]

Inserts, Updates and Deletes

Update

In [33]: stmt = users.update().
    ...:     values(fullname="Fullname: " + users.c.name)

In [34]: conn.execute(stmt)
2017-02-26 10:54:49,862 INFO sqlalchemy.engine.base.Engine UPDATE users SET fullname=(? || users.name)
2017-02-26 10:54:49,862 INFO sqlalchemy.engine.base.Engine ('Fullname: ',)
2017-02-26 10:54:49,863 INFO sqlalchemy.engine.base.Engine COMMIT
Out[34]: <sqlalchemy.engine.result.ResultProxy at 0x10a1c3f50>

在对 insert() 或 update() 执行批量操作的时候, SQLAlchemy 已经在 SQL 中预留了使用字段名命名的待绑定参数, 如果我们想指定不同的命名参数, 可以使用 bindparam().

In [37]: stmt = users.insert().
    ...:     values(name=bindparam("_name") + ".. name")

In [38]: conn.execute(stmt, [
    ...:     {"id":4, "_name":"name1"},
    ...:     {"id":5, "_name":"name2"},
    ...:     {"id":6, "_name":"name3"},
    ...: ])
2017-02-26 11:12:00,478 INFO sqlalchemy.engine.base.Engine INSERT INTO users (id, name) VALUES (?, (? || ?))
2017-02-26 11:12:00,478 INFO sqlalchemy.engine.base.Engine ((4, 'name1', '.. name'), (5, 'name2', '.. name'), (6, 'name3', '.. name'))
2017-02-26 11:12:00,479 INFO sqlalchemy.engine.base.Engine COMMIT
Out[38]: <sqlalchemy.engine.result.ResultProxy at 0x10a271dd0>

update() 与 insert() 的使用类似, 除了它多一个 where() 字句.

In [39]: stmt = users.update().
    ...:     where(users.c.name == "jack").
    ...:     values(name="ed")

In [40]: conn.execute(stmt)
2017-02-26 11:13:37,877 INFO sqlalchemy.engine.base.Engine UPDATE users SET name=? WHERE users.name = ?
2017-02-26 11:13:37,878 INFO sqlalchemy.engine.base.Engine ('ed', 'jack')
2017-02-26 11:13:37,878 INFO sqlalchemy.engine.base.Engine COMMIT
Out[40]: <sqlalchemy.engine.result.ResultProxy at 0x10a271850>

update() 的批量执行, 当然需要用到 bindparam.

In [41]: stmt = users.update().
    ...:     where(users.c.name == bindparam("oldname")).
    ...:     values(name=bindparam('newname'))

In [42]: conn.execute(stmt, [
    ...:     {"oldname": "jack", "newname": "ed"},
    ...:     {"oldname": "wendy", "newname": "mary"},
    ...:     {"oldname": "jim", "newname": "jake"},
    ...: ])
2017-02-26 11:16:48,668 INFO sqlalchemy.engine.base.Engine UPDATE users SET name=? WHERE users.name = ?
2017-02-26 11:16:48,668 INFO sqlalchemy.engine.base.Engine (('ed', 'jack'), ('mary', 'wendy'), ('jake', 'jim'))
2017-02-26 11:16:48,669 INFO sqlalchemy.engine.base.Engine COMMIT
Out[42]: <sqlalchemy.engine.result.ResultProxy at 0x10a1c3210>

在这里, 需要注意的是, 会遇到 bindparam 的参数与预设的字段名冲突的问题

In [47]: stmt = users.update().
    ...:     where(users.c.name == bindparam("name")).
    ...:     values(name='name')

In [48]: conn.execute(stmt, [
    ...:     {"name": "jack"},
    ...:     {"name": "wendy"},
    ...:     {"name": "jim"},
    ...: ])

CompileError: bindparam() name 'name' is reserved for automatic usage in the VALUES or SET clause of this insert/update statement.   Please use a name other than column name when using bindparam() with insert() or update() (for example, 'b_name').

需要注意一下, 统一的方案是, 在任何时刻, bindparam 都不要与字段名相同.

Correlated Updates 关联更新

直接使用查询结果进行更新

In [49]: stmt = select([addresses.c.email_address]).
    ...:     where(addresses.c.user_id == users.c.id).
    ...:     limit(1)

In [50]: conn.execute(users.update().values(fullname=stmt))
2017-02-26 11:27:18,176 INFO sqlalchemy.engine.base.Engine UPDATE users SET fullname=(SELECT addresses.email_address
FROM addresses
WHERE addresses.user_id = users.id
 LIMIT ? OFFSET ?)
2017-02-26 11:27:18,176 INFO sqlalchemy.engine.base.Engine (1, 0)
2017-02-26 11:27:18,177 INFO sqlalchemy.engine.base.Engine COMMIT
Out[50]: <sqlalchemy.engine.result.ResultProxy at 0x10a28d510>

Multiple Table Updates

sqlite3 不支持. MySQL 支持.

stmt = users.update().
        values(name='ed wood').
        where(users.c.id == addresses.c.id).
        where(addresses.c.email_address.startswith('ed%'))
conn.execute(stmt)

MySQL 还支持更简单粗暴的方法.

stmt = users.update().
        values({
            users.c.name:'ed wood',
            addresses.c.email_address:'ed.wood@foo.com'
        }).
        where(users.c.id == addresses.c.id).
        where(addresses.c.email_address.startswith('ed%'))

Deletes

>>> conn.execute(addresses.delete())
DELETE FROM addresses
()
COMMIT
<sqlalchemy.engine.result.ResultProxy object at 0x...>

>>> conn.execute(users.delete().where(users.c.name > 'm'))
DELETE FROM users WHERE users.name > ?
('m',)
COMMIT
<sqlalchemy.engine.result.ResultProxy object at 0x...>

Matched Row Counts

update() 和 delete() 可以看到 where 匹配的行数. 其实这个值没啥意义, 又不准.

SQL Expression Language 总算是过了一遍, 实际使用中遇到的问题会新开一篇记录, 自此, 代码里面终于不用见到原生 SQL 了.

原文地址:https://www.cnblogs.com/senjougahara/p/6394037.html