28 Apr 18 异步+回调 线程queue 线程event 协程(yield,greenlet,gevent)

28 Apr 18
一、异步+回调机制
a、问题引入
问题:
1)任务的返回值不能得到及时的处理,必须等到所有任务都运行完毕才能统一进行处理
2)解析的过程是串行执行的,如果解析一次需要花费2s,解析9次则需要花费18s
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os
import requests
import time
import random
 
def get(url):
    print('%s GET %s' %(os.getpid(),url))
    response=requests.get(url)
    time.sleep(random.randint(1,3))
    if response.status_code == 200:
        return response.text
 
def pasrse(res):
    print('%s 解析结果为:%s' %(os.getpid(),len(res)))
 
if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.python.org',
    ]
    pool=ProcessPoolExecutor(4)
    objs=[]
    for url in urls:
        obj=pool.submit(get,url)
        objs.append(obj)
 
    pool.shutdown(wait=True)
 
    for obj in objs:
        res=obj.result()
        pasrse(res)
 
b、进阶解决方案: 可以解决上述两个问题,但使得获取信息函数set和解析信息函数pasrse耦合到了一起
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests
import os
import time
import random
 
def get(url):
    print('%s GET %s' %(os.getpid(),url))
    response=requests.get(url)
    time.sleep(random.randint(1,3))
 
    if response.status_code == 200:
        pasrse(response.text)
 
def pasrse(res):
    print('%s 解析结果为:%s' %(os.getpid(),len(res)))
 
if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.python.org',
 
    ]
    pool=ProcessPoolExecutor(4)
    for url in urls:
        pool.submit(get,url)
 
c1、终极解决方案: 可以解决上述两个问题,同时使获取信息函数set和解析信息函数pasrse解耦合(进程版)
主进程作为回调的执行者
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests
import os
import time
import random
 
def get(url):
    print('%s GET %s' %(os.getpid(),url))
    response=requests.get(url)
    time.sleep(random.randint(1,3))
 
    if response.status_code == 200:
        # 干解析的活
        return response.text
 
def pasrse(obj):  #后续回调是obj会将自身传给pasrse,所以pasrse必须有且仅有一个参数
    res=obj.result()
    print('%s 解析结果为:%s' %(os.getpid(),len(res)))
 
if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.python.org',
    ]
 
    pool=ProcessPoolExecutor(4)
    for url in urls:
        obj=pool.submit(get,url)
        obj.add_done_callback(pasrse)
 
    print('主进程',os.getpid())
 
c2、终极解决方案: 可以解决上述两个问题,同时使获取信息函数set和解析信息函数pasrse解耦合(线程版)
哪个子进程空闲就由那个子进程作为回调的执行者
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import current_thread
import requests
import os
import time
import random
 
def get(url):
    print('%s GET %s' %(current_thread().name,url))
    response=requests.get(url)
    time.sleep(random.randint(1,3))
 
    if response.status_code == 200:
        # 干解析的活
        return response.text
 
def pasrse(obj):
    res=obj.result()
    print('%s 解析结果为:%s' %(current_thread().name,len(res)))
 
if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.python.org',
    ]
    pool=ThreadPoolExecutor(4)
    for url in urls:
        obj=pool.submit(get,url)
        obj.add_done_callback(pasrse)
 
    print('主线程',current_thread().name)
 
二、线程queue
import queue
 
q=queue.Queue(3) #队列:先进先出
q.put(1)
q.put(2)
q.put(3)
# q.put(4)       #阻塞
print(q.get())
print(q.get())
print(q.get())
 
q=queue.LifoQueue(3) #堆栈:后进先出
q.put('a')
q.put('b')
q.put('c')
print(q.get())
print(q.get())
print(q.get())
 
q=queue.PriorityQueue(3) #优先级队列:可以以小元组的形式往队列里存值,第一个元素代表优先级,数字越小优先级越高
q.put((10,'user1'))
q.put((-3,'user2'))
q.put((-2,'user3'))
print(q.get())
print(q.get())
print(q.get())
 
三、线程event
a、案例一: 等待check重置event内的值后,connect从event.wait()后继续运行
from threading import Event,current_thread,Thread
import time
 
event=Event()   #event内部维护着一个全局变量
 
def check():
    print('%s 正在检测服务是否正常....' %current_thread().name)
    time.sleep(3)
    event.set() #改变event中的全局变量的值
 
def connect():
    print('%s 等待连接...' %current_thread().name)
    event.wait() #等待全局变量的值被重置;如果括号中为1,即只等1秒
    print('%s 开始连接...' % current_thread().name)
 
if __name__ == '__main__':
    t1=Thread(target=connect)
    t2=Thread(target=connect)
    t3=Thread(target=connect)
    c1=Thread(target=check)
    t1.start()
    t2.start()
    t3.start()
c1.start()
 
b、案例二:三次刷尝试后退出
from threading import Event,current_thread,Thread
import time
 
event=Event()
 
def check():
    print('%s 正在检测服务是否正常....' %current_thread().name)
    time.sleep(5)
    event.set()
 
def connect():
    count=1
    while not event.is_set():
        if count ==  4:
            print('尝试的次数过多,请稍后重试')
            return
        print('%s 尝试第%s次连接...' %(current_thread().name,count))
        event.wait(1)
        count+=1
    print('%s 开始连接...' % current_thread().name)
 
if __name__ == '__main__':
    t1=Thread(target=connect)
    t2=Thread(target=connect)
    t3=Thread(target=connect)
    c1=Thread(target=check)
    t1.start()
    t2.start()
    t3.start()
    c1.start()
 
四、协程
1、单线程下实现并发:协程 (为了提高效率;但不是说所有协程都会提升效率)
   并发指的多个任务看起来是同时运行的;并发实现的本质:切换+保存状态
   有效的协程在一定程度‘骗过’了CPU;通过自己内部协调,一遇到IO就切到自己的其他程序中,使得CPU以为这个程序一直在运行,从而使其更有可能处于就绪态或运行态,以更多的占用CPU。
2、实现并发的三种手段:
a)单线程下的并发;由程序自己控制,相对速度快
b)多线程下的并发;由操作系统控制,相对速度较慢
c)多进程下的并发;由操作系统控制,相对速度慢
 
3、基于yield保存状态,实现两个任务直接来回切换,即并发的效果 (但yield不会遇到阻塞自动切程序)
   PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
 
import time
def consumer():
    '''任务1:接收数据,处理数据'''
    while True:
        x=yield
 
def producer():
    '''任务2:生产数据'''
    g=consumer()
    next(g)
    for i in range(10000000):
        g.send(i)
 
start=time.time()
producer() #1.0202116966247559
stop=time.time()
print(stop-start)
 
# 纯计算的任务并发执行
import time
def task1():
    res=1
    for i in range(1000000):
        res+=i
        yield
        time.sleep(10000)  #yield不会自动跳过阻塞
        print('task1')
 
def task2():
    g=task1()
    res=1
    for i in range(1000000):
        res*=i
        next(g)
        print('task2')
 
start=time.time()
task2()
stop=time.time()
print(stop-start)
 
五、单线程下实现遇到IO切换
1、 用greenlet(封装yield,遇到IO不自动切)
from greenlet import greenlet
import time
 
def eat(name):
    print('%s eat 1' %name)
    time.sleep(30)
    g2.switch('alex')  #只在第一次切换时传值
    print('%s eat 2' %name)
    g2.switch()
def play(name):
    print('%s play 1' %name)
    g1.switch()
    print('%s play 2' %name)
 
g1=greenlet(eat)
g2=greenlet(play)
g1.switch('egon')
 
2、 用gevent模块(封装greenlet,不处理的话,遇到自己的IO才主动切)
import gevent
 
def eat(name):
    print('%s eat 1' %name)
    gevent.sleep(5)  #换成time.sleep(5),不会自动切
    print('%s eat 2' %name)
def play(name):
    print('%s play 1' %name)
    gevent.sleep(3)
    print('%s play 2' %name)
 
g1=gevent.spawn(eat,'egon')
g2=gevent.spawn(play,'alex')
 
# gevent.sleep(100)
# g1.join()
# g2.join()
gevent.joinall([g1,g2])
 
3、 用gevent模块(封装greenlet,处理的话,遇到其他IO也主动切)
from gevent import monkey;monkey.patch_all()
from threading import current_thread
import gevent
import time
 
def eat():
    print('%s eat 1' %current_thread().name)
    time.sleep(5)
    print('%s eat 2' %current_thread().name)
def play():
    print('%s play 1' %current_thread().name)
    time.sleep(3)
    print('%s play 2' %current_thread().name)
 
g1=gevent.spawn(eat)
g2=gevent.spawn(play)
 
# gevent.sleep(100)
# g1.join()
# g2.join()
print(current_thread().name)
gevent.joinall([g1,g2])
原文地址:https://www.cnblogs.com/zhangyaqian/p/py20180428.html