条件锁condition与Queue()

在学习之前你应该先了解锁和队列基础

import queue
import time
import random
import threading
import asyncio
import   logging
# from queue import Empty
logging.basicConfig(level = logging.INFO,format = '%(asctime)s  - %(levelname)s -->%(funcName)s  at line %(lineno)d: 
 %(message)s')
log= logging.getLogger()
# queue.qsize 不可以限制,作为应答式condition
q_init = queue.Queue(5)

async def jobs(item):
    time.sleep(random.randint(1,3))
    status = random.randint(0, 1)
    if status == 0:
        return ("success",item)
    else:
        return ("failed",item)

async def do_work(item):
    logging.info("do something %s,time start %s" % (item, time.asctime()))
    a =await jobs(item)
    return a

def async_runner(checker):
    new_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(new_loop)
    loop = asyncio.get_event_loop()
    task = asyncio.ensure_future(do_work(checker))
    loop.run_until_complete(asyncio.wait([task]))
    st = task.result()
    return st

def worker_consumer(q_init,cond):
        while True:
                if q_init.empty():
                    break
                if cond.acquire():
                    if not q_init.empty():
                        cond.notify()
                        checker = q_init.get()
                        st = async_runner(checker)
                        if st[0] in ["success", "failed"]:
                            logging.info("%s task finished status is %s" % (st[1], st[0]))
                            logging.info("报告大王刚摘的%s个蟠桃已经吃完了..." % checker)
                            q_init.task_done()
                    cond.release()

def producer(cond,q_init):
        item = 1
        while True:
            if cond.acquire():
                if q_init.qsize() <5:
                    q_init.put(item)
                    logging.info("孩儿们,刚从蟠桃园摘了%s 个蟠桃给你们尝尝..." % item)
                    cond.notify()
                else:
                    cond.wait()
                cond.release()
                item += 1
                if item >= 11:
                   break
        q_init.join()

if __name__ == '__main__':
        # 消费者>生产者
        thread_num=5
        cond=threading.Condition()
        producer = [threading.Thread(target=producer,args=(cond,q_init)) for i in range(1)]
        consumer = [threading.Thread(target=worker_consumer, args=(q_init,cond)) for i in range(thread_num)]
        for p in producer:
                p.start()
        for k in consumer:
                k.start()
        # block main thread of producer
        for pr  in producer:
            pr.join()
        for m in consumer:
            m.join()

结果:

2019-12-21 22:14:23,853  - INFO -->producer  at line 56: 
 孩儿们,刚从蟠桃园摘了1 个蟠桃给你们尝尝...
2019-12-21 22:14:23,853  - INFO -->producer  at line 56: 
 孩儿们,刚从蟠桃园摘了2 个蟠桃给你们尝尝...
2019-12-21 22:14:23,853  - INFO -->producer  at line 56: 
 孩儿们,刚从蟠桃园摘了3 个蟠桃给你们尝尝...
2019-12-21 22:14:23,853  - INFO -->producer  at line 56: 
 孩儿们,刚从蟠桃园摘了4 个蟠桃给你们尝尝...
2019-12-21 22:14:23,853  - INFO -->producer  at line 56: 
 孩儿们,刚从蟠桃园摘了5 个蟠桃给你们尝尝...
2019-12-21 22:14:23,854  - INFO -->do_work  at line 22: 
 do something 1,time start Sat Dec 21 22:14:23 2019
2019-12-21 22:14:24,856  - INFO -->worker_consumer  at line 45: 
 1 task finished status is failed
2019-12-21 22:14:24,857  - INFO -->worker_consumer  at line 46: 
 报告大王刚摘的1个蟠桃已经吃完了...
2019-12-21 22:14:24,857  - INFO -->producer  at line 56: 
 孩儿们,刚从蟠桃园摘了7 个蟠桃给你们尝尝...
2019-12-21 22:14:24,859  - INFO -->do_work  at line 22: 
 do something 2,time start Sat Dec 21 22:14:24 2019
2019-12-21 22:14:27,860  - INFO -->worker_consumer  at line 45: 
 2 task finished status is failed
2019-12-21 22:14:27,860  - INFO -->worker_consumer  at line 46: 
 报告大王刚摘的2个蟠桃已经吃完了...
2019-12-21 22:14:27,861  - INFO -->do_work  at line 22: 
 do something 3,time start Sat Dec 21 22:14:27 2019
2019-12-21 22:14:29,861  - INFO -->worker_consumer  at line 45: 
 3 task finished status is success
2019-12-21 22:14:29,861  - INFO -->worker_consumer  at line 46: 
 报告大王刚摘的3个蟠桃已经吃完了...
2019-12-21 22:14:29,862  - INFO -->do_work  at line 22: 
 do something 4,time start Sat Dec 21 22:14:29 2019
2019-12-21 22:14:31,863  - INFO -->worker_consumer  at line 45: 
 4 task finished status is success
2019-12-21 22:14:31,863  - INFO -->worker_consumer  at line 46: 
 报告大王刚摘的4个蟠桃已经吃完了...
2019-12-21 22:14:31,864  - INFO -->do_work  at line 22: 
 do something 5,time start Sat Dec 21 22:14:31 2019
2019-12-21 22:14:32,865  - INFO -->worker_consumer  at line 45: 
 5 task finished status is failed
2019-12-21 22:14:32,865  - INFO -->worker_consumer  at line 46: 
 报告大王刚摘的5个蟠桃已经吃完了...
2019-12-21 22:14:32,866  - INFO -->do_work  at line 22: 
 do something 7,time start Sat Dec 21 22:14:32 2019
2019-12-21 22:14:34,867  - INFO -->worker_consumer  at line 45: 
 7 task finished status is failed
2019-12-21 22:14:34,867  - INFO -->worker_consumer  at line 46: 
 报告大王刚摘的7个蟠桃已经吃完了...
2019-12-21 22:14:34,867  - INFO -->producer  at line 56: 
 孩儿们,刚从蟠桃园摘了9 个蟠桃给你们尝尝...
2019-12-21 22:14:34,867  - INFO -->producer  at line 56: 
 孩儿们,刚从蟠桃园摘了10 个蟠桃给你们尝尝...
2019-12-21 22:14:34,868  - INFO -->do_work  at line 22: 
 do something 9,time start Sat Dec 21 22:14:34 2019
2019-12-21 22:14:37,869  - INFO -->worker_consumer  at line 45: 
 9 task finished status is failed
2019-12-21 22:14:37,869  - INFO -->worker_consumer  at line 46: 
 报告大王刚摘的9个蟠桃已经吃完了...
2019-12-21 22:14:37,870  - INFO -->do_work  at line 22: 
 do something 10,time start Sat Dec 21 22:14:37 2019
2019-12-21 22:14:38,871  - INFO -->worker_consumer  at line 45: 
 10 task finished status is failed
2019-12-21 22:14:38,871  - INFO -->worker_consumer  at line 46: 
 报告大王刚摘的10个蟠桃已经吃完了...

  


原文地址:https://www.cnblogs.com/SunshineKimi/p/12076618.html