python爬虫之多线程爬取

一、什么是多进程?

  像电脑上同时运行多个软件,比如在打开微信的同时,也打开了QQ与钉钉,这就是多进程。

二、什么是多线程?

  一个进程中可以进行多种操作,即在QQ上既可以发送消息也可视频/语音,这就是多线程。

三、主进程/子进程

  主进程下面可能会有好多子进程,即不一定一个运行的软件就是一个进程,他下面可能会有很多个子进程。

四、主线程/子线程

  一个主线程下面可能会有多个子线程。

五、如何创建线程(Thread)

1、面向过程的创建方式

t = threading.Thread(target=s, name='xxx'xxx argxs=(x,y))

target:线程启动之后需要执行的函数

name:线程的名字

threading.current_thread().name:获取线程的name

args:主线程向子线程传递的参数

t.start():启动线程

t.join():让主线程等待子线程结束

#!/usr/local/bin/python3.7

import threading
import time

"""
    一个主线程,两个子线程。
"""

def send_message(x):
    for i in range(1,5):
        print('%s在发%s'%(x,threading.current_thread().name))
        time.sleep(1)

def video(x):
    for i in range(1,5):
        print('%s在%s'%(x, threading.current_thread().name))
        time.sleep(1)

def main():
    # 主线程传递到子线程的参数
    x = '墨子李'
    # 创建发消息的线程
    sthread = threading.Thread(target=send_message, name='发消息', args=(x,))
    # 创建视频的线程
    vthread = threading.Thread(target=video, name='视频', args=(x,))
    # 启动线程
    sthread.start()
    vthread.start()
    # 主线程等待子线程结束之后再结束
    sthread.join()
    vthread.join()
    print('我是主线程')
if __name__ == "__main__":
    main()

2、面向对象的创建方式

#!/usr/local/bin/python3.7

"""
    定义一个类,继承自threading.Thread
"""
import threading
import time

class SendMessage(threading.Thread):
    def __init__(self, name, x):
        super().__init__()
        self.name = name
        self.x = x

    def run(self):
        for i in range(1, self.x):
            print('%s在发送第%s条消息'%(self.name, i))
            time.sleep(1)

class Video(threading.Thread):
    def __init__(self, name, x):
        super().__init__()
        self.name = name
        self.x = x

    def run(self):
        for i in range(1, self.x):
            print('%s视频了%s分钟'%(self.name, i))
            time.sleep(1)



def main():
    # 创建线程
    sthread = SendMessage('墨子李', 5)
    vthread = Video('墨子李', 5)

    # 启动线程
    sthread.start()
    vthread.start()

    # 主线程等待子线程结束
    sthread.join()
    vthread.join()

if __name__ == "__main__":
    main()

 3、线程同步

  线程之间共享全局变量(进程之间不可以),会出现数据混乱的现象,这个时候要使用线程锁来处理这种情况。

  创建锁:s = threading.Lock()

  上锁:s.acquire()

  释放锁:s.release()

4、队列(queue)

  先进先出原则

  创建队列:q = Queue(5)

  给队列添加数据:q.put('xxx')

  q.put('xxx', False) 如果队列满,程序直接报错

  q.put('xxx', True, 3) 如果队列满,程序等待3s再报错

  q.get() 获取数据,如果队列为空卡在这里等待

  q.get(False) 如果队列为空,程序直接报错

  q.get(True, 3) 如果队列为空,程序等待3s报错

  q.empty() 判断队列是否为空

  q.full() 判断队列是否已满

  q.qsize() 获取队列长度

#!/usr/local/bin/python3.7

from queue import Queue

# 创建队列,规定队列长度为5
q = Queue(5)

# 添加数据
q.put('1')
q.put('2')
q.put('3')
q.put('4')
q.put('5')
# 判断队列是否已满,返回true
print(q.full())

# 获取数据
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())


# 队列长度 ,数据取完之后长度为0
print(q.qsize())
# 判断队列是否为空,此时返回True
print(q.empty())

5、多线程爬取 

1、分析

  两类线程:下载、解析

  内容队列:下载线程往队列中put数据,解析线程从队列get数据。

  url队列:下载线程从url队列get数据

  写数据:上锁

2、实例

#!/usr/local/bin/python3.7

"""
@File    :   duoxiancheng.py
@Time    :   2020/06/19
@Author  :   Mozili

"""

import requests
import threading
import queue
import time
from lxml import etree
import json



# 采集线程列表
crawl_thread_list = []
# 解析线程列表
parse_thread_list = []

class CrawlThread(threading.Thread):
    def __init__(self, name, page_queue, data_queue):
        super().__init__()
        self.name = name
        self.page_queue = page_queue
        self.data_queue = data_queue
        self.headers = {'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.1 Safari/605.1.15'
    } 

    def run(self):
        print('%s开始采集数据....'%self.name)
        # 这里需要加循环让线程继续下去,不然执行一次之后就会停止
        while True:
            # 获取页码队列中的数据,拼接url
            url = 'http://www.ifanjian.net/jianwen-{}'
            # 判断队列是否为空
            if not self.page_queue.empty():
                url = url.format(self.page_queue.get())
                # print(url) 
            else:
                # print('数据已取完')
                break
            # 发送请求
            r = requests.get(url, headers=self.headers)
            # print('请求url----------------------------------------', r.url)
            # self.fp1.write(r.text)
            # 将数据添加到数据队列
            self.data_queue.put(r.text)
            # print(self.data_queue.qsize())
            # print(self.data_queue.get())
        print('%s结束采集数据....'%self.name)
              

class ParseThread(threading.Thread):
    def __init__(self, name, data_queue, lock, fp):
        super().__init__()
        self.name = name
        self.data_queue = data_queue
        self.lock = lock
        self.fp = fp
    
    def run(self):
        
        items = []
        print('{}开始解析数据...'.format(self.name))
        while self.data_queue.qsize()!= 0:
            # 从队列中获取数据
            content = self.data_queue.get()
            # 解析数据
            tree = etree.HTML(content)
            time.sleep(1)
            # 获取标题
            title_list = tree.xpath("//ul[@class='cont-list']/li/h2/a/text()")
            # print(title_list)
            # 获取图片链接
            src_list = tree.xpath("//ul[@class='cont-list']/li//p/img/@data-src")
            # print(src_list)
            for title in title_list:
                for src in src_list:
                    data = {
                        '标题':title,
                        '链接':src
                        }
                    items.append(data) 
                    # 加锁
                    self.lock.acquire()
                    self.fp.write(json.dumps(data, ensure_ascii=False)+'
')
                    # 释放锁
                    self.lock.release()
                    break 
        # print(self.data_queue.qsize())
        print('{}结束解析数据...'.format(self.name))
        
def creat_crawl_thread(page_queue, data_queue):

    crawl_name_list = ['采集线程1', '采集线程2', '采集线程3']
    for name in crawl_name_list:
        crawl_thread = CrawlThread(name, page_queue, data_queue)
        crawl_thread_list.append(crawl_thread) 

def creat_parse_thread(data_queue, lock, fp):

    parse_name_list = ['解析线程1', '解析线程2', '解析线程3']
    for name in parse_name_list:
        parse_thread = ParseThread(name, data_queue, lock, fp) 
        parse_thread_list.append(parse_thread)

def main():
    # 创建页码队列
    page_queue = queue.Queue()
    # 给队列添加页码
    for page in range(1, 51):
        page_queue.put(page)
    # 创建数据队列
    data_queue = queue.Queue()
    # 创建锁
    lock = threading.Lock()
    # 创建文件
    fp = open('Reptile/fanjian.json', 'a', encoding='utf8')
    # fp1 = open('Reptile/fanjian.txt', 'a', encoding='utf8')
    # 创建采集数据线程
    creat_crawl_thread(page_queue, data_queue)
    # 创建解析数据线程
    creat_parse_thread(data_queue, lock, fp)
    
    # 启动采集数据线程
    for cthread in crawl_thread_list:
        cthread.start()
     
    time.sleep(3)
    # 启动解析数据线程
    for pthread in parse_thread_list:
        pthread.start()
    
    # 主线程等待子线程结束
    cthread.join()
    pthread.join()
    
    print('主线程结束....')
    # 关闭文件
    # fp.close()

if __name__ == "__main__":
    main()

  

  

原文地址:https://www.cnblogs.com/lxmtx/p/13129064.html