并发下载

并发下载

一.queue的实现

from queue import Queue

queue_object=Queue()

for i in range(4):
    queue_object.put(i)
    
while not queue_object.empty():
    print(queue_object.get())
0
1
2
3
from queue import LifoQueue

lifo_queue=LifoQueue()

for i in range(4):
    lifo_queue.put(i)
    
while not lifo_queue.empty():
    print(lifo_queue.get())
3
2
1
0
from queue import PriorityQueue

class Job(object):
    def __init__(self,level,description):
        self.level=level
        self.description=description
        return
    
    def __lt__(self,other):
        return self.level<other.level
    
priority_queue=PriorityQueue()

priority_queue.put(Job(5,"中级别工作"))
priority_queue.put(Job(10,"低级别工作"))
priority_queue.put(Job(1,"重要工作"))

while not priority_queue.empty():
    next_job=priority_queue.get()
    print("开始工作:",next_job.description)
开始工作: 重要工作
开始工作: 中级别工作
开始工作: 低级别工作

二.三种技术采集和解析数据对比

1.单线程实现

步骤:构建网址—>访问网页并获取源代码—>解析源代码—>转成JSON格式—>存储在本地文件

from lxml import etree
import requests
import json

# 访问网页的请求头
headers={
    "User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36",
    "Accept-Language":"zh-CN,zh;q=0.8"
}

# 存储解析后数据的本地文件
local_file=open("duanzi.json","a")

# 解析 html字符串,获取需要信息
def parse_html(html):
    text=etree.HTML(html)
    
    # 返回所有段子的结点位置
    # contains模糊查询,第一个参数是要匹配的标签,第二个参数是标签名的部分内容
    node_list=text.xpath('//li[contains(@id,"qiushi_tag")]')
    
    for node in node_list:
        # 获取用户名
        username=node.xpath('.//span[@class="recmd-name"]/text()')
    
        #图片链接
        image=node.xpath('.//img/@src')[0]
    
        # 段子内容
        content=node.xpath('.//a[@class="recmd-content"]')[0].text
    
        #点赞
        like=node.xpath('.//div[@class="recmd-num"]/span')[0].text
    
        # 评论
        try:
            comments=node.xpath('.//div[@class="recmd-num"]/span')[3].text
        except IndexError:
            comments=0
    
        items={
            "username":username,
            "image":image,
            "content":content,
            "like":like,
            "comments":comments
        }
    
        local_file.write(json.dumps(items,ensure_ascii=False)+"
")
        
def main():
    # 获取1-10页的网页源代码解析
    for page in range(1,11):
        url="https://www.qiushibaike.com/8hr/page/"+str(page)+"/"
        # 爬取网页源代码
        html=requests.get(url,headers=headers).text
        
        parse_html(html)
main()

2.多线程实现

从单线程爬虫的流程可以看出,全部过程只是用了一个线程,先爬取一个网页,对网页内容进行解析,然后存储,完成整套操作后再开始爬取下一个网页,每个网页依次进行,效率非常慢

爬虫的流程简要步骤如下:

  1. 使用一个队列pageQueue保存要访问的网页页码
  2. 同时启动多个采集线程,每个线程都从网页页码队列pageQueue中取出一个要访问的页码,构建网址,访问网址并爬取数据.操作完一个网页后再从网页页码队列中取出下一个页码,依次进行,直到所有的页码都已访问完毕.所有的采集线程保存在列表threadCrawls中
  3. 使用一个队列dataQueue来保存所有的网页源代码,每个线程获取到的数据都放入该队列中
  4. 同时启动多个解析线程,每个线程都从网页源代码队列dataQueue中取出一个网页源代码,并进行解析,获取想要的数据,并转化为JSON格式.解析完成后再取出下一个网页源代码,依次进行,直到所有的源代码都已被取出.将所有的解析线程存储在列表threadParses中
  5. 将解析得到的JSON数据存储在本地文件duanzi.json中
from IPython.display import Image
Image(filename="./data/thread.png",width=500)

output_13_0.png

创建一个ThreadCrawl类,继承自threading.Thread类,用于采集网页信息

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author LQ6H

import requests
import threading
from lxml import etree
import json
from queue import Queue


# 采集网页页码队列是否为空的信号
CRAWL_EXIT=False

class ThreadCrawl(threading.Thread):
    def __init__(self,threadName,pageQueue,dataQueue):
        threading.Thread.__init__(self)
        # 线程名
        self.threadName=threadName
        # 页码队列
        self.pageQueue=pageQueue
        # 数据队列
        self.dataQueue=dataQueue

        self.headers="{'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36'}"

    def run(self):
        print("启动:"+self.threadName)

        while not CRAWL_EXIT:
            try:
                # 从dataQueue中取出1一个页码数字,先进先出
                # 可选参数block,默认值是True
                # 如果队列为空,block为True,会进入阻塞状态,直到队列有新的数据
                # 如果队列为空,block为False,会弹出一个Queue.empty()异常
                page=self.pageQueue.get(False)
                # 构建网页的URL地址
                url="https://www.qiushibaike.com/8hr/page/"+str(page)+"/"
                content=requests.get(url,headers=self.headers).text

                # 将爬取到的网页源代码放入dataQueue队列中
                self.dataQueue.put(content)
            except:
                pass

        print("结束:"+self.threadName)

首先定义一个全局变量CRAWL_EXIT,用于标识pageQueue队列是否为空.当pageQueue不玩空时,线程继续爬取下一个页码;当pageQueue为空时,表明所有的网页都已被爬取完毕,线程就可以退出

队列pageQueue是线程安全的,使用队列来调度线程,保证了每个线程采集的网页地址不重复

创建一个ThreadParse类,继承自threading.Thread,用于解析网页信息

PARSE_EXIT=False
class ThreadParse(threading.Thread):
    def __init__(self,threadName,dataQueue,localFile,lock):
        super(ThreadParse,self).__init__()
        # 线程名
        self.threadName=threadName
        # 数据队列
        self.dataQueue=dataQueue
        # 保存解析后数据的文件名
        self.localFile=localFile
        # 互斥锁
        self.lock=lock

    def run(self):
        print("启动:"+self.threadName)
        while not PARSE_EXIT:
            try:
                html=self.dataQueue.get(False)
                self.parse(html)

            except:
                pass

        print("结束:"+self.threadName)


    def parse(self,html):
        text = etree.HTML(html)
        node_list = text.xpath('//li[contains(@id,"qiushi_tag")]')

        for node in node_list:
            try:

                # 获取用户名
                username = node.xpath('.//span[@class="recmd-name"]/text()')

                # 图片链接
                image = node.xpath('.//img/@src')[0]

                # 段子内容
                content = node.xpath('.//a[@class="recmd-content"]')[0].text

                # 点赞
                like = node.xpath('.//div[@class="recmd-num"]/span')[0].text

                # 评论
                try:
                    comments = node.xpath('.//div[@class="recmd-num"]/span')[3].text
                except IndexError:
                    comments = 0

                items = {
                    "username": username,
                    "image": image,
                    "content": content,
                    "like": like,
                    "comments": comments
                }


                # with后面有两个必须执行的操作:__enter__和__exit__,打开和关闭
                # 不管里面的操作如何,都会直接打开和关闭功能
                # 打开锁,向文件添加内容,释放锁

                with self.lock:
                    # 写入解析后的数据
                    self.localFile.write(json.dumps(items,ensure_ascii=False)+"
")
            except:
                pass

在多线程开发中,为了维护资源的完整性,在访问共享资源时使用共享锁lock.线程获得了锁之后,才可以访问文件localFile,并往里写入数据,写入完毕后,将锁释放,其他线程就可以访问这个文件.同一时刻,只允许一个协程访问该文件

def main():

页码队列,存储10个页码,先进先出

pageQueue=Queue(10)

for i in range(1,11):

pageQueue.put(i)

# 采集结果(网页的HTML源代码)的数据队列,参数为空表示不限制
dataQueue=Queue()
# 以追加的方式打开本地文件
localFile=open("duanzi.json","wb+")
# 互斥锁
lock=threading.Lock()

# 3个采集线程的名字
crawlList=["采集线程1号","采集线程2号","采集线程3号"]
# 创建,启动和存储3个采集线程
threadCrawls=[]

for threadName in crawlList:
    thread=ThreadCrawl(threadName,pageQueue,dataQueue)
    thread.start()
    threadCrawls.append(thread)

# 3个解析线程的名字
parseList=["解析线程1号","解析线程2号","解析线程3号"]
# 创建,启动和存储3个解析线程
threadParses=[]
for threadName in parseList:
    thread=ThreadParse(threadName,dataQueue,localFile,lock)
    thread.start()
    threadParses.append(thread)

while not pageQueue.empty():
    pass

# 如果pageQueue为空,采集线程退出循环
global CRAWL_EXIT
CRAWL_EXIT=True

print("pageQueue为空
")

for thread in threadCrawls:
    # 阻塞子线程
    thread.join()

while not dataQueue.empty():
    pass

print("dataQueue为空")

global PARSE_EXIT
PARSE_EXIT=True

for thread in threadParses:
    thread.join()

with lock:
    # 关闭文件,在关闭之前,内容都在内存里
    localFile.close()

if name==“main”:

main()

3.协程实现

在上面实现的多线程爬虫中,分别开启了3个采集线程爬取网页和3个解析线程来解析网页,提供了程序执行的效率.但是,线程是交由CPU调度的,每个时间片段中只能有一个线程执行.而协程是在一个线程内部执行,一旦遇到了网络I/O阻塞,它就会立刻切换到另一个协程中执行,通过不断的轮询,降低了爬取网页的时间.对于爬虫而言,协程和多线程在效率上没有很大的不同

使用协程来实现爬虫,具体步骤如下:

  1. 定义一个负责爬虫的类,所有的爬虫工作完全交由该类负责
  2. 使用一个队列data_queue保存所有的数据
  3. 创建多个协程任务,每个协程都会使用页码构建完整的网址,访问网址爬取和提取有用的数据,并保存到数据队列中,直到所有网页中的数据提取出来
  4. 将data_queue队列中的数据全部提取出来,保存到本地文件duanzi.txt中
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author LQ6H


import requests
from queue import Queue
import time
from lxml import etree
import gevent

class Spider(object):
    def __init__(self):
        self.headers={
            "User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36"
        }

        self.base_url="https://www.qiushibaike.com/8hr/page/"
        # 创建保存数据的队列
        self.data_queue=Queue()
        # 统计数量
        self.count=0

    def send_request(self,url):
        print("[INFO]:正在爬取"+url)
        html=requests.get(url,headers=self.headers).content
        # 每次请求间隔1s
        time.sleep(1)
        self.parse_page(html)

    def parse_page(self,html):
        html_ogj=etree.HTML(html)
        node_list=html_ogj.xpath('//li[contains(@id,"qiushi_tag")]')

        for node in node_list:
            try:

                # 获取用户名
                username = node.xpath('.//span[@class="recmd-name"]/text()')

                # 图片链接
                image = node.xpath('.//img/@src')[0]

                # 段子内容
                content = node.xpath('.//a[@class="recmd-content"]')[0].text

                # 点赞
                like = node.xpath('.//div[@class="recmd-num"]/span')[0].text

                # 评论
                try:
                    comments = node.xpath('.//div[@class="recmd-num"]/span')[3].text
                except IndexError:
                    comments = 0

                items = {
                    "username": username,
                    "image": image,
                    "content": content,
                    "like": like,
                    "comments": comments
                }

                self.count+=1
                self.data_queue.put(items)
            except:
                pass

    def start_work(self):
        job_list=[]
        for page in range(1,11):
            # 构建一个协程任务对象
            url=self.base_url+str(page)+"/"
            job=gevent.spawn(self.send_request,url)

            # 保存所有的协程任务
            job_list.append(job)

        # joinall()接收一个列表,将列表中的所有协程任务添加到任务队列里执行
        gevent.joinall(job_list)

        local_file=open("duanzi.json","wb+")

        while not self.data_queue.empty():
            content=self.data_queue.get()
            result=str(content).encode("utf-8")
            local_file.write(result+b"
")

        local_file.close()
        print(self.count)

if __name__=="__main__":
    spider=Spider()
    spider.start_work()

4.性能分析

使用单线程,多线程和协程实现数据的爬取后,通过计算这3种方式下的耗时情况,比较三种爬虫的效率

首先导入time模块,然后计算main()函数执行之后与之前的时间差,或者计算调用start_work()方法之前与调用之后的时间差

计算main()函数执行前后时间差:

if __name__=="__main__":
    startTime=time.time()
    main()
    print(time.time()-startTime)

计算start_work()方法调用前后时间差:

if __name__=="__main__":
    spider=Spider()
    start=time.time()
    spider.start_work()
    print("[INFO]:Using time%f secend"%(time.time()-start))
原文地址:https://www.cnblogs.com/LQ6H/p/12940570.html