aiohttp分流处理

# -*- coding: utf-8 -*-
# @Time : 2018/12/26 9:55 PM
# @Author : cxa
# @Software: PyCharm
import asyncio
import aiohttp
from db.mongohelper import save_data
import hashlib
import pathlib
import ujson
from logger.log import crawler
from utils import proxy_helper
from retrying import retry
from itertools import islice

try:
    import uvloop

    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
    pass
sem = asyncio.Semaphore(1000)
url = "https://xxx.xxx.com"


@retry(stop_max_attempt_number=5)
def get_proxy():
    proxy = proxy_helper.get_proxy()
    host = proxy.get('ip')
    port = proxy.get('port')
    ip = f"http://{host}:{port}"
    return ip


async def fetch(item, session, proxy, retry_index=0):
    try:
        name = item
        sf = get_md5(name)
        data = {"kw": name, "signinfo": sf}
        async with session.post(url, data=data, proxy=proxy, verify_ssl=False) as req:
            res_status = req.status
            if res_status == 200:
                data = ujson.loads(await req.text())
                searchdata = data.get("searchResult")
                if searchdata:
                    await save_data(searchdata)
                else:
                    crawler.info(f'<search_name: {name}>, data: {data},')

    except IndexError as e:
        print(f"<出错时候的数据:{seq}>,<原因: e>")
    except Exception as e:
        data = None
        crawler.error(f"<Error: {url} {str(e)}>")
    if not data:
        crawler.info(f'<Retry url: {url}>, Retry times: {retry_index+1}')
        retry_index += 1
        proxy = get_proxy()
        return await fetch(item, session, proxy, retry_index)


async def bound_fetch(item, session, proxy):
    async with sem:
        await fetch(item, session, proxy)


async def print_when_done(tasks):
    [await _ for _ in limited_as_completed(tasks, 2000)]


async def run(data):
    async with aiohttp.ClientSession() as session:
        proxy = get_proxy()
        coros = (asyncio.ensure_future(bound_fetch(item, session, proxy)) for item in data)
        await print_when_done(coros)


def limited_as_completed(coros, limit):
    futures = [
        asyncio.ensure_future(c)
        for c in islice(coros, 0, limit)
    ]

    async def first_to_finish():
        while True:
            await asyncio.sleep(0.01)
            for f in futures:
                if f.done():
                    futures.remove(f)
                    try:
                        newf = next(coros)
                        futures.append(
                            asyncio.ensure_future(newf))
                    except StopIteration as e:
                        pass
                    return f.result()

    while len(futures) > 0:
        yield first_to_finish()


def get_use_list():
    fname = pathlib.Path.joinpath(pathlib.Path.cwd(), "namelist.txt")
    with open(fname, encoding='utf-8') as fs:
        data = (i.strip() for i in fs.readlines())
    return data


def get_md5(key):
    m = hashlib.md5()
    m.update(f'{key}0jjj890j0369dce05f9'.encode('utf-8'))
    a = m.hexdigest()
    return a


if __name__ == '__main__':
    crawler.info("开始下载")
    data = get_use_list()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(data))
    loop.close()

原文地址:https://www.cnblogs.com/c-x-a/p/10190558.html