10-异步爬虫(线程池/asyncio协程)实战案例

异步爬虫:

  • 基于线程池
  • 基于单线程+多任务的异步爬虫

线程池

  • from multiprocessing.dummy import Pool

  • map(callback,alist)

    • 可以使用callback对alist中的每一个元素进行指定形式的异步操作
  • 为了体现效果,我们自己搭建一个web服务

    • Flask的基本使用
    • 环境安装 pip install flask
    • 创建一个py源文件

在这里插入图片描述
在这里插入图片描述

test.html
在这里插入图片描述
在这里插入图片描述
自定义一个简单网页

# -*- coding: utf-8 -*-
from flask import Flask
from flask import render_template
import time

#实例化一个app
app = Flask(__name__)

# 创建视图函数&路由地址
@app.route("/gpc") #路由地址
def index_1(): # 视图函数
    time.sleep(2)#为了表示堵塞效果我们定义一个相应时间
    return render_template("test.html")

@app.route("/una")
def index_2():
    time.sleep(2)
    return render_template("test.html")

@app.route("/python")
def index_3():
    time.sleep(2)
    return render_template("test.html")

if __name__=="__main__":
    #debug=True 表示开启调试模式:服务器端代码被修改后按下保存键会自动重启服务
    app.run(debug=True)

在这里插入图片描述
同步爬虫

import requests
import time

def get_request(url):
    page_text = requests.get(url=url).text
    return len(page_text)

#同步代码总耗时: 6.035839080810547秒
if __name__ == "__main__":
    start = time.time()#开始时间
    urls = [
        "http://127.0.0.1:5000/gpc",
        "http://127.0.0.1:5000/una",
        "http://127.0.0.1:5000/python"]
    for url in urls:
        res = get_request(url)
        print(res)
    print("总耗时:",time.time()-start)

在这里插入图片描述
异步爬虫

import requests
from multiprocessing.dummy import Pool #线程池
import time

def get_request(url):
    page_text = requests.get(url=url).text
    return len(page_text)

urls = [
    "http://127.0.0.1:5000/gpc",
    "http://127.0.0.1:5000/una",
    "http://127.0.0.1:5000/python"]
# 异步代码
if __name__ == "__main__":
    start = time.time()  # 开始时间
    pool = Pool(3)#3表示开启的现场数量
    # get_request作为回调函数,需要基于异步的形式对urls列表中的每一个列表元素进行操作
    # 保证回调函数必须要有一个参数和返回值
    result_list = pool.map(get_request,urls)
    print(result_list)
    print("总耗时:", time.time() - start)
    # 
    # [45532, 45532, 45532]
    # 总耗时: 2.023587703704834

在这里插入图片描述
异步爬虫数据解析
在这里插入图片描述

import requests
from multiprocessing.dummy import Pool#线程池
from lxml import etree
import time

def get_request(url):
    page_text = requests.get(url=url).text
    tree = etree.HTML(page_text)
    analysis = tree.xpath("//div[@id='contson09d31b73b44d']/p[5]//text()")#定义xpath数据解析
    return analysis

urls = [
    "http://127.0.0.1:5000/gpc",
    "http://127.0.0.1:5000/una",
    "http://127.0.0.1:5000/python"]
# 异步代码
if __name__ == "__main__":
    start = time.time()  # 开始时间
    pool = Pool(3)#3表示开启的现场数量
    # get_request作为回调函数,需要基于异步的形式对urls列表中的每一个列表元素进行操作
    # 保证回调函数必须要有一个参数和返回值
    result_list = pool.map(get_request,urls)
    print(result_list)
    print("总耗时:", time.time() - start)
    #
    # [['临别殷勤重寄词,词中有誓两心知。', '七月七日长生殿,夜半无人私语时。', '在天愿作比翼鸟,在地愿为连理枝。', '天长地久有时尽,此恨绵绵无绝期。'],
    #  ['临别殷勤重寄词,词中有誓两心知。', '七月七日长生殿,夜半无人私语时。', '在天愿作比翼鸟,在地愿为连理枝。', '天长地久有时尽,此恨绵绵无绝期。'],
    #  ['临别殷勤重寄词,词中有誓两心知。', '七月七日长生殿,夜半无人私语时。', '在天愿作比翼鸟,在地愿为连理枝。', '天长地久有时尽,此恨绵绵无绝期。']]
    # 总耗时: 2.0326013565063477


在这里插入图片描述

单线程+多任务的异步爬虫

  • 单线程+多任务的异步爬虫
  • pip install asyncio
    • 特殊的函数
      • 如果一个函数的定义被async修饰后,则该函数就变成了一个特殊函数
      • 该特殊函数调用后,函数内部的实现语句不会被【立即】执行
    • 协程对象
      • 对象:通过特殊函数的调用返回一个协程对象
      • 协程对象 == 特殊函数 == 一组指定的操作
      • 协程 == 一组指定的操作
    • 任务对象
      • 任务对象就是一个高级的协程对象。(任务对象就是对协程的进一步封装)
      • 任务对象 == 协程对象 == 特殊函数 == 一组指定的操作
      • 任务对象 == 一组指定的操作
      • 如何创建一个任务对象
        • asyncio.ensure_future(协程对象)
      • 任务对象的高级之处
        • 可以给任务对象绑定一个回调:
          • 给任务对象绑定回调:
            • task.add_done_callback(task_callback())
            • 回调函数的调用时机:
              • 在我们任务执行结束后,才可以调用回调函数
            • 回调函数的参数只可以有一个:表示的就是该回调函数的调用者(任务对象)
            • 使用回调函数的参数调用result()返回的就是任务对象表示的特殊函数return的结果
    • 事件循环对象
      • 对象
      • 作用
        • 可以将多个任务对象注册/装载到事件循环对象中
        • 如果开启了事件循环后,则其内部注册/装载的任务对象表示的指定操作就会被基于异步被执行
      • 事件循环对象创建
        • loop = asyncio.get_event_loop()
      • 注册且启动
        • loop.run_until_complete(task)
    • wait 方法的作用
      • 将任务列表中的任务对象赋予可被挂起的权限,只有任务对象被赋予了可被挂起的权限后,该任务对象才可以被挂起
      • 挂起:表示将当前的任务对象交出cpu的使用权
    • await 关键字
      • 在特殊函数内部,凡是阻塞操作前必须使用await进行修饰,await就可以保证阻塞操作在异步执行的过程中不会被跳过!
  • 注意事项【重要】
    • 在特殊函数内部不可以出现不支持异步模块对应的代码,否则会中断整个异步效果

在这里插入图片描述

单任务操作:

import asyncio
import time


async def get_request(url):
    print("正在请求的url:",url)
    time.sleep(2)
    print("请求结束:",url)
if __name__=="__main__":
    c = get_request("www.1.com") # c就是一个协程对象
    task =asyncio.ensure_future(c)#task就是一个任务对象,它是对协程对象的进一步封装
    loop = asyncio.get_event_loop() #loop就是一个事件循环对象
    loop.run_until_complete(task) # 将任务对象注册到事件循环中且开启事件循环

在这里插入图片描述
绑定回调函数

import asyncio
import time


async def get_request(url):
    print("正在请求的url:",url)
    time.sleep(2)
    print("请求结束:",url)
    return "我是特殊函数的返回值"

#回调函数的封装
def task_callback(t):#参数t:就是该回调函数的调用者(任务对象)
    print("i am task_callback(),参数t:",t)
    print("t.result()返回的是:",t.result())

if __name__=="__main__":
    c = get_request("www.1.com") # c就是一个协程对象

    task =asyncio.ensure_future(c)#task就是一个任务对象,它是对协程对象的进一步封装
    task.add_done_callback(task_callback) #给task绑定一个回调函数

    loop = asyncio.get_event_loop() #loop就是一个事件循环对象

    loop.run_until_complete(task) # 将任务对象注册到事件循环中且开启事件循环

在这里插入图片描述

多任务操作

import asyncio
import time



async def get_request(url):
    print("正在请求的url:",url)
    time.sleep(2)#(堵塞操作2秒)出现了不支持异步模块的对象所以耗时6秒未成功进行异步操作
    print("请求结束:",url)

urls = [
    "http://127.0.0.1:5000/gpc",
    "http://127.0.0.1:5000/una",
    "http://127.0.0.1:5000/python"
]

if __name__=="__main__":
    start = time.time()
    tasks = [] # 多任务列表
    # 1.创建协程对象
    for url in urls:
        c = get_request(url)
        # 2.创建任务对象
        task = asyncio.ensure_future(c)
        tasks.append(task)
    # 3.创建事件循环对象
    loop = asyncio.get_event_loop()
    # 4.将任务对象注册到事件循环中且开启事件循环
    # loop.run_until_complete(tasks)#循环中不能放列表
    loop.run_until_complete(asyncio.wait(tasks))# 必须使用wait方法对tasks封装
    print("总耗时:",time.time()-start)

在这里插入图片描述
多任务异步

import asyncio
import time



async def get_request(url):
    print("正在请求的url:",url)
    await asyncio.sleep(2)#(堵塞操作2秒)支持异步模块的代码
    print("请求结束:",url)

urls = [
    "http://127.0.0.1:5000/gpc",
    "http://127.0.0.1:5000/una",
    "http://127.0.0.1:5000/python"
]

if __name__=="__main__":
    start = time.time()
    tasks = [] # 多任务列表
    # 1.创建协程对象
    for url in urls:
        c = get_request(url)
        # 2.创建任务对象
        task = asyncio.ensure_future(c)
        tasks.append(task)
    # 3.创建事件循环对象
    loop = asyncio.get_event_loop()
    # 4.将任务对象注册到事件循环中且开启事件循环
    # loop.run_until_complete(tasks)#循环中不能放列表
    loop.run_until_complete(asyncio.wait(tasks))# 必须使用wait方法对tasks封装
    print("总耗时:",time.time()-start)


在这里插入图片描述

协程爬虫实例

requests是一个不支持异步的模块
aiohttp

  • aiohttp是一个支持异步的网络请求模块
  • pip install aiohttp
  • 使用代码:
  • 1,首先写出一个大致架构
async def get_request(url):
    with aiohttp.ClientSession() as sess:#实例化一个请求对象
        # get/post(url,headers,params/data,proxy="http://ip:port")
        with sess.get(url=url) as response:# 使用get发起请求,返回一个相应对象
            page_text = response.text#获取字符串形式的相应数据
            return page_text
  • 2,补充细节
    • 在阻塞操作前加上await关键字
    • 在每一个with前加上async关键字
async def get_request(url):
    async with aiohttp.ClientSession() as sess:#实例化一个请求对象
        # get/post(url,headers,params/data,proxy="http://ip:port")
        async with await sess.get(url=url) as response:# 使用get发起请求,返回一个相应对象
            page_text = await response.text#text()获取字符串形式的相应数据  read()获取byte类型的响应数据
            return page_text

完整爬虫异步网站如下:

import asyncio
import aiohttp
import time

urls = [
    "http://127.0.0.1:5000/gpc",
    "http://127.0.0.1:5000/una",
    "http://127.0.0.1:5000/python"
]

async def get_request(url):
    async with aiohttp.ClientSession() as sess:#实例化一个请求对象
        # get/post(url,headers,params/data,proxy="http://ip:port")
        async with await sess.get(url=url) as response:# 使用get发起请求,返回一个相应对象
            page_text = await response.text()#text()获取字符串形式的相应数据  read()获取byte类型的响应数据
            return page_text

if __name__=="__main__":
    start = time.time()
    tasks = [] # 多任务列表
    # 1.创建协程对象
    for url in urls:
        c = get_request(url)
        # 2.创建任务对象
        task = asyncio.ensure_future(c)
        tasks.append(task)
    # 3.创建事件循环对象
    loop = asyncio.get_event_loop()
    # 4.将任务对象注册到事件循环中且开启事件循环
    loop.run_until_complete(asyncio.wait(tasks))# 必须使用wait方法对tasks封装
    print("总耗时:",time.time()-start)


多任务爬虫的数据解析

  • 一定要是用任务对象的回调函数实现数据解析
  • 多任务的架构中数据的爬取是封装在特殊函数中的,我们一定要保证数据请求结束后,在实现数据解析
  • 使用多任务的异步协程爬取数据实现套路:
    • 可以先试用request模块将待请求的数据对应的url封装到urls列表中【同步】
    • 可以使用aiohttp模块将列表中的url进行异步的请求和数据解析【异步】
      代码展示
import asyncio
import aiohttp
import time
from lxml import etree

urls = [
    "http://127.0.0.1:5000/gpc",
    "http://127.0.0.1:5000/una",
    "http://127.0.0.1:5000/python"
]

async def get_request(url):
    async with aiohttp.ClientSession() as sess:#实例化一个请求对象
        # get/post(url,headers,params/data,proxy="http://ip:port")
        async with await sess.get(url=url) as response:# 使用get发起请求,返回一个相应对象
            page_text = await response.text()#text()获取字符串形式的相应数据  read()获取byte类型的响应数据
            return page_text
#解析函数的封装
def parse(t):
    #获取请求到页面源码数据
    page_text = t.result()
    tree = etree.HTML(page_text)
    parse_text = tree.xpath("//div[@id='contson09d31b73b44d']/p[5]//text()")
    print(parse_text)
if __name__=="__main__":
    start = time.time()
    tasks = [] # 多任务列表
    # 1.创建协程对象
    for url in urls:
        c = get_request(url)
        # 2.创建任务对象
        task = asyncio.ensure_future(c)
        task.add_done_callback(parse)
        tasks.append(task)
    # 3.创建事件循环对象
    loop = asyncio.get_event_loop()
    # 4.将任务对象注册到事件循环中且开启事件循环
    loop.run_until_complete(asyncio.wait(tasks))# 必须使用wait方法对tasks封装
    print("总耗时:",time.time()-start)





在这里插入图片描述

原文地址:https://www.cnblogs.com/gemoumou/p/13635335.html