爬虫六 爬虫性能相关

一 背景知识

    爬虫的本质就是一个socket客户端与服务端的通信过程,如果我们有多个url待爬取,采用串行的方式执行,只能等待爬取一个结束后才能继续下一个,效率会非常低。

需要强调的是:串行并不意味着低效,如果串行的都是纯计算的任务,那么cpu的利用率仍然会很高,之所以爬虫程序的串行低效,是因为爬虫程序是明显的IO密集型程序。

二 同步、异步、回调机制

1、同步调用:即提交一个任务后就在原地等待任务结束,等到拿到任务的结果后再继续下一行代码,效率低下


import requests
def get_page(url):
    response=requests.get(url)
    if response.status_code == 200:
        return response.text
urls=['https://www.baidu.com/','http://www.sina.com.cn/','https://www.python.org']
for url in urls:
    res=get_page(url) #调用一个任务,就在原地等待任务结束拿到结果后才继续往后执行
    print(len(res))
 

2、一个简单的解决方案:多线程或多进程

#在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。

from multiprocessing import Process
from threading import Thread
import requests
def get_page(url):
    response=requests.get(url)
    if response.status_code == 200:
        return response.text
if __name__ == '__main__':
    urls=['https://www.baidu.com/','http://www.sina.com.cn/','https://www.python.org']
  p_list=[] for url in urls: p=Process(target=get_page,args=(url,)) #任务的调用方式是异步调用:提交一个任务,不用等待,直接执行下一行代码
     p_list.append(p) p.start() # t=Thread(target=get_page,args=(url,)) # t.start()
for i in p_list:
     p.join() #默认不添加该方法是直接开启子进程或者子线程执行程序,主线程或主进程不管子程序是否执行完成,直接执行主进程或主线程代码。而添加上该方法后必须等待子进程或子线程程序都执行完成后才能执行主程序 该方案的问题是:#开启多进程或都线程的方式,我们是无法无限制地开启多进程或多线程的:在遇到要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率, 而且线程与进程本身也更容易进入假死状态。

3、改进方案:

  1、线程池或进程池+异步调用:提交一个任务后并不会等待任务结束,而是继续下一行代码

#很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、
减少创建和关闭连接的频率。这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统,如websphere、tomcat和各种数据库等。 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import requests
import os def get_page(url): print('GET : %s' %url) response=requests.get(url) if response.status_code == 200:
      print('子进程',os.getpid())
      
return response.text
     if __name__ == '__main__': p=ProcessPoolExecutor() # p=ThreadPoolExecutor() urls=['https://www.baidu.com/','http://www.sina.com.cn/','https://www.python.org'] for url in urls: p.submit(get_page,url) p.shutdown(wait=True) #该方法是关闭线程池或者进程池,并且等待所有子进程或子线程完成后再执行后面代码(主程序代码)。
   print('主进程',os.getpid())

   2、“线程池”或“连接池”+异步调用+回调机制

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import requests
import os
def get_page(url):
    print('%s GET : %s' %(os.getpid(),url))    #该进程号是创建的子进程号
    response=requests.get(url)
    if response.status_code == 200:
        return response.text

def parse_page(res):
    res=res.result()
    print('%s parsing' %os.getpid())  #该进程号是主进程号,由于“连接池”维持连接的缓存池,所以尽量重用已有的连接,因为开辟新的线程也会产生损耗

if __name__ == '__main__':
    p=ProcessPoolExecutor()
    # p=ThreadPoolExecutor()

    urls=['https://www.baidu.com/','http://www.sina.com.cn/','https://www.python.org']
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page)  #函数回调,p.submit(get_page,url)的结果会自动发送给回调函数当作回调函数的参数执行
    p.shutdown(wait=True)
   print('主进程',os.getpid()) 改进后方案其实也存在着问题: #“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其   面临的响应规模,并根据响应规模调整“池”的大小。 对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,   多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这个问题。

#总结:开辟新的子进程不仅会开辟新的内存空间,还会复制父进程的数据到新的子进程中,而开辟新的子线程只会开辟新的内存空间不会复制父线程的数据,而是共享父线程数据,所以说开辟新进程比开辟新线程的花销要大。

三 高性能

    上述无论哪种解决方案其实没有解决一个性能相关的问题:IO阻塞,无论是多进程还是多线程,在遇到IO阻塞时都会被操作系统强行剥夺走CPU的执行权限,程序的执行效率因此就降低了下来。

    解决这一问题的关键在于,我们自己从应用程序级别检测IO阻塞然后切换到我们自己程序的其他任务执行,这样把我们程序的IO降到最低,我们的程序处于就绪态就会增多,以此来迷惑操作系统,操作系统便以为我们的程序是IO比较少的程序,从而会尽可能多的分配CPU给我们,这样也就达到了提升程序执行效率的目的

    1、在python3.3之后新增了asyncio模块,可以帮我们检测IO(只能是网络IO,比如说get下载网站数据就是网络io),实现应用程序级别的切换


#asyncio模块的基本使用
import asyncio @asyncio.coroutine #增加asyncio装饰器,必须的 def task(task_id,senconds): print('%s is start' %task_id) # yield from asyncio.sleep(senconds) #只能检测网络IO,检测到IO后切换到其他任务执行,sleep方法是asyncio自带的方法 print('%s is end' %task_id) tasks=[task(task_id=1,senconds=3),task(task_id=2,senconds=4)] loop=asyncio.get_event_loop()#实例化产生一个loop(event)对象 loop.run_until_complete(asyncio.gather(*tasks))#为loop对象传入参数,参数也可以是单个值,也可以是个列表。固定写法 loop.close()#关闭

#注释:该方法就相当于在单线程下开启多个携程,相当于串行执行携程程序,执行完程序所花费的时间是网络io阻塞最长的那个时间+计算密集型时间之和
 

    2、但asyncio模块只能发tcp级别的请求,不能发http协议,因此,在我们需要发送http请求的时候,需要我们自定义http报头


#我们爬取一个网页的过程,以https://www.python.org/doc/为例,将关键步骤列举如下
#步骤一:向www.python.org这台主机发送tcp三次握手,是IO阻塞操作
#步骤二:封装http协议的报头
#步骤三:发送http协议的请求包,是IO阻塞操作
#步骤四:接收http协议的响应包,是IO阻塞操作
import asyncio

@asyncio.coroutine
def get_page(host,port=80,url='/'):
    #步骤一(IO阻塞):发起tcp链接,是阻塞操作,因此需要yield from
    recv,send=yield from asyncio.open_connection(host,port)

    #步骤二:封装http协议的报头,因为asyncio模块只能封装并发送tcp包,因此这一步需要我们自己封装http协议的包
    requset_headers="""GET %s HTTP/1.0
Host: %s

""" % (url, host,)
    # requset_headers="""POST %s HTTP/1.0
Host: %s

name=egon&password=123""" % (url, host,)
    requset_headers=requset_headers.encode('utf-8')

    #步骤三(IO阻塞):发送http请求包
    send.write(requset_headers)
    yield from send.drain()

    #步骤四(IO阻塞):接收http协议的响应包
    text=yield from recv.read()#得到的是个btyes形式的数据,需要encode

    #其他处理
    print(host,url,text)
    send.close()
    print('-===>')
    return 1

tasks=[get_page(host='www.python.org',url='/doc'),get_page(host='www.cnblogs.com',url='linhaifeng'),get_page(host='www.openstack.org')]

loop=asyncio.get_event_loop()
results=loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

print('=====>',results) #[1, 1, 1]

    3、自定义http报头多少有点麻烦,于是有了aiohttp模块,专门帮我们封装http报头,然后我们还需要用asyncio检测IO实现切换


import aiohttp
import asyncio

@asyncio.coroutine
def get_page(url):
    print('GET:%s' %url)
  #步骤一:先建立tcp连接(io阻塞) 该步骤封装了上面的发起tcp链接,封装http头部,发起http请求包三个步骤 response=yield from aiohttp.request('GET',url)   #步骤二:接收http协议的响应包(io阻塞) data=yield from response.read()    #其他处理 print(url,data) response.close() return 1 tasks=[ get_page('https://www.python.org/doc'), get_page('https://www.cnblogs.com/linhaifeng'), get_page('https://www.openstack.org') ] loop=asyncio.get_event_loop() results=loop.run_until_complete(asyncio.gather(*tasks)) loop.close() print('=====>',results) #[1, 1, 1]

    4、此外,还可以将requests.get函数传给asyncio,就能够被检测了


import requests
import asyncio

@asyncio.coroutine
def get_page(func,*args):
    print('GET:%s' %args[0])
loop=asyncio.get_event_loop() #实例化产生一个loop(event)对象,有了这个对象就可以提交任务了 furture=loop.run_in_executor(None,func,*args) #参数传给对象下的方法,并且该方法是个网络io操作,封装http请求并且发送tcp包 response=yield from furture #接收http协议的响应包(io阻塞)
print(response.url,len(response.text)) return 1 tasks=[ get_page(requests.get,'https://www.python.org/doc'),#由于是多个get_page所以在每个get_page中只要是遇到网络io就会切到下一个get_page中,由于切的时间很短就可以看成是并发 get_page(requests.get,'https://www.cnblogs.com/linhaifeng'), get_page(requests.get,'https://www.openstack.org') ] loop=asyncio.get_event_loop() results=loop.run_until_complete(asyncio.gather(*tasks)) #固定写法 loop.close() print('=====>',results) #[1, 1, 1]
#总结:单线程下并发多个任务即相当于开启多个携程,但是是串行执行任务,由于切的时间很短就可以看成是并发。

   

  5、还有之前在协程时介绍的gevent模块

from gevent import monkey;monkey.patch_all()#必须为monkey这个模块打补丁
import gevent
import requests

def get_page(url):
    print('GET:%s' %url)
    response=requests.get(url)
    print(url,len(response.text))
    return 1

# g1=gevent.spawn(get_page,'https://www.python.org/doc')               #产生多个携程对象,然后等待所有携程结束后再执行线程代码,比之前的都方便简单
# g2=gevent.spawn(get_page,'https://www.cnblogs.com/linhaifeng')
# g3=gevent.spawn(get_page,'https://www.openstack.org')
# gevent.joinall([g1,g2,g3,])
# print(g1.value,g2.value,g3.value) #拿到返回值

#开启协程池执行程序
from gevent.pool import Pool
pool=Pool(2)
g1=pool.spawn(get_page,'https://www.python.org/doc')
g2=pool.spawn(get_page,'https://www.cnblogs.com/linhaifeng')
g3=pool.spawn(get_page,'https://www.openstack.org')
gevent.joinall([g1,g2,g3,])
print(g1.value,g2.value,g3.value) #拿到返回值

    6、封装了gevent+requests模块的grequests模块


#pip3 install grequests
import grequests
request_list=[
    grequests.get('https://wwww.xxxx.org/doc1'),
    grequests.get('https://www.cnblogs.com/linhaifeng'),
    grequests.get('https://www.openstack.org')
]
##### 方法1:执行并获取响应列表#####
# response_list = grequests.map(request_list)
# print(response_list)

#####方法2:执行并获取响应列表(处理异常) ##### def exception_handler(request, exception): # print(request,exception) print("%s Request failed" %request.url) response_list = grequests.map(request_list, exception_handler=exception_handler) print(response_list)

    7、twisted:是一个网络框架,其中一个功能是发送异步请求,检测IO并自动切换,它比之前的方法封装程度更高是因为twisted为我们提供了一个回调函数的接口

'''
#安装twisted模块的时候可能会遇到的问题,耐心解决就可以了 #问题一:error: Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++ Build Tools": http://landinghub.visualstudio.com/visual-cpp-build-tools https://www.lfd.uci.edu/~gohlke/pythonlibs/#twisted pip3 install C:UsersAdministratorDownloadsTwisted-17.9.0-cp36-cp36m-win_amd64.whl pip3 install twisted #问题二:ModuleNotFoundError: No module named 'win32api' https://sourceforge.net/projects/pywin32/files/pywin32/ #问题三:openssl pip3 install pyopenssl ''' #方法1:twisted基本用法 from twisted.web.client import getPage,defer #getPage就相当于requests.get()函数 from twisted.internet import reactor def all_done(arg): # print(arg) reactor.stop()#该方法是告诉reactor.run()你可以结束你的循环了 def callback(res): print(res) return 1 defer_list=[] urls=[ 'http://www.baidu.com', 'http://www.bing.com', 'https://www.python.org', ] for url in urls: obj=getPage(url.encode('utf-8'),)#一定要编码,提交一个url。得到一个对象 obj.addCallback(callback) #对象执行回调函数 defer_list.append(obj)#将任务添加到列表中,采集所有的任务 defer.DeferredList(defer_list).addBoth(all_done)#采集之前统计的任务列表来判断任务是否都执行完成(需要维护计数器,来停止IO循环),都执行完成后再执行.addBoth(all_done)中的all_done函数。 reactor.run()#由于刚才的提交url是一个异步调用,所以reactor.run()方法只是检测提交的每一个url程序,只要是遇到网络io就会切cpu,依次这样死循环,自己不能终止任务 #方法2:twisted的getPage的详细用法 from twisted.internet import reactor from twisted.web.client import getPage import urllib.parse def one_done(arg): print(arg) reactor.stop() # post_data = urllib.parse.urlencode({'check_data': 'adf'})#先编码,然后再bytes post_data = bytes(post_data, encoding='utf8') headers = {b'Content-Type': b'application/x-www-form-urlencoded'} response = getPage(bytes('http://dig.chouti.com/login', encoding='utf8'), #getpage模块的用法和requests模块的用法功能类似,但是他每一个参数都必须是bytes类型,否则就报错 method=bytes('POST', encoding='utf8'), postdata=post_data, cookies={}, headers=headers) response.addBoth(one_done) reactor.run()
 

   8、tornado

from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import HTTPRequest
from tornado import ioloop

def handle_response(response):
    """ 处理返回值内容(需要维护计数器,来停止IO循环),调用 ioloop.IOLoop.current().stop()"""
    if response.error:
        print("Error:", response.error)
    else:
        print(response.body)

def func():
    url_list = [
        'http://www.baidu.com',
        'http://www.bing.com',
    ]
    for url in url_list:
        print(url)
        http_client = AsyncHTTPClient()
        http_client.fetch(HTTPRequest(url), handle_response)#发送url请求

ioloop.IOLoop.current().add_callback(func)#发送url请求并执行回调函数
ioloop.IOLoop.current().start()#开始运行

总结:

一、程序的执行方式:

   串行执行
        1、串行执行:
               串行并不意味着效率低,for循环案例,每for循环一次执行的程序是个加法运算,就属于计算密集型。
               程序如果是纯计算的话,串行执行并没有效率问题
        2、串行的问题:
               IO密集型程序串行运行,效率极低,是因为每次循环都会碰到io操作,都会切cpu。
   

   并发执行(并行执行)

     1、并发执行
            开启多进程或多线程并发执行,彼此互不干扰
            2、线程和进程
               python的多线程在IO密集型程序中是有用的(GIL),由于有GIL的存在一个进程只能被一个cpu所执行,所以一个进程下的多个线程,如果一个进程内的某个线程遇到io阻塞就会切到该进程内的下一个线程。
       python的多进程在计算密集型程序中是有优势的,由于cpu会在规定的单位时间内从一个进程切到下一个进程中,所以这个任务是计算密集型就用进程创建该任务是最好不过的了。
      3、 并发的问题:
            不能无限制地开启多线程或多进程,这样电脑cpu会被撑爆的。
        4、解决:进程池或线程池
       思路:将进程或线程控制在一定数量内(计算机可承受的范围)
         进程池或线程池的问题:池的大小需要跟着任务规模的增大而调高,如果任务数过多,池返回会降低效率
      5、 最终:如何解决IO问题成了关键

二、任务的调用方式:
      1、同步调用

      提交完任务后,在原地等待任务执行完毕,拿到返回值,再执行下一行代码。(for循环)

   2、异步调用

      提交完任务后,不等待任务的执行,直接执行下一行代码
      3、异步调用+回调机制:
          提交完任务(捆绑一个回调函数)后,不等待任务的执行,直接执行下一行代码
          然后等到进程有结果直接出发回调函数的执行

三、区别阻塞的概念:
        阻塞指的是进程的一种状态:在进程遇到IO时,会被操作系统剥夺走CPU的执行权限

  最终:需要寻找一种解决方案,需要同时具备以下几点。(就是上面的几种方法)
      1、检测单线程下的IO
      2、遇到IO自动切换(切之前保存状态)

原文地址:https://www.cnblogs.com/xuanan/p/7810365.html