-
初始数据
# Redis数据库地址 REDIS_HOST = '127.0.0.1' # Redis端口 REDIS_PORT = 6379 # Redis密码,如无填None REDIS_PASSWORD = None REDIS_KEY = 'proxies' # 代理分数,最高为100分,最低我0分。初始分数为10分 MAX_SCORE = 100 MIN_SCORE = 0 INITIAL_SCORE = 10 # 有效的状态码 VALID_STATUS_CODES = [200, 302] # 代理池数量界限 POOL_UPPER_THRESHOLD = 50000 # 检查周期 TESTER_CYCLE = 20 # 获取周期 GETTER_CYCLE = 300 # 测试API,建议抓哪个网站测哪个,这里用百度网来测试API TEST_URL = 'http://www.baidu.com' # API配置,API地址、端口 API_HOST = '0.0.0.0' API_PORT = 5555 # 开关 TESTER_ENABLED = True GETTER_ENABLED = True API_ENABLED = True # 最大批测试量 BATCH_TEST_SIZE = 10
-
异常问题
class PoolEmptyError(Exception): def __init__(self): # Exception类,是所有非退出异常的公共基类。 Exception.__init__(self) def __str__(self): # repr函数,返回对象的规范字符串表示形式。 return repr('代理池已经枯竭')
-
存储模块
import redis from proxypool.error import PoolEmptyError from proxypool.setting import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_KEY from proxypool.setting import MAX_SCORE, MIN_SCORE, INITIAL_SCORE from random import choice import re class RedisClient(object): def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD): """ 初始化 :param host: Redis 地址 :param port: Redis 端口 :param password: Redis密码 """ # 创建链接数据库对象 self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True) def add(self, proxy, score=INITIAL_SCORE): """ 添加代理,设置分数为最高 :param proxy: 代理 :param score: 分数 :return: 添加结果 """ # zscore() 方法,返回 "REDIS_KEY" 中元素为 "proxy" 的分数。 # zadd() 方法,将任意数量的元素名称、分数对设置为键 "REDIS_KEY"。 if not re.match('d+.d+.d+.d+:d+', proxy): print('代理不符合规范', proxy, '丢弃') return if not self.db.zscore(REDIS_KEY, proxy): return self.db.zadd(REDIS_KEY, score, proxy) def random(self): """ 从数据库中随机获取有效代理,首先尝试获取最高分数代理,如果不存在,按照排名获取,否则异常 :return: 随机代理 """ # zrangebyscore() 方法,返回排序集 "REDIS_KEY" 中带分数的值,其中分数范围为 MAX_SCORE ~ MAX_SCORE ,即返回 100 分的代理 # 代理的有效程度以分数来区分,分数高,说明代理越稳定。 # 首先查找 100 分的代理,如果没有,则在 0 ~ 100 之间查找最高分的代理, # 如果还没有,则调用 proxypool.error.PoolEmptyError 抛出异常 result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE) if len(result): return choice(result) else: result = self.db.zrevrange(REDIS_KEY, 0, 100) if len(result): return choice(result) else: raise PoolEmptyError def decrease(self, proxy): """ 代理值减一分,小于最小值则删除 :param proxy: 代理 :return: 修改后的代理分数 """ # zscore() 方法,返回 "REDIS_KEY" 中元素为 "proxy" 的分数。 # zincrby() 方法,按 “-1” 增加排序集 "REDIS_KEY" 中 "proxy" 的分数 # zrem() 方法,从排序集 "REDIS_KEY" 中删除成员 "proxy" score = self.db.zscore(REDIS_KEY, proxy) if score and score > MIN_SCORE: print('代理', proxy, '当前分数', score, '减1') return self.db.zincrby(REDIS_KEY, proxy, -1) else: print('代理', proxy, '当前分数', score, '移除') return self.db.zrem(REDIS_KEY, proxy) def exists(self, proxy): """ 判断是否存在 :param proxy: 代理 :return: 是否存在 """ # zscore() 方法,返回 "REDIS_KEY" 中元素为 "proxy" 的分数。 return not self.db.zscore(REDIS_KEY, proxy) == None def max(self, proxy): """ 将代理设置为MAX_SCORE :param proxy: 代理 :return: 设置结果 """ print('代理', proxy, '可用,设置为', MAX_SCORE) # zadd() 方法,将任意数量的元素名称、分数对设置为键 "REDIS_KEY"。 return self.db.zadd(REDIS_KEY, MAX_SCORE, proxy) def count(self): """ 获取数量 :return: 数量 """ # zcard() 方法,返回排序集 "REDIS_KEY" 中的元素个数 return self.db.zcard(REDIS_KEY) def all(self): """ 获取全部代理 :return: 全部代理列表 """ # zrangebyscore()方法,获取 MIN_SCORE ~ MAX_SCORE 的代理,即所有代理 return self.db.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE) def batch(self, start, stop): """ 批量获取 :param start: 开始索引 :param stop: 结束索引 :return: 代理列表 """ # zrangebyscore()方法,获取 start ~ (stop - 1) 的代理 return self.db.zrevrange(REDIS_KEY, start, stop - 1) if __name__ == '__main__': conn = RedisClient() result = conn.batch(680, 688) print(result)
-
爬虫抓取代理
import requests from requests.exceptions import ConnectionError base_headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.71 Safari/537.36', 'Accept-Encoding': 'gzip, deflate, sdch', 'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7' } def get_page(url, options={}): """ 抓取代理 :param url: :param options: :return: """ headers = dict(base_headers, **options) print('正在抓取', url) try: response = requests.get(url, headers=headers) print('抓取成功', url, response.status_code) if response.status_code == 200: return response.text except ConnectionError: print('抓取失败', url) return None
-
获取模块
----获取代理的每个方法统一定义为以 crawl 开头
import json import re from .utils import get_page from pyquery import PyQuery as pq class ProxyMetaclass(type): """ 定义一个类 ProxyMetaclass,Crawl类将设置为元类,元类中实现了 __new__()方法,这个方法有固定几个参数, 第四个参数 attrs 包含了类的一些属性,通过遍历 attrs 这个参数即可获取类的所有方法信息,就同遍历字典一样, 键名对应方法的名,接着判断方法的开关是否 crawl,是则将其加入到 __CrawlFunc__属性中。 这样就将所有以 crawl 开头的方法定义成一个属性,动态获取到所有以 crawl 开头的方法列表。 """ def __new__(cls, name, bases, attrs): """ __new__() 方法与 __init__() 方法类似, __new__ 负责对象的创建 而 __init__ 负责对象的初始化。 """ count = 0 attrs['__CrawlFunc__'] = [] for k, v in attrs.items(): if 'crawl_' in k: attrs['__CrawlFunc__'].append(k) count = count + 1 attrs['__CrawlFuncCount__'] = count return type.__new__(cls, name, bases, attrs) class Crawler(object, metaclass=ProxyMetaclass): def get_proxies(self, callback): """ 将所有以 crawl 开头的方法都调用一遍 :param callback:要被回调的方法 :return: 每个方法返回的代理并组合成列表 """ proxies = [] for proxy in eval("self.{}()".format(callback)): print('成功获取到代理', proxy) proxies.append(proxy) return proxies def crawl_daili66(self, page_count=4): """ 获取代理 "66ip" :param page_count: 页码 :return: 代理 """ start_url = 'http://www.66ip.cn/{}.html' urls = [start_url.format(page) for page in range(1, page_count + 1)] for url in urls: print('Crawling', url) html = get_page(url) # 爬取网页源代码 if html: doc = pq(html) # 对网页源代码进行解析 # 选取 "class=containerbox" 的节点里的 table 节点,再选择其中的带有以 "0" 开头的 gt 属性值的每个 <tr> 元素。 trs = doc('.containerbox table tr:gt(0)').items() for tr in trs: # 选择属于其父元素的第 1 个子元素的每个 <td> 元素。 ip = tr.find('td:nth-child(1)').text() # 选择属于其父元素的第 2 个子元素的每个 <td> 元素。 port = tr.find('td:nth-child(2)').text() yield ':'.join([ip, port]) def crawl_ip3366(self): """ 获取代理 "ip3366" :return:代理的迭代器 """ for page in range(1, 4): start_url = 'http://www.ip3366.net/free/?stype=1&page={}'.format(page) html = get_page(start_url) # 获取网页源代码 ip_address = re.compile('<tr>s*<td>(.*?)</td>s*<td>(.*?)</td>') # 正则表达式对象 # s * 匹配空格,起到换行作用 re_ip_address = ip_address.findall(html) for address, port in re_ip_address: result = address+':' + port yield result.replace(' ', '') # 把 result 中的空格去掉 def crawl_kuaidaili(self): """ 获取代理 "快代理" :return:代理的迭代器 """ for i in range(1, 4): start_url = 'http://www.kuaidaili.com/free/inha/{}/'.format(i) html = get_page(start_url) # 获取网页源代码 if html: ip_address = re.compile('<td data-title="IP">(.*?)</td>') re_ip_address = ip_address.findall(html) # 代理地址 port = re.compile('<td data-title="PORT">(.*?)</td>') re_port = port.findall(html) # 代理端口 for address, port in zip(re_ip_address, re_port): # zip()将对象中对应的元素打包成一个个元组,然后返回由这些元组组成的列表 address_port = address+':'+port yield address_port.replace(' ', '') def crawl_xicidaili(self): """ 获取代理 "西刺代理" :return:代理的迭代器 """ for i in range(1, 3): start_url = 'http://www.xicidaili.com/nn/{}'.format(i) headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', 'Cookie': '_free_proxy_session=BAh7B0kiD3Nlc3Npb25faWQGOgZFVEkiJWRjYzc5MmM1MTBiMDMzYTUzNTZjNzA4NjBhNWRjZjliBjsAVEkiEF9jc3JmX3Rva2VuBjsARkkiMUp6S2tXT3g5a0FCT01ndzlmWWZqRVJNek1WanRuUDBCbTJUN21GMTBKd3M9BjsARg%3D%3D--2a69429cb2115c6a0cc9a86e0ebe2800c0d471b3', 'Host': 'www.xicidaili.com', 'Referer': 'http://www.xicidaili.com/nn/3', 'Upgrade-Insecure-Requests': '1', } html = get_page(start_url, options=headers) # 获取网页源代码 if html: find_trs = re.compile('<tr class.*?>(.*?)</tr>', re.S) trs = find_trs.findall(html) for tr in trs: find_ip = re.compile('<td>(d+.d+.d+.d+)</td>') re_ip_address = find_ip.findall(tr) # 代理地址 find_port = re.compile('<td>(d+)</td>') re_port = find_port.findall(tr) # 代理端口 for address,port in zip(re_ip_address, re_port): address_port = address+':'+port yield address_port.replace(' ', '') def crawl_iphai(self): """ 获取代理 "IP海" :return:代理的迭代器 """ start_url = 'http://www.iphai.com/' html = get_page(start_url) # 获取网页源代码 if html: find_tr = re.compile('<tr>(.*?)</tr>', re.S) trs = find_tr.findall(html) # 选择网页源代码中有代理信息的部分源码 for s in range(1, len(trs)): find_ip = re.compile('<td>s+(d+.d+.d+.d+)s+</td>', re.S) re_ip_address = find_ip.findall(trs[s]) # 代理地址 find_port = re.compile('<td>s+(d+)s+</td>', re.S) re_port = find_port.findall(trs[s]) # 代理端口 for address, port in zip(re_ip_address, re_port): # zip()将对象中对应的元素打包成一个个元组,然后返回由这些元组组成的列表 address_port = address+':'+port yield address_port.replace(' ', '') def crawl_data5u(self): """ 获取代理 "无忧代理" :return:代理的迭代器 """ start_url = 'http://www.data5u.com' headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7', 'Cache-Control': 'max-age=0', 'Connection': 'keep-alive', 'Cookie': 'JSESSIONID=47AA0C887112A2D83EE040405F837A86', 'Host': 'www.data5u.com', 'Referer': 'http://www.data5u.com/free/index.shtml', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.108 Safari/537.36', } html = get_page(start_url, options=headers) # 获取网页源代码 if html: ip_address = re.compile('<span><li>(d+.d+.d+.d+)</li>.*?<li class="port.*?>(d+)</li>', re.S) re_ip_address = ip_address.findall(html) # 返回 (代理地址, 代理端口) 组成的列表 for address, port in re_ip_address: result = address + ':' + port yield result.replace(' ', '')
-
代理存储
from proxypool.tester import Tester from proxypool.db import RedisClient from proxypool.crawler import Crawler from proxypool.setting import * import sys class Getter(): def __init__(self): self.redis = RedisClient() self.crawler = Crawler() def is_over_threshold(self): """ 判断是否达到了代理池限制 """ if self.redis.count() >= POOL_UPPER_THRESHOLD: return True else: return False def run(self): print('获取器开始执行') if not self.is_over_threshold(): # 遍历定义的 Crawler 类中的几个 craw 开头的网页代理爬虫,运行着几个爬虫 for callback_label in range(self.crawler.__CrawlFuncCount__): callback = self.crawler.__CrawlFunc__[callback_label] # 获取代理 proxies = self.crawler.get_proxies(callback) sys.stdout.flush() for proxy in proxies: # 将代理逐个加入到数据库中 self.redis.add(proxy)
-
检测模块
import asyncio import aiohttp import time import sys try: from aiohttp import ClientError except: from aiohttp import ClientProxyConnectionError as ProxyConnectionError from proxypool.db import RedisClient from proxypool.setting import * class Tester(object): def __init__(self): self.redis = RedisClient() async def test_single_proxy(self, proxy): """ 测试单个代理 :param proxy: :return: """ conn = aiohttp.TCPConnector(verify_ssl=False) async with aiohttp.ClientSession(connector=conn) as session: try: if isinstance(proxy, bytes): proxy = proxy.decode('utf-8') real_proxy = 'http://' + proxy print('正在测试', proxy) # 测试所选的代理,如果响应状态码在 [200, 302] 里面,说明代理可用,否则代理分数减 1 async with session.get(TEST_URL, proxy=real_proxy, timeout=15, allow_redirects=False) as response: if response.status in VALID_STATUS_CODES: self.redis.max(proxy) print('代理可用', proxy) else: self.redis.decrease(proxy) print('请求响应码不合法 ', response.status, 'IP', proxy) except (ClientError, aiohttp.client_exceptions.ClientConnectorError, asyncio.TimeoutError, AttributeError): self.redis.decrease(proxy) print('代理请求失败', proxy) def run(self): """ 测试主函数 :return: """ print('测试器开始运行') try: # 数据库中代理个数 count = self.redis.count() print('当前剩余', count, '个代理') # 遍历所有代理,间隔为 BATCH_TEST_SIZE = 10 for i in range(0, count, BATCH_TEST_SIZE): # 开始测试的代理,停止测试的代理 start = i stop = min(i + BATCH_TEST_SIZE, count) print('正在测试第', start + 1, '-', stop, '个代理') # 通过 redis.batch() 方法,批量获取代理(10个代理) test_proxies = self.redis.batch(start, stop) # asyncio.get_event_loop() 方法,返回异步事件循环。 loop = asyncio.get_event_loop() # 通过异步处理,测试10个代理, run_until_complete() 方法:运行事件循环,直到完成。 # asyncio.wait(tasks) , 等待 "tasks" 提供的未来和协同工作完成 tasks = [self.test_single_proxy(proxy) for proxy in test_proxies] loop.run_until_complete(asyncio.wait(tasks)) # sys.stdout.flush() , 1、 flush() 刷新缓存区。 2、缓冲区满时,自动刷新。 3、文件关闭或者是程序结束自动刷新。 sys.stdout.flush() time.sleep(5) except Exception as e: print('测试器发生错误', e.args)
-
接口模块
from flask import Flask, g from .db import RedisClient __all__ = ['app'] # __name__ 参数,让flask知道什么是属于自己的应用程序。 app = Flask(__name__) # 获取连接 def get_conn(): # hasattr() 返回对象是否具有具有给定名称的属性。 if not hasattr(g, 'redis'): g.redis = RedisClient() return g.redis # http://localhost:port # http://127.0.0.0:5555 @app.route('/') def index(): # 返回副标题 "Welcome to Proxy Pool System" return '<h2>Welcome to Proxy Pool System</h2>' # http://localhost:port/random # http://127.0.0.0:5555/random @app.route('/random') def get_proxy(): """ Get a proxy :return: 随机代理 """ conn = get_conn() return conn.random() # http://localhost:port/count # http://127.0.0.0:5555/count @app.route('/count') def get_counts(): """ Get the count of proxies :return: 代理池总量 """ conn = get_conn() return str(conn.count()) if __name__ == '__main__': app.run()
-
调度模块
import time # 多处理程序 from multiprocessing import Process from proxypool.api import app from proxypool.getter import Getter from proxypool.tester import Tester from proxypool.db import RedisClient from proxypool.setting import * class Scheduler(): def schedule_tester(self, cycle=TESTER_CYCLE): """ 定时测试代理 cycle = TESTER_CYCLE = 20:检查周期 """ tester = Tester() while True: print('测试器开始运行') tester.run() time.sleep(cycle) def schedule_getter(self, cycle=GETTER_CYCLE): """ 定时获取代理 cycle = GETTER_CYCLE = 300:获取周期 """ getter = Getter() while True: print('开始抓取代理') getter.run() time.sleep(cycle) def schedule_api(self): """ 开启API """ app.run(API_HOST, API_PORT) def run(self): print('代理池开始运行') ''' 开关 TESTER_ENABLED = True:测试开关 GETTER_ENABLED = True:获取开关 API_ENABLED = True:API开关 ''' if TESTER_ENABLED: tester_process = Process(target=self.schedule_tester) tester_process.start() if GETTER_ENABLED: getter_process = Process(target=self.schedule_getter) getter_process.start() if API_ENABLED: api_process = Process(target=self.schedule_api) api_process.start()
-
运行
from proxypool.scheduler import Scheduler import sys import io # 文本 IO 包装:打包流 # sys.stdout.buffer ,标准输出的缓冲区 # 将 sys.stdout.buffer(标准输出的缓冲区 )中的内容打包 sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') def main(): try: s = Scheduler() s.run() except: main() if __name__ == '__main__': main()
-
获取代理----有了上面的代码,可以通过 'http://127.0.0.1:5555/random' 随机从数据库中获取我们爬取到的可用代理
import requests PROXY_POOL_URL = 'http://127.0.0.1:5555/random' def get_proxy(): try: response = requests.get(PROXY_POOL_URL) if response.status_code == 200: return response.text except ConnectionError: return None proxy = get_proxy() proxies = { 'http': 'http://' + proxy, 'https': 'https://' + proxy, } try: response = requests.get('http://httpbin.org/get', proxies=proxies) print(response.text) except requests.exceptions.ConnectionError as e: print('Error', e.args)
-
代码:
链接:https://pan.baidu.com/s/1d4rNtBNNwP4XWYyaVZ8IRA 密码:6vj7