Python基础(十四)-并发编程

一、操作系统

参考文档:https://www.cnblogs.com/yuanchenqi/articles/6248025.html

二、进程与线程

2.1、进程简介

进程:一个程序在一个数据集上的一次动态执行过程,一般由程序、数据集、进程控制块三部分组成

  • 程序:用来描述进程要完成哪些功能以及如何完成
  • 数据集:程序在执行过程中所需要使用的资源
  • 进程控制块:记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系 统感知进程存在的唯一标志

2.2、线程

线程:轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序 计数器、寄存器集合和堆栈共同组成。线程的引入减小了程序并发执行时的开销,提高了操作系统的并发 性能。线程没有自己的系统资源。

2.3、进程与线程区别

1)一个程序至少有一个进程,一个进程至少有一个线程.(进程可以理解成线程的容器)

2)进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率。

3)线程在执行过程中与进程还是有区别的。每个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口。但是线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。

4)进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位.

5)线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈)但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源.

  6)一个线程可以创建和撤销另一个线程;同一个进程中的多个线程之间可以并发执行.

三、python GIL

In CPython, the global interpreter lock(全局解释器锁), or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

核心意思就是,无论你启多少个线程,你有多少个cpu, Python在执行的时候会在同一时刻只允许一个线程运行

四、python线程与threading模块

4.1、线程调用方式

1)直接调用方式

import threading
import time

def func(num):   #定义每个线程运行的函数
    print("running on num: %s" %num)
    time.sleep(3)

if __name__ == '__main__':
    t1=threading.Thread(target=func,args=(1,))  #生成一个线程实例,注意target=只需要写函数名即可
    t2=threading.Thread(target=func,args=(2,))

    t1.start()  #启动线程
    t2.start()

    print(t1.getName())  #获取线程名称 ==>Thread-1
    print(t2.getName())  #获取线程名称 ==>Thread-2

2)继承式调用

import threading
import time

class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)  #父类初始化
        self.num=num
    def run(self):  #定义每个线程要运行的函数,名称必须是run
        print("running on num: %s" %self.num)
        time.sleep(3)

if __name__ == '__main__':
    t1=MyThread(1)
    t2=MyThread(2)
    t1.start()
    t2.start()
    print("ending..........")

4.2、threading.Thread实例方法

1)join():在子线程完成运行之前,这个子线程的父线程将一直被阻塞

2)setDaemon(True):将线程声明为守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程就分兵两路分别运行,那么当主线程完成想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法

import threading
from time import ctime,sleep
import time

def ListenMusic(name):

        print ("Begin listening to %s. %s" %(name,ctime()))
        sleep(3)
        print("end listening %s"%ctime())

def RecordBlog(title):

        print ("Begin recording the %s! %s" %(title,ctime()))
        sleep(5)
        print('end recording %s'%ctime())

threads = []   #定义线程列表
t1 = threading.Thread(target=ListenMusic,args=('AA',))
t2 = threading.Thread(target=RecordBlog,args=('BB',))
threads.append(t1)
threads.append(t2)

if __name__ == '__main__':

    for t in threads:
        t.setDaemon(True) #注意:一定在start之前设置
        t.start()
        # t.join()
    t1.join()
    # t1.setDaemon(True)

    print ("all over %s" %ctime())

3)其他方法

run():       #线程被cpu调度后自动执行线程对象的run方法
start():     #启动线程活动。
isAlive():   #返回线程是否活动的。
getName():   #返回线程名。
setName():   #设置线程名。

#threading模块提供的一些方法:
threading.currentThread():   #返回当前的线程变量。
threading.enumerate():       #返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.activeCount():     #返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

五、同步锁

5.1、问题引出

import time
import threading

def addNum():
    global num #在每个线程中都获取这个全局变量
    # num-=1     #final num: 0

    temp=num
    #print('--get num:',num )
    time.sleep(0.01)   #相当于io操作,存在线程切换
    num =temp-1       #对此公共变量进行-1操作

num = 100  #设定一个共享变量
thread_list = []
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有线程执行完毕
    t.join()

print('final num:', num )

#结果
#当time.sleep(0.1)  ==>99
#当time.sleep(0.01)  ==>90多随机出现

#原因:当第一个线程拿到num=100时,time.sleep时,线程切换,第二个线程拿到还是100,他不知道第一个线程拿到的num是几
#同理当第二个线程time.sleep,线程切换,第三个拿到还是100

5.2、问题解决

import time
import threading

def addNum():
    global num #在每个线程中都获取这个全局变量
    # num-=1     #final num: 0

    R.acquire()       #获得同步锁,此时第二个线程不能执行
    temp=num
    time.sleep(0.01)   #相当于io操作,存在线程切换
    num =temp-1       #对此公共变量进行-1操作
    R.release()       #释放锁,此时第二个线程可以执行了

num = 100  #设定一个共享变量
thread_list = []
R=threading.Lock()   #定义同步锁
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有线程执行完毕
    t.join()

print('final num:', num )   #0

六、线程死锁

在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都正在使用,所有这两个线程在无外力作用下将一直等待下去

import threading,time

class myThread(threading.Thread):
    def doA(self):
        lockA.acquire()    #获得锁A
        print(self.name,"gotlockA",time.ctime())
        time.sleep(3)
        lockB.acquire()
        print(self.name,"gotlockB",time.ctime())
        lockB.release()
        lockA.release()    #释放锁A

    def doB(self):
        lockB.acquire()
        print(self.name,"gotlockB",time.ctime())
        time.sleep(2)
        lockA.acquire()
        print(self.name,"gotlockA",time.ctime())   #第一个线程得到了锁B,此时需要获取锁A,而此时锁A已经被第二个线程获取了,造成了死锁
        lockA.release()
        lockB.release()

    def run(self):
        self.doA()
        self.doB()
if __name__=="__main__":

    lockA=threading.Lock()
    lockB=threading.Lock()
    threads=[]
    for i in range(5):
        threads.append(myThread())
    for t in threads:
        t.start()
    for t in threads:
        t.join()

#结果
Thread-1 gotlockA Fri Sep 13 21:19:45 2019
Thread-1 gotlockB Fri Sep 13 21:19:48 2019
Thread-1 gotlockB Fri Sep 13 21:19:48 2019
Thread-2 gotlockA Fri Sep 13 21:19:48 2019

七、递归锁

可以使用递归锁来解决死锁问题:lock=threading.RLock()

RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。

import threading,time

class myThread(threading.Thread):
    def doA(self):
        lock.acquire()    #获得递归锁counter=1
        print(self.name,"gotlock",time.ctime())
        time.sleep(3)
        lock.acquire()    #获得递归锁counter=2
        print(self.name,"gotlock",time.ctime())
        lock.release()    #释放递归锁counter=1
        lock.release()    #释放递归锁counter=0,此时第二个线程可以抢夺锁了

    def doB(self):
        lock.acquire()
        print(self.name,"gotlock",time.ctime())
        time.sleep(2)
        lock.acquire()
        print(self.name,"gotlock",time.ctime())
        lock.release()
        lock.release()

    def run(self):
        self.doA()
        # time.sleep(0.1)
        self.doB()
if __name__=="__main__":

    # lockA=threading.Lock()
    # lockB=threading.Lock()
    lock=threading.RLock()
    threads=[]
    for i in range(5):
        threads.append(myThread())
    for t in threads:
        t.start()
    for t in threads:
        t.join()

#结果
Thread-1 gotlock Fri Sep 13 21:33:55 2019
Thread-1 gotlock Fri Sep 13 21:33:58 2019
Thread-1 gotlock Fri Sep 13 21:33:58 2019
Thread-1 gotlock Fri Sep 13 21:34:00 2019
Thread-3 gotlock Fri Sep 13 21:34:00 2019
Thread-3 gotlock Fri Sep 13 21:34:03 2019
Thread-3 gotlock Fri Sep 13 21:34:03 2019
Thread-3 gotlock Fri Sep 13 21:34:05 2019
Thread-5 gotlock Fri Sep 13 21:34:05 2019
Thread-5 gotlock Fri Sep 13 21:34:08 2019
Thread-5 gotlock Fri Sep 13 21:34:08 2019
Thread-5 gotlock Fri Sep 13 21:34:10 2019
Thread-4 gotlock Fri Sep 13 21:34:10 2019
Thread-4 gotlock Fri Sep 13 21:34:13 2019
Thread-4 gotlock Fri Sep 13 21:34:13 2019
Thread-4 gotlock Fri Sep 13 21:34:15 2019
Thread-2 gotlock Fri Sep 13 21:34:15 2019
Thread-2 gotlock Fri Sep 13 21:34:18 2019
Thread-2 gotlock Fri Sep 13 21:34:18 2019
Thread-2 gotlock Fri Sep 13 21:34:20 2019

八、同步条件(Event)

8.1、event方法

event=threading.Event()   #创建event对象
event.wait()   #线程需等待flag被设置
event.set()   #设置flag
event.clear()  #清除flag

An event is a simple synchronization object;the event represents an internal flag,and threads can wait for the flag to be set, or set or clear the flag themselves.
If the flag is set, the wait method doesn’t do anything. #一旦flag被设置,wait方法不会做任何事,相当于pass
If the flag is cleared, wait will block until it becomes set again. #一旦flag被清除,wait方法会被阻塞,知道flag被设置
#Any number of threads may wait for the same event. #任何数量的线程会等待相同的event

8.2、示例

import threading,time
class Boss(threading.Thread):
    def run(self):
        print("BOSS:今晚大家都要加班到22:00。")
        print(event.isSet())
        event.set()   #设置flag,一旦设置,wait()阻塞解除
        time.sleep(5)
        print("BOSS:<22:00>可以下班了。")
        print(event.isSet())
        event.set()
class Worker(threading.Thread):
    def run(self):
        event.wait()    #此时flag还未被设置,被阻塞
        print("Worker:哎……命苦啊!")
        time.sleep(1)
        event.clear()   #清除flag
        event.wait()
        print("Worker:OhYeah!")
if __name__=="__main__":
    event=threading.Event()   #定义event对象
    threads=[]
    for i in range(5):
        threads.append(Worker())
    threads.append(Boss())
    for t in threads:
        t.start()
    for t in threads:
        t.join()

#++++++++++++++++++++++++++++++++++++++++++++++++++
BOSS:今晚大家都要加班到22:00。
False
Worker:哎……命苦啊!
Worker:哎……命苦啊!
Worker:哎……命苦啊!
Worker:哎……命苦啊!
Worker:哎……命苦啊!
BOSS:<22:00>可以下班了。
False
Worker:OhYeah!
Worker:OhYeah!
Worker:OhYeah!
Worker:OhYeah!
Worker:OhYeah!

九、信号量(Semaphore)

1)信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1

2)计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)

3)BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。

import threading,time
class myThread(threading.Thread):
    def run(self):
        if semaphore.acquire():
            print(self.name)
            time.sleep(5)
            semaphore.release()
if __name__=="__main__":
    semaphore=threading.Semaphore(5)  #相当于并发量为5
    thrs=[]
    for i in range(100):
        thrs.append(myThread())
    for t in thrs:
        t.start()

十、多线程利器-队列

10.1、列表:不安全数据结构

import threading,time

li=[1,2,3,4,5]

def pri():
    while li:
        a=li[-1]  #获取列表最后一个元素
        print(a)
        time.sleep(1)
        try:
            li.remove(a)
        except Exception as e:
            print('----',a,e)

t1=threading.Thread(target=pri,args=())
t1.start()
t2=threading.Thread(target=pri,args=())
t2.start()

#结果
5
5
4
---- 5 list.remove(x): x not in list
4
...

10.2、队列:安全的数据结构

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

10.3、队列方法

import queue      #  线程 队列
q=queue.Queue(3)  # FIFO模式(先进先出)

q.put(12)
q.put("hello")
q.put({"name":"AA"})
# q.put_nowait(56)#  q.put(block=False)

print(q.qsize())
print(q.empty())
print(q.full())
# q.put(34,False)  #报错queue.Full

while 1:
    data=q.get()
    print(data)
    print("----------")

#++++++++++++++++++++++++++++++++++++++++++++++
#此包中的常用方法(q = queue.Queue()):
q.qsize()    #返回队列的大小
q.empty()    #如果队列为空,返回True,反之False
q.full()     #如果队列满了,返回True,反之False
q.full       #与 maxsize 大小对应
q.get([block[, timeout]])   #获取队列,timeout等待时间
q.get_nowait()              #相当q.get(False)
q.put(item)                 #写入队列,timeout等待时间
q.put_nowait(item)          #相当q.put(item, False)
q.task_done()               #在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join()                    #实际上意味着等到队列为空,再执行别的操作

10.4、队列模式

Python Queue模块有三种队列及构造函数:

  1. Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize)
  2. LIFO类似于堆,即先进后出。 class queue.LifoQueue(maxsize)
  3. 还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize)
import queue

#先进后出
# q=queue.LifoQueue()
# q.put(34)
# q.put(56)
# q.put(12)

#优先级
q=queue.PriorityQueue()   #数值越小,优先级越高,越先出来
q.put([5,100])
q.put([7,200])
q.put([3,"hello"])
q.put([4,{"name":"alex"}])

while 1:
  data=q.get()
  print(data)

十一、生产者消费者模型

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

import time,random
import queue,threading

q = queue.Queue()

def Producer(name):
  count = 0
  while count <10:
    print("making........")
    time.sleep(random.randrange(3))
    q.put(count)
    print('Producer %s has produced %s baozi..' %(name, count))
    count +=1
    #q.task_done()
    #q.join()
    print("ok......")
def Consumer(name):
  count = 0
  while count <10:
    time.sleep(random.randrange(4))
    if not q.empty():   #判断队列是否为空
        data = q.get()
        #q.task_done()
        #q.join()
        print(data)
        print('33[32;1mConsumer %s has eat %s baozi...33[0m' %(name, data))
    else:
        print("-----no baozi anymore----")
    count +=1

p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
c2 = threading.Thread(target=Consumer, args=('C',))
c3 = threading.Thread(target=Consumer, args=('D',))
p1.start()
c1.start()
c2.start()
c3.start()

十二、多进程模块multiprocessing

由于GIL的存在,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。

multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

12.1、进程调用

1)直接调用

from multiprocessing import Process
import time

def func(name):
    time.sleep(1)
    print("hello",name,time.ctime())

if __name__ == '__main__':
    p_list=[]
    for i in range(3):
        p=Process(target=func,args=("AA",))
        p_list.append(p)
        p.start()
    for i in p_list:
        p.join()

    print("ending......")

2)继承式调用

from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self):
        super(MyProcess,self).__init__()
        # self.name=name
    def run(self):
        time.sleep(1)
        print("hello",self.name,time.ctime())

if __name__ == '__main__':
    p_list=[]
    for i in range(3):
        p=MyProcess()
        p.start()
        p_list.append(p)
    for i in p_list:
        p.join()
    print("ending.....")


hello MyProcess-1 Sat Sep 14 12:46:46 2019
hello MyProcess-2 Sat Sep 14 12:46:46 2019
hello MyProcess-3 Sat Sep 14 12:46:46 2019
ending.....

进程号相关:

from multiprocessing import Process
import os
import time

def info(title):
    print("title:", title)
    print('parent process:', os.getppid())   #父进程pid  ==>pycharm
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main process line')
    time.sleep(1)
    print("------------------")
    p = Process(target=info, args=('yuan',))
    p.start()
    p.join()


title: main process line
parent process: 9488
process id: 7160
------------------
title: yuan
parent process: 7160
process id: 10708

12.2、Process类

1)构造方法

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 线程组,目前还没有实现,库引用中提示必须是None;
  target: 要执行的方法;
  name: 进程名;
  args/kwargs: 要传入方法的参数。

2)实例方法

  is_alive():返回进程是否在运行。

  join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

  start():进程准备就绪,等待CPU调度

  run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

  terminate():不管任务是否完成,立即停止工作进程

3)属性

  daemon:和线程的setDeamon功能一样

  name:进程名字。

  pid:进程号。

import time
from multiprocessing import Process

class MyProcess(Process):
    def __init__(self,num):
        super(MyProcess,self).__init__()
        self.num=num
    def run(self):
        time.sleep(1)
        print(self.pid)
        print(self.is_alive())
        print(self.num)
        time.sleep(1)

if __name__ == '__main__':
    p_list=[]
    for i in range(10):
        p = MyProcess(i)
        #p.daemon=True
        p_list.append(p)

    for p in p_list:
        p.start()
    # for p in p_list:
    #     p.join()

    print('main process end')

十三、进程间通信

13.1、进程队列Queue

from multiprocessing import Process, Queue
import queue

def f(q,n):
    #q.put([123, 456, 'hello'])
    q.put(n*n+1)
    print("son process",id(q))

if __name__ == '__main__':
    q = Queue()
    print("main process",id(q))

    for i in range(3):
        p = Process(target=f, args=(q,i))   #子进程向队列中村方法数据,主进程取
        p.start()

    print(q.get())
    print(q.get())
    print(q.get())

13.2、管道

from multiprocessing import Process, Pipe
def f(conn):
    conn.send([12, {"name":"yuan"}, 'hello'])
    response=conn.recv()
    print("response",response)
    conn.close()
    print("q_ID2:",id(conn))

if __name__ == '__main__':

    parent_conn, child_conn = Pipe() #双向管道

    print("q_ID1:",id(child_conn))
    p = Process(target=f, args=(child_conn,))  #实例化出一个进程,将child_conn传递给子进程
    p.start()

    print(parent_conn.recv())  
    parent_conn.send("儿子你好!")
    p.join()

#结果
q_ID1: 2387686086080
[12, {'name': 'yuan'}, 'hello']
response 儿子你好!
q_ID2: 1829247810584

13.3、Managers

Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据。

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example:

from multiprocessing import Process, Manager
def f(d, l,n):
    d[n] = '1'    #{0:"1"}
    d['2'] = 2    #{0:"1","2":2}
    l.append(n)    #[0,1,2,3,4,   0,1,2,3,4,5,6,7,8,9]
    #print(l)

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()   #{}
        l = manager.list(range(5))  #[0,1,2,3,4]

        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d,l,i))  #将字典,列表传递给子进程
            p.start()
            p_list.append(p)

        for res in p_list:
            res.join()

        print(d)
        print(l)

#++++++++++++++++++++++++++++++++++++++++++++++++++++
{0: '1', 1: '1', 2: '1', 3: '1', 4: '1', 5: '1', 6: '1', '2': 2, 8: '1', 9: '1', 7: '1'}
[0, 1, 2, 3, 4, 1, 0, 2, 3, 4, 5, 7, 8, 6, 9]

13.4、进程同步

多个进程共用屏幕,容易发生抢占现象

image

from multiprocessing import Process, Lock
import time

def f(l, i):
        #串行
        l.acquire()
        time.sleep(1)
        print('hello world %s' % i)
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

13.5、进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

  • apply
  • apply_async
from  multiprocessing import Process,Pool
import time,os

def Foo(i):
    time.sleep(1)
    print(i)
    print("son",os.getpid())
    return "HELLO %s"%i

def Bar(arg):
    print(arg)
    print("hello")
    print("Bar:",os.getpid())

if __name__ == '__main__':

    pool = Pool(5)
    print("main pid",os.getpid())
    for i in range(100):
        #pool.apply(func=Foo, args=(i,))  #同步接口
        # pool.apply_async(func=Foo, args=(i,))  #异步接口

        #回调函数:  就是某个动作或者函数执行成功后再去执行的函数 ==>Foo执行完去执行Bar
        pool.apply_async(func=Foo, args=(i,),callback=Bar)

    pool.close()
    pool.join()         # join与close调用顺序是固定的

    print('end')

十四、协程

协程,又称微线程,纤程。英文名Coroutine。

优点1: 协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。

优点2: 不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

因为协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。

14.1、yield简单实现

import time
import queue

def consumer(name):
    print("--->ready to eat baozi...")
    while True:
        new_baozi = yield
        print("[%s] is eating baozi %s" % (name,new_baozi))
        #time.sleep(1)

def producer():
    r = con.__next__()  #在yield处hang住
    r = con2.__next__()
    n = 0
    while 1:
        time.sleep(1)
        print("33[32;1m[producer]33[0m is making baozi %s and %s" %(n,n+1) )
        con.send(n)      #将值传递给yield
        con2.send(n+1)
        n +=2

if __name__ == '__main__':
    con = consumer("c1")   #创建生成器
    con2 = consumer("c2")
    producer()

14.2、Greenlet

greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator

报错:ImportError: No module named 'greenlet'

from greenlet import greenlet

def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()
def test2():
    print(56)
    gr1.switch()
    print(78)
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

14.3、Gevent

import gevent
import requests,time
start=time.time()
def f(url):
    print('GET: %s' % url)
    resp =requests.get(url)
    data = resp.text
    print('%d bytes received from %s.' % (len(data), url))

# f('https://www.python.org/')
# f('https://www.yahoo.com/')
# f('https://www.baidu.com/')
# f('https://www.sina.com.cn/')
# f("http://www.xiaohuar.com/hua/")

gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://www.baidu.com/'),
        gevent.spawn(f, 'https://www.sina.com.cn/'),
        gevent.spawn(f, 'http://www.xiaohuar.com/hua/'),
])
print("cost time:",time.time()-start)
原文地址:https://www.cnblogs.com/hujinzhong/p/11517535.html