asyncio系列之Queue实现

  1 import types
  2 import select
  3 import time
  4 import socket
  5 import functools
  6 import collections
  7 
  8 
  9 class Future:
 10     def __init__(self, *, loop=None):
 11         self._result = None
 12         self._callbacks = []
 13         self._loop = loop
 14 
 15     def set_result(self, result):
 16         self._result = result
 17         callbacks = self._callbacks[:]
 18         self._callbacks = []
 19         for callback in callbacks:
 20             self._loop._ready.append(callback)
 21 
 22     def add_callback(self, callback):
 23         self._callbacks.append(callback)
 24 
 25     def __iter__(self):
 26         print("挂起在yield处")
 27         yield self
 28         print("恢复执行")
 29         return "future"
 30 
 31     __await__ = __iter__
 32 
 33 
 34 class Task:
 35     def __init__(self, cor, *, loop=None):
 36         self.cor = cor
 37         self._loop = loop
 38 
 39     def _step(self):
 40         cor = self.cor
 41         try:
 42             result = cor.send(None)
 43         except StopIteration as e:
 44             self._loop._task_count -= 1
 45             if self._loop._task_count == 0:
 46                 self._loop.close()
 47         except Exception as e:
 48             pass
 49         else:
 50             if isinstance(result, Future):
 51                 result.add_callback(self._wakeup)
 52 
 53     def _wakeup(self):
 54         self._step()
 55 
 56 
 57 class Loop:
 58     def __init__(self):
 59         self._stop = False
 60         self._ready = []
 61         self._scheduled = []
 62         self._time = lambda: time.time()
 63         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 64         sock.setblocking(False)
 65         self._select = functools.partial(select.select, [sock], [], [])
 66         self._task_count = 0
 67 
 68     def create_task(self, cor):
 69         task = Task(cor, loop=self)
 70         self._ready.append(task._step)
 71         self._task_count += 1
 72         return task
 73 
 74     def call_later(self, delay, callback, *args):
 75         callback._when = delay
 76         self._scheduled.append((callback, *args))
 77 
 78     def run_until_complete(self, task):
 79         assert isinstance(task, Task)
 80         timeout = None
 81         while not self._stop:
 82             if self._ready:
 83                 timeout = 0
 84             if self._scheduled:
 85                 callback, *args = self._scheduled.pop()
 86                 timeout = callback._when
 87                 self._ready.append(functools.partial(callback, *args))
 88 
 89                 self._select(timeout)
 90             n = len(self._ready)
 91             for i in range(n):
 92                 step = self._ready.pop()
 93                 step()
 94 
 95     def close(self):
 96         self._stop = True
 97 
 98 
 99 @types.coroutine
100 def _sleep():
101     yield
102 
103 
104 async def sleep(s, result=None):
105     if s <= 0:
106         await _sleep()
107         return result
108     else:
109         future = Future(loop=loop)
110         future._loop.call_later(s, unless_cancelled, future)
111         await future
112         return result
113 
114 
115 def unless_cancelled(future):
116     future.set_result(None)
117 
118 
119 class Queue:
120     def __init__(self, maxsize=0, *, loop=None):
121         self._loop = loop
122         self._maxsize = maxsize
123 
124         # Futures.
125         self._getters = collections.deque()
126         # Futures.
127         self._putters = collections.deque()
128         self._init(maxsize)
129 
130     def _init(self, maxsize):
131         self._queue = collections.deque()
132 
133     def _get(self):
134         return self._queue.popleft()
135 
136     def _put(self, item):
137         self._queue.append(item)
138 
139     def _wakeup_next(self, waiters):
140         while waiters:
141             waiter = waiters.popleft()
142             try:
143                 waiter.set_result(None)
144             except Exception as e:
145                 pass
146             break
147 
148     def qsize(self):
149         return len(self._queue)
150 
151     @property
152     def maxsize(self):
153         return self._maxsize
154 
155     def empty(self):
156         return not self._queue
157 
158     def full(self):
159         if self._maxsize <= 0:
160             return False
161         else:
162             return self.qsize() >= self._maxsize
163 
164     @types.coroutine
165     def put(self, item):
166         while self.full():
167             putter = Future(loop=self._loop)
168             self._putters.append(putter)
169             try:
170                 yield from putter
171             except:
172                 if not self.full():
173                     self._wakeup_next(self._putters)
174                 raise
175         return self.put_nowait(item)
176 
177     def put_nowait(self, item):
178         if self.full():
179             raise
180         self._put(item)
181         self._wakeup_next(self._getters)
182 
183     @types.coroutine
184     def get(self):
185         while self.empty():
186             getter = Future(loop=self._loop)
187             self._getters.append(getter)
188             try:
189                 yield from getter
190             except:
191                 try:
192                     self._getters.remove(getter)
193                 except ValueError:
194                     pass
195 
196                 if not self.empty():
197                     self._wakeup_next(self._getters)
198                 raise
199         return self.get_nowait()
200 
201     def get_nowait(self):
202         if self.empty():
203             raise
204         item = self._get()
205         self._wakeup_next(self._putters)
206         return item
207 
208 
209 async def foo(queue):
210     print(f'enter foo at {time.strftime("%Y-%m-%d %H:%M:%S")}')
211     item = await queue.get()
212     print('foo get ', item)
213     await sleep(2)
214     item = await queue.get()
215     print('foo get ', item)
216     print(f'exit foo at {time.strftime("%Y-%m-%d %H:%M:%S")}')
217 
218 
219 async def goo(queue):
220     print(f'enter goo at {time.strftime("%Y-%m-%d %H:%M:%S")}')
221     print('goo put a')
222     await queue.put('a')
223     # await sleep(2)
224     print('goo put b')
225     await queue.put('b')
226     print(f'exit goo at {time.strftime("%Y-%m-%d %H:%M:%S")}')
227 
228 if __name__ == '__main__':
229     loop = Loop()
230     queue = Queue(maxsize=1, loop=loop)
231     f = foo(queue)
232     g = goo(queue)
233     task1 = loop.create_task(f)
234     task2 = loop.create_task(g)
235     loop.run_until_complete(task1)
原文地址:https://www.cnblogs.com/yejing-snake/p/13560958.html