爬虫之高性能相关

  问题:我给你10个的url,你帮我去把10url的网址下载。

传统方式

# 传统串行方式
import requests
import time

urls = ['https://github.com/' for _ in range(10)]
start = time.time()
for url in urls:
    response = requests.get(url)
    # print(response)

spend_time = time.time() - start
print(spend_time)
# 12.493084907531738

一、多进程和多线程实现并发

import time
from concurrent.futures import ProcessPoolExecutor

import requests

start = time.time()


def task(url):
    response = requests.get(url)
    return response


def done(future, *args, **kwargs):
    response = future.result()
    print(response.url)


if __name__ == '__main__':
    url_list = ['https://www.douban.com/' for _ in range(100)]
    with ProcessPoolExecutor(max_workers=10) as pool:
        for url in url_list:
            v = pool.submit(task, url)
            v.add_done_callback(done)
    print(time.time() - start)
    # 11.862671136856079

  花费时间 11.862671136856079秒

#########编写方式二#########
import requests
from concurrent.futures import ThreadPoolExecutor

pool = ThreadPoolExecutor()


def task(url):
    response = requests.get(url)
    return response


def done(future,*args,**kwargs):
    response = future.result()
    print(response.url)


if __name__ == '__main__':
    start = time.time()
    with ThreadPoolExecutor(max_workers=os.cpu_count()) as pool:
        url_list = ['https://www.douban.com/' for _ in range(100)]
        for url in url_list:
            v = pool.submit(task,url)
            v.add_done_callback(done)
    print(time.time() - start)
    # 8.904985904693604

  花费时间 8.904985904693604秒

  由于GIL限制,建议:IO密集的任务,用ThreadPoolExecutor;CPU密集任务,用ProcessPoolExcutor。

二、基于事件循环的异步IO

 1. asyncio + aiohttp

import time
import asyncio

import aiohttp


async def start_request(session, url):
    sem = asyncio.Semaphore(10, loop=loop)
    async with sem:
        print(f'make request to {url}')
        with async_timeout.timeout(60):
            async with session.get(url, verify_ssl=False) as response:
                if response.status == 200:
                    print(response.status)


async def run(urls):
    conn = aiohttp.TCPConnector(ssl=False,
                                limit=60,  # 连接池在windows下不能太大, <500
                                use_dns_cache=True)
    async with aiohttp.ClientSession(connector=conn, loop=loop) as session:
        datas = await asyncio.gather(*[start_request(session, url) for url in urls], return_exceptions=True)
        for ind, url in enumerate(urls):
            if isinstance(datas[ind], Exception):
                print(f"{ind}, {url}: 下载失败 请重新下载:")


if __name__ == '__main__':
    start = time.time()
    urls = (('http://www.baidu.com/') for _ in range(100))
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(urls))
    print(time.time() - start)
    # 1.4860568046569824

 2. Twisted 

import time

from twisted.internet import defer
from twisted.internet import reactor
from twisted.web.client import getPage

start = time.time()


def one_done(content, arg):
    response = content.decode('utf-8')
    # print(response)
    print(arg)


def all_done(arg):
    reactor.stop()
    print(time.time() - start)


@defer.inlineCallbacks
def task(url):
    res = getPage(bytes(url, encoding='utf8'))  # 发送Http请求
    res.addCallback(one_done, url)
    yield res


url_list = ('http://www.cnblogs.com' for _ in range(100))

defer_list = []  # [特殊,特殊,特殊(已经向url发送请求)]
for url in url_list:
    v = task(url)
    defer_list.append(v)

d = defer.DeferredList(defer_list)
d.addBoth(all_done)

reactor.run()  # 死循环
# 5.039534091949463

 3. tornado

import time

from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import HTTPRequest
from tornado import ioloop

COUNT = 0
start = time.time()


def handle_response(response):
    global COUNT
    COUNT -= 1
    if response.error:
        print("Error:", response.error)
    else:
        # print(response.body)
        print(response.request)
        # 方法同twisted
        # ioloop.IOLoop.current().stop()
    if COUNT == 0:
        ioloop.IOLoop.current().stop()


def func():
    url_list = ['http://www.baidu.com' for _ in range(100)]
    global COUNT
    COUNT = len(url_list)
    for url in url_list:
        print(url)
        http_client = AsyncHTTPClient()
        http_client.fetch(HTTPRequest(url), handle_response)


if __name__ == '__main__':
    ioloop.IOLoop.current().add_callback(func)
    ioloop.IOLoop.current().start()  # 死循环
    print(time.time() - start)
    # 3.0621743202209473

以上均是Python内置以及第三方模块提供异步IO请求模块,使用简便大大提高效率,而对于异步IO请求的本质则是【非阻塞Socket】+【IO多路复用】

"""
########http请求本质,IO阻塞########
sk = socket.socket()
#1.连接
sk.connect(('www.baidu.com',80,)) #阻塞
print('连接成功了')
#2.连接成功后发送消息
sk.send(b"GET / HTTP/1.0
Host: baidu.com

")

#3.等待服务端响应
data = sk.recv(8096)#阻塞
print(data) #

区分响应头和影响体

#关闭连接
sk.close()
"""
"""
########http请求本质,IO非阻塞########
sk = socket.socket()
sk.setblocking(False)
#1.连接
try:
    sk.connect(('www.baidu.com',80,)) #非阻塞,但会报错
    print('连接成功了')
except BlockingIOError as e:
    print(e)

#2.连接成功后发送消息
sk.send(b"GET / HTTP/1.0
Host: baidu.com

")

#3.等待服务端响应
data = sk.recv(8096)#阻塞
print(data) #

区分响应头和影响体

#关闭连接
sk.close()
"""
异步非阻塞请求的本质

自定义异步非阻塞IO

class HttpRequest:
    def __init__(self,sk,host,callback):
        self.socket = sk
        self.host = host
        self.callback = callback

    def fileno(self):
        return self.socket.fileno()


class HttpResponse:
    def __init__(self,recv_data):
        self.recv_data = recv_data
        self.header_dict = {}
        self.body = None

        self.initialize()

    def initialize(self):
        headers, body = self.recv_data.split(b'

', 1)
        self.body = body
        header_list = headers.split(b'
')
        for h in header_list:
            h_str = str(h,encoding='utf-8')
            v = h_str.split(':',1)
            if len(v) == 2:
                self.header_dict[v[0]] = v[1]


class AsyncRequest:
    def __init__(self):
        self.conn = []
        self.connection = [] # 用于检测是否已经连接成功

    def add_request(self,host,callback):
        try:
            sk = socket.socket()
            sk.setblocking(0)
            sk.connect((host,80))
        except BlockingIOError as e:
            pass
        request = HttpRequest(sk,host,callback)
        self.conn.append(request)
        self.connection.append(request)

    def run(self):

        while True:
            rlist,wlist,elist = select.select(self.conn,self.connection,self.conn,0.05)
            for w in wlist:
                print(w.host,'连接成功...')
                # 只要能循环到,表示socket和服务器端已经连接成功
                tpl = "GET / HTTP/1.0
Host:%s

"  %(w.host,)
                w.socket.send(bytes(tpl,encoding='utf-8'))
                self.connection.remove(w)
            for r in rlist:
                # r,是HttpRequest
                recv_data = bytes()
                while True:
                    try:
                        chunck = r.socket.recv(8096)
                        recv_data += chunck
                    except Exception as e:
                        break
                response = HttpResponse(recv_data)
                r.callback(response)
                r.socket.close()
                self.conn.remove(r)
            if len(self.conn) == 0:
                break


def f1(response):
    print('保存到文件',response.header_dict)


def f2(response):
    print('保存到数据库', response.header_dict)


url_list = [
    {'host':'www.youku.com','callback': f1},
    {'host':'v.qq.com','callback': f2},
    {'host':'www.cnblogs.com','callback': f2},
]


if __name__ == '__main__':
    req = AsyncRequest()
    for item in url_list:
        req.add_request(item['host'], item['callback'])
    req.run()
自定异步非阻塞IO



原文地址:https://www.cnblogs.com/zhangyafei/p/10244633.html