<scrapy>scrapy源码剖析

  • 前戏Twisted使用
    • 1.简单实现版本1.0
      • # 事件循环,监听socket变化(终止条件,所有socket都已经移除)
        from twisted.internet import reactor
        # 创建socket对象,如果下载完成,自动从事件循环中移除
        from twisted.web.client import getPage
        # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除
        from twisted.internet import defer
        
        
        # 1.利用getPage创建socket
        # 2.将socket添加到事件循环
        # 3.开始事件循环(内部发送请求,并接受响应,当所有socket请求完成,终止事件循环)
        
        # 1.利用getPage创建socket
        def response(content):
        	print(content)
        
        # 2.将socket添加到事件循环
        # 这个装饰器和yield d表示将socket已经添加到事件循环
        @defer.inlineCallbacks
        def task():
        	url = "http://www.baidu.com"
        	# 根据域名找到你的ip,并创建socket,返回值d和defer.Deferred类似,不会自动终止的特殊socket对象
        	d = getPage(url.encode('utf-8'))
        	# 利用socket发请求,请求完成拿到值,执行response函数
        	d.addCallback(response)
        
        	yield d
        
        # 执行task函数
        task()
        # 3.开始事件循环
        reactor.run()
    • 版本2.0
      • # 事件循环,监听socket变化(终止条件,所有socket都已经移除)
        from twisted.internet import reactor
        # 创建socket对象,如果下载完成,自动从事件循环中移除
        from twisted.web.client import getPage
        # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除
        from twisted.internet import defer
        
        # 版本1.0
        # 1.1.利用getPage创建socket
        # 1.2.将socket添加到事件循环
        # 1.3.开始事件循环(内部发送请求,并接受响应,当所有socket请求完成,终止事件循环)
        
        # 版本2.0
        # 2.1.解决不能自动终止的问题
        
        # 1.1. 利用getPage创建socket
        def response(content):
        	print(content)
        
        # 1.2.将socket添加到事件循环
        # 这个装饰器和yield d表示将socket已经添加到事件循环
        @defer.inlineCallbacks
        def task():
        	url = "http://www.baidu.com"
        	# 根据域名找到你的ip,并创建socket,返回值d和defer.Deferred类似,不会自动终止的特殊socket对象
        	d = getPage(url.encode('utf-8'))
        	# 利用socket发请求,请求完成拿到值,执行response函数
        	d.addCallback(response)
        
        	yield d
        
        def done(*args,**kwargs):
        	# 终止事件循环
        	reactor.stop()
        
        # 执行task函数
        d = task()
        # 监听d是否完成,需要用列表[d,]加入
        dd = defer.DeferredList([d,])
        # 监听d是否完成,如果完成就会调用addBoth的回调函数
        # 2.1:利用回调函数done终止事件循环
        dd.addBoth(done)
        
        
        # 1.3.开始事件循环
        reactor.run() 
    • 版本3.0
        • # 事件循环,监听socket变化(终止条件,所有socket都已经移除)
          from twisted.internet import reactor
          # 创建socket对象,如果下载完成,自动从事件循环中移除
          from twisted.web.client import getPage
          # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除
          from twisted.internet import defer
          
          
          # 版本1.0
          # 1.1.利用getPage创建socket
          # 1.2.将socket添加到事件循环
          # 1.3.开始事件循环(内部发送请求,并接受响应,当所有socket请求完成,终止事件循环)
          
          # 版本2.0
          # 2.1.解决不能自动终止的问题
          
          # 版本3.0
          # 3.1.1.解决并发,异步IO的问题--利用多个socket
          
          # 1.1. 利用getPage创建socket
          def response(content):
          	print(content)
          
          
          # 1.2.将socket添加到事件循环
          # 这个装饰器和yield d表示将socket已经添加到事件循环
          @defer.inlineCallbacks
          def task():
          	url = "http://www.baidu.com"
          	# 根据域名找到你的ip,并创建socket,返回值d和defer.Deferred类似,不会自动终止的特殊socket对象
          	d = getPage(url.encode('utf-8'))
          	# 利用socket发请求,请求完成拿到值,执行response函数
          	d.addCallback(response)
          
          	yield d
          
          
          def done(*args, **kwargs):
          	# 终止事件循环
          	reactor.stop()
          
          
          # 执行task函数
          # d = task()
          
          # 3.1.1.同时监听多个d,利用多个socket,解决并发的问题,异步IO的问题,全部发出去了,等请求回来
          li = []
          for i in range(10):
          	d = task()
          	li.append(d)
          
          dd = defer.DeferredList(li)
          
          # 监听d是否完成,需要用列表[d,]加入
          # dd = defer.DeferredList([d,])
          
          # 监听d是否完成,如果完成就会调用addBoth的回调函数
          # 2.1:利用回调函数done终止事件循环
          dd.addBoth(done)
          
          # 1.3.开始事件循环
          reactor.run()
        版本3.1-另一种方法解决并发问题-以及多爬虫同时爬取的并发问题
        • 有bug是因为_close只有一个,后面会进行封装,不用多关注
        • # 事件循环,监听socket变化(终止条件,所有socket都已经移除)
          from twisted.internet import reactor
          # 创建socket对象,如果下载完成,自动从事件循环中移除
          from twisted.web.client import getPage
          # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除
          from twisted.internet import defer
          
          # 版本1.0
          # 1.1.利用getPage创建socket
          # 1.2.将socket添加到事件循环
          # 1.3.开始事件循环(内部发送请求,并接受响应,当所有socket请求完成,终止事件循环)
          
          # 版本2.0
          # 2.1.解决不能自动终止的问题
          
          # 版本3.1
          # 3.1.解决并发,异步IO的问题--利用task中加入特殊socket对象
          # 3.2 加入多个爬虫同时运行的功能--类似scrapy crawl all
          
          
          _close = None
          count = 0
          
          
          # 1.1. 利用getPage创建socket
          def response(content):
          	print(content)
          	global count
          	count += 1
          	if count == 3:
          		# 使特殊socket对象终止
          		_close.callback(None)
          
          
          # 1.2.将socket添加到事件循环
          # 这个装饰器和yield d表示将socket已经添加到事件循环
          @defer.inlineCallbacks
          def task():
          	# 3.1:创建多个socket,因为defer.Deferred()特殊对象,不会自动停止
          	# 设定_close 全局变量,以便能请求全部返还能够手动终止
          	# 利用全局变量count的计数,去控制特殊对象的终止,只有全部终止才会结束
          	global _close
          
          	# 这个相当于scrapy中的start_url
          	url = "http://www.baidu.com"
          	d1 = getPage(url.encode('utf-8'))
          	d1.addCallback(response)
          
          	url = "http://www.cnblogs.com"
          	d2 = getPage(url.encode('utf-8'))
          	d2.addCallback(response)
          
          	url = "http://www.bing.com"
          	d3 = getPage(url.encode('utf-8'))
          	d3.addCallback(response)
          
          	_close = defer.Deferred()
          
          	yield _close
          
          
          def done(*args, **kwargs):
          	# 终止事件循环
          	reactor.stop()
          
          
          # 3.2:同时创建多个task即可实现,scrapy爬虫同时执行,2个爬虫有各自的start_url是并发的
          # 执行task函数
          spider1 = task()
          spider2 = task()
          
          # 监听d是否完成,需要用列表[d,]加入
          dd = defer.DeferredList([spider1,spider2])
          
          # 监听d是否完成,如果完成就会调用addBoth的回调函数
          # 2.1:利用回调函数done终止事件循环
          dd.addBoth(done)
          
          # 1.3.开始事件循环
          reactor.run()
                        
  • scrapy经验 + Twisted功能
    • Low
      • # 事件循环,监听socket变化(终止条件,所有socket都已经移除)
        from twisted.internet import reactor
        # 创建socket对象,如果下载完成,自动从事件循环中移除
        from twisted.web.client import getPage
        # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除
        from twisted.internet import defer
        import queue
        
        Q = queue.Queue()
        
        
        class Request(object):
        	# 这里的callback = parse
        	def __init__(self, url, callback):
        		self.url = url
        		self.callback = callback
        
        
        class HttpResponse(object):
        
        	def __init__(self, content, request):
        		self.content = content
        		self.request = request
        		self.url = request.url
        		self.text = str(content, encoding='utf-8')
        
        
        class ChoutiSpider(object):
        	name = 'chouti'
        
        	def start_requests(self):
        		start_url = ['http://www.baidu.com', 'http://www.bing.com', ]
        		for url in start_url:
        			# 执行Request函数
        			yield Request(url, self.parse)
        
        	def parse(self, response):
        		# 1.crawling移除
        		# 2.获取parse yield返回值
        		# 3.再次去队列中获取
        		print(response.text)  # 执行HttpResponse()中的方法
        		yield Request('http://www.cnblogs.com', callback=self.parse)
        
        
        class Engine(object):
        	def __init__(self):
        		self._close = None
        		self.spider = None
        		self.max = 5  # 最大并发数
        		self.crawling = []  # 表示正在爬取的爬虫
        
        	def get_response_callback(self, content, request):
        		# getPage的返回值response,传入的req ---url 和 callback
        		self.crawling.remove(request)  # 删除已经下载完成的url callback
        		rep = HttpResponse(content, request)  # 将返回值传递过去
        		# 生成器或空
        		result = request.callback(rep)  # 调用spider中的parse方法 = parse(rep)
        		import types
        		# 判断返回值是不是生成器
        		if isinstance(result, types.GeneratorType):
        			for req in result:
        				Q.put(req)  # 将新请求加入队列
        
        	def _next_request(self):
        		# 判断终止条件
        		if Q.qsize() == 0 and len(self.crawling) == 0:
        			self._close.callback(None)  # 手动停止
        			return
        
        		# 发送过程中,会有最大并发数的限制,循环取url并下载
        		if len(self.crawling) >= self.max:  # 超过最大并发数,直接返回
        			return
        		while len(self.crawling) < self.max:  # 低于最大并发数
        			try:
        				req = Q.get(block=False)  # 取数据,如果为空会报错,加入block不会等队列中的数据
        				self.crawling.append(req)  # 将取到的url加入记录正在爬取数量的列表crawling
        				d = getPage(req.url.encode('utf-8'))  # getPage创建socket对象,发送请求进行下载
        
        				# 5.等页面下载完成执行用户自己定义的回调函数,处理response
        				d.addCallback(self.get_response_callback, req)  # d为请求的结果
        				# 未达到最大并发数,可以再去调度器中获取Request
        				# d.addCallback(self._next_request)  # 上一个方法执行玩,进行递归调用,继续取url
        				d.addCallback(lambda _: reactor.callLater(0, self._next_request))  # 多久后调用
        			except Exception as e:  # 如果队列为空,直接返回,不再循环取
        				return
        
        	# 这个装饰器和yield self._close表示将socket已经添加到事件循环
        	@defer.inlineCallbacks
        	def crawl(self, spider):
        		# 3.将初始Request对象添加到调度器---将初始urL加入队列
        		start_requests = iter(spider.start_requests())  # 迭代器---执行spider中start_request函数
        		while True:
        			try:
        				request = next(start_requests)  # 取迭代器中的下一个值 url和callback
        				Q.put(request)  # 将取到的值放入队列
        			except StopIteration as e:  # 如果队列取完,就跳出循环
        				break
        
        		# 4.反复去调度器中取request并发送请求进行下载,下载完成后执行回调函数
        		# self._next_request()
        		reactor.callLater(0, self._next_request)  # scrapy内部的写法
        
        		self._close = defer.Deferred()  # 特殊socket不会自动结束,只能手动结束
        		yield self._close
        
        
        # 爬虫对象
        spider = ChoutiSpider()
        
        _active = set()
        # 1.创建引擎
        engine = Engine()
        # 2.将爬虫放入引擎进行处理,执行引擎中crawl函数
        d = engine.crawl(spider)
        
        _active.add(d)
        # 监听爬虫d是否完成,如果完成执行addBoth终止socket
        dd = defer.DeferredList(_active)
        # 终止socket
        dd.addBoth(lambda _: reactor.stop())
        
        reactor.run()
          
    • High
      •   
      • engine.py---scrapy主要实现逻辑
        • # 事件循环,监听socket变化(终止条件,所有socket都已经移除)
          from twisted.internet import reactor
          # 创建socket对象,如果下载完成,自动从事件循环中移除
          from twisted.web.client import getPage
          # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除
          from twisted.internet import defer
          from queue import Queue
          
          '''
          scrapy的分工:
          ExecutionEngine---引擎:作用是帮你做调度,体现是去调度器里拿,或放入调度器
          Crawler---创建引擎和spider对象
          '''
          
          
          class Request(object):
          	'''用于封装用户请求相关信息'''
          
          	def __init__(self, url, callback):
          		# rep的参数
          		self.url = url
          		self.callback = callback
          
          
          class HttpResponse(object):
          	"""封装返回的content和request"""
          	def __init__(self, content, request):
          		self.content = content
          		self.request = request
          		self.url = request.url
          		self.text = str(content, encoding='utf-8')
          
          
          class Scheduler(object):
          	"""任务调度器"""
          
          	def __init__(self):
          		self.q = Queue()  # 创建Q对象
          
          	def open(self):
          		pass
          
          	def next_request(self):
          		# 不断重复取队列,如果队列为空,返回
          		try:
          			req = self.q.get(block=False)
          		except Exception as e:
          			req = None
          		return req
          
          	def enqueue_request(self, req):
          		# url+callback放入队列
          		self.q.put(req)
          
          	def size(self):
          		# 队列大小
          		return self.q.qsize()
          
          
          class ExecutionEngine(object):
          	"""引擎:所有调度"""
          
          	def __init__(self):
          		self._close = None
          		self.scheduler = None
          		self.max = 5
          		self.crawlling = []
          
          	def get_response_callback(self, content, request):
          		# 24.删除已经下载完成的url callback
          		self.crawlling.remove(request)
          		# 25.将content和requset封装成response
          		response = HttpResponse(content, request)
          		# 26.执行解析函数,解析的结果url加入Q,content返回
          		result = request.callback(response)
          		import types
          		if isinstance(result, types.GeneratorType):  # 如果是url
          			for req in result:
          				self.scheduler.enqueue_request(req)  # f放入Q
          
          	def _next_request(self):
          		# 21.被最大并发数限制不停的取数据,并不断往外发
          		if self.scheduler.size() == 0 and len(self.crawlling) == 0:
          			self._close.callback(None)
          			return
          
          		while len(self.crawlling) < self.max:
          			req = self.scheduler.next_request()
          			if not req:
          				return
          			self.crawlling.append(req)  # 放入正在爬取列表
          			# 22. 发请求,此时的d就是下载下来的response
          			d = getPage(req.url.encode('utf-8'))
          			# 23.此时req包括url和callback,执行get_response_callback
          			d.addCallback(self.get_response_callback, req)
          			d.addCallback(lambda _: reactor.callLater(0, self._next_request))  # 在调用一次自己
          
          	@defer.inlineCallbacks
          	def open_spider(self, start_requests):
          		# 实例化调度器
          		self.scheduler = Scheduler()
          		# 17.执行调度器中的open方法
          		yield self.scheduler.open()
          		# 18.将初始Request对象添加到调度器
          		while True:
          			try:
          				req = next(start_requests)
          			except StopIteration as e:
          				break
          			# 19.执行调度器enqueue_request
          			self.scheduler.enqueue_request(req)
          		# yield None
          		# 20.相当于self._next_request()
          		reactor.callLater(0, self._next_request)
          
          	@defer.inlineCallbacks
          	def start(self):
          		self._close = defer.Deferred()
          		yield self._close
          
          
          class Crawler(object):
          	"""用于封装调度器以及引擎的..."""
          
          	def _create_engine(self):
          		# 15.创建一个引擎对象
          		return ExecutionEngine()
          
          	def _create_spider(self, spider_cls_path):
          		"""
          
          		:param spider_cls_path:  spider.chouti.ChoutiSpider
          		:return:
          		"""
          		# 分割:模块路径,爬虫名字
          		module_path, cls_name = spider_cls_path.rsplit('.', maxsplit=1)
          		import importlib
          		# 16.创建一个spider对象
          		m = importlib.import_module(module_path)
          		cls = getattr(m, cls_name)  # 返回m对象的cls_name值,即为爬虫名
          		return cls()  # 返回爬虫对象
          
          	@defer.inlineCallbacks  # 这个装饰器作用是将socket放入事件循环
          	def crawl(self, spider_cls_path):
          		# 10.实例化私有方法_create_engine创建引擎 ---15
          		engine = self._create_engine()
          		# 11.创建spider对象--16
          		spider = self._create_spider(spider_cls_path)
          		# 12.根据返回的spider对象,执行爬虫的start_requests方法,得到初始url的列表---
          		start_requests = iter(spider.start_requests())  # 迭代器
          		# 13.调用引擎的open_spider方法将初始Request对象添加到调度器---17
          		yield engine.open_spider(start_requests)
          		# 14.执行defer.Deferred等待,_close的结束信号
          		yield engine.start()
          
          
          class CrawlerProcess(object):
          	"""开启事件循环"""
          
          	# 初始化
          	def __init__(self):
          		self._active = set()  # 创建一个集合
          
          	def crawl(self, spider_cls_path):
          		"""
          		:param spider_cls_path:
          		:return:
          		"""
          		# 6.创建用于封装调度器和引擎的Crawler类的实例化对象crawler
          		crawler = Crawler()
          		# 7.执行Crawler类中的crawl方法,将爬虫名作为参数传入---10.返回值d
          		d = crawler.crawl(spider_cls_path)
          		# 8.将d加入正在活动的集合
          		self._active.add(d)
          
          	def start(self):
          		# 特殊socket,不会自动结束
          		# 9.等待_active中的d已经全部完成,如果完成就执行addBoth函数终止该特殊socket
          		dd = defer.DeferredList(self._active)
          		dd.addBoth(lambda _: reactor.stop())
          		reactor.run()
          
          
          class Commond(object):
          	"""自定制的命令"""
          
          	def run(self):
          		# 3.创建事件循环CrawlerProcess实例化对象crawl_process
          		crawl_process = CrawlerProcess()
          		# 将需要执行的爬虫名加入列表spider_cls_path_list
          		spider_cls_path_list = ['spider.chouti.ChoutiSpider', 'spider.cnblogs.CnblogsSpider', ]
          		for spider_cls_path in spider_cls_path_list:
          			# 4.通过实例化对象crawl_process调用CrawlerProcess类中的crawl方法,将该列表作为参数传入--6
          			crawl_process.crawl(spider_cls_path)
          		# 5.调用CrawlerProcess类中的start方法---主要通过defer.DeferredList为了限制事件循环的结束 --9
          		crawl_process.start()
          
          
          if __name__ == '__main__':
          	# 1.创建自定制命令Commond类的实例化对象cmd
          	cmd = Commond()
          	# 2.调用Commond类中的run方法---将需要执行的爬虫交给事件循环类CrawlerProcess--3
          	cmd.run()
      • 爬虫1:chouti.py
        • # 需要导入Request
          from TinySpider.engine import Request
          
          class ChoutiSpider(object):
          	name = 'chouti'
          
          	def start_requests(self):
          		start_url = ['http://www.baidu.com', 'http://www.bing.com', ]
          		for url in start_url:
          			# 执行Request函数
          			yield Request(url, self.parse)
          
          	def parse(self, response):
          		# 1.crawling移除
          		# 2.获取parse yield返回值
          		# 3.再次去队列中获取
          		print(response)  # 执行HttpResponse()中的方法
          		yield Request('http://www.cnblogs.com', callback=self.parse)
      • 爬虫2:cnblogs.py
        • # 需要导入Request
          from TinySpider.engine import Request
          
          class CnblogsSpider(object):
          	name = 'cnblogs'
          
          	def start_requests(self):
          		start_url = ['http://www.cnblogs.com',]
          		for url in start_url:
          			# 执行Request函数
          			yield Request(url, self.parse)
          
          	def parse(self, response):
          		# 1.crawling移除
          		# 2.获取parse yield返回值
          		# 3.再次去队列中获取
          		# print(response.text)  # 执行HttpResponse()中的方法
          		print(response)  # 执行HttpResponse()中的方法
          		yield Request('http://www.cnblogs.com', callback=self.parse)  
                    
  • Scrapy源码剖析
    • 大致框架跟High一样              
原文地址:https://www.cnblogs.com/shuimohei/p/13363462.html