aio 爬虫,去重,入库

#aio 爬虫,去重,入库
import asyncio
import aiohttp
import aiomysql
import re
from pyquery import PyQuery

stoping = False
start_url = 'http://www.jobbole.com/'

waiting_urls = []
seen_urls = set()
# url去重 --布隆过滤器 bloom filter

sem  = asyncio.Semaphore(3) #限制并发数量

async def fetch(url,session):
    async with sem:
        #await asyncio.sleep(0.5)
        try:
            async with session.get(url) as resp:
                print(resp.status)
                if resp.status in [200,201]:
                    data = await resp.text()
                    return data
        except Exception as e :
            print(e)


#因为不是耗费 io的 所以用普通函数
def extract_urls(html):
    urls = []
    pq = PyQuery(html)
    for link in pq.items('a'):
        url = link.attr('href')
        if url and url.startswith('http') and url not in seen_urls:
            urls.append(url)
            waiting_urls.append(url)
    return urls


async def init_urls(url,session):
    html = await fetch(url,session)
    seen_urls.add(url)
    extract_urls(html)

async def article_handeler(url,session,pool):
    #获取文章详情,并解析入库
    html = await fetch(url,session)
    seen_urls.add(url)
    extract_urls(html)
    pq = PyQuery(html)
    title = pq('title').text()
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute('SELECT 42;')
            insert_sql = 'insert into aiomysql_test(title) VALUES ("{}")'.format(title)
            await cur.execute(insert_sql)


async def consumer(pool):
    async with aiohttp.ClientSession() as session:
        while not stoping:
            if len(waiting_urls) == 0:
                await asyncio.sleep(0.5)
                continue
            url = waiting_urls.pop()
            print('start get url:{}'.format(url))
            if re.match('http://.*?jobbole.com/d+/',url):
                if url not in seen_urls:
                    asyncio.ensure_future(article_handeler(url,session,pool))
                    await asyncio.sleep(0.5)
            else:
                if url not in seen_urls:
                    asyncio.ensure_future(init_urls(url,session))


async def main(loop):
    #等待mysql链接建立好
    pool = await aiomysql.create_pool(host='127.0.0.1',port = 3306,
                                      user = 'root',password='123456',
                                      db = 'aiomysql_test',loop=loop,
                                      charset = 'utf8',autocommit = True)
    async with aiohttp.ClientSession() as session:
        html = await fetch(start_url, session)
        seen_urls.add(start_url)
        extract_urls(html)

    asyncio.ensure_future(consumer(pool))

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(main(loop))
    loop.run_forever()
原文地址:https://www.cnblogs.com/Erick-L/p/8939607.html