博客链接:
http://www.cnblogs.com/linhaifeng/articles/7429894.html
今日概要:
1 生产者消费者模型(补充) 2 GIL(进程与线程的应用场景) ***** 1每一个cpython进程内都有一个GIL 2、GIL导致同一进程内的多个线程同一时间只能有一个运行 3、之所以有GIL,是因为Cpython的内存管理不是线程安全的 4、对于计算密集型用多进程,多IO密集型用多线程 3 死锁现象与递归锁 * #单线程实现并发 4 协程: 单线程下实现并发 5 IO模型
生产者消费者模型(补充版):
摘录的别人解析的,如下是转载地址。建议看如下转载地址,别人写得更容易理解,我这边仅仅是简单的复制了一下,代码也不完整。
https://www.jianshu.com/p/2d3e6a21f6fe
""" 使用Queue组件实现的缺点就是,实现了多少个消费者consumer进程, 就需要在最后往队列中添加多少个None标识,方便生产完毕结束消费者consumer进程。 否则,p.get() 不到任务会阻塞子进程,因为while循环,直到队列q中有新的任务加进来,才会再次执行。 而我们的生产者只能生产这么多东西,所以相当于程序卡死。 使用JoinableQueue组件,是因为JoinableQueue中有两个方法:task_done()和join() 。 首先说join()和Process中的join()的效果类似,都是阻塞当前进程,防止当前进程结束。 但是JoinableQueue的join()是和task_down()配合使用的。 Process中的join()是等到子进程中的代码执行完毕,就会执行主进程join()下面的代码。 而JoinableQueue中的join()是等到队列中的任务数量为0的时候才会执行q.join()下面的代码, 否则会一直阻塞。task_down()方法是每获取一次队列中的任务,就需要执行一次。 直到队列中的任务数为0的时候,就会执行JoinableQueue的join()后面的方法了。 所以生产者生产完所有的数据后,会一直阻塞着。不让p1和p2进程结束。等到消费者get()一次数据, 就会执行一次task_down()方法,从而队列中的任务数量减1,当数量为0后, 执行JoinableQueue的join()后面代码,从而p1和p2进程结束。 因为p1和p2添加了join()方法,所以当子进程中的consumer方法执行完后,才会往下执行。 从而主进程结束。因为这里把消费者进程c1和c2 设置成了守护进程,主进程结束的同时, c1和c2 进程也会随之结束,进程都结束了。所以消费者consumer方法也会结束。 """ from multiprocessing import Process, JoinableQueue import os def producer(food, q): for i in range(3): res = '%s%s' % (food, i) q.put(res) print('%s make %s' % (os.getpid(), res)) q.join() # join()方法跟下面的消费者函数里面的task_done()方法一起使用 def customer(q): while True: res = q.get() print('%s eat %s' % (os.getpid(), res)) q.task_done() # if __name__ == '__main__': # q = JoinableQueue() # p1 = Process(target=producer, args=('dumpling', q,)) # p2 = Process(target=producer, args=('rice', q,)) # p3 = Process(target=producer, args=('dog_food', q,)) # c1 = Process(target=customer, args=(q,)) # c2 = Process(target=customer, args=(q,)) # # c1.daemon = True # 设置消费者为守护进程 # c2.daemon = True # 设置消费者为守护进程 # # p1.start() # p2.start() # p3.start() # c1.start() # c2.start() # # p1.join() # p2.join() # p3.join() # # producer has done, and 'q.join()' has already got all the data # # print('host', os.getpid())
gil:
# from threading import Thread,Lock # import time # n=100 # def task(): # global n # mutext.acquire() # temp=n # time.sleep(0.1) # n=temp-1 # mutext.release() # if __name__ == '__main__': # t_l=[] # mutext=Lock() # start=time.time() # for i in range(3): # t=Thread(target=task) # t_l.append(t) # t.start() # # for t in t_l: # t.join() # print(time.time()-start) # print(n) # #计算密集型:多进程效率高 from multiprocessing import Process # from threading import Thread # import os,time # def work(): # res=0 # for i in range(10000000): # res*=i # # if __name__ == '__main__': # l=[] # print(os.cpu_count()) #本机为4核 # start=time.time() # for i in range(4): # p=Process(target=work) #耗时run time is 0.8030457496643066 # # p=Thread(target=work) #耗时run time is 2.134121894836426 # l.append(p) # p.start() # # for p in l: # p.join() # # stop=time.time() # # # print('run time is %s' %(stop-start)) from threading import Thread import os,time def work(): time.sleep(2) if __name__ == '__main__': l=[] start=time.time() for i in range(100): # p=Process(target=work) #耗时run time is 4.881279230117798 p=Thread(target=work) #耗时run time is 2.011115074157715 l.append(p) p.start() for p in l: p.join() stop=time.time() # print('run time is %s' %(stop-start))
在计算密集型的程序中使用多进程,
在io密集型的程序中使用多线程
理解:
# gil锁是一个概念性的知识点,它没有类似于class,类; def,函数;等等类似的具有标示性的东西,不具备可视性,但是是一个很重要的知识点需要理解,
# 需要明白它底层是如何执行的,它是已经被封装好的,明白后就能知道我们所学的进程并发和线程并发是在什么情况下使用的.
# 先说结论,计算密集型程序中使用多进程,效率更高;而在io密集型的程序中,使用多线程效率更高(单线程的并发效率更高,即协程.)
# 在cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程进行,无法利用多核优势
# 当我们在执行程序的时候,需要在Python解释器里面去运行程序,就类似于我们学计算机发展史的时候,那个时候工作人员需要去排队抢占计算机硬件资源,
# 轮流拿着自己的代码和写好的的程序去等着,就像现在我们在公司里面等着排队使用打印机的情况类似,那个时候的计算机很罕见,大家需要运行程序的时候
# 要等到轮到自己的时候才可以去把自己的程序放进去执行,那么在我们的系统内部也是这样的,我们的所有python代码都是我们写好后,在pycharm里面放着,
# 真正实现的底层是我们的python解释器在执行,但是解释器就类似于以前的老式计算机,它只能一个一个程序的运行,不能多个同时运行,
# 那么就涉及到先后顺序的问题,这里没有排队的概念,各个程序之间是竞争关系,谁抢到的了资源谁就先运行,要引用我们前几天学过的那个概念,并发的概念,
# 抢占cpu资源,CPU要一直运行下去,它不会一直让你一个程序去占着它,即便你抢到了它的使用权限,即便你的程序是一直在计算中,你的占用时间过长的话,
# 它一样会把你释放掉,把你的gil锁也给释放掉,这里的gil锁是一个什么样的概念呢,它本质就是互斥锁,即都是将并发运行变成串行,
# 以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全.所以我们的gil锁是为了保证数据的安全,这个是很重要的一点,
# 因为你并发的执行效率是很高的,一旦变成了串行就会大大降低,如果不是为了保证数据的安全性,是没有必要去牺牲执行效率的.这个是大前提.
# 分析:
# 我们有四个任务需要处理,处理方式肯定是要玩出并发的效果,解决方案可以是:
# 方案一:开启四个进程
# 方案二:一个进程下,开启四个线程
# 单核情况下,分析结果:
# 如果四个任务是计算密集型,没有多核来并行计算,方案一徒增了创建进程的开销,方案二胜
# 如果四个任务是I / O密集型,方案一创建进程的开销大,且进程的切换速度远不如线程,方案二胜
# 多核情况下,分析结果:
# 如果四个任务是计算密集型,多核意味着并行计算,在python中一个进程中同一时刻只有一个线程执行用不上多核,方案一胜
# 如果四个任务是I / O密集型,再多的核也解决不了I / O问题,方案二胜
# 结论:现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至不如串行(没有大量切换),
# 但是,对于IO密集型的任务效率还是有显著提升的。
# 应用:
#
# 多线程用于IO密集型,如socket,爬虫,web
# 多进程用于计算密集型,如金融分析
死锁现象与递归锁:
from threading import Thread,Lock,RLock import time # mutexA=mutexB=Lock() mutexA=mutexB=RLock() class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('%s 拿到A锁' %self.name) mutexB.acquire() print('%s 拿到B锁' %self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('%s 拿到B锁' % self.name) time.sleep(0.1) mutexA.acquire() print('%s 拿到A锁' % self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t=MyThread() t.start()
协程:
# import time # def consumer(res): # '''任务1:接收数据,处理数据''' # pass # # def producer(): # '''任务2:生产数据''' # res=[] # for i in range(100000000): # res.append(i) # return res # # start=time.time() # #串行执行 # res=producer() # consumer(res) # stop=time.time() # print(stop-start) # import time def consumer(): '''任务1:接收数据,处理数据''' while True: x=yield print('consumer') def producer(): '''任务2:生产数据''' g=consumer() next(g) for i in range(100000000): print('producer') time.sleep(6) g.send(i) start=time.time() #基于yield保存状态,实现两个任务直接来回切换,即并发的效果 #PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的. producer() stop=time.time() print(stop-start) #1.2250702381134033
#pip3 install gevent #1、切换+保存状态 #2 检测IO,实现遇到IO切换 from gevent import monkey;monkey.patch_all() import gevent import time def eat(name): print('%s eat 1' %name) time.sleep(2) print('%s eat 2' %name) def play(name): print('%s play 1' %name) time.sleep(3) print('%s play 2' %name) g1=gevent.spawn(eat,'alex') g2=gevent.spawn(play,'egon') g1.join() g2.join()