爬虫——多线程糗事百科案例

案例:多线程爬虫

目标:爬取糗事百科段子,待爬取页面URL:http://www.qiushibaike.com/8hr/page/1

要求:

  1. 使用requests获取页面信息,用XPATH/re 做数据提取
  2. 获取每个帖子里的 用户头像链接、用户主页、用户名、用户性别、用户年龄、段子内容、点赞次数、评论次数
  3. 保存到本地json文件内
  4. 采用多线程

queue(队列对象)

queue是python中的标准库,可以直接import queue引用,队列是线程间最常用的交换数据的形式

python下多线程:

对于资源,加锁是个重要的环节。因为python原生的list, dict等,都是not thread safe的。而queue,是thread safe(线程案例)的,因此在满足使用条件下,建议使用队列

  1. 初始化:class queue.Queue(maxsize) FIFO(先进先出)
  2. 常用方法:
    1. queue.Queue.qsize() 返回队列的大小
    2. queue.Queue.empty() 如果队列为空,返回True,反之返回False
    3. queue.Queue.full() 如果队列满了,返回True,反之返回False
    4. queue.Queue.get([block[, timeout]]) 从队列中取出一个值,timeout为等待时间
  3. 创建一个“队列”对象
    • import queue
    • myqueue = queue.Queue(maxsize = 10)
  4. 将一个值放入队列中
    • myqueue.put(10)
  5. 将一个值从队列中取出
    • myqueue.get()
#!/usr/bin/python3
# -*- coding:utf-8 -*-
__author__ = 'mayi'

"""
案例:多线程爬虫
目标:爬取糗事百科段子,待爬取页面首页URL:http://www.qiushibaike.com/8hr/page/1
要求:
    1.使用requests获取页面信息,用XPATH/re 做数据提取
    2.获取每个帖子里的 用户头像链接、用户主页、用户名、用户性别、用户年龄、段子内容、点赞次数、评论次数
    3.保存到json文件内
    4.采用多线程
"""

import requests
from lxml import etree
from queue import Queue
import threading
import time
import json

# 数据队列
data_queue = Queue()
exitFlag_Parser = False
# 锁
lock = threading.Lock()

class ThreadCrawl(threading.Thread):
    """
    爬取线程类
    """
    def __init__(self, thread_name, page_queue):
        threading.Thread.__init__(self)
        self.thread_name = thread_name
        self.page_queue = page_queue
        self.url = "http://www.qiushibaike.com/8hr/page/"
        self.header = {'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.71 Safari/537.36'}

    def run(self):
        print(self.thread_name + " Starting...")
        self.qiushi_spider()
        print(self.thread_name + " Exiting...")

    def qiushi_spider(self):
        global data_queue
        while True:
            # page队列为空时,循环结束
            if self.page_queue.empty():
                break
            else:
                page = self.page_queue.get()
                full_url = self.url + str(page) + "/"
                print(full_url)

            # 多次尝试发送请求失败后结束、防止死循环
            timeout = 5
            while timeout:
                try:
                    # 防止访问太快
                    time.sleep(1)
                    content = requests.get(full_url, headers = self.header)
                    data_queue.put(content.text)
                    break
                except Exception as e:
                    print(e)
                    timeout -= 1
                    time.sleep(1)


class ThreadParser(threading.Thread):
    """
    页面解析类
    """
    def __init__(self, thread_name, file_name):
        threading.Thread.__init__(self)
        self.thread_name = thread_name
        self.file_name = file_name

    def run(self):
        # 开始
        print(self.thread_name + " Starting...")
        global data_queue, exitFlag_Parser
        while not exitFlag_Parser:
            try:
                item = data_queue.get(block = False)
                if item:
                    self.parse_data(item)
                    data_queue.task_done()
            except:
                pass
        # 结束
        print(self.thread_name + " Exiting...")

    def parse_data(self, item):
        """
        解析网页函数
        :param item: 网页内容
        """
        global lock
        try:
            html = etree.HTML(item)
            # id = qiushi_tag_119336220:id均包含:qiushi_tag_
            result = html.xpath('//div[contains(@id,"qiushi_tag_")]')
            for res in result:
                try:
                    # 用户头像链接、用户主页、用户名、用户性别、用户年龄、段子内容、点赞次数、评论次数
                    # 用户头像链接
                    head_url = res.xpath('.//img/@src')[0]
                    # 用户主页
                    home_url = "http://www.qiushibaike.com" + res.xpath('.//a/@href')[0]
                    # 用户名
                    user_name = res.xpath('.//h2')[0].text
                    # 用户性别:匿名用户,匹配不到性别
                    article_gender = res.xpath('./div/div/@class')
                    if article_gender:
                        gender = article_gender[0].split()[-1].replace("Icon", "")
                    else:
                        gender = ""
                    # 用户年龄:匿名用户,匹配不到年龄
                    article_age = res.xpath('./div/div')
                    if article_age:
                        age = article_age[0].text
                    else:
                        age = 0
                    # 段子内容
                    content = res.xpath('.//div[@class="content"]/span')[0].text.strip()
                    # 点赞次数
                    stats_vote = res.xpath('.//span[@class="stats-vote"]//i[@class="number"]')
                    if stats_vote:
                        stats_vote = stats_vote[0].text.strip()
                    else:
                        stats_vote = "0"
                    # 评论次数
                    stats_comments = res.xpath('.//span[@class="stats-comments"]//i[@class="number"]')
                    if stats_comments:
                        stats_comments = stats_comments[0].text.strip()
                    else:
                        stats_comments = "0"

                    record = {
                        "head_url": head_url,
                        "home_url": home_url,
                        "user_name": user_name,
                        "gender": gender,
                        "age": age,
                        "content": content,
                        "stats_vote": stats_vote,
                        "stats_comments": stats_comments
                    }
                    with lock:
                        self.file_name.write(json.dumps(record, ensure_ascii = False) + ",")

                except Exception as e:
                    print(e)

        except Exception as e:
            print(e)

def main():
    """
    主函数
    :return:
    """
    # 采集的数据存储在本地磁盘的文件名
    file_name = open("糗事百科.json", "a", encoding = "utf-8")
    # 待采集的起始页码
    start_page = int(input("请输入起始页码:"))
    # 待采集的终止页码
    end_page = int(input("请输入终止页码:"))

    # 定义一个page队列
    pageQueue = Queue()
    for page in range(start_page, end_page + 1):
        # 页码入队列
        pageQueue.put(page)

    # 初始化采集线程
    crawl_threads = []
    crawl_list = ["采集线程1", "采集线程2", "采集线程3"]

    for thread_name in crawl_list:
        thread = ThreadCrawl(thread_name, pageQueue)
        thread.start()
        crawl_threads.append(thread)

    # 初始化解析线程
    parser_threads = []
    parser_list = ["解析线程1", "解析线程2", "解析线程3"]
    for thread_name in parser_list:
        thread = ThreadParser(thread_name, file_name)
        thread.start()
        parser_threads.append(thread)

    # 等待列队被清空
    while not pageQueue.empty():
        pass

    # 等待所有线程处理完成
    for thread in crawl_threads:
        thread.join()

    # 等待队列被清空
    while not data_queue.empty():
        pass

    # 通知线程退出
    global exitFlag_Parser
    exitFlag_Parser = True

    for thread in parser_threads:
        thread.join()

    with lock:
        file_name.close()

if __name__ == '__main__':
    # 运行主函数
    main()

  

原文地址:https://www.cnblogs.com/mayi0312/p/7229869.html