asyncpg:异步操作PostgreSQL

楔子

Python目前已经进化到了3.8版本,对操作数据库也提供了相应的异步支持,下面以PostgreSQL为例,来看看如何使用Python3.8异步操作PostgreSQL数据库中的表。

如果Python想异步操作PostgreSQL的话,需要使用一个模块,叫:asyncpg,直接pip install asyncpg即可。

注意:使用asyncpg,需要你对Python中的异步有一定的了解,并且熟悉标准库asyncio。

简单使用

以一个简单的例子,来看看如何使用asyncpg操作PostgreSQL数据库。

import asyncio
import asyncpg


async def main():
    # 创建连接数据库的驱动,创建连接的时候要使用await
    conn = await asyncpg.connect(host="localhost",
                                 port=5432,
                                 user="postgres",
                                 password="zgghyys123",
                                 database="postgres")
    # 除了上面的方式,还可以使用类似于sqlalchemy的方式创建
    # await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")

    # 调用await conn.fetchrow执行select语句,获取满足条件的单条记录
    # 调用await conn.fetch执行select语句,获取满足条件的全部记录
    row1 = await conn.fetchrow("select * from t1")
    row2 = await conn.fetch("select * from t1")
    
    # 返回的是一个Record对象,这个Record对象等于将返回的记录进行了一个封装
    # 至于怎么用后面会说。
    print(row1)  # <Record pk=1 name='古明地觉'>
    print(row2)
    """
    [<Record pk=1 name='古明地觉'>, <Record pk=2 name='雾雨魔理沙'>, 
     <Record pk=3 name='芙兰朵露'>, <Record pk=4 name='十六夜咲夜'>,
     <Record pk=5 name='风见幽香'>, <Record pk=6 name='博丽灵梦'>, 
     <Record pk=7 name='藤原妹红'>, <Record pk=8 name='琪露诺'>]
    """
    # 关闭连接
    await conn.close()


if __name__ == '__main__':
    asyncio.run(main())

以上我们演示了如何使用asyncpg来获取数据库中的记录,我们看到执行select语句的话,我们可以使用conn.fetchrow(query)来获取满足条件的单条记录,conn.fetchall(query)来获取满足条件的所有记录。

除了conn.fetchone 和 conn.fetchall之外,还有一个没什么卵用的conn.fetchval

import asyncio
import asyncpg


async def main():
    conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
    row = await conn.fetchval("select * from t1")
    
    # 这个conn.fetchval表示获取满足条件的第一条记录的第一个值
    # 我们的t1表中有两个字段,分别是pk和name,而第一个字段是pk
    # 因此上面那条语句你可以理解为:select pk from t1 limit 1
    # 就表示获取第一条记录的第一个值,个人觉得没有什么卵用
    print(row)  # 1

    # 关闭连接
    await conn.close()


if __name__ == '__main__':
    asyncio.run(main())

Record

我们说使用conn.fetchone查询得到的是一个Record,使用conn.fetchall 查询得到的是多个Record组成的列表,那么这个Rcord怎么用呢?

首先Record对象是对返回的记录进行了一个封装,如果是sqlalchemy的话,一条记录对应一个元组

import asyncio
import asyncpg


async def main():
    conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
    row = await conn.fetchrow("select * from t1")

    print(type(row))  # <class 'asyncpg.Record'>
    print(row)  # <Record pk=1 name='古明地觉'>

    # 这个Record对象可以想象成一个字典
    # 我们说表中有两个字段,分别是pk和name
    print(row["pk"], row["name"])  # 1 古明地觉

    # 除此之外,还可以通过get获取
    print(row.get("pk"), row.get("name"))  # 1 古明地觉

    # 除此之外还可以调用keys、values、items,这个不用我说,都应该知道意味着什么
    # 只不过返回的是一个迭代器
    print(row.keys())  # <tuple_iterator object at 0x000001D6FFDAE610>
    print(row.values())  # <tuple_iterator object at 0x000001D6FFDAE610>
    print(row.items())  # <RecordItemsIterator object at 0x000001D6FFDF20C0>

    # 我们需要转成列表或者元组
    print(list(row.keys()))  # ['pk', 'name']
    print(list(row.values()))  # [1, '古明地觉']
    print(dict(row.items()))  # {'pk': 1, 'name': '古明地觉'}

    # 关闭连接
    await conn.close()


if __name__ == '__main__':
    asyncio.run(main())

可以看到相比sqlalchemy返回的简简单单的元组,asyncpg封装的Record对象具有更强的可定制性和可扩展性,因为在返回的记录中包含了表的字段名。

执行记录的其它几种姿势

我们使用conn.fetchrow或者conn.fetch执行SQL语句的时候,也是可以加上参数的。

import asyncio
import asyncpg


async def main():
    conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
    # 和其它驱动类似,使用$n作为占位符
    row = await conn.fetchrow("select * from t1 where name = $1", "风见幽香")

    print(row)  # <Record pk=5 name='风见幽香'>
    # 关闭连接
    await conn.close()


if __name__ == '__main__':
    asyncio.run(main())

我们看到在获取记录的时候使用的都是fetchrow和fetch,但是除了这两个之外,还有conn.execute和conn.executemany

fetchrow和fetch是专门针对select语句的,还有execute和executemany针对的是select语句之外的其它语句

import asyncio
import asyncpg


async def main():
    conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")

    # 插入记录,返回一个字符串,记录最后一条SQL的执行状态
    row = await conn.execute("insert into t1 values(9, '八意永琳')")
    print(row)  # INSERT 0 1

    row = await conn.execute("update t1 set pk = 99 where name = $1", '琪露诺')
    print(row)  # UPDATE 1

    row = await conn.execute("delete from t1 where name in ($1, $2)", '琪露诺', '八意永琳')
    print(row)  # DELETE 2
    # 关闭连接
    await conn.close()


if __name__ == '__main__':
    asyncio.run(main())

如果是executemany的话,可以同时执行多条SQL语句

import asyncio
import asyncpg


async def main():
    conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")

    # executemany:第一条参数是一个模板,第二条命令是包含多个元组的列表
    # 执行多条记录的话,返回的结果为None
    
    row = await conn.executemany("insert into t1 values($1, $2)",
                                 [(8, "琪露诺"), (9, "八意永琳")]
                                 )
    print(row)  # None

    row = await conn.executemany("update t1 set pk = $1 where name = $2",
                                 [(88, "琪露诺"), (99, "八意永琳")]
                                 )
    print(row)  # None
    
    row = await conn.executemany("delete from t1 where pk = $1",
                                 [(88,), (99,)])  # 即使是一个值也要写成元组
    print(row)  # None
    
    # 关闭连接
    await conn.close()


if __name__ == '__main__':
    asyncio.run(main())

除了增删改之外,像什么建表语句、修改表字段、类型等等,都使用execute,执行多条的话使用executemany。至于fetch和fetchrow是专门针对select查询使用的

注意:如果是执行大量SQL的话,那么executemany要比execute快很多,但是executemany不具备事务功能

await conn.executemany("insert into t1 values($1, $2)",
                       [(8, "琪露诺"), (8, "八意永琳"), (9, "zun")]
                       )

我们表中的pk是主键,不可以重复,这里插入三条记录。显然第二条记录中的pk和第一条重复了,执行的时候会报错。但是第一条(8, "琪露诺")是进入到数据库了的,尽管第二条记录在插入的时候执行失败,当然第二条执行失败,那么第二条之后的也就无法执行了。这一点务必注意,如果我们想要实现事务的话该怎么做呢?

import asyncio
import asyncpg


async def main():
    conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")

    # 调用conn.transaction()会开启一个事务,当然这里建议使用异步上下文管理
    async with conn.transaction():
        await conn.executemany("insert into t1 values($1, $2)",
                               [(9, "八意永琳"), (9, "zun")]
                               )

    # 关闭连接
    await conn.close()


if __name__ == '__main__':
    asyncio.run(main())

两条记录主键重复,最终这两条记录都没进入到数据库中。

连接池

asyncpg还提供了连接池,需要的话往池子里面去取即可。

import asyncio
import asyncpg


async def main():
    pool = await asyncpg.create_pool(
        "postgres://postgres:zgghyys123@localhost:5432/postgres",
        min_size=10,  # 连接池初始化时默认的最小连接数, 默认为10
        max_size=10,  # 连接池的最大连接数, 默认为10
        max_queries=5000,  # 每个链接最大查询数量, 超过了就换新的连接, 默认5000
        # 最大不活跃时间, 默认300.0, 超过这个时间的连接就会被关闭,传入0的话则永不关闭
        max_inactive_connection_lifetime=300.0
    )
    # 如果还有其它什么特殊参数,也可以直接往里面传递,因为设置了**connect_kwargs
    # 专门用来设置一些数据库独有的某些属性

    # 从池子中取出一个连接
    async with pool.acquire() as conn:
        async with conn.transaction():
            row = await conn.fetchrow("select '100'::int + 200")
            # 我们看到没有指定名字,随意返回字段名叫做?column?
            # 不要慌,PostgreSQL中返回的也是这个结果
            print(row)  # <Record ?column?=300>

            # 解决办法就是起一个别名
            row = await conn.fetchrow("select '100'::int + 200 as result")
            print(row)  # <Record result=300>

    # 这里就不需要关闭了,因为我们的连接是从池子里面取出的
    # 上下文结束之后就放回到池子里面了
    # await conn.close()


if __name__ == '__main__':
    # 这里就不要使用asyncio.run(main())了
    # 而是创建一个事件循环,然后运行
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

通过以上的例子,我们看到asyncpg还是非常好用的,感觉比psycopg2要强大一些。另外值得一提的是,asyncpg不依赖psycopg2,asyncpg是自己独立实现了连接PostgreSQL的一套驱动,底层不需要依赖psycopg这个模块。

除了asyncpg,异步操作PostgreSQL还有一个模块叫做aiopg,但是功能个人感觉没有asyncpg强大,并且aiopg底层是需要依赖于psycopg2_binary的。不过有一个精简的小型orm叫peewee,这个orm也进行了异步化,叫做peewee_async,它在异步连接PostgreSQL的时候,使用的异步驱动是aiopg,表示asyncpg。

效率对比

我们之所以使用asyncpg,无非是为了效率,那么asyncpg和传统的psycopg2相比,在效率上究竟有多少差距呢?我们来测试一下。

SELECT count(*) FROM interface; -- 8459729

我数据库中有一张表叫做interface,是之前工作的时候从对方接口获取的,我们就用它来进行测试吧。

先使用同步版本的来访问,看看用多长时间。

import time
from sqlalchemy import create_engine, text

engine = create_engine("postgres://postgres:zgghyys123@localhost:5432/postgres")

with engine.begin() as conn:
    start = time.perf_counter()
    for _ in range(20):
        res = conn.execute(
            text('select * from interface where "ProwlerPersonID" = :arg'),
            {"arg": "c9fcbed8-fa47-481a-9d73-5fd1dd344f19"})
        print(f"满足条件的记录有:{len(res.fetchall())}条")
    end = time.perf_counter()
    print("总耗时:", end - start)  # 50.0419027
"""
满足条件的记录有:228186条
满足条件的记录有:228186条
满足条件的记录有:228186条
满足条件的记录有:228186条
满足条件的记录有:228186条
满足条件的记录有:228186条
满足条件的记录有:228186条
满足条件的记录有:228186条
满足条件的记录有:228186条
满足条件的记录有:228186条
满足条件的记录有:228186条
满足条件的记录有:228186条
满足条件的记录有:228186条
满足条件的记录有:228186条
满足条件的记录有:228186条
满足条件的记录有:228186条
满足条件的记录有:228186条
满足条件的记录有:228186条
满足条件的记录有:228186条
满足条件的记录有:228186条
总耗时: 50.0419027
"""

我们看到记录有20多万条,所以就不打印记录了,因为执行的是同一条SQL,所以结果是一样的,然后我们看到花了50秒钟。

再来看看使用异步版本用多长时间。

import time
import asyncio
import asyncpg


async def run_sql(conn, query_list):
    result = []
    for query in query_list:
        result.append(await conn.fetch(*query))
    await conn.close()
    return [f"满足条件的记录有:{len(_)}条" for _ in result]


async def main():
    async with asyncpg.create_pool("postgres://postgres:zgghyys123@localhost:5432/postgres") as pool:
        query_list = [('select * from interface where "ProwlerPersonID" = $1',
                       "c9fcbed8-fa47-481a-9d73-5fd1dd344f19")
                      for _ in range(20)]

        # 我们要创建5个连接异步访问
        count = len(query_list) // 5
        query_list = [query_list[c * 4: (c + 1) * 4] for c in range(count + 1)]

        tasks = []
        for q in query_list:
            conn = await pool.acquire()
            tasks.append(run_sql(conn, q))
        results = await asyncio.gather(*tasks)
        return results

if __name__ == '__main__':
    start = time.perf_counter()
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(main())
    end = time.perf_counter()
    for result in results:
        for _ in result:
            print(_)
    print("总耗时:", end - start)
    """
    满足条件的记录有:228186条
    满足条件的记录有:228186条
    满足条件的记录有:228186条
    满足条件的记录有:228186条
    满足条件的记录有:228186条
    满足条件的记录有:228186条
    满足条件的记录有:228186条
    满足条件的记录有:228186条
    满足条件的记录有:228186条
    满足条件的记录有:228186条
    满足条件的记录有:228186条
    满足条件的记录有:228186条
    满足条件的记录有:228186条
    满足条件的记录有:228186条
    满足条件的记录有:228186条
    满足条件的记录有:228186条
    满足条件的记录有:228186条
    满足条件的记录有:228186条
    满足条件的记录有:228186条
    满足条件的记录有:228186条
    总耗时: 9.8730488
    """

我们看到花了将近十秒钟,正好是同步版本的五分之一,因为我们使用了连接池中的五个连接。

注意:如果是sqlalchemy,即便你给每一个SQL开一个连接,如果不使用多线程,只是同步访问的话,那么耗时还是50秒左右,因为它是同步访问的。

而使用异步的模式访问的话,每个连接都可以进行异步访问,那么我们创建的5个连接中,如果一个连接阻塞了,会切换到其它的连接去执行。所以耗时为五分之一,不过这里可能有人会觉得困惑,不知道我上面的代码做了些什么,这里来解释一下。

async def f1():
    for _ in [耗时协程1, 耗时协程2, 耗时协程3]:
        await _
        
def f2():
    for _ in [耗时函数1, 耗时函数2, 耗时函数3]:
        _()

我们上面的两段代码,如果函数和协程里面的代码做了相同的事情的话,那么这两个for循环耗时基本是一致的。首先函数无需解释,关键是协程为什么会这样。

我们await 协程,会等待这个协程完成,对于一个for循环来说,不可能说当前循环还没执行完毕就去执行下一层循环。所以无论是协程还是普通的函数,都要经历三轮循环,所以它们的耗时是基本一致的。

如果想解决这个问题,那么就要使用asyncio.gather(*协程列表),如果是await asyncio.gather(*[耗时协程1, 耗时协程2, 耗时协程3]),那么时间相比普通函数来说,就会有明显的缩短。因为此时这三个协程是同时发出的。

我们上面使用asyncio.gather的目的就在于此,但是问题是我们为什么要创建多个连接、为什么要把20个任务分成4份呢。

首先对于数据库来讲,一个连接只能同时执行一个SQL,如果你使用多线程、但是每个线程用的却是同一个连接的话,你会发现耗时和原来基本没区别。虽然线程阻塞会自动切换,但是你使用的连接已经被别人用了,所以请求同时只能发一个。如果是asyncpg的话,一个连接同时执行多了SQL,那么会直接报错。

import asyncio
import asyncpg


async def main():
    async with asyncpg.create_pool("postgres://postgres:zgghyys123@localhost:5432/postgres") as pool:
        async with pool.acquire() as conn:
            # 将20个请求同时发出,理论上可以,但是对于数据库来说是不同意的
            # 因为我们这里的conn都是同一个连接
            query_list = [conn.fetch('select * from interface where "ProwlerPersonID" = $1',
                                     "c9fcbed8-fa47-481a-9d73-5fd1dd344f19")
                          for _ in range(20)]
            await asyncio.gather(*query_list)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
"""
  File "asyncpgprotocolprotocol.pyx", line 301, in query
  File "asyncpgprotocolprotocol.pyx", line 664, in asyncpg.protocol.protocol.BaseProtocol._check_state
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
"""

阅读报错信息,提示我们:无法执行操作,另一个操作已经在执行了。

说明我们的20个请求都是同一个连接发出的,第一个连接在执行的时候,第二个连接理论上应该处于阻塞状态的,但是这里直接报错了。但不管怎么样,都不是我们想要的。

所以我们只能使用我们上面说的,for循环里面写await 协程的方式,但是这样和同步又没什么区别,不过我们创建5个连接,对5个连接使用asyncio.gather,也就是让这五个连接同时执行。尽管每个连接是同步的,但是这5个连接是异步的,因为它们彼此没有关联,是不同的连接,因此异步耗时为同步的五分之一。

总结

综上所述,asyncpg还是很强大的,合理使用确实能够增加效率。只是一定要注意数据之间的安全性。

原文地址:https://www.cnblogs.com/traditional/p/12290776.html