python异步爬虫aiohttp

参考文章:# 知乎爬虫大佬@作者 : 陈祥安# @公众号: Python学习开发

作者github链接:https://github.com/cxapython/chat_aio_spider/commits?author=cxapython

这里说明一下,我这里是付费学习了大佬的一篇文章,然后结合他的github的代码,编写自己的业务需求

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
# @Time : 2020/7/13 0013 17:50
# @Author : ypl
# @Site : 获取最佳栏目爬虫
# @File : update_.py
# @Software: PyCharm
"""
import re
import requests
from moudle.update_data import r
from script.es_script import es_update_data
from script.retry_helper import aio_retry
from script.script import headers, get_token
import json
import asyncio
from dataclasses import dataclass
import aiohttp
import async_timeout
from contextvars import ContextVar
requests.packages.urllib3.disable_warnings()

sem_count = ContextVar("asyncio.Semaphore")

def save_content_list(data, body_name):
    """
    根据上个操作的数据更新es
    :return:
    """
    print(data)
    if data != {}:
        body = {
            "query": {
                "term": {
                    "keyword": {
                        "value": data['keyword']
                    }
                }

            },
            "script": {
                "lang": "painless",
                "source": "ctx._source.section=params.section",  # group字段
                "params": {
                    "section": data['section']
                }
            }
        }
        es_update_data(body_name, body)


@dataclass
class UpdateSection(object):
    def __init__(self, cookie):
        self.cookies = cookie
        self.proxy = 'http://url:port'

    def get_cookie(self):
        cookie_str = re.findall(r'xman_t=.*?;', self.cookies)[0] + re.findall(r'(cookie2=.*?);', self.cookies)[0]
        cookies = {item.split('=')[0]: item.split('=')[1] for item in cookie_str.split(';')}
        return cookies

    def _make_url(self, keyword):
        url = "{}".format(keyword, self.token)
        return url

    @aio_retry()  # 大佬自己写的超时装饰器
    async def fetch(self, url, cookies):

        async with aiohttp.ClientSession() as session:
            with async_timeout.timeout(5):
                async with session.get(url, headers=headers, proxy=self.proxy, cookies=cookies) as response:  # 这里get和post请求的参数都一样, 无非就是代理的格式是字符串, cookie的格式是字典  
                    return await response.text()  # 返回的如果是json格式就是json  如果是html那就是html字符串

    async def main(self, url, data, data_name, body_name, cookies):
        async with sem_count.get():
            if data['section'] == '': 
                result = await self.fetch(url, cookies)
                result_data = json.loads(result)
                section = result_data['data']['categories'][0]['name']
                await self.parse_json(section, data, data_name, body_name)

    async def parse_json(self, section, data, data_name, body_name):
    """数据处理函数""" data_dict
= { 'section': section, 'keyword': data['keyword'] } r.lrem(data_name, 1, str(data)) print("删除成功") save_content_list(data_dict, body_name) # 这里的方法应该放到类的内部的 忘了 async def start(self, data_name, body_name, status_name, data_list): cookies = self.get_cookie() sem = asyncio.Semaphore(5) # 限制并发数量 sem_count.set(sem) if data_list == None: while 1: data_list = r.lrange(data_name, 0, 100) if not data_list: break tasks = [asyncio.create_task(self.main(self._make_url(eval(data.decode())['keyword']), eval(data.decode()), data_name, body_name, cookies)) for data in data_list] # 放任务,这里用的列表推导式 await asyncio.wait(tasks) r.set(status_name, 1) print('完毕') else: tasks = [asyncio.create_task( self.main(self._make_url(data['keyword']), data, data_name, body_name, cookies)) for data in data_list] await asyncio.wait(tasks) def run(self, status_name, data_name, body_name, data_list=None): # 启动程序 忽略参数 asyncio.run(self.start(data_name, body_name, status_name, data_list))
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
# @Time : 2020/8/20 0020 11:37
# @Site : 
# @File : retry_helper.py
# @Software: PyCharm
"""
import asyncio
import random
from functools import wraps
from loguru import logger
import traceback


class RetryTimeout(Exception):
    def __init__(self, info):
        self.info = info

    def __str__(self):
        return self.info


def aio_retry(**kwargs):
    max_sleep_time: int = kwargs.pop("max", 0)
    min_sleep_time: int = kwargs.pop("min", 0)
    attempts: int = kwargs.pop("attempts", 3)
    error: bool = kwargs.pop("error", False)

    def retry(func):
        @wraps(func)
        async def decorator(*args, **_kwargs):
            retry_count = 1
            sleep_time = 1
            error_info = ""
            while True:
                if retry_count > attempts:
                    if error:
                        raise RetryTimeout("错误次数太多")
                    else:
                        logger.error(f"重试{retry_count}次仍然出错,错误内容,{error_info}")
                        break
                try:
                    result = await func(*args, **_kwargs)
                    return result
                except Exception as e:
                    if retry_count == attempts:
                        error_info = f"{traceback.format_exc()}"
                    await asyncio.sleep(sleep_time)
                finally:
                    retry_count += 1
                    if max_sleep_time:
                        sleep_time = random.randint(min_sleep_time, max_sleep_time)

        return decorator

    return retry
原文地址:https://www.cnblogs.com/itBlogToYpl/p/13540064.html