<随便写> 多线程的例子

'''
	一个线程在使用这个共享的时候,其他线程必须等待他结束
	通过"锁"实现,作用就是防止多个线程使用这片内存空间
	进程:程序的一次执行
	线程:cpu运算的基本调度单位
	多线程:大量密集I/O处理,在等待响应的时候,其他线程去工作
	多进程:大量的密集并行计算
	scrapy:异步网络框架(很多协程在处理)
	页码队列--线程取页码爬取(采集线程--网络IO)--数据队列(得到的响应)--线程解析网页(解析线程磁盘IO)--解析后的数据存储
'''
# 请求
import requests
# 队列
from multiprocessing import Queue
# 线程
from threading import Thread
import threading
# 解析
from lxml import etree
# 存储
import json
import time


class ThreadCrawl(Thread):
	def __init__(self, threadName, pageQueue, dataQueue):
		# 调用父类的初始化方法
		super(ThreadCrawl, self).__init__()
		self.threadName = threadName
		self.pageQueue = pageQueue
		self.dataQueue = dataQueue
		self.headers = {"User-Agent":"Mozilla/5.0(Windows NT 10.0;WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.170 Safari/537.36"}

	# thread.start()会执行run方法
	def run(self):
		print("启动"+self.threadName)
		while not CRAWL_EXIT:
			try:
				# 从页码队列取出一个数字,
				# 可选参数block(默认Ture)
				# 1.队列为空,block为Ture,会进入阻塞状态,直到有新的值进入队列
				# 2.如果队列为空.block为False,会弹出Queue.empty()出错
				page = self.pageQueue.get(False)
				url = "https://www.qiushibaike.com/text/page/" + str(page) + "/"
				content = requests.get(url,headers=self.headers).text
				#调用数据队列,将源码放进去
				self.dataQueue.put(content)
			except:
				pass
			print("结束"+self.threadName)

class ThreadParse(Thread):
	def __init__(self,threadName,dataQueue,filename,lock):
		super(ThreadParse,self).__init__()
		self.threadName = threadName
		self.dataQueue = dataQueue
		self.filename = filename
		self.lock = lock

	def run(self):
		while not PARSE_EXIT:
			try:
				html = self.dataQueue.get(False)
				self.parse(html)
			except:
				pass

	def parse(self,html):
		html = etree.HTML(html)
		print(html)

		# with 后面有两个必须执行的操作:__enter__ 和 _exit__
		# 不管里面的操作结果如何,都会执行打开、关闭
		# 打开锁、处理内容、释放锁
		with self.lock:
			# 写入存储的解析后的数据
			self.filename.write(json.dumps(html, ensure_ascii=False).encode("utf-8") + "
")


CRAWL_EXIT = False
PARSE_EXIT = False

def main():
	# 页码队列,可以存储20个值
	pageQueue = Queue(20)
	# 放入1-10数字,先进先出
	for i in range(1, 21):
		pageQueue.put(i)

	# 数据队列,HTML源码,不写参数,默认无限
	dataQueue = Queue()

	# 创建锁
	lock = threading.Lock()

	# 采集线程名字
	crawlList = ["采集线程1号", "采集线程2号", "采集线程3号"]

	# 存储采集线程
	thread_crawl = []
	for threadName in crawlList:
		# 写一个
		thread = ThreadCrawl(threadName, pageQueue, dataQueue)
		thread.start()
		thread_crawl.append(thread)

	filename = open("duanzi.json","a")
	#解析线程名字
	parseList = ["解析线程1号","解析线程2号","解析线程3号"]
	threadparse = []
	for threadName in parseList:
		thread = ThreadParse(threadName,dataQueue,filename,lock)
		thread.start()
		threadparse.append(thread)



	#如果队列不为空,一直在这等待
	while not pageQueue.empty():
		pass
	#如果队列为空,CRAWL_EXIT = True 退出
	global CRAWL_EXIT
	CRAWL_EXIT = True

	#加阻塞,线程做完才能运行主线程
	for thread in thread_crawl:
		thread.join()
		print(thread)

	while not dataQueue.empty():
		pass

	global PARSE_EXIT
	PARSE_EXIT = True

	for thread in threadparse:
		thread.join()
		print(thread)

	with lock:
		# 关闭文件
		filename.close()
	print("谢谢使用")

if __name__ == '__main__':
	main()

  

原文地址:https://www.cnblogs.com/shuimohei/p/10500634.html