Python 爬虫六 性能相关

前面已经讲过了爬虫的两大基础模块:

  requests模块:用来伪造请求爬取数据

      bs4模块:用来整理,提取数据

当我们真正的开始有需求的时候通常都是批量爬取url这样的。那如何批量爬取呢?

按照正常的思路,我们开始用不同的实现方式,写一些实例代码。

1、串行

串行,如字面意思,就是写个for 循环一个个执行:

import requests

def fetch_async(url):
    response = requests.get(url)
    return response


url_list = ['http://www.github.com', 'http://www.bing.com']

for url in url_list:
    fetch_async(url)

时间进度表示:

for 循环每次为url做处理的时候执行fetch_async方法,首先发送请求到目的url的服务器(图片中的请求),之后程序就会阻塞住,等待目的服务器的结果返回(阻塞等待时间),接收到服务器的返回数据后,进行再次数据处理。说到这里,一定都会觉得这是最差的方案了。

2、并行 多线程or多进程

from concurrent.futures import ThreadPoolExecutor  # 线程处理池
import requests


def fetch_async(url):
    response = requests.get(url)
    return response


url_list = ['http://www.github.com', 'http://www.bing.com']
pool = ThreadPoolExecutor(5)  # 设置线程池的容量
for url in url_list:
    pool.submit(fetch_async, url)  # 提交执行函数和参数
pool.shutdown(wait=True)  # 取决于最慢的线程的执行时间来结束线程池
from concurrent.futures import ProcessPoolExecutor  # 进程池
import requests

def fetch_async(url):
    response = requests.get(url)
    return response


url_list = ['http://www.github.com', 'http://www.bing.com']
pool = ProcessPoolExecutor(5)  # 设置进程池的最大容量
for url in url_list:
    pool.submit(fetch_async, url)  # 提交到进程池,带上目的函数跟参数
pool.shutdown(wait=True)  # 取决于最慢的进程的执行时间来结束进程池

上面的两种是相似的,通过多线程或多进程来进行并行处理,增加单位时间段内的并发量来提高效率。

补充:带回调函数的多线程

from concurrent.futures import ThreadPoolExecutor
import requests

def fetch_async(url):
    response = requests.get(url)
    return response


def callback(future):  # 线程池内函数的返回结果
    print(future.result())


url_list = ['http://www.github.com', 'http://www.bing.com']
pool = ThreadPoolExecutor(5)
for url in url_list:
    v = pool.submit(fetch_async, url)
    v.add_done_callback(callback)  # 线程处理完成之后调用此函数
pool.shutdown(wait=True)
线程池回调函数

进程池的回调函数是一样的:

v.add_done_callback(callback)

通过上述方法,均可以实现对请求性能的提交,但是,从之前的解析图中,可以发现,当发送请求后,无论是串行还是并行,放大到每一次请求,都不可避免的遇到了阻塞,等待返回结果这个过程。IO阻塞时会造成线程和进程的浪费。所以这时候我们引入异步IO。

异步IO

不了解异步IO的可以看一下之前的这个博客http://www.cnblogs.com/wuzdandz/p/7633386.html

异步IO的模块已经有很多封装好的,这里为大家介绍几个:

a、asyncio

import asyncio  # python3.3之后的一个内置模块


@asyncio.coroutine
def func1():
    print('before...func1......')
    # 异步调用
    yield from asyncio.sleep(5)
    print('end...func1......')


tasks = [func1(), func1()]

loop = asyncio.get_event_loop()  # 获取EventLoop
loop.run_until_complete(asyncio.gather(*tasks))  # 执行coroutine
loop.close()

有一个缺点是asyncio没有提供发送http请求的功能,修改一下代码

import asyncio


@asyncio.coroutine
def fetch_async(host, url='/'):
    print(host, url)
    reader, writer = yield from asyncio.open_connection(host, 80)

    request_header_content = """GET %s HTTP/1.0
Host: %s

""" % (url, host,)  # 定制http请求字串
    request_header_content = bytes(request_header_content, encoding='utf-8')

    writer.write(request_header_content)
    yield from writer.drain()
    text = yield from reader.read()
    print(host, url, text)
    writer.close()

tasks = [
    fetch_async('www.cnblogs.com', '/wuzdandz/'),
    fetch_async('dig.chouti.com', '/pic/show?nid=4073644713430508&lid=10273091')
]

loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

b、asyncio + aiohttp

import aiohttp
import asyncio


@asyncio.coroutine
def fetch_async(url):
    print(url)
    response = yield from aiohttp.request('GET', url)
    # data = yield from response.read()
    # print(url, data)
    print(url, response)
    response.close()


tasks = [fetch_async('http://www.google.com/'), fetch_async('http://www.chouti.com/')]

event_loop = asyncio.get_event_loop()
results = event_loop.run_until_complete(asyncio.gather(*tasks))
event_loop.close()

c、asyncio + requests

import asyncio
import requests


@asyncio.coroutine
def fetch_async(func, *args):
    loop = asyncio.get_event_loop()
    future = loop.run_in_executor(None, func, *args)
    response = yield from future
    print(response.url, response.content)


tasks = [
    fetch_async(requests.get, 'http://www.cnblogs.com/wuzdandz/'),
    fetch_async(requests.get, 'http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091')
]

loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

d、gevent + requests

import gevent

import requests
from gevent import monkey

monkey.patch_all()


def fetch_async(method, url, req_kwargs):
    print(method, url, req_kwargs)
    response = requests.request(method=method, url=url, **req_kwargs)
    print(response.url, response.content)

# ##### 发送请求 #####
gevent.joinall([
    gevent.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
    gevent.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),
    gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}),
])

# ##### 发送请求(协程池控制最大协程数量) #####
# from gevent.pool import Pool
# pool = Pool(None)  
# gevent.joinall([
#     pool.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
#     pool.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),
#     pool.spawn(fetch_async, method='get', url='https://www.github.com/', req_kwargs={}),
# ])

e、grequests

import grequests


request_list = [
    grequests.get('http://httpbin.org/delay/1', timeout=0.001),
    grequests.get('http://fakedomain/'),
    grequests.get('http://httpbin.org/status/500')
]


# ##### 执行并获取响应列表 #####
# response_list = grequests.map(request_list)
# print(response_list)


# ##### 执行并获取响应列表(处理异常) #####
# def exception_handler(request, exception):
#     print(request,exception)
#     print("Request failed")

# response_list = grequests.map(request_list, exception_handler=exception_handler)
# print(response_list)

f、Twisted实例

from twisted.web.client import getPage, defer
from twisted.internet import reactor


def all_done(arg):
    reactor.stop()


def callback(contents):
    print(contents)


deferred_list = []

url_list = ['http://www.bing.com', 'http://www.baidu.com', ]
for url in url_list:
    deferred = getPage(bytes(url, encoding='utf8'))
    deferred.addCallback(callback)
    deferred_list.append(deferred)

dlist = defer.DeferredList(deferred_list)
dlist.addBoth(all_done)

reactor.run()

g、Tornado

from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import HTTPRequest
from tornado import ioloop


def handle_response(response):
    """
    处理返回值内容(需要维护计数器,来停止IO循环),调用 ioloop.IOLoop.current().stop()
    :param response: 
    :return: 
    """
    if response.error:
        print("Error:", response.error)
    else:
        print(response.body)


def func():
    url_list = [
        'http://www.baidu.com',
        'http://www.bing.com',
    ]
    for url in url_list:
        http_client = AsyncHTTPClient()
        http_client.fetch(HTTPRequest(url), handle_response)


ioloop.IOLoop.current().add_callback(func)
ioloop.IOLoop.current().start()

h、Twisted更多

from twisted.internet import reactor
from twisted.web.client import getPage
import urllib.parse


def one_done(arg):
    print(arg)
    reactor.stop()

post_data = urllib.parse.urlencode({'check_data': 'adf'})
post_data = bytes(post_data, encoding='utf8')
headers = {b'Content-Type': b'application/x-www-form-urlencoded'}
response = getPage(bytes('http://dig.chouti.com/login', encoding='utf8'),
                   method=bytes('POST', encoding='utf8'),
                   postdata=post_data,
                   cookies={},
                   headers=headers)
response.addBoth(one_done)

reactor.run()

以上都是Python内置或者是第三方模块提供异步IO请求,异步IO请求的本质则是 非阻塞Socket + IO多路复用,之前根据select写过一个异步的FTP,现在来写一个自定义的牛逼的异步爬虫模块

X、自定义

# 并发 自定义异步模块  单线程、异步、回调
import socket
import select


class HttpRequest(object):
    def __init__(self, sk, item):
        self.sock = sk
        self.item = item

    def fileno(self):
        return self.sock.fileno()


class AsyncHttp(object):
    def __init__(self):
        self.client_list = []
        self.connections_list = []

    def start(self, item):
        """

        :param item: ==> host; url
        :return:
        """
        try:
            sk = socket.socket()
            sk.setblocking(False)
            sk.connect((item['host']), 80)
        except BlockingIOError as e:
            pass
        self.client_list.append(HttpRequest(sk, item))
        self.connections_list.append(HttpRequest(sk, item))

    def run(self):
        """
        检测self.client_list, self.connections_list里面的socket是否已经连接成功
        :return:
        """
        while True:
            # 之前:select内部传入的参数必须是socket对象[socket对象、socket对象]    ==》 调用的就是fileno方法
            # 现在:select内部传入的参数可以是任意对象[任意对象(包含fileno方法)、任意对象(包含fileno方法)]
            # r:可读,socket对象可以读取接收到的服务端的信息;w:可写,如果又socket和服务端连接成功
            r, w, e = select.select(self.client_list, self.connections_list, [], 0.05)
            for conn in w:  # 连接成功
                host = conn.item['host']
                url = conn.item['url']
                # 构造 'GET / HTTP/1.0
Host: www.baidu.com

'
                temp = 'GET %s HTTP/1.0
Host: %s

' % (host, url)
                conn.sock.sendall(bytes(temp))  # 就发送伪造好的请求给服务端,请求头+请求体
                self.connections_list.remove(conn)  # 将发送好的socket对象从监测连接的连接池中移除
            for conn in r:  # 获取响应内容
                data = conn.sock.recv(8096)
                func = conn.item['fn']
                func(data)
                conn.sock.close()
                self.client_list.remove(conn)
            if not self.client_list:
                break


def done(content):
    print(content)


reactor = AsyncHttp()

url_list = [
    {'host': 'www.baidu.com', 'url': '/', 'fn': done},
    {'host': 'www.bing.com', 'url': '/', 'fn': done},
    {'host': 'www.cnblogs.com', 'url': '/wuzdandz/', 'fn': done},
]
for item in url_list:
    reactor.start(item)

reactor.run()

关于fileno的解释

select 模块的特性,一般IO操作都有fileno方法,fileno==》文件描述符,网络相关的也是文件描述符;终端是输入输出设备也有文件描述符,但凡有这个的操作系统监听的都是文件描述符,而不是这个对象
所以select本质上就是给什么对象都是去取文件描述符fileno,开始我们没有做封装,用的是socket对象,直接调用了它的fileno方法
当进行封装时,传入的是自己的对象,我们不知道文件描述符是什么,那是操作系统所提供的,此时调用fileno方法就需要直接返回self.sock.fileno(),即socket本身的文件描述符,相当于一次转发,中间商操作
原文地址:https://www.cnblogs.com/wuzdandz/p/9377409.html