32.爬虫2

代理使用:

- 代理:
    代理服务器,可以接受请求将其转发。
- 匿名度:
    - 高匿:不知道你使用了代理,也不知道你的ip
    - 匿名:指导你使用代理,但是不知道的真实ip
    - 透明:指导你使用了代理并且知道你的真实ip
- 类型:
    - 代理网站:
        - www.goubanjia.com
        - 西刺代理
        - 快代理
        - http://zhiliandaili.cn
    - http:
    - https:
- cookie的处理

import requests
from lxml import etree
headers={
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.163 Safari/537.36",
}

代理池:

# 代理池
ip_pool = [
    {"https": "111.21.23.12:888"},
    {"https": "111.21.23.12:888"},
    {"https": "111.21.23.12:888"},
    {"https": "111.21.23.12:888"},

]
proxies = random.choice(ip_pool)
# 从代理精灵中提取代理ip
# http://zhiliandaili.cn

爬取西刺代理:

# 爬取西刺代理
url = "https://www.xicidaili.com/nn/%d"
proxy_list_http = []
proxy_list_https = []
for page in range(1,20):
    new_url = format(url%page)
    page_text = requests.get(url=new_url, headers=headers).text # verfiy是忽略证书的参数
    tree = etree.HTML(page_text)
    # tbody不可以出现在xpath中,否则xpath不会生效
    tr_list = tree.xpath('//*[@id="ip_list"]//tr')[1:]
    for tr in tr_list:
        ip = tr.xpath("./td[2]/text()")[0]
        port = tr.xpath("./td[3]/text()")[0]
        t_type = tr.xpath("./td[6]/text()")[0]
        if t_type == "http":
            dic = {
                t_type:ip+":"+port
            }
            proxy_list_http.append(dic)
        else:
            dic = {
                t_type:ip+":"+port
            }
            proxy_list_https.append(dic)
print(len(proxy_list_http), len(proxy_list_https))

检测代理是否可以使用:

for ip in proxy_list_http:
    response = requests.get("https://www.sogou.com", headers=headers, proxies={"https":ip}
    if response.status_code == "200":
        return "ok"

cookie:

- cookie的处理
    - 手动处理,在headers中加上Cookie
    - 自动处理:session对象,可以创建一个session对象,该对象可以向requests一样进行请求发送,不同之处在于如果在使用session进行请求发送的过程中产生了cookie,则cookie会被自定存储在session对象中。

session对象

# session对象
session = requests.session()
session.get(url, headers=headers) # 使用session登录之后自动获取cookie
url = ""
response = session.get(url, headers=headers) # 带着获取的cookie再继续发起请求
response.json()

验证码平台:

- 打码平台
    - 超级鹰:https://www.chaojiying.com/about.html
        - 注册:(用户中心省份)
        - 登录:
            - 创建一个软件:899333
            - 下载示例代码
    - 云打码
    - 打码兔

模拟登陆:

- 模拟登陆
- 动态变化的请求参数
    - 通过情况下动态变化的请求参数都会被隐藏在前台页面中
- 古诗文网的登录注册中的cookie是在验证码的时候请求的,所以建议模拟登陆的时候劲量使用session请求
- 使用线程池增加爬取效率

线程池:

from multiprocessing.dummy import Pool
from time import sleep
import time
start = time.time()
alist = {
    'www.1.com',
    'www.2.com',
    'www.3.com'
}

def get_request(url):
    print("正在下载:",url)
    sleep(1)
    print("正在结束:",url)

pool = Pool(3)
pool.map(get_request,alist)
print("总耗时:",time.time() - start)

单线程+多任务异步协程

### 单线程+多任务异步协程
- 协程
    - import asyncio
    - 在函数定义的时候,如果使用了async修饰后,则该函数调用后会返回一个写成对象,并且函数内部的实现语句不会立即被执行
- 任务对象
    - 任务对象就是对协程对象的进一步封装,任务对象=高级的协程对象=特殊的函数
    - 任务对象是必须要注册到事件循环对象中
    - 给任务对象绑定回调
- 事件循环
    - 当做是一个容器,容器中必须存放任务对象
    - 启动事件循环对象想后,则事件循环对象会对其内部的存储任务对象进行异步执行
- aiohttp:
    - 是异步的
import asyncio
async def test():
    print("this is a async func")

c = test()
# 封装一个任务对象
task = asyncio.ensure_future(c)
# 创建一个事件循环对象
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
import asyncio
import time
start = time.time()
# 在特殊函数内部的实现中不可以出现不支持异步的模块代码(比如time.sleep(2))
async def get_request(url):
#     await asyncio.sleep(2)  # 需要使用特定的函数
    time.sleep(2) # 这个比上面的慢
    print("下载成功:", url)
urls = [
    "www.1.com",
    "www.2.com",
]
tasks = []
for url in urls:
    c = get_request(url)
    task= asyncio.ensure_future(c)
    tasks.append(task)
print("总耗时:",time.time() - start)
loop= asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

回调:

import asyncio
import time
start = time.time()
async def test():
    time.sleep(2)
    print("this is a async func")
    return "bobo"
    
def callback(task): # 作为任务对象的回调函数
    print("i am callback and:", task.result())
    
c = test()
# 封装一个任务对象
task = asyncio.ensure_future(c)
task.add_done_callback(callback)
print("总耗时:",time.time() - start)
# 创建一个事件循环对象
loop = asyncio.get_event_loop()
loop.run_until_complete(task)

aiohttp:

# 使用aiohttp代替requests
import requests
import aiohttp
import time
import asyncio
urls = [
    "http://127.0.0.1:5000/index1",
    "http://127.0.0.1:5000/index2"
]

async def get_request(url):
    async with aiohttp.ClientSession() as s: # 每个with必须使用async
        async with await s.get(url) as response: # 在每个阻塞操作之前都需要使用await
            page_text = await response.text
    return response.text

tasks = []
for url in urls:
    c = get_request(url)
    task = asyncio.ensure_future(c)
    tasks.append(task)

# loop = asyncio.get_event_loop()
# loop.run_until_complete(asyncio.wait(tasks))

一个案例:

# 具体代码
from flask import Flask
import time

app = Flask(__name__)


@app.route("/index1")
def index1():
    time.sleep(2)
    return "hello"


@app.route("/index2")
def index2():
    time.sleep(2)
    return "hello2"


if __name__ == "__main__":
    app.run(threaded=True)  # 同时服务端必须开启线程模式
-----------------------------------------------------------------------
import aiohttp
import time
import asyncio
start = time.time()

urls = [
    "http://127.0.0.1:5000/index1",
    "http://127.0.0.1:5000/index2"
]


async def get_request(url):
    async with aiohttp.ClientSession() as s:  # 每个with必须使用async
        async with await s.get(url) as response:  # 在每个阻塞操作之前都需要使用await
            page_text = await response.text()
            print(page_text)
    return page_text
tasks = []
for url in urls:
    c = get_request(url)
    task = asyncio.ensure_future(c)
    tasks.append(task)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print("总耗时:", time.time() - start)


回顾:

- cookie 处理
    - 手动处理
        - cookie从抓包工具中捕获封装到headers
    - 自动处理
        - sessiond对象
- 代理
    - 代理服务器
        - 进行请求转发
        - 代理ip:port作用到get,post方法的proxies={“https”: "ip:port"}
        - 代理池(列表)
- 验证码识别
    - 超级鹰
- 模拟登陆
    - 验证码的识别
    - 动态请求参数
    - cookie
- 单线程+多异步任务协程
    - 协程
        - 如果一个函数的定义被async修饰后,则该函数调用后会返回一个协程对象
    - 任务对象
        - 就是对协程对象的进一步封装
            - task.add_done_callback(func):func(task):task.result()
                import asyncio
                import time
                start = time.time()
                async def test():
                    time.sleep(2)
                    print("this is a async func")
                    return "bobo"

                def callback(task): # 作为任务对象的回调函数
                    print("i am callback and:", task.result())

                c = test()
                #封装一个任务对象
                task = asyncio.ensure_future(c)
                task.add_done_callback(callback)
                print("总耗时:",time.time() - start)

                """这里好像出现了问题,时间循环在封装任务时已经启动"""
                #创建一个事件循环对象
                #loop = asyncio.get_event_loop()
                #loop.run_until_complete(task)
    - 时间循环对象
        - 时间循环对象是用来装载任务对象,当该对象被启动后,则会异步的处理调用其内部的装载每一个任务对象(将任务对象手动金星挂起操作)
    - async,wait
    - 注意事项:在特殊函数内部不可以出现不支持异步模块的代码,否则就会中断整个异步的效果
- aiohttp支持异步请求的模块

回顾案例:

# 回顾
import aiohttp
import time
import asyncio
from lxml import etree
start = time.time()

urls = [
    "http://127.0.0.1:5000/index1",
    "http://127.0.0.1:5000/index2"
]


async def get_request(url):
    async with aiohttp.ClientSession() as s:  # 每个with必须使用async
#         async with await s.get(url,headers,proxies) as response:
        async with await s.get(url) as response:  # 在每个阻塞操作之前都需要使用await
            page_text = await response.text()
            print(page_text)
            return page_text

def parse(task):
    page_text = task.reslut()
    tree = etree.HTML(page_text)
    parse_data = tree.xpah("//li/text()")
    print(parse_data)
    
tasks = []
for url in urls:
    c = get_request(url)  # 获取协程对象
    task = asyncio.ensure_future(c) # 创建任务对象
    task.add_done_callback(parse) # 定义回调函数
    tasks.append(task)  
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks)) # 开启时间循环对象asyncio.wait(tasks)
print("总耗时:", time.time() - start)
-------------------------------------------------------------------
from flask import Flask
import time
app = Flask(__name__)
@app.route("/index1")
def index1():
    time.sleep(2)
    return "hello"
@app.route("/index2")
def index2():
    time.sleep(2)
    return "hello2"
if __name__ == "__main__":
    app.run(threaded=True)  # 同时服务端必须开启线程模式

selenium:

### 今日内容
- selenium模块在爬虫中的使用
    - 概念:是一个基于浏览器自动化的模块
    - 爬虫之间的关联
        - 便捷的抓获到动态加载的数据。(可见即可得)
        - 实现模拟登陆
    - 环境安装:pip install selenium
    - 基本使用:
        - 使用某一款浏览器的驱动程序
        - http://chromedriver.storage.googleapis.com/index.html

- pip install selenium
- 编码流程
	- 导报:from selenium import webdirver
    - 实例化某一个浏览器对象
    - 自制定自动化操作
import time
from selenium import webdirver
#这里面需要去下载谷歌的驱动,其中的path就是你下载的谷歌驱动存放路径
- http://chromedriver.storage.googleapis.com/index.html下载地址
brower = webdirver.Chorme(executable_path=path)
text_input = brower.find_element_by_id("kw")
text_input.send_keys("人民币")
brower.find_element_by_id("su").click()
time.sleep(2)
- 获取页面源码数据
page_text = brower.page_source 
brower.quit()

常用函数:

# 使用
import time
from selenium import webdriver
url = "https://qzone.qq.com/"
brower = webdriver.Chrome(executable_path=r"C:UserslzhDownloadschromedriver.exe")

brower.get(url)
# 执行script脚本
# brower.execute_script("window.scrollTo(0, document.body.scrollHeight)")
# 切换到iframe标签中
brower.switch_to.frame("login_frame")
click_btn = brower.find_element_by_id("switcher_plogin").click()
text_input = brower.find_element_by_id("u")
text_input.send_keys("3164626382")
text_input = brower.find_element_by_id("p")
text_input.send_keys("lzh19950326")
click_but = brower.find_element_by_id("login_button").click()
time.sleep(2)
# 获取页面源码数据
page_text = brower.page_source  # 可以使用tree对页面源码进行解析
brower.quit()

案例一:

import time
from lxml import etree
from selenium import webdriver
url = "http://125.35.6.84:81"
bro = webdriver.Chrome(executable_path=r"C:UserslzhDownloadschromedriver.exe")

bro.get(url)
sleep(1)
page_text = bro.page_source
page_text_list = [page_text]

for i in range(3):
    bro.find_element_by_id("pageIto_next").click()
    sleep(1)
    page_text_list.append(bro.page_source)

for page_text in page_text_list:
    tree = etree.HTML(page_text)
    li_list = tree.xpath("//ul[@id='gzlist']/li")
    for li in li_list:
        title = li.xpath('./dl/@title')[0]
        num = li.xpaht('./ol/@title')[0]
        print(title + num)

案例二:

from selenium import webdriver
from time import sleep
from selenium.webdriver import ActionChains
bro = webdriver.Chrome(executable_path=r"C:UserslzhDownloadschromedriver.exe")
bro.get("https://www.runoob.com/try/try.php?filename=jqueryui-api-droppable")

bro.switch_to.frame("iframeResult")
div_tag = bro.find_element_by_id("draggable")
# 拖动 = 点击 + 滑动
action = ActionChains(bro)
action.click_and_hold(div_tag)

for i in range(5):
    # 让动作链立即执行perform()
    action.move_by_offset(17, 5).perform()
action.release()
sleep(1)
bro.quit()

案例三:

from time import sleep
from lxml import etree
from PIL import Image
from selenium.webdriver import ActionChains
from chaojiying import Chaojiying_Client
from selenium import webdriver
url = "http://kyfw.12306.cn/otn/login/init"
bro = webdriver.Chrome(executable_path=r"C:UserslzhDownloadschromedriver.exe")
bro.get(url)
sleep(2)
bro.save_screenshot("main.png")

code_img_tag = bro.find_element_by_xpath('//*[@id="loginForm"]/div/ul[2]/li[4]/div/div/div[3]/img')
location = code_img_tag.location
size = code_img_tag.size

rangle = (int(location["x"]), int(location["y"]), int(location["x"] + size["width"]),
          int(location["y"] + size["height"]))

i = Image.open("./main.png")
frame = i.crop(rangle)
frame.save("code.png")


# 使用超级鹰
def get_text(impath, imgType):
    chaojiying = Chaojiying_Client('maxhope', 'maxhope8', '904410')
    im = open(impath, "rb").read()
    return chaojiying.PostPic(im, imgType)["pic_str"]


imgpath = './code.png'
imgTrype = 9004
# 55,70|267,133 = [[55,70], [267, 133]]
result = get_text(imgpath, imgTrype)
print(result)
all_list = []
if '|' in result:
    list_1 = result.split('|')
    count_1 = len(list_1)
    for i in range(count_1):
        xy_list = []
        x = int(list_1[i].split(',')[0])
        y = int(list_1[i].split(',')[1])
        xy_list.append(x)
        xy_list.append(y)
        all_list.append(xy_list)
else:
    x = int(result.split(',')[0])
    y = int(result.split(',')[1])
    xy_list = []
    xy_list.append(x)
    xy_list.append(y)
    all_list.append(xy_list)

print(all_list)
action = ActionChains(bro)


for l in all_list:
    x = l[0]
    y = l[1]

ActionChains(bro).move_to_element_with_offset(code_img_tag, x, y).click().perform()

bro.quit()

案例四:

import re

import requests
from lxml import etree
headers={
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.163 Safari/537.36",
}

url = "https://www.pearvideo.com/category_1"

page_text = requests.get(url, headers=headers).text

tree = etree.HTML(page_text)

li_list = tree.xpath('//*[@id="categoryList"]/li[1]')
print(len(li_list))
for li in li_list:
    detail_url = "https://www.pearvideo.com/" + li.xpath('./div/a/@href')[0]
    title = li.xpath('./div/a/div[2]/text()')[0] + '.mp4'
    detail_page_text = requests.get(url=detail_url, headers=headers).text

    ex = 'srcUrl="(.*?)",vdoUrl=srcUrl'
    movie_url = re.findall(ex, detail_page_text, re.S)[0]

    content = requests.get(movie_url, headers=headers).content

    with open(title, "wb") as f:
        f.write(content)

fiddler安装使用:

'''
# 移动端数据爬取
- fiddler是一款抓包工具
    - 配置:让其可以抓取https协议请求
    - 自动安装:tools-》options-》https-》安装证书
    - 手动安装:tools-》options-》https-》actions 》 trust root certificate 》
        export root certificate to desktop >点击安装证书 》 本地计算机 》 将所有证书都放在下列存储
        》 受信任的根证书颁发机构
- http:客户端和服务器端进行数据交互的某种形式
- https:安全的http协议
- https的加密采取的是证书秘钥加密


- 1.配置fiddler的端口
- 2.将手机和fiddler所在的电脑在同一个网段上
- 3.在手机上访问fiddler的ip+prot:192.168.31.68:8888,在当前页面上下载安装证书
- 4.在手机中安装且信任证书
- 5.设置手机网络的代理:开启代理==》fiddler对应pc端的ip和端口
'''
原文地址:https://www.cnblogs.com/liuzhanghao/p/12674695.html