python asyncio 并发 队列(一)

import asyncio, threading
import aiohttp


class Tasks():

    def __init__(self, max_async, loop=None):

        self.loop = loop or asyncio.get_event_loop()
        self._queue = asyncio.Queue(maxsize=100, loop=self.loop)
        self.lock = threading.Lock()
        self.max_async = max_async
        self.work_list = []

    async def geturldata(self, url):
        print(url)
        try:
            async with aiohttp.ClientSession(loop=self.loop, conn_timeout=1) as s:
                resp = await s.get(url)
                result = await resp.read()
                print(result.decode('utf-8'))
        except Exception as e:
            print(e)

    async def run(self):
        #ensure_future将协程转换成任务,并投递到事件循环
        
        works = [asyncio.ensure_future(self.work(), loop=self.loop) for _ in range(self.max_async)]
        self.work_list.extend(works)
        await self._queue.join()
        print('all tasks done')
        for w in works:
            w.cancel() #任务并不会立刻变为取消的状态,而是要等到下次的事件循环
    async def work(self):
        try:
            while True:
                url = await self._queue.get()

                await self.geturldata(url)
                self._queue.task_done()
        except asyncio.CancelledError: pass
    def addtask(self, item):
        with self.lock:
            self._queue.put_nowait(item)

    @property
    def count(self):
        return self._queue.qsize()
    def printstatus(self):
        for w in self.work_list:
            print(w.done())

t = Tasks(max_async=10)
url = 'https://www.baidu.com'
for u in range(100):
    t.addtask(url)
print(t.count)
loop = asyncio.get_event_loop()
loop.run_until_complete(t.run())
t.printstatus()
loop.close()
原文地址:https://www.cnblogs.com/alplf123/p/8549063.html