程序员带你构建ip池,爬取大数据

使用爬虫不可避免的就会遇到网站的各种封ip操作,因此就需要我们找寻代理,通过代理ip进行操作,屏蔽自己真实ip。

import requests

import pymongo

from lxml.html import etree

class SelfIpProxy():

def __init__(self): # 设置区域

self.depth = 1

self.timeout = 10

self.collection = pymongo.MongoClient()['Proxies']['free2']

self.url = {'http':"http://19ncc.medmeeting.org/cn",'https':"https://www.baidu.com"}

self.headers = {

'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36',

'Referer': 'https://www.xicidaili.com/nn/2'}

def get_ip(self): # 从网站获取ip

urls = [

'https://www.xicidaili.com/nn/{}'.format(i) for i in range(1, self.depth + 1)]

for url in urls:

html = requests.get(url, headers=self.headers, timeout=30)

html.encoding = 'utf-8'

e_html = etree.HTML(html.text)

ips = e_html.xpath('//table[@id="ip_list"]/tr/td[2]/text()')

ports = e_html.xpath('//table[@id="ip_list"]/tr/td[3]/text()')

modes = e_html.xpath('//table[@id="ip_list"]/tr/td[6]/text()')

for ip, port, mode in zip(ips, ports, modes):

item = dict()

item[mode.lower()] = '{}://{}:{}'.format(mode.lower(), ip, port)

yield item

def store_ip(self):

for i in self.get_ip():

self.collection.insert_one(i)

def check_ip(self):

count = 0

demo = self.collection.find({}, {'_id': 0}, no_cursor_timeout=True) # 为了防止pymongo.error.CursionError,手动打开库

for ip in demo:

count += 1

print('正在测试第{}个ip'.format(count))

for key, value in ip.items():

try:

html = requests.get(self.url[key],

headers=self.headers, proxies={key: value}, timeout=self.timeout)

html.encoding = 'utf-8'

html.raise_for_status()

print('************当前ip测试通过,当前ip为{}************'.format(value))

except BaseException:

print('当前ip测试不通过,当前ip为{}'.format(value))

self.collection.delete_one(ip)

demo.close() # 手动关闭库

def anti_duplicate(self): # 去重

demo = self.collection.find({}, {'_id': 0}, no_cursor_timeout=True)

l = []

for i in demo:

if i not in l:

l.append(i)

demo.close()

self.collection.drop()

for i in l:

self.collection.insert_one(i)

if __name__ == '__main__':

# 设置内容在class内部__init__()方法内部

my_ip = SelfIpProxy()

my_ip.store_ip() # 获取存储ip到MongoDB中,已经成功, 很快,不需要多线程

my_ip.check_ip() # 检查ip是否可用

# my_ip.anti_duplicate() # 去重

结果,绝大部分都是不可用的,少量能用上:

正在测试第318个ip

当前ip测试不通过,当前ip为https://114.239.255.179:9999

正在测试第319个ip

当前ip测试不通过,当前ip为https://222.189.246.79:9999

正在测试第320个ip

当前ip测试不通过,当前ip为https://163.204.240.117:9999

正在测试第321个ip

当前ip测试不通过,当前ip为http://120.83.99.253:9999

正在测试第322个ip

当前ip测试通过,当前ip为http://59.57.148.10:9999

正在测试第323个ip

当前ip测试不通过,当前ip为http://182.35.81.209:9999

正在测试第324个ip

当前ip测试不通过,当前ip为http://112.87.69.236:9999

正在测试第325个ip

当前ip测试不通过,当前ip为http://120.83.108.41:9999

改成多进程,本来想将多进程的函数也写进类里面,但是不知道怎么回事main函数调用就没反应了,无奈只能在class之外重写了一个check_ip函数,全代码如下:

import requests

import pymongo

from lxml.html import etree

from multiprocessing import Pool

class SelfIpProxy():

def __init__(self): # 设置区域

self.depth = 10

self.collection = pymongo.MongoClient()['Proxies']['free2']

self.headers = {

'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36',

'Referer': 'https://www.xicidaili.com/nn/2'}

def get_ip(self): # 从网站获取ip

urls = [

'https://www.xicidaili.com/nn/{}'.format(i) for i in range(1, self.depth + 1)]

for url in urls:

html = requests.get(url, headers=self.headers, timeout=30)

html.encoding = 'utf-8'

e_html = etree.HTML(html.text)

ips = e_html.xpath('//table[@id="ip_list"]/tr/td[2]/text()')

ports = e_html.xpath('//table[@id="ip_list"]/tr/td[3]/text()')

modes = e_html.xpath('//table[@id="ip_list"]/tr/td[6]/text()')

for ip, port, mode in zip(ips, ports, modes):

item = dict()

item[mode.lower()] = '{}://{}:{}'.format(mode.lower(), ip, port)

yield item

def store_ip(self):

for i in self.get_ip():

self.collection.insert_one(i)

def anti_duplicate(self): # 去重

demo = self.collection.find({}, {'_id': 0}, no_cursor_timeout=True)

l = []

for i in demo:

if i not in l:

l.append(i)

demo.close()

self.collection.drop()

for i in l:

self.collection.insert_one(i)

def check_ip(proxy):

url = {'http': "http://www.baidu.com", 'https': "https://www.baidu.com"}

for key, value in proxy.items():

try:

html = requests.get(url[key], proxies={key: value}, timeout=10)

html.encoding = 'utf-8'

html.raise_for_status()

print('***************************当前ip测试通过,当前ip为{}***************************
'.format(value))

pymongo.MongoClient()['Proxies']['checked'].insert_one(proxy)

except:

print('当前ip测试失败,当前ip为{}'.format(value))

if __name__ == '__main__':

# 设置内容在class内部__init__()方法内部

my_ip = SelfIpProxy()

my_ip.store_ip() # 获取存储ip到MongoDB中,已经成功, 很快,不需要多线程

proxies = [] # 将库里的ip转成列表收集,以便多进程处理

demo = my_ip.collection.find({}, {'_id': 0}, no_cursor_timeout=True) # 手动打开库,是因为库长度较长,防止时间过长,引起指针报错。

for i in demo:

proxies.append(i)

my_ip.collection.drop()

demo.close # 手动关闭库

pool = Pool(8) # 开始多进程处理模式

for i in range(len(proxies)):

pool.apply_async(check_ip, args=(proxies[i], ))

pool.close()

pool.join()

# my_ip.anti_duplicate() # 去重

多进程的效果是原来大约4个小时才能跑完1000个验证,现在大约半小时就能搞定,最后出来一共42个。

原文地址:https://www.cnblogs.com/jiguangdongtaiip/p/13542256.html