Python中syncio和aiohttp

CPython 解释器本身就不是线程安全的,因此有全局解释器锁(GIL),一次只允许使用一个线程执行 Python 字节码。因此,一个 Python 进程通常不能同时使用多个 CPU 核心。然而,标准库中所有执行阻塞型 I/O 操作的函数,在等待操作系统返回结果时都会释放GIL。这意味着在 Python 语言这个层次上可以使用多线程,而 I/O 密集型 Python 程序能从中受益:一个 Python 线程等待网络响应时,阻塞型 I/O 函数会释放 GIL,再运行一个线程。asyncio这个包使用事件循环驱动的协程实现并发。 asyncio 大量使用 yield from 表达式,因此与Python 旧版不兼容。
asyncio 包使用的“协程”是较严格的定义。适合asyncio API 的协程在定义体中必须使用 yield from,而不能使用 yield。此外,适合 asyncio 的协程要由调用方驱动,并由调用方通过 yield from 调用;

先看2个例子:

import threading
import asyncio
 
@asyncio.coroutine
def hello():
    print('Start Hello', threading.currentThread())
    yield from asyncio.sleep(5)
    print('End Hello', threading.currentThread())
 
@asyncio.coroutine
def world():
    print('Start World', threading.currentThread())
    yield from asyncio.sleep(3)
    print('End World', threading.currentThread())
 
# 获取EventLoop:
loop = asyncio.get_event_loop()
tasks = [hello(), world()]
# 执行coroutine
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

@asyncio.coroutine把生成器函数标记为协程类型。
asyncio.sleep(3) 创建一个3秒后完成的协程。
loop.run_until_complete(future),运行直到future完成;如果参数是 coroutine object,则需要使用 ensure_future()函数包装。
loop.close() 关闭事件循环

import asyncio
 
@asyncio.coroutine
def worker(text):
    """ 协程运行的函数 :param text::return: """
    i = 0
    while True:
        print(text, i)
        try:
            yield from asyncio.sleep(.1)
        except asyncio.CancelledError:
            break
        i += 1
 
@asyncio.coroutine
def client(text, io_used):
    work_fu = asyncio.ensure_future(worker(text))
    # 假装等待I/O一段时间
    yield from asyncio.sleep(io_used)
    # 结束运行协程
    work_fu.cancel()
    return 'done'
 
loop = asyncio.get_event_loop()
tasks = [client('xiaozhe', 3), client('zzz', 5)]
result = loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print('Answer:', result)

asyncio.ensure_future(coro_or_future, *, loop=None):计划安排一个 coroutine object的执行,返回一个 asyncio.Task object。
worker_fu.cancel(): 取消一个协程的执行,抛出CancelledError异常。
asyncio.wait():协程的参数是一个由期物或协程构成的可迭代对象; wait 会分别把各个协程包装进一个 Task 对象。

async和await是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换。
1. 把@asyncio.coroutine替换为async
2. 把yield from替换为await

@asyncio.coroutine
def hello():
  print("Hello world!")
  r = yield from asyncio.sleep(1)
  print("Hello again!")

等价于

async def hello():
  print("Hello world!")
  r = await asyncio.sleep(1)
  print("Hello again!")  

asyncio可以实现单线程并发IO操作。如果仅用在客户端,发挥的威力不大。如果把asyncio用在服务器端,例如Web服务器,由于HTTP连接就是IO操作,因此可以用单线程+coroutine实现多用户的高并发支持。

asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架

客户端:

import aiohttp
import asyncio
import async_timeout
 
 
async def fetch(session, url):
    async with async_timeout.timeout(10):
        async with session.get(url) as response:
            return await response.text()
 
 
async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'http://python.org')
        print(html)
 
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

服务端:

from aiohttp import web
 
async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return web.Response(text=text)
 
app = web.Application()
app.router.add_get('/', handle)
app.router.add_get('/{name}', handle)
 
web.run_app(app)

运行结果:

 

爬取当当畅销书的图书信息的代码如下:

'''异步方式爬取当当畅销书的图书信息'''
import  os
import time
import aiohttp
import asyncio
import pandas as pd
from bs4 import BeautifulSoup

# table表格用于储存书本信息
table = []


# 获取网页(文本信息)
async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text(encoding='gb18030')


# 解析网页
async def parser(html):
    # 利用BeautifulSoup将获取到的文本解析成HTML
    soup = BeautifulSoup(html, 'lxml')
    # 获取网页中的畅销书信息
    book_list = soup.find('ul', class_='bang_list clearfix bang_list_mode')('li')
    for book in book_list:
        info = book.find_all('div')
        # 获取每本畅销书的排名,名称,评论数,作者,出版社
        rank = info[0].text[0:-1]
        name = info[2].text
        comments = info[3].text.split('')[0]
        author = info[4].text
        date_and_publisher= info[5].text.split()
        publisher = date_and_publisher[1] if len(date_and_publisher)>=2 else ''
        # 将每本畅销书的上述信息加入到table中
        table.append([rank, name, comments, author, publisher])


# 处理网页
async def download(url):
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, url)
        await parser(html)

# 全部网页
urls = ['http://bang.dangdang.com/books/bestsellers/01.00.00.00.00.00-recent7-0-0-1-%d'%i for i in range(1,26)]
# 统计该爬虫的消耗时间
print('#' * 50)
t1 = time.time() # 开始时间

# 利用asyncio模块进行异步IO处理
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(download(url)) for url in urls]
tasks = asyncio.gather(*tasks)
loop.run_until_complete(tasks)

# 将table转化为pandas中的DataFrame并保存为CSV格式的文件
df = pd.DataFrame(table, columns=['rank', 'name', 'comments', 'author', 'publisher'])
df.to_csv('dangdang.csv', index=False)

t2 = time.time() # 结束时间
print('使用aiohttp,总共耗时:%s' % (t2 - t1))
print('#' * 50)
原文地址:https://www.cnblogs.com/majiang/p/9877504.html