代理池——代理采集,测试,保存和接口使用

代理池的设置主要有四部

  1. 获取代理
  2. 代理测试
  3. 数据存储
  4. API接口

1.1先设置需要获取的代理的网站和解析规则

config.py

# 所有网站得解析方式

parse_list = [
    {
        'urls': ['http://www.66ip.cn/{}.html'.format(n) for n in range(1, 10)], # 66代理
        'pattern': '//div[@id="main"]//div/div/table//tr[position()>1]',
        'position': {'ip': './td[1]', 'port': './td[2]', 'address': './td[3]'},
    },
    {
        'urls':['https://www.kuaidaili.com/free/inha/{}/'.format(n) for n in range(1, 10)], # 快代理
        'pattern': '//div[@id="list"]/table/tbody/tr',
        'position': {'ip': './td[1]', 'port': './td[2]', 'address': './td[5]'},
    },
    {
        'urls':['http://www.xicidaili.com/nn/{}'.format(n) for n in range(1, 5)], # 西刺代理
        'pattern': '//table[@id="ip_list"]/tr[position()>1]',
        'position': {'ip': './td[2]', 'port': './td[3]', 'address': './td[4]/a/'}
    }    
]

DEFAULT_HEADERS = {
    'User-Agent': 'ozilla/5.0 (Windows NT 10.0; Win64; x64; rv:63.0) Gecko/20100101 Firefox/63.0',
    'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
}

1.2 设置数据解析的方法

Parser.py

# 解析方法
import re 
from lxml import etree


class Parser_response(object):
    # 对传递过来的html代码进行数据解析
    @staticmethod
    def parser_xpath(response, parser):
        '''根据parser规则对response进行解析'''
        etre = etree.HTML(response)
        items = etre.xpath(parser['pattern'])

        proxy_list = []
        for item in items:
            try: # 解析数据
                ip = item.xpath(parser['position']['ip'])[0].text
                port = item.xpath(parser['position']['port'])[0].text
                address = item.xpath(parser['position']['address'])[0].text
            except Exception:
                continue
            else:
                proxy = {'ip': ip, 'port': port, 'address': address} # 设置成字典格式
                proxy_list.append(proxy) # 加入到列表中返回

        return proxy_list

    @staticmethod
    def parser_re():
        pass

1.3 主程序中调用他们proxy_pool.py


from Parser import Parser_response # 解析过程


'''
代理url的请求测试''' class Download_HTML(object): @staticmethod def download(url): try: print('正在请求{}.....'.format(url)) r = requests.get(url, headers=DEFAULT_HEADERS) # 对代理的网站url进行请求 r.encoding = 'utf-8' # 设置编码格式 if not r.ok: # 请求不成功 raise ConnectionError else: return r.text # 成功返回网络代码 except Exception as e: print('请求失败,异常: ', e) ....... def crawl(self, parser): '''对配置文件中的网站进行解析''' Download = Download_HTML() # 实例化代理网站 for url in parser['urls']: # 取出url列表,多页 time.sleep(2) # 延时两秒 response = Download.download(url) # 调用方法获取网页的html代码 if not response: return None # 将html代码和代码解析规则传给相应类的方法,获取返回的代理信息列表(ip,port, address) proxy_list = Parser_response.parser_xpath(response, parser) for proxy in proxy_list: # 遍历 while True: if self.queue1.full(): # 如果队列满了的话 time.sleep(1) else: self.queue1.put(proxy) # 没满就加入到队列queue1中 break

2代理的检测

def detect(queue1, queue2): #queue1是所有的代理数据,queue2是可用的代理的数据
    '''对queue1中的代理进行检测'''
    proxy_list = []
    while True:
        if not queue1.empty(): # 如果队列不为空
            proxy = queue1.get() # 从队列取出数据
            proxy_list.append(proxy) # 再将数据添加到列表中

            if len(proxy_list) > 9: # 如果列表中得数量到10时
                spanws = []
                for proxy in proxy_list: # 遍历列表
                    spanws.append(gevent.spawn(detect_baidu, proxy, queue2)) # 设置协程,传入方法和变量(代理和一个代表可用队列的队列)

                gevent.joinall(spanws) # 开启协程
                proxy_list = [] # 将列表清空

'''代理测试放入可用队列'''
def detect_baidu(proxy, queue2=None):
    '''请求测试'''
    proxies = {
        'http': 'http://{}:{}'.format(proxy['ip'], proxy['port']),
        'https': 'https://{}:{}'.format(proxy['ip'], proxy['port']),
    } # 完善代理的数据
    result = None # 设置初始标志None
    try:
        print(proxies)
        # 用代理进行网络请求以测试代理是否可用
        result = requests.get(url='https://www.baidu.com', headers=DEFAULT_HEADERS, proxies=proxies, timeout=5)
    except Exception:
        print('代理不可用,已丢弃....')
    '''detect_baidu方法被调用了两次,主要看调用时是否传入了queue2这个参数,
        如果加了queue2这个参数,表示调用判断代理可用性决定加入到可用队列queue2中;
        没有带queue2这个参数的话,表示可用代理回测,返回的标志决定了queue2中的代理是否删除
    '''
    # 判断加入到可用队列与否(主要看是否传入了queue2)
    if result and queue2: 
        print('代理{}可用,已添加到可用队列....'.format(proxy))
        queue2.put(proxy) 
    # 判断加入到数据库中与否的前提(主要看是否传入了queue2)
    elif result: 
        return True

 

3 数据的存储

3.1 数据库的方法设置

Mongo_DB.py

import pymongo


class MongoHelper(object):

    def __init__(self):
        self.client = pymongo.MongoClient(host='127.0.0.1', port=27017)
        self.db = self.client['tanzhou_homework']
        self.proxy = self.db['proxy']

    def insert(self, data=None): #
        if data:
            self.proxy.save(data)

    def delete(self, data=None): #
        if data:
            self.proxy.remove(data)

    def update(self, data, conditions): #
        if data and conditions:
            self.proxy.update(data, {'$set': conditions})

    def select(self, data=None): #
        if data:
            items = self.proxy.find(data)
        else:
            data={}
            items = self.proxy.find(data)

            results = []
            for item in items:
                result = (item['ip'], item['port'], item['address'])

                results.append(result)

            return results


if __name__ == '__main__':
    mongo = MongoHelper()
    

3.2 主程序中调用proxy_pool.py

from Mongo_DB import MongoHelper # 数据库操作

mongo = MongoHelper()

def insert_sql(queue2):
    '''读取queue2里面得数据再添加到数据库'''
    while True:
        proxy = queue2.get()
        if proxy:
            mongo.insert(proxy)
            # print('代理{}已经成功添加到数据库'.format(proxy))

4 API接口

Service.py

from random import choice
from flask import Flask
from Mongo_DB import MongoHelper

mongo = MongoHelper()

app = Flask(__name__)

@app.route('/') # 路由根目录r
def index():
    result = mongo.select() # 获取IP列表
    text = choice(result) # 随机选择一个出来
    return '{}:{}'.format(text[0], text[1]) # 返回

def start_service():
    app.run(debug=True, host='0.0.0.0', port=9999)


if __name__ == '__main__':
    start_service()
    

主程序中

from Service import start_service # 接口配置


if __name__ == '__main__':
    p4 = Process(target=start_service) # 接口

 

完整代码如下:

config.py

# 所有网站得解析方式

parse_list = [
    {
        'urls': ['http://www.66ip.cn/{}.html'.format(n) for n in range(1, 10)],
        'pattern': '//div[@id="main"]//div/div/table//tr[position()>1]',
        'position': {'ip': './td[1]', 'port': './td[2]', 'address': './td[3]'},
    },
    {
        'urls':['https://www.kuaidaili.com/free/inha/{}/'.format(n) for n in range(1, 10)],
        'pattern': '//div[@id="list"]/table/tbody/tr',
        'position': {'ip': './td[1]', 'port': './td[2]', 'address': './td[5]'},
    },
    {
        'urls':['http://www.xicidaili.com/nn/{}'.format(n) for n in range(1, 5)],
        'pattern': '//table[@id="ip_list"]/tr[position()>1]',
        'position': {'ip': './td[2]', 'port': './td[3]', 'address': './td[4]/a/'}
    }    
]

DEFAULT_HEADERS = {
    'User-Agent': 'ozilla/5.0 (Windows NT 10.0; Win64; x64; rv:63.0) Gecko/20100101 Firefox/63.0',
    'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
}

Mongo_DB.py

import pymongo


class MongoHelper(object):

    def __init__(self):
        self.client = pymongo.MongoClient(host='127.0.0.1', port=27017)
        self.db = self.client['tanzhou_homework']
        self.proxy = self.db['proxy']

    def insert(self, data=None): #
        if data:
            self.proxy.save(data)

    def delete(self, data=None): #
        if data:
            self.proxy.remove(data)

    def update(self, data, conditions): #
        if data and conditions:
            self.proxy.update(data, {'$set': conditions})

    def select(self, data=None): #
        if data:
            items = self.proxy.find(data)
        else:
            data={}
            items = self.proxy.find(data)

            results = []
            for item in items:
                result = (item['ip'], item['port'], item['address'])

                results.append(result)

            return results


if __name__ == '__main__':
    mongo = MongoHelper()
    

Parser.py

# 解析方法
import re 
from lxml import etree


class Parser_response(object):
    # 对传递过来的html代码进行数据解析
    @staticmethod
    def parser_xpath(response, parser):
        '''根据parser规则对response进行解析'''
        etre = etree.HTML(response)
        items = etre.xpath(parser['pattern'])

        proxy_list = []
        for item in items:
            try: # 解析数据
                ip = item.xpath(parser['position']['ip'])[0].text
                port = item.xpath(parser['position']['port'])[0].text
                address = item.xpath(parser['position']['address'])[0].text
            except Exception:
                continue
            else:
                proxy = {'ip': ip, 'port': port, 'address': address} # 设置成字典格式
                proxy_list.append(proxy) # 加入到列表中返回

        return proxy_list

    @staticmethod
    def parser_re():
        pass

Service.py

from random import choice
from flask import Flask
from Mongo_DB import MongoHelper

mongo = MongoHelper()

app = Flask(__name__)

@app.route('/') # 路由根目录r
def index():
    result = mongo.select() # 获取IP列表
    text = choice(result) # 随机选择一个出来
    return '{}:{}'.format(text[0], text[1]) # 返回

def start_service():
    app.run(debug=True, host='0.0.0.0', port=9999)


if __name__ == '__main__':
    start_service()
    

proxy_pool.py

import time 
import requests 
from multiprocessing import Process, Queue
import gevent
from gevent import monkey;monkey.patch_all()

from config import parse_list, DEFAULT_HEADERS # 代理网站及相应解析规则,请求头
from Parser import Parser_response  # 解析过程
from Mongo_DB import MongoHelper # 数据库操作
from Service import start_service # 接口配置

mongo = MongoHelper()

'''代理url的请求测试'''
class Download_HTML(object):

    @staticmethod
    def download(url):
        try:
            print('正在请求{}.....'.format(url))
            r = requests.get(url, headers=DEFAULT_HEADERS) # 对代理的网站url进行请求
            r.encoding = 'utf-8' # 设置编码格式
            if not r.ok: # 请求不成功
                raise ConnectionError
            else:
                return r.text # 成功返回网络代码
        except Exception as e:
            print('请求失败,异常: ', e)

'''爬取代理数据并添加到队列中'''
class Proxy_crawl(object):
    porxies_set = set() # 空集合,查重用

    def __init__(self, queue1):
        self.queue1 = queue1

    def open_spider(self):
        while True:
            proxy_list = mongo.select() # 获取数据库中得所有代理
            spawns = []
            for proxy in proxy_list: # 协程,再进行可用队列中回测代理可用性
                spawns.append(gevent.spawn(detece_from_db,proxy,self.porxies_set))
            gevent.joinall(spawns)

            if len(self.porxies_set) < 5: # 当集合数量小于5的时候
                print('当前可用代理数量为{},开始获取代理...'.format(len(self.porxies_set)))
                for parser in parse_list:
                    self.crawl(parser) # 继续爬取
            else:
                print('当前可用代理数量大于阈值....开始休眠')
                time.sleep(300)

    def crawl(self, parser):
        '''对配置文件中的网站进行解析'''
        Download = Download_HTML() # 实例化代理网站
        for url in parser['urls']: # 取出url列表,多页
            time.sleep(2) # 延时两秒
            response = Download.download(url) # 调用方法获取网页的html代码
            if not response:
                return None
            # 将html代码和代码解析规则传给相应类的方法,获取返回的代理信息列表(ip,port, address)
            proxy_list = Parser_response.parser_xpath(response, parser)

            for proxy in proxy_list: # 遍历
                while True:
                    if self.queue1.full(): # 如果队列满了的话
                        time.sleep(1)
                    else:
                        self.queue1.put(proxy) # 没满就加入到队列queue1中
                        break

def detece_from_db(proxy, porxies_set):
    proxy = {'ip':proxy[0], 'port':proxy[1]}
    result = detect_baidu(proxy) # 对数据库中的代理进行回测
    if not result:
        mongo.delete(proxy) # 不可用就删除
        return None
    porxies_set.add('{}:{}'.format(proxy['ip'], proxy['port'])) # 可用就添加到set集合中去

'''设置协程实现高并发'''
def detect(queue1, queue2):
    '''对queue1中的代理进行检测'''
    proxy_list = []
    while True:
        if not queue1.empty(): # 如果队列不为空
            proxy = queue1.get() # 从队列取出数据
            proxy_list.append(proxy) # 再将数据添加到列表中

            if len(proxy_list) > 9: # 如果列表中得数量到10时
                spanws = []
                for proxy in proxy_list: # 遍历列表
                    spanws.append(gevent.spawn(detect_baidu, proxy, queue2)) # 设置协程,传入方法和变量(代理和一个代表可用队列的队列)

                gevent.joinall(spanws) # 开启协程
                proxy_list = [] # 将列表清空

'''代理测试放入可用队列'''
def detect_baidu(proxy, queue2=None):
    '''请求测试'''
    proxies = {
        'http': 'http://{}:{}'.format(proxy['ip'], proxy['port']),
        'https': 'https://{}:{}'.format(proxy['ip'], proxy['port']),
    } # 完善代理的数据
    result = None # 设置初始标志None
    try:
        print(proxies)
        # 用代理进行网络请求以测试代理是否可用
        result = requests.get(url='https://www.baidu.com', headers=DEFAULT_HEADERS, proxies=proxies, timeout=5)
    except Exception:
        print('代理不可用,已丢弃....')
    '''detect_baidu方法被调用了两次,主要看调用时是否传入了queue2这个参数,
        如果加了queue2这个参数,表示调用判断代理可用性决定加入到可用队列queue2中;
        没有带queue2这个参数的话,表示可用代理回测,返回的标志决定了queue2中的代理是否删除
    '''
    # 判断加入到可用队列与否(主要看是否传入了queue2)
    if result and queue2: 
        print('代理{}可用,已添加到可用队列....'.format(proxy))
        queue2.put(proxy) 
    # 判断加入到数据库中与否的前提(主要看是否传入了queue2)
    elif result: 
        return True

'''爬取代理数据的前提调用'''
def start_crawl(queue1):
    p = Proxy_crawl(queue1) # 类的实例化
    p.open_spider()

def insert_sql(queue2):
    '''读取queue2里面得数据再添加到数据库'''
    while True:
        proxy = queue2.get()
        if proxy:
            mongo.insert(proxy)
            # print('代理{}已经成功添加到数据库'.format(proxy))


if __name__ == '__main__':
    q1 = Queue() # 所有代理的队列
    q2 = Queue() # 可用代理的队列
    p1 = Process(target=detect, args=(q1, q2)) # 代理检测
    p2 = Process(target=start_crawl, args=(q1, )) # 下载代理
    p3 = Process(target=insert_sql, args=(q2, )) # 存入数据
    p4 = Process(target=start_service) # 接口
    
    p2.start() # 开启进程
    p1.start()
    p3.start()
    p4.start()
原文地址:https://www.cnblogs.com/pywjh/p/9990149.html