python3多线程糗事百科案例

Queue(队列对象)

Queue是python中的标准库,可以直接import Queue引用;队列是线程间最常用的交换数据的形式

python下多线程的思考

对于资源,加锁是个重要的环节。因为python原生的list,dict等,都是not thread safe的。而Queue,是线程安全的,因此在满足使用条件下,建议使用队列

  1. 初始化: class Queue.Queue(maxsize) FIFO 先进先出

  2. 包中的常用方法:

    • Queue.qsize() 返回队列的大小

    • Queue.empty() 如果队列为空,返回True,反之False

    • Queue.full() 如果队列满了,返回True,反之False

    • Queue.full 与 maxsize 大小对应

    • Queue.get([block[, timeout]])获取队列,timeout等待时间

  3. 创建一个“队列”对象

    • import queue
    • myqueue = queue.Queue(maxsize = 10)
  4. 将一个值放入队列中

    • myqueue.put(10)
  5. 将一个值从队列中取出

    • myqueue.get()
  1 # 使用了线程库
  2 import threading
  3 # 队列
  4 import queue
  5 # 解析库
  6 from lxml import etree
  7 # 请求处理
  8 import requests
  9 # json处理
 10 import json
 11 import time
 12 
 13 class ThreadCrawl(threading.Thread):
 14     def __init__(self, threadName, pageQueue, dataQueue):
 15         #threading.Thread.__init__(self)
 16         # 调用父类初始化方法
 17         super(ThreadCrawl, self).__init__()
 18         # 线程名
 19         self.threadName = threadName
 20         # 页码队列
 21         self.pageQueue = pageQueue
 22         # 数据队列
 23         self.dataQueue = dataQueue
 24         # 请求报头
 25         self.headers = {"User-Agent" : "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0;"}
 26 
 27     def run(self):
 28         print("启动 " + self.threadName)
 29         while not CRAWL_EXIT:
 30             try:
 31                 # 取出一个数字,先进先出
 32                 # 可选参数block,默认值为True
 33                 #1. 如果对列为空,block为True的话,不会结束,会进入阻塞状态,直到队列有新的数据
 34                 #2. 如果队列为空,block为False的话,就弹出一个Queue.empty()异常,
 35                 page = self.pageQueue.get(False)
 36                 url = "http://www.qiushibaike.com/8hr/page/" + str(page) +"/"
 37                 #print url
 38                 content = requests.get(url, headers = self.headers).text
 39                 time.sleep(1)
 40                 self.dataQueue.put(content)
 41                 #print(len(content))
 42             except:
 43                 pass
 44         print("结束 " + self.threadName)
 45 
 46 class ThreadParse(threading.Thread):
 47     def __init__(self, threadName, dataQueue, filename, lock):
 48         super(ThreadParse, self).__init__()
 49         # 线程名
 50         self.threadName = threadName
 51         # 数据队列
 52         self.dataQueue = dataQueue
 53         # 保存解析后数据的文件名
 54         self.filename = filename
 55         #
 56         self.lock = lock
 57 
 58     def run(self):
 59         print("启动" + self.threadName)
 60         while not PARSE_EXIT:
 61             try:
 62                 html = self.dataQueue.get(False)
 63                 self.parse(html)
 64             except:
 65                 pass
 66         print("退出" + self.threadName)
 67 
 68     def parse(self, html):
 69         # 解析为HTML DOM
 70         html = etree.HTML(html)
 71 
 72         node_list = html.xpath('//div[contains(@id, "qiushi_tag")]')
 73 
 74         for node in node_list:
 75             # xpath返回的列表,这个列表就这一个参数,用索引方式取出来,用户名
 76             username = node.xpath('.//img/@alt')[0]
 77             # 图片连接
 78             image = node.xpath('.//div[@class="thumb"]//@src')#[0]
 79             # 取出标签下的内容,段子内容
 80             content = node.xpath('.//div[@class="content"]/span')[0].text
 81             # 取出标签里包含的内容,点赞
 82             zan = node.xpath('.//i')[0].text
 83             # 评论
 84             comments = node.xpath('.//i')[1].text
 85 
 86             items = {
 87                 "username" : username,
 88                 "image" : image,
 89                 "content" : content,
 90                 "zan" : zan,
 91                 "comments" : comments
 92             }
 93 
 94             # with 后面有两个必须执行的操作:__enter__ 和 _exit__
 95             # 不管里面的操作结果如何,都会执行打开、关闭
 96             # 打开锁、处理内容、释放锁
 97             with self.lock:
 98                 # 写入存储的解析后的数据
 99                 self.filename.write(json.dumps(items, ensure_ascii = False) + "
")
100 
101 CRAWL_EXIT = False
102 PARSE_EXIT = False
103 
104 
105 def main():
106     # 页码的队列,表示20个页面
107     pageQueue = queue.Queue(20)
108     # 放入1~10的数字,先进先出
109     for i in range(1, 21):
110         pageQueue.put(i)
111 
112     # 采集结果(每页的HTML源码)的数据队列,参数为空表示不限制
113     dataQueue = queue.Queue()
114 
115     filename = open("duanzi.json", "a")
116     # 创建锁
117     lock = threading.Lock()
118 
119     # 三个采集线程的名字
120     crawlList = ["采集线程1号", "采集线程2号", "采集线程3号"]
121     # 存储三个采集线程的列表集合
122     threadcrawl = []
123     for threadName in crawlList:
124         thread = ThreadCrawl(threadName, pageQueue, dataQueue)
125         thread.start()
126         threadcrawl.append(thread)
127 
128 
129     # 三个解析线程的名字
130     parseList = ["解析线程1号","解析线程2号","解析线程3号"]
131     # 存储三个解析线程
132     threadparse = []
133     for threadName in parseList:
134         thread = ThreadParse(threadName, dataQueue, filename, lock)
135         thread.start()
136         threadparse.append(thread)
137 
138     # 等待pageQueue队列为空,也就是等待之前的操作执行完毕
139     while not pageQueue.empty():
140         pass
141 
142     # 如果pageQueue为空,采集线程退出循环
143     global CRAWL_EXIT
144     CRAWL_EXIT = True
145 
146     print("pageQueue为空")
147 
148     for thread in threadcrawl:
149         thread.join()
150         print("1")
151 
152     while not dataQueue.empty():
153         pass
154 
155     global PARSE_EXIT
156     PARSE_EXIT = True
157 
158     for thread in threadparse:
159         thread.join()
160         print("2")
161 
162     with lock:
163         # 关闭文件
164         filename.close()
165     print("谢谢使用!")
166 
167 if __name__ == "__main__":
168     main()

控制台输出:

 1 启动 采集线程1号
 2 启动 采集线程2号
 3 启动 采集线程3号
 4 启动解析线程1号
 5 启动解析线程2号
 6 启动解析线程3号
 7 pageQueue为空
 8 结束 采集线程2号
 9 89037
10 结束 采集线程3号
11 89037
12 结束 采集线程1号
13 1
14 1
15 1
16 退出解析线程3号
17 退出解析线程1号
18 2
19 退出解析线程2号
20 2
21 2
22 谢谢使用!
 
原文地址:https://www.cnblogs.com/wanglinjie/p/9201461.html