爬虫代理清洗


                                web开发(网络爬虫 -- 免费代理清洗

**************************************************************************************************************************************************************************************************************************************

具体的要求:

 

1、为数据库的连接及读、写、改,构造一个类:

1、可以是sqlite3,也可以是mysql;
2、构造“代理清洗”类时继承上述数据库的类进行使用数据库。

2、将要清洗的代理都放入数据库:

1、将所有要清洗的代理放入数据库;
2、注意代码的“强壮度”---如:放入数据库时智能数据去重复,数据库可自动创建;
3、注意扩展性,清洗不同网站时数据要有标注,请注重数据库设计的原则;
4、定时去收集数据;


3、标注清理出来的可用代理;

1、可实现“断点”检测,评估任务队列;
2、标注可用代理;
3、检测代理是否可用,请用并发来提高效率;
4、定时去检测数据库新加入的代理;

4、数据使用方便;

1、构造返回最新的可用代理的函数方法;

****************************************************************************************************************

 

              代理清洗代码如下

import sqlite3
import glob
import threading

class Sqlite:

headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.110 Safari/537.36'}

def __init__(self,
conn_name,
table_create,
):
self.table_create = table_create
self.conn_name = conn_name #数据库名
self.test_table()
self.cursor_lock = threading.Lock() # 游标锁

def test_table(self):
if glob.glob(self.conn_name):
self.conn = sqlite3.connect(self.conn_name, check_same_thread=False) #当你连接数据库时,如果是新建的,这一句就生成了db文件。
self.c = self.conn.cursor() #创建游标
else:
self.conn = sqlite3.connect(self.conn_name, check_same_thread=False) #当你连接数据库时,如果是新建的,这一句就生成了db文件。
self.c = self.conn.cursor()
self.c.execute(self.table_create)
self.conn.commit()

def for_test(self, update_stat): #还原数据库status的状态信息
self.c.execute(update_stat)
self.conn.commit()
print('修改完成')

def get_info_for_db(self, select):
self.cursor_lock.acquire()
self.c.execute(select)
info = self.c.fetchall() # 提出多条指定信息
self.cursor_lock.release()
return info

def update_for_db(self, pk, update):#对数据库进行修改
self.cursor_lock.acquire()
self.c.execute(update, (pk, ))
self.conn.commit()
self.cursor_lock.release()

def write_to_db(self, write, value):#把多条数据写入数据库
self.cursor_lock.acquire()
self.c.execute(write, value)
self.conn.commit()
self.cursor_lock.release()

def insert_for_db(self, insert, info_list):#在数据库中添加多条信息
'''
:param insert: 'INSERT INTO test(name, url, status) VALUES(?,?,?);'
:param info_list: [('B', 'B_url', 'False'), ('C', 'C_url', 'False')........]
'''
self.cursor_lock.acquire()#游标锁
self.c.executemany(insert, info_list) #多条插入数据excutemany,不是excute
self.conn.commit()
self.cursor_lock.release()

*****************************************************************************************************************************************************

import re
import json
import time
import datetime
import collections
import threading
import requests
from pysqlite import Sqlite
'''
title:代理清洗
作用:可用于各个网站的清洗代理
作者:陈文鑫
整体思路:1.导入所有需要使用的模块(python自带,第三方库)
2.继承pysqlite里的增查改的方法
3.找到每个网站的共同特征写一个特征函数,实现传入一个url或正则等返回所有的IP和port
4.讲爬取的数据进行检测,把各种可能出现的异常全部展示出来,然后判断IP的可用性
5.创建一个任务队列,然后将爬取到的所有IP和端口号存取到数据库中,当出现IP重复时提示重复IP,将没有重复IP保存到数据库中
为了提升保存数据的速度,创建了多线程,也提升了保存更多信息的速度
6.使用多线程(my_thread)把保存到库中IP状态为False的一个一个提取出来进行检测,每个检测过程中数据库中显示为:正在检测中..,当最后检测完时
把所有检测完的IP可用的把状态改为IP可用,有问题的改为IP不可用
7.函数main()作用是让程序每个60秒去访问一次网页,把IP爬取下来,进行判读保存数据,最后进行检测,修改起状态信息
'''
class Proxies(Sqlite):

def __init__(self,
conn_name='proxy.db',
table_create='''CREATE TABLE proxies(id integer PRIMARY KEY autoincrement,ip text, port text, status text, load_time real, test_time text DEFAULT NULL, site_id integer DEFAULT NULL)'''):
super().__init__(conn_name, table_create)

#*******************************************特征函数***************************************************************
def get_info(self, url, regex):
'''
:param url: url为代理IP的官网链接 例如:https://www.xicidaili.com/nn
:return: 返回所有的IP和端口号 例如:[('119.101.112.251', '9999'), ('119.101.116.154', '9999')]
'''
r = requests.get(url, headers=self.headers, proxies={'http': '119.101.114.203:9999'})
r.encoding = r.apparent_encoding
info = re.findall(regex, r.text)
return info

#****************************************功能函数***********************************************************************

#########################################修改数据库中status为False#######################################################################
def for_test(self, update_stat="""UPDATE proxies SET status='False';"""):
self.c.execute(update_stat)
self.conn.commit()
print('修改完成')

##########################################测试函数判断IP是否可用#########################################################3
def test_proxy(self, ip, port):#测试当前IP是否可用
proxies = {
'http': 'http://{}:{}'.format(ip, port),
'https': 'http://{}:{}'.format(ip, port)
}
try:
r = requests.get('http://httpbin.org/ip',
headers=self.headers,
proxies=proxies,
timeout=2,#禁止超过2秒
)
ip_now = r.json()['origin']

except json.decoder.JSONDecodeError:#代理
print('JSONDecodeError')
except requests.exceptions.ProxyError:
print('远程代理关闭或无效')
except ConnectionRefusedError:
print("代理服务器异常")
except requests.exceptions.ChunkedEncodingError:
print("强制关闭")
except requests.exceptions.ConnectTimeout:
print("访问超时")
except requests.exceptions.ReadTimeout:
print("读取超时")
except ValueError:
print('值错误')
except TypeError:
print('请稍后清洗,暂无更新')
else:
if ip == ip_now:
print(threading.current_thread().name, ip, port, '代理IP可用')
return 'OK'
else:
print(threading.current_thread().name, ip, port, '可能是透明的代理')

########################################将数据写入到数据库中#####################################################################
def info_to_db(self, url, regex):
self.info = collections.deque(self.get_info(url, regex))#任务队列
while True:
try:
info = self.info.popleft()
except IndexError:
print('已全部写入数据库')
break
else:
self.cursor_lock.acquire()
ip = self.c.execute("""SELECT * FROM proxies WHERE ip='{}' LIMIT 1;""".format(info[0]))
self.cursor_lock.release()
if ip.fetchone():
print(info[0], info[1], '重复啦...')

else:
value = [info[0], info[1], 'False', time.time()]
self.write_to_db("""INSERT INTO proxies(ip, port, status, load_time) VALUES(?,?,?,?);""", value)

def running(self, url, regex):#多线程写入数据库
poll = []
for i in range(10):
poll.append(threading.Thread(target=self.info_to_db, args=(url, regex)))
for i in poll:
i.start()
for i in poll:
i.join()
###################################提取数据库中的IP进行测试#########################################################3
def get_for_db_info(self):
while True:
#提取数据库中的一条信息
info = self.get_info_for_db("""SELECT * FROM proxies WHERE status='False' LIMIT 1;""")
try:
info = info[0]
except IndexError:
return '全部清洗完毕'
else:
self.update_for_db(info[0], """UPDATE proxies SET status='检测中....' WHERE id=?;""")

return info

##############################测试可用的IP对他的状态进行修改#########################################################################3
def test_for_db_info(self):
while True:
info = self.get_for_db_info()
# print(threading.active_count())#测试线程的数量
if info == '全部清洗完毕':
print('---线程---<{}>'.format(threading.current_thread().name), '检测完毕')
break
test_value = self.test_proxy(info[1], info[2])
if test_value == 'OK':
self.update_for_db(info[0], """UPDATE proxies SET status='代理可用' WHERE id=?;""")
else:
self.update_for_db(info[0], """UPDATE proxies SET status='代理不可用' WHERE id=?;""")
self.update_for_db(info[0], '''update proxies set test_time='{}' where id=?;'''.format(datetime.datetime.now()))

#***************************************************************************************************************
#多线程测试数据库
def my_thread(self):
pool = []
for i in range(10):
pool.append(threading.Thread(target=self.test_for_db_info))
for n in pool:
n.start()
for m in pool:
m.join()


def main(self, url, regex):
"""
此函数的作用是定时清洗代理,每隔60秒执行一次
"""
while True:
self.running(url, regex)#先爬取网页上的数据
time.sleep(5)
self.for_test()#睡眠5秒后修改为False
timer = threading.Timer(30, self.my_thread) #30秒后在进行检测数据库的IP
timer.start()
time.sleep(60) #每隔60秒进行一次循环


if __name__ == '__main__':
#西刺代理
xici_url = 'https://www.xicidaili.com/nn/'
xici_regex = '<tr class=".*?">s+?<td class="country">'
'<img src=".*?" alt=".*?" /></td>'
's+?<td>(.*?)</td>s+?<td>(.*?)</td>'
#急速代理
super_url = 'http://www.superfastip.com/welcome/freeip/'
super_regex = '<tr>[sS]*?<td>(.*?)</td>s+?<td>(.*?)</td>s+?'
'<!--<td>.*?</td>-->s+?<td>.*?</td>s+?<td>.*?</td>s+?'
'<td>.*?</td>s+?<td>.*?</td>s+?<td>.*?</td>s+?</tr>'

x = Proxies()
# x.for_test()
# x.my_thread()
x.main(xici_url, xici_regex)
#x.running(super_url, super_regex)

 

 

 

 

 

 

 

 

 

原文地址:https://www.cnblogs.com/cwx-0324/p/10193735.html