Scrapy-redis之RFPDupeFilter、Queue、Scheduler

scrapy-redis去重应用

 1 # -*- coding: utf-8 -*-
 2 import scrapy
 3 from scrapy.http import Request
 4 
 5 
 6 class ChoutiSpider(scrapy.Spider):
 7     name = 'chouti'
 8     allowed_domains = ['chouti.com']
 9     start_urls = ['http://www.chouti.com/']
10 
11     def start_requests(self):
12         url = "http://dig.chouti.com/"
13         yield Request(url=url, callback=self.parse)
14 
15     def parse(self, response):
16         print('response', response)

自定义中间件,过滤重复URL的爬虫,并且保存redis中

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 
 4 import time
 5 from scrapy.dupefilters import BaseDupeFilter
 6 from scrapy.utils.request import request_fingerprint
 7 import redis
 8 from scrapy_redis.dupefilter import RFPDupeFilter
 9 from scrapy_redis.connection import get_redis_from_settings
10 from scrapy_redis import defaults
11 
12 
13 class DupeFilter(BaseDupeFilter):
14     def __init__(self):
15         self.conn = redis.Redis(host='192.168.1.13', port=3306, password='woshinidaye')
16 
17     def request_seen(self, request):
18         fd = request_fingerprint(request)
19         result = self.conn.sadd('visited_urls', fd)
20         if result == 1:
21             return False
22         return True
23 
24 
25 class RedisDupeFilter(RFPDupeFilter):
26     """
27     改下源码当中存入redis的key值,它源码里边是默认是存的时间戳作为key
28     """
29 
30     @classmethod
31     def from_settings(cls, settings):
32         """Returns an instance from given settings.
33 
34         This uses by default the key ``dupefilter:<timestamp>``. When using the
35         ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as
36         it needs to pass the spider name in the key.
37 
38         Parameters
39         ----------
40         settings : scrapy.settings.Settings
41 
42         Returns
43         -------
44         RFPDupeFilter
45             A RFPDupeFilter instance.
46 
47 
48         """
49         server = get_redis_from_settings(settings)
50         # XXX: This creates one-time key. needed to support to use this
51         # class as standalone dupefilter with scrapy's default scheduler
52         # if scrapy passes spider on open() method this wouldn't be needed
53         # TODO: Use SCRAPY_JOB env as default and fallback to timestamp.
54         # key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}  时间戳经常变不好取以后我直接定死
55         key = defaults.DUPEFILTER_KEY % {'timestamp': 'woshinidie'}
56         debug = settings.getbool('DUPEFILTER_DEBUG')
57         return cls(server, key=key, debug=debug)

配置文件

 1 # redis去重配置
 2 REDIS_HOST = '192.168.1.13'                           # 主机名
 3 REDIS_PORT = 3306                                     # 端口
 4 REDIS_PARAMS = {'password': 'woshinidaye'}            # Redis连接参数  默认:REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING,})
 5 # REDIS_PARAMS['redis_cls'] = 'myproject.RedisClient' # 指定连接Redis的Python模块  默认:redis.StrictRedis
 6 REDIS_ENCODING = "utf-8"                              # redis编码类型  默认:'utf-8'
 7 
 8 # REDIS_URL = 'redis://user:pass@hostname:9001'       # 连接URL(优先于以上配置)源码可以看到
 9 DUPEFILTER_KEY = 'dupefilter:%(timestamp)s'
10 # 纯源生的它内部默认是用的以时间戳作为key
11 # DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'
12 # 我自定义在源码之上改了保存在redis中的key配置
13 DUPEFILTER_CLASS = 'redisdepth.xxx.RedisDupeFilter'
14 # 自定义redis去重配置
15 # DUPEFILTER_CLASS = 'redisdepth.xxx.DupeFilter'

Scrapy-redis的队列

  包括:先进先出队列,后进先出队列,优先队列

1.先进先出队列

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 
 4 import redis
 5 
 6 
 7 class FifoQueue(object):
 8     def __init__(self):
 9         self.server = redis.Redis(host='192.168.1.13', port=3306, password='woshinidaye')
10 
11     def push(self, request):
12         """Push a request"""
13         self.server.lpush('User', request)
14 
15     def pop(self):
16         """Pop a request"""
17         data = self.server.rpop('User')
18         return data
19 
20 
21 q = FifoQueue()
22 q.push(11)
23 q.push(22)
24 q.push(33)
25 print(q.pop())
26 # 先进先出队列 

 2.后进先出队列

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 
 4 import redis
 5 
 6 
 7 class LifoQueue(object):
 8 
 9     def __init__(self):
10         self.server = redis.Redis(host='192.168.1.13', port=3306, password='woshinidaye')
11 
12     def push(self, request):
13         """Push a request"""
14         self.server.lpush('User', request)
15 
16     def pop(self, timeout=0):
17         """Pop a request"""
18         data = self.server.lpop('User')
19         return data

3.优先队列

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 
 4 import redis
 5 
 6 
 7 class PriorityQueue(object):
 8     """Per-spider priority queue abstraction using redis' sorted set"""
 9 
10     def __init__(self):
11         self.server = redis.Redis(host='192.168.1.13', port=3306, password='woshinidaye')
12 
13     def push(self, request, score):
14         """Push a request"""
15         score = -request.priority
16         # We don't use zadd method as the order of arguments change depending on
17         # whether the class is Redis or StrictRedis, and the option of using
18         # kwargs only accepts strings, not bytes.
19         self.server.execute_command('ZADD', 'xxxx', score, request)
20 
21     def pop(self, timeout=0):
22         """
23         Pop a request
24         timeout not support in this queue class
25         """
26         # use atomic range/remove using multi/exec
27         pipe = self.server.pipeline()
28         pipe.multi()
29         pipe.zrange('xxxx', 0, 0).zremrangebyrank('xxxx', 0, 0)
30         results, count = pipe.execute()
31         if results:
32             return results[0]
33 
34 
35 q = PriorityQueue()
36 
37 q.push('ZH', -99)
38 q.push('SB', -66)
39 q.push('JJ', -33)
40 # 如果优先级从小到大广度优先,从大到小就深度优先
41 print(q.pop())  # 默认取最小的
42 print(q.pop())
43 print(q.pop())

Scheduler源码分析(我在Notepad++写了直接贴过来的)

 1 1.找到from scrapy_redis.scheduler import Scheduler 
 2     -执行Scheduler.from_crawler
 3     -执行Scheduler.from_settings
 4         - 读取配置文件:
 5             SCHEDULER_PERSIST                # 是否在关闭时候保留原来的调度器和去重记录,True=保留,False=清空
 6             SCHEDULER_FLUSH_ON_START        # 是否在开始之前清空 调度器和去重记录,True=清空,False=不清空         
 7             SCHEDULER_IDLE_BEFORE_CLOSE        # 去调度器中获取数据时,如果为空,最多等待时间(最后没数据,未获取到)
 8         - 读取配置文件:
 9             SCHEDULER_QUEUE_KEY                # 调度器中请求存放在redis中的key
10             SCHEDULER_QUEUE_CLASS            # 这里可以选择三种先进先出、后进先出、优先级,默认使用优先级队列(默认),其他:PriorityQueue(有序集合),FifoQueue(列表)、LifoQueue(列表)
11             SCHEDULER_DUPEFILTER_KEY        # 去重规则,在redis中保存时对应的key
12             DUPEFILTER_CLASS                # 这里有两种选择使用默认或者自己定义的
13                 # 内置比如:DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'
14                 # 自定义的比如:DUPEFILTER_CLASS = 'redisdepth.xxx.DupeFilter'    这个优先级别高 在源码里边是先判断然后再后续操作    
15             SCHEDULER_SERIALIZER            # 对保存到redis中的数据进行序列化,默认使用pickle
16         - 读取配置文件:redis-server
17             # 源码在connection.py中17行
18             REDIS_HOST = '192.168.1.13'                           # 主机名
19             REDIS_PORT = 3306                                     # 端口
20             REDIS_PARAMS = {'password': 'woshinidaye'}            # Redis连接参数  默认:REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING,})
21             # REDIS_PARAMS['redis_cls'] = 'myproject.RedisClient' # 指定连接Redis的Python模块  默认:redis.StrictRedis
22             REDIS_ENCODING = "utf-8"                              # redis编码类型  默认:'utf-8'
23             # REDIS_URL = 'redis://user:pass@hostname:9001'       # 连接URL(优先于以上配置)源码可以看到
24 2.爬虫开始执行起始URL
25         - 调用Scheduler.enqueue_request
26         def enqueue_request(self, request):
27             # 请求需要过滤?并且 去重规则是否已经有?(是否已经访问,如果未访问添加到去重记录)request_seen去重规则重要的一个方法
28             if not request.dont_filter and self.df.request_seen(request):
29                 self.df.log(request, self.spider)
30                 # 已经访问过不再进行访问
31                 return False
32             if self.stats:
33                 self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
34             # 未访问过,添加到调度器中把这个请求
35             self.queue.push(request)
36             return True
37 3.下载器去调度中获取任务,去执行任务下载
38         - 调用Scheduler.next_request
39         def next_request(self):
40             block_pop_timeout = self.idle_before_close
41             # 把任务取出来
42             request = self.queue.pop(block_pop_timeout)
43             if request and self.stats:
44             # 此时下载
45                 self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
46             return request
47 
48                 

settings需要的配置

 1 # redis去重配置
 2 REDIS_HOST = '192.168.1.13'                           # 主机名
 3 REDIS_PORT = 3306                                     # 端口
 4 REDIS_PARAMS = {'password': 'woshinidaye'}            # Redis连接参数  默认:REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING,})
 5 # REDIS_PARAMS['redis_cls'] = 'myproject.RedisClient' # 指定连接Redis的Python模块  默认:redis.StrictRedis
 6 REDIS_ENCODING = "utf-8"                              # redis编码类型  默认:'utf-8'
 7 
 8 # REDIS_URL = 'redis://user:pass@hostname:9001'       # 连接URL(优先于以上配置)源码可以看到
 9 DUPEFILTER_KEY = 'dupefilter:%(timestamp)s'
10 # 纯源生的它内部默认是用的以时间戳作为key
11 # DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'
12 # 我自定义在源码之上改了保存在redis中的key配置
13 DUPEFILTER_CLASS = 'redisdepth.xxx.RedisDupeFilter'
14 # 自定义redis去重配置
15 # DUPEFILTER_CLASS = 'redisdepth.xxx.DupeFilter'
16 
17 
18 # #############调度器配置###########################
19 # from scrapy_redis.scheduler import Scheduler
20 
21 SCHEDULER = "scrapy_redis.scheduler.Scheduler"
22 DEPTH_PRIORITY = 1  # 广度优先
23 # DEPTH_PRIORITY = -1 # 深度优先
24 SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'  # 默认使用优先级队列(默认),其他:PriorityQueue(有序集合),FifoQueue(列表)、LifoQueue(列表)
25 # 广度优先
26 # SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.FifoQueue'
27 # 深度优先
28 # SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.LifoQueue'
29 
30 SCHEDULER_QUEUE_KEY = '%(spider)s:requests'         # 调度器中请求存放在redis中的key
31 SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat"  # 对保存到redis中的数据进行序列化,默认使用pickle
32 SCHEDULER_PERSIST = True                            # 是否在关闭时候保留原来的调度器和去重记录,True=保留,False=清空
33 SCHEDULER_FLUSH_ON_START = True                     # 是否在开始之前清空 调度器和去重记录,True=清空,False=不清空
34 SCHEDULER_IDLE_BEFORE_CLOSE = 10                    # 去调度器中获取数据时,如果为空,最多等待时间(最后没数据,未获取到)。
35 SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'  # 去重规则,在redis中保存时对应的key
36 SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'    # 去重规则对应处理的类

总结:

深度优先:基于层级先进入到最深层级进行处理全部后再往上层级处理
广度优先:基于从第一层开始,每个层次处理之后进入下一层级处理

先进先出,广度优先 FifoQueue
后进先出,深度优先 LifoQueue
优先级队列:
DEPTH_PRIORITY = 1 # 广度优先
DEPTH_PRIORITY = -1 # 深度优先

调度器 队列 DupeFilter三者关系
  调度器:获取哪个request
  队列: 存放request
  DupeFilter:对访问记录处理

 补充点点

定义持久化,爬虫yield Item对象时执行RedisPipeline,默认是pickle
     
    a. 将item持久化到redis时,指定key和序列化函数
     
        REDIS_ITEMS_KEY = '%(spider)s:items'
        REDIS_ITEMS_SERIALIZER = 'json.dumps'
     
    b. 使用列表保存item数据  

 配置文件大解读

# -*- coding: utf-8 -*-

# Scrapy settings for redisdepth project
#
# For simplicity, this file contains only settings considered important or
# commonly used. You can find more settings consulting the documentation:
#
#     https://docs.scrapy.org/en/latest/topics/settings.html
#     https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
#     https://docs.scrapy.org/en/latest/topics/spider-middleware.html

# 爬虫名称
BOT_NAME = 'redisdepth'

# 爬虫应用路径
SPIDER_MODULES = ['redisdepth.spiders']
NEWSPIDER_MODULE = 'redisdepth.spiders'


# Crawl responsibly by identifying yourself (and your website) on the user-agent
# 客服端user-agent请求头
#USER_AGENT = 'redisdepth (+http://www.yourdomain.com)'
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36'

# 爬虫君子证书,禁止爬虫设置
# Obey robots.txt rules
# ROBOTSTXT_OBEY = True
ROBOTSTXT_OBEY = False

# Configure maximum concurrent requests performed by Scrapy (default: 16)
# 并发请求数 力度要粗点
#CONCURRENT_REQUESTS = 32

# Configure a delay for requests for the same website (default: 0)
# See https://docs.scrapy.org/en/latest/topics/settings.html#download-delay
# See also autothrottle settings and docs
# 延迟下载秒数
#DOWNLOAD_DELAY = 3
# The download delay setting will honor only one of:
# 单域名访问并发数 并且延迟下次秒数也用在每个域名
#CONCURRENT_REQUESTS_PER_DOMAIN = 16
# 单IP访问并发数,如果有值则忽略:CONCURRENT_REQUESTS_PER_DOMAIN,并且延迟下次秒数也应用在每个IP
#CONCURRENT_REQUESTS_PER_IP = 16

# Disable cookies (enabled by default)
# 是否支持cookie,cookiejar进行操作cookie
#COOKIES_ENABLED = False

# Disable Telnet Console (enabled by default)
# Telnet用于查看当前爬虫的信息,操作爬虫等...
#    使用telnet ip port ,然后通过命令操作
# TELNETCONSOLE_ENABLED = True
# TELNETCONSOLE_HOST = '127.0.0.1'
# TELNETCONSOLE_PORT = [6023,]
#TELNETCONSOLE_ENABLED = False

# 默认请求头
# Override the default request headers:
#DEFAULT_REQUEST_HEADERS = {
#   'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
#   'Accept-Language': 'en',
#}

# 爬虫中间件
# Enable or disable spider middlewares
# See https://docs.scrapy.org/en/latest/topics/spider-middleware.html
# SPIDER_MIDDLEWARES = {
#    # 'redisdepth.middlewares.RedisdepthSpiderMiddleware': 543,
#     'redisdepth.sd.Sd1': 666,
#     'redisdepth.sd.Sd2': 667,
#
# }

# 下载中间件
# Enable or disable downloader middlewares
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
# DOWNLOADER_MIDDLEWARES = {
#    # 'redisdepth.middlewares.RedisdepthDownloaderMiddleware': 543,
#    #  'redisdepth.md.Md1': 666,
#    #  'redisdepth.md.Md2': 667
# }

# 自定义扩展,基于信号进行调用
# Enable or disable extensions
# See https://docs.scrapy.org/en/latest/topics/extensions.html
EXTENSIONS = {
   # 'scrapy.extensions.telnet.TelnetConsole': None,
    'redisdepth.ext.MyExtension': 666,
}

# 定义pipeline处理请求
# Configure item pipelines
# See https://docs.scrapy.org/en/latest/topics/item-pipeline.html
#ITEM_PIPELINES = {
#    'redisdepth.pipelines.RedisdepthPipeline': 300,
#}

"""
 自动限速算法
    from scrapy.contrib.throttle import AutoThrottle
    自动限速设置
    1. 获取最小延迟 DOWNLOAD_DELAY
    2. 获取最大延迟 AUTOTHROTTLE_MAX_DELAY
    3. 设置初始下载延迟 AUTOTHROTTLE_START_DELAY
    4. 当请求下载完成后,获取其"连接"时间 latency,即:请求连接到接受到响应头之间的时间
    5. 用于计算的... AUTOTHROTTLE_TARGET_CONCURRENCY
    target_delay = latency / self.target_concurrency
    new_delay = (slot.delay + target_delay) / 2.0 # 表示上一次的延迟时间
    new_delay = max(target_delay, new_delay)
    new_delay = min(max(self.mindelay, new_delay), self.maxdelay)
    slot.delay = new_delay
"""
# Enable and configure the AutoThrottle extension (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/autothrottle.html
# 开始自动限速
#AUTOTHROTTLE_ENABLED = True
# The initial download delay
# 初始下载延迟
#AUTOTHROTTLE_START_DELAY = 5
# The maximum download delay to be set in case of high latencies
# 最大下载延迟
#AUTOTHROTTLE_MAX_DELAY = 60
# The average number of requests Scrapy should be sending in parallel to
# each remote server
# 平均每秒并发数
#AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
# Enable showing throttling stats for every response received:
# 是否显示
#AUTOTHROTTLE_DEBUG = False

# Enable and configure HTTP caching (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings
"""
启用缓存
    目的用于将已经发送的请求或相应缓存下来,以便以后使用
    
    from scrapy.downloadermiddlewares.httpcache import HttpCacheMiddleware
    from scrapy.extensions.httpcache import DummyPolicy
    from scrapy.extensions.httpcache import FilesystemCacheStorage
"""
# 是否启用缓存策略
#HTTPCACHE_ENABLED = True
# 缓存策略:所有请求均缓存,下次在请求直接访问原来的缓存即可
# HTTPCACHE_POLICY = "scrapy.extensions.httpcache.DummyPolicy"
# 缓存策略:根据Http响应头:Cache-Control、Last-Modified 等进行缓存的策略
# HTTPCACHE_POLICY = "scrapy.extensions.httpcache.RFC2616Policy"
# 缓存超时时间
#HTTPCACHE_EXPIRATION_SECS = 0
# 缓存保存路径
#HTTPCACHE_DIR = 'httpcache'
# 缓存忽略的http状态码
#HTTPCACHE_IGNORE_HTTP_CODES = []
# 缓存存储的插件
#HTTPCACHE_STORAGE = 'scrapy.extensions.httpcache.FilesystemCacheStorage'
原文地址:https://www.cnblogs.com/Alexephor/p/11446167.html