高性能异步爬虫

其实爬虫的本质就是client发请求批量获取server的响应数据,如果我们有多个url待爬取,只用一个线程且采用串行的方式执行,那只能等待爬取一个结束后才能继续下一个,效率会非常低。

需要强调的是:对于单线程下串行N个任务,并不完全等同于低效,如果这N个任务都是纯计算的任务,那么该线程对cpu的利用率仍然会很高,

之所以单线程下串行多个爬虫任务低效,是因为爬虫任务是明显的IO密集型(阻塞)程序。那么该如何提高爬取性能呢?

分析处理

同步调用:即提交一个任务后就在原地等待任务结束,等到拿到任务的结果后再继续下一行代码,效率低下。

解决同步调用方案之多线程/多进程(不建议使用)

好处:在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。

弊端:开启多进程或都线程的方式,我们是无法无限制地开启多进程或多线程的:在遇到要同时处理成百上千个的连接请求时,则无论多线程还是多进程都会严重占据系统资源,

降低系统对外界响应效率,而且线程与进程本身也更容易进入假死状态。

解决同步调用方案之线程/进程池(适当使用)

好处:很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。可以很好的降低系统开销。

弊端:“线程池”和“连接池”技术也只是在一定程度上缓解了频繁创建和销毁线程带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,

“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。

案例:对比同步和使用线程池的执行效率

#同步执行
import time
def sayhello(str):
    print("Hello ",str)
    time.sleep(2)
name_list =['xiaozi','aa','bb','cc']
start_time = time.time()
for i in range(len(name_list)):
    sayhello(name_list[i])
print('%d second'% (time.time()-start_time))



执行结果:

Hello  xiaozi
Hello  aa
Hello  bb
Hello  cc
8 second
import time
from multiprocessing.dummy import Pool
def sayhello(str):
    print("Hello ",str)
    time.sleep(2)
start = time.time()
name_list =['xiaozi','aa','bb','cc']
#实例化线程池对象,开启了4个线程
pool = Pool(4)
pool.map(sayhello,name_list)  # 将列表的每个元素调用sayhello方法
pool.close()
pool.join()
end = time.time()
print(end-start)


执行结果:

Hello  xiaozi
Hello  aa
Hello  bb
Hello  cc
2.0805933475494385

效率可见一斑。

总结对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。

总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这个问题。

单线程+异步协程

无论哪种解决方案其实没有解决一个性能相关的问题:IO阻塞,无论是多进程还是多线程,在遇到IO阻塞时都会被操作系统强行剥夺走CPU的执行权限,程序的执行效率因此就降低了下来。

解决这一问题的关键在于,我们自己从应用程序级别检测IO阻塞然后切换到我们自己程序的其他任务执行,这样把我们程序的IO降到最低,我们的程序处于就绪态就会增多,以此来迷惑操作系统,操作系统便以为我们的程序是IO比较少的程序,从而会尽可能多的分配CPU给我们,这样也就达到了提升程序执行效率的目的。

在python3.4之后新增了asyncio模块,可以帮我们检测IO阻塞,然后实现异步IO。注意:asyncio只能发tcp级别的请求,不能发http协议。

什么是异步IO

所谓「异步 IO」,就是你发起一个 IO阻塞 操作,却不用等它结束,你可以继续做其他事情,当它结束时,你会得到通知。

实现异步IO的方式

单线程+异步协程实现异步IO操作

异步协程的用法

从 Python 3.4 开始,Python 中加入了协程的概念,但这个版本的协程还是以生成器对象为基础的,在 Python 3.5 则增加了 async/await,使得协程的实现更加方便。首先我们需要了解下面几个概念:



event_loop:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足某些条件的时候,函数就会被循环执行。程序是按照设定的顺序从头执行到尾,运行的次数也是完全按照设定。
当在编写异步程序时,必然其中有部分程序的运行耗时是比较久的,需要先让出当前程序的控制权,让其在背后运行,让另一部分的程序先运行起来。当背后运行的程序完成后,也需要及时通知主程序已经完成任务可以
进行下一步操作,但这个过程所需的时间是不确定的,需要主程序不断的监听状态,一旦收到了任务完成的消息,就开始进行下一步。loop就是这个持续不断的监视器。 coroutine:中文翻译叫协程,在 Python 中常指代为协程对象类型,我们可以将协程对象注册到事件循环中,它会被事件循环调用。我们可以使用 async 关键字来定义一个方法
,这个方法在调用时不会立即被执行,而是返回一个协程对象。 task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。 future:代表将来执行或还没有执行的任务,实际上和 task 没有本质区别。 另外我们还需要了解 async
/await 关键字,它是从 Python 3.5 才出现的,专门用于定义协程。其中,async 定义一个协程,await 用来挂起阻塞方法的执行。

代码示例

import asyncio
async def request(url):
    print('正在请求url',url)
    print('请求成功',url)
    return url

c = request('www.baidu.com') #async修饰的函数 调用后返回一个协程对象

loop = asyncio.get_event_loop() # 创建一个事件循环对象
loop.run_until_complete(c)  # 将协程对象注册到loop中 然后启动loop


# task 和 future对象会记录当前协程的一些状态信息
# task的使用
loop =asyncio.get_event_loop()
task = loop.create_task(c)  # 基于loop创建一个task对象 并将协程c注册到task中
print(task)
loop.run_until_complete(task)  # 将task注册到事件循环中
print(task)


# future的使用
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(c)  # 基于asyncio模块创建future对象
print(task)
loop.run_until_complete(task)
print(task)


# 绑定回调,回调函数就是task执行完毕后执行的函数
def call_back_func(task):
    print(task.result())# result返回的就是task中协程对象对应函数的返回值

loop = asyncio.get_event_loop()
task = loop.create_task(c)
task.add_done_callback(call_back_func)  # 将回调函数绑定到任务对象中
loop.run_until_complete(task)

多任务协程

如果我们想执行多次请求应该怎么办呢?我们可以定义一个 task 列表,然后使用 asyncio 的 wait() 方法即可执行。

注:在异步协程中 如果出现了与同步模块相关的代码 则无法实现异步

import asyncio
import time
async def request(url):
    print('正在请求url',url)
    # 在异步协程中 如果出现了与同步模块相关的代码 则无法实现异步
    # time.sleep(2)
    await asyncio.sleep(2)  # asyncio 遇到io操作必须用await挂起
    print('请求成功',url)
start = time.time()
urls = [
    'www.baidu.com',
    'www.luffycitu.com',
    'www.goubanjia.com'
]
tasks = []
for url in urls:
    c = request(url)
    task = asyncio.ensure_future(c)
    tasks.append(task)
loop = asyncio.get_event_loop()
# 需要将任务列表封装到wait中 固定写法
loop.run_until_complete(asyncio.wait(tasks))
end =time.time()
print(end-start)

aiohttp模块

接下来,咱们就可以尝试的将多任务异步协程应用到爬虫中进行试验,看是否能够实现多任务异步爬虫?为了表现出协程的优势,我们需要先创建一个合适的实验环境,最好的方法就是模拟一个需要等待一定时间才可以获取返回结果的网页,在本地模拟一个慢速服务器,这里我们选用 Flask。

服务器代码如下:

from flask import Flask
import time
app = Flask(__name__)
@app.route('/bobo')
def index_bobo():
    time.sleep(2)
    return 'Hello bobo'
@app.route('/jay')
def index_jay():
    time.sleep(2)
    return 'Hello jay'
@app.route('/tom')
def index_tom():
    time.sleep(2)
    return 'Hello tom'
if __name__ == '__main__':
    app.run(threaded=True)

接下来,将多任务异步协程操作应用在爬虫上:

import requests
import asyncio
import time
start = time.time()
urls = [
    'http://127.0.0.1:5000/bobo','http://127.0.0.1:5000/jay','http://127.0.0.1:5000/tom'
]
async def get_page(url):
    print('正在下载',url)
    response = requests.get(url=url)
    print('下载完毕:',response.text)
tasks = []
for url in urls:
    c = get_page(url)
    task = asyncio.ensure_future(c)
    tasks.append(task)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('总耗时:',end-start)

运行结果如下:

正在下载 http://127.0.0.1:5000/bobo
下载完毕: Hello bobo
正在下载 http://127.0.0.1:5000/jay
下载完毕: Hello jay
正在下载 http://127.0.0.1:5000/tom
下载完毕: Hello tom
总耗时: 6.045619249343872

可以发现和正常的请求并没有什么两样,依然还是顺次执行的,耗时 6 秒,平均一个请求耗时 2 秒,说好的异步处理呢?

原因在于requests模块是非异步模块,要想实现真正的异步必须使用基于异步的网络请求模块所以这里就需要 aiohttp 派上用场了。

aiohttp使用

发起请求

async def fetch():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://www.baidu.com') as resposne:
            print(await resposne.text())
loop = asyncio.get_event_loop()
tasks = [fetch(),]
loop.run_until_complete(asyncio.wait(tasks))

添加请求参数

params = {'key': 'value', 'page': 10}
async def fetch():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://www.baidu.com/s',params=params) as resposne:
            print(await resposne.url)
loop = asyncio.get_event_loop()
tasks = [fetch(),]
loop.run_until_complete(asyncio.wait(tasks))

UA伪装

url = 'http://httpbin.org/user-agent'
headers = {'User-Agent': 'test_user_agent'}
async def fetch():
    async with aiohttp.ClientSession() as session:
        async with session.get(url,headers=headers) as resposne:
            print(await resposne.text())
loop = asyncio.get_event_loop()
tasks = [fetch(),]
loop.run_until_complete(asyncio.wait(tasks))

自定义cookies

url = 'http://httpbin.org/cookies'
cookies = {'cookies_name': 'test_cookies'}
async def fetch():
    async with aiohttp.ClientSession() as session:
        async with session.get(url,cookies=cookies) as resposne:
            print(await resposne.text())
loop = asyncio.get_event_loop()
tasks = [fetch(),]
loop.run_until_complete(asyncio.wait(tasks))

post请求参数

url = 'http://httpbin.org'
payload = {'username': 'zhang', 'password': '123456'}
async def fetch():
    async with aiohttp.ClientSession() as session:
        async with session.post(url, data=payload) as resposne:
            print(await resposne.text())
loop = asyncio.get_event_loop()
tasks = [fetch(), ]
loop.run_until_complete(asyncio.wait(tasks))

设置代理

url = "http://python.org"
async def fetch():
    async with aiohttp.ClientSession() as session:
        async with session.get(url, proxy="http://some.proxy.com") as resposne:
        print(resposne.status)
loop = asyncio.get_event_loop()
tasks = [fetch(), ]
loop.run_until_complete(asyncio.wait(tasks))

异步IO处理

#环境安装:pip install aiohttp
#使用该模块中的ClientSession
import requests
import asyncio
import time
import aiohttp
start = time.time()
urls = [
    'http://127.0.0.1:5000/bobo','http://127.0.0.1:5000/jay','http://127.0.0.1:5000/tom',
    'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom',
    'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom',
    'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom',
]
async def get_page(url):
    async with aiohttp.ClientSession() as session:
        #get()、post():
        #headers,params/data,proxy='http://ip:port'
        async with await session.get(url) as response:
            #text()返回字符串形式的响应数据
            #read()返回的二进制形式的响应数据
            #json()返回的就是json对象
            #注意:获取响应数据操作之前一定要使用await进行手动挂起
            page_text = await response.text()
            print(page_text)
tasks = []
for url in urls:
    c = get_page(url)
    task = asyncio.ensure_future(c)
    tasks.append(task)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('总耗时:',end-start)

在这里我们将请求库由 requests 改成了 aiohttp,通过 aiohttp 的 ClientSession 类的 get() 方法进行请求,结果如下:

Hello tom
Hello jay
Hello bobo
Hello bobo
Hello jay
Hello bobo
Hello tom
Hello jay
Hello jay
Hello tom
Hello tom
Hello bobo
总耗时: 2.037203073501587

异步操作的便捷之处在于,当遇到阻塞式操作时,任务被挂起,程序接着去执行其他的任务,而不是傻傻地等着,这样可以充分利用 CPU 时间,而不必把时间浪费在等待 IO 上。

可见,使用了异步协程之后,我们几乎可以在相同的时间内实现成百上千倍次的网络请求,把这个运用在爬虫中,速度提升可谓是非常可观了。

原文地址:https://www.cnblogs.com/sxy-blog/p/13215270.html