多线程

多线程即是同时运行多个子程序,就像爬取10页网页,笨方法就是从第一页爬到第十页这样最少需要十几分钟

而用多线程创建10个线程同时爬十个网页,理论上速度可扩大10倍

例如:

  • import threading,time  
  • class MyThread(threading.Thread):  
  •     def __init__(self,threadname):  
  •         threading.Thread.__init__(self,name=threadname)  
  •   
  •     def run(self):  
  •         for i in xrange(10):  
  •             print self.getName(),i  
  •             time.sleep(1)  
  •   
  • my = MyThread('test')  
  • my.start() 

定义类继承threading.Thread,然后在__init__里首先调用threading.Thread的__init__方法即可创建线程

重写类的run()方法即可,把你要在线程执行时做的事情都放到里面,这个线程就可以工作了

本例中线程的工作就是每隔一秒输出一下,现在线程就处于“ready”状态或者也称为“runnable”状态。为什么不称为“running”状态呢?其实是有原因的。因为我们的计算机一般是不具有真正并行处理能力的。我们所谓的多线程只是把时间分成片段,然后隔一个时间段就让一个线程执行一下,然后进入“sleeping ”状态,然后唤醒另一个在“sleeping”的线程,如此循环runnable->sleeping->runnable... ,只是因为计算机执行速度很快,而时间片段间隔很小,我们感受不到,以为是同时进行的。所以说一个线程在start了之后只是处在了可以运行的状态,他什么时候运行还是由系统来进行调度的。那一个线程什么时候会“dead”呢?一般来说当线程对象的run方法执行结束或者在执行中抛出异常的话,那么这个线程就会结束了。系统会自动对“dead”状态线程进行清理。如果一个线程A在执行的过程中需要等待另一个线程tB执行结束后才能运行的话,那就可以在A在调用B的B.join()方法,另外还可以给join()传入等待的时间。

t1 = MyThread('t1')
print t1.getName(),t1.isDaemon()
t1.setDaemon(True)
print t1.getName(),t1.isDaemon()
t1.start()
print ('main thread exit')

上面的代码,线程对象的setDaemon()方法可以让子线程随着主线程的退出而结束,不过注意的是setDaemon()方法必须在线程对象没有调用start()方法之前调用(默认情况下;在python中,主线程结束后,会默认等待子线程结束后,主线程才退出)。当执行到 print 'main thread exit' 后,主线程就退出了,当然t1这个线程也跟着结束了。但是如果不使用t1线程对象的setDaemon()方法的话,即便主线程结束了,还要等待t1线程自己结束才能退出进程。isDaemon()是用来获得一个线程对象的Daemonflag状态的
获得当前正在运行的线程的引用   running = threading.currentThread()
获得当前所有活动对象(即run方法开始但是未终止的任何线程)的一个列表   threadlist = threading.enumerate()
获得这个列表的长度   threadcount = threading.activeCount()
查看一个线程对象的状态调用这个线程对象的isAlive()方法,返回1代表处于“runnable”状态且没有“dead”     threadflag = threading.isAlive()
 
线程同歩队列
虽然不知道干啥的,好像很牛逼,至少安装这个模块的时候搞得差点死,Python2用的是Queue,而到了Python3 变成了queue而且不用安装,自动带的
网上找了个例子,爬取book.doubai排行榜,结果失败了,就换成了糗事百科,代码:
# -*- coding:utf-8 -*-  
import requests  
from lxml import etree  
import queue  
import threading  
import time  
import json  
  
  
class thread_crawl(threading.Thread):  
    ''''' 
    抓取线程类 
    '''  
  
    def __init__(self, threadID, q):  
        threading.Thread.__init__(self)  
        self.threadID = threadID  
        self.q = q  
  
    def run(self):  
        print ("Starting " + self.threadID)  
        self.qiushi_spider()  
        print( "Exiting ", self.threadID)  
  
    def qiushi_spider(self):  
        # page = 1  
        while True:  
            if self.q.empty():  
                break  
            else:  
                page = self.q.get()  
                print ('qiushi_spider=', self.threadID, ',page=', str(page))  
                url = 'http://www.qiushibaike.com/hot/page/' + str(page) + '/'  
                headers = {  
                    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36',  
                    'Accept-Language': 'zh-CN,zh;q=0.8'}  
                # 多次尝试失败结束、防止死循环  
                timeout = 4  
                while timeout > 0:  
                    timeout -= 1  
                    try:  
                        content = requests.get(url, headers=headers)  
                        data_queue.put(content.text)  
                        break  
                    except Exception:  
                        print ('qiushi_spider', e)  
                if timeout < 0:  
                    print ('timeout', url)  
  
  
class Thread_Parser(threading.Thread):  
    ''''' 
    页面解析类; 
    '''  
  
    def __init__(self, threadID, queue, lock, f):  
        threading.Thread.__init__(self)  
        self.threadID = threadID  
        self.queue = queue  
        self.lock = lock  
        self.f = f  
  
    def run(self):  
        print ('starting ', self.threadID)  
        global total, exitFlag_Parser  
        while not exitFlag_Parser:  
            try:  
                ''''' 
                调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。 
                如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。 
                如果队列为空且block为False,队列将引发Empty异常。 
                '''  
                item = self.queue.get(False)  
                if not item:  
                    pass  
                self.parse_data(item)  
                self.queue.task_done()  
                print ('Thread_Parser=', self.threadID, ',total=', total)  
            except:  
                pass  
        print ('Exiting ', self.threadID)  
  
    def parse_data(self, item):  
        ''''' 
        解析网页函数 
        :param item: 网页内容 
        :return: 
        '''  
        global total  
        try:  
            html = etree.HTML(item)  
            result = html.xpath('//div[contains(@id,"qiushi_tag")]')  
            for site in result:  
                try:  
                    imgUrl = site.xpath('.//img/@src')[0]  
                    title = site.xpath('.//h2')[0].text  
                    content = site.xpath('.//div[@class="content"]')[0].text.strip()  
                    vote = None  
                    comments = None  
                    try:  
                        vote = site.xpath('.//i')[0].text  
                        comments = site.xpath('.//i')[1].text  
                    except:  
                        pass  
                    result = {  
                        'imgUrl': imgUrl,  
                        'title': title,  
                        'content': content,  
                        'vote': vote,  
                        'comments': comments,  
                    }  
  
                    with self.lock:  
                        # print 'write %s' % json.dumps(result)  
                        self.f.write(json.dumps(result, ensure_ascii=False).encode('utf-8') + "
")  
  
                except Exception:  
                    print ('site in result', e)  
        except Exception:  
            print ('parse_data', e)  
        with self.lock:  
            total += 1  
  
data_queue = queue.Queue()  
exitFlag_Parser = False  
lock = threading.Lock()  
total = 0  
  
def main():  
    output = open('qiushibaike.json', 'a')  
  
    #初始化网页页码page从1-10个页面  
    pageQueue = queue.Queue(50)  
    for page in range(1, 11):  
        pageQueue.put(page)  
  
    #初始化采集线程  
    crawlthreads = []  
    crawlList = ["crawl-1", "crawl-2", "crawl-3"]  
  
    for threadID in crawlList:  
        thread = thread_crawl(threadID, pageQueue)  
        thread.start()  
        crawlthreads.append(thread)  
  
    #初始化解析线程parserList  
    parserthreads = []  
    parserList = ["parser-1", "parser-2", "parser-3"]  
    #分别启动parserList  
    for threadID in parserList:  
        thread = Thread_Parser(threadID, data_queue, lock, output)  
        thread.start()  
        parserthreads.append(thread)  
  
    # 等待队列清空  
    while not pageQueue.empty():  
        pass  
  
    # 等待所有线程完成  
    for t in crawlthreads:  
        t.join()  
  
    while not data_queue.empty():  
        pass  
    # 通知线程是时候退出  
    global exitFlag_Parser  
    exitFlag_Parser = True  
  
    for t in parserthreads:  
        t.join()  
    print ("Exiting Main Thread")  
    with lock:  
        output.close()  
  
  
if __name__ == '__main__':  
    main() 

  

原文地址:https://www.cnblogs.com/tianxxl/p/7674438.html