doc

      今天已是学习Python的第十一天,来干一碗鸡汤继续今天的内容,今天的鸡汤是:超越别人对你的期望。本篇博客主要介绍以下几点内容:

  • 线程的基本使用;

  • 线程的锁机制;

  • 生产者消费之模型(队列);

  • 如何自定义线程池;

  • 进程的基本使用;

  • 进程的锁机制;

  • 进程之间如何实现数据共享;

  • 进程池;

  • 协程的基本使用。

一、线程

1、创建线程

   上篇博客已经介绍过如何创建多线程的程序,在这里在复习一下如何创建线程过程以及线程的一些方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import threading
 
class MyThread(threading.Thread):         #首先继承线程类
     
    def  __init__(self,func,args):
        self.func = func
        self.args = args
        super(MyThread,self).__init__()   #执行父类的所有构造方法
                                           
    def run(self):                        #因为会在创建线程后自动触发run方法,我们自定义run方法,让线程来执行此方法
        self.func(self.args)
 
def f1(args):
    print(args)
 
obj = MyThread(f1,123)
obj.start()                        #开启线程
 
#结果输出:
123

线程的方法:

  • start:线程准备就绪,等待CPU调度;

  • setName:为线程设置名称;

  • getName:获取线程名称;

  • setDaemon(布尔值):设置为主线程是否等待子线程执行(默认False);

        如果是将setDaemon设置成True,主线程执行过程中,子线程也在进行,主线程执行完毕后,子线程不论成功与否,均停止主线程不会等子线程;

        如果值为False,主线程执行过程中,子线程也在执行,主线程执行完毕后,等待子线程也执行完成后,程序停止。

  • join(秒):表示主线程到此,会等待子线程执行,参数表示主线程在此最多等待N秒后,继续往下执行;

  • run:线程被CPU调度后自动执行线程对象的run方法。

2、线程的锁机制

    下面我们来介绍一下线程的锁机制,由于线程之间是进行随机调度,并且每个线程可能只执行N条操作,当多个线程同时修改同一条数据时可能会出现脏数据,所以出现了线程锁。在python中分为三种线程锁:互斥锁(lock,Rlock)、信号量(Semaphore)、事件(event),还有一个条件(Condition)配合线程锁来使用,下面分别介绍这几种锁:

   (1)、互斥锁(lock,Rlock)

 我们先看一下不加线程锁的程序的执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import threading
import time
 
NUM = 10
 
def func(i):
    global NUM
    NUM -=1
    time.sleep(1)
    print(NUM)
 
for i in range(10):                #创建10个线程,去执行上面的函数
    t = threading.Thread(target=func,args=(i,))
    t.start()
 
#因为没有线程锁,10个线程同时去修改上面的NUM,导致出现脏数据,结果:
0
0
0
0
0
0
0
0
0
0

当我们加上线程锁后,效果就会避免上面现象的发生:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import threading
import time
 
NUM = 10
 
def func(i,l):
    global NUM
    l.acquire()   #加锁,
    NUM -=1
    time.sleep(2)
    print(NUM,i)
    l.release()   #开锁
 
# lock = threading.Lock()    #只能锁一次,一般不推荐使用
lock = threading.RLock()     #推荐使用Rlock,可以在程序中锁一次或多次,一次性只能允许一个线程操作
 
for i in range(10):
    t = threading.Thread(target=func,args=(i,lock,))
    t.start()
#结果:
9 0
8 1
7 2
6 3
5 4
4 5
3 6
2 7
1 8
0 9

     (2)、信号量(Semaphore)

    上面我们介绍了互斥锁,我们发现,互斥锁同时只能允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据,比如肯德基有3个购餐的窗口,那最多只允许3个人购买,后面的人只能等前面的人买完才能购买。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import threading
import time
 
NUM = 10
 
def func(i,l):
    global NUM
    l.acquire()   #加锁,
    NUM -=1
    time.sleep(1)
    print(NUM,i)
    l.release()   #开锁
 
lock = threading.BoundedSemaphore(5)    #一次可以允许多个线程更改数据
 
for i in range(10):
    t = threading.Thread(target=func,args=(i,lock,))
    t.start()
 
#结果5个线程同时修改数据:
5 0
4 1
3 3
3 2
1 4
0 6
0 5
0 7
0 9
0 8

     (3)、事件(event)

    Python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法:set、wait、clear。

    事件处理的机制:全局定义了一个"Flag",如果"Flag"值为Flase,那么当程序执行event.wait方法时就会阻塞,如果"Flag"值为True,那么event.wait方法时便不再阻塞。

  • event.clear:将"Flag"设置成False,(加锁);

  • event.set:将"Flag"设置成True,(解锁)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import threading
 
def func(i,e):
    print(i)
    e.wait()           #检测是什么状态,如果是锁状态,会在此等待,如果无锁状态,直接执行下面操作,默认是锁状态
    print(i+100)
 
event = threading.Event()
 
for i in range(10):
    t = threading.Thread(target=func,args=(i,event,))
    t.start()
 
event.clear()          #主动设置成锁状态
inp = input(">>>:")
if inp =='1':
    event.set()        #解锁
 
#结果:
0
1
2
3
4
5
6
7
8
9
>>>:1
100
102
103
104
105
107
108
109
101
106

     (4)、条件(Condition)

    使得线程等待,只有满足条件的时候,才释放N个线程去更改数据,下面通过两种方法来演示加条件的线程锁操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import threading
 
def func(i,con):
    print(i)
    con.acquire()
    con.wait()             #代码执行到这会阻塞,当主线程条件成立后,才会继续往下执行
    print(i+100)
    con.release()
 
c = threading.Condition()  #创建条件,满足这个条件会执行线程
for i in range(10):
    t = threading.Thread(target=func,args=(i,c,))
    t.start()
 
while True:
    inp = input('>>>:'#获取用户输入,输入几,允许几个线程操作
    if inp =='q':
        break
    c.acquire()
    c.notify(int(inp))    #notify:通知其他线程,那些挂起的线程接到这个通知之后会开始运行。通常三个方法放一起,代码格式规定
    c.release()
#结果:
0
1
2
3
4
5
6
7
8
9
>>>:2
>>>:100
101
3
>>>:103
102
104
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import threading
 
def condition():
    ret = False
    r = input('>>>:'#获取用户输入,如果是true,就允许一个线程执行
    if  r == 'true':
        ret = True
    else:
        ret = False
    return  ret
 
def func(i,con):
    print(i)
    con.acquire()
    con.wait_for(condition)
    print(i+100)
    con.release()
 
c = threading.Condition()
for i in range(10):
    t = threading.Thread(target=func,args=(i,c,))
    t.start()
 
#结果:
>>>:1
2
3
4
5
6
7
8
9
true
100
>>>:

     (5)、Timer

Timer:定时器,指定N秒之后执行某操作。

1
2
3
4
5
6
7
8
from threading import Timer
 
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)   #线程等待1秒,执行后面的函数
t.start()

3、生产者消费者模型(队列)

   Queue模块实现了多生产者、多消费者队列,它特别适用于多线程编程。Queue类中实现了所有需要的锁语义,Queue模块实现了四种类型的队列:

  • queue.Queue先进先出队列(FIFO),第一加入队列的任务,被第一个取出;

  • queue.LifoQueue后进先出队列(LIFO),最后加入队列的任务,被第一个取出

  • queue.PriorityQueue:优先级队列,保持队列数据有序,是根据权重判断取出顺序,最小值被先取出。

  • queue.deque:双向队列,一种支持向两端高效地插入数据、支持随机访问的容器

下面通过例子来详细介绍一下先进先出队列的使用方法:

queue.Queue(先进先出):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import queue
q = queue.Queue(2)    #队列最大支持两个链接
 
q.put(11)             #向队列中放入元素
q.put(12)
print(q.qsize())      #输出队列的的大小
 
print(q.get())        #移除列队元素并将元素返回
print(q.get())
 
#结果:
2     #表示队列中有两个元素
11
12
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import queue
q = queue.Queue(2)                #队列最大支持两个链接
      
q.put(11)                         #向队列中放入元素
q.put(12)
print(q.empty())                  #判断队列是否为空
 
#q.put(22)                        #如果队列里满了,会在此阻塞,因为队列最大支持两个链接
#q.put(22,timeout=2)              #如果我们使用这种方式会在这阻塞2秒然后报错
q.put(33,block=False,timeout=2)   #block= False 设置程序不阻塞,直接报错
 
print(q.get())
print(q.get())
# print(q.get())        #同样在移除元素的时候也有相同的方法,可以设置超时时间
print(q.get(timeout=2))
 
#结果,报错:
  File "E:/project/Day11/线程/s1.py", line 51, in <module>
    q.put(33,block=False,timeout=2)
  File "C:UsersHenryAppDataLocalProgramsPythonPython35libqueue.py", line 130, in put
    raise Full
queue.Full
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import queue
 
q = queue.Queue(5)
q.put(123)
q.put(456)
print(q.get()) 
q.task_done()   #在完成一项工作后,会向队列发送一个确认信号,知道取完数据后,join才会终止程序,要么join会一直阻塞
print(q.get())
q.task_done()
q.join()        #实际上意味着等到队列为空,再执行别的操作
 
#结果:
123
456

通过上面的例子,我们总结一下queue队列提供的公共方法:

  • Queue.put:向队列中放入元素,block是否阻塞(默认True),timeout阻塞时的超时时间;

  • Queue.get:移除队列中的元素,block是否阻塞,timeout阻塞时超时时间;

  • queue.Queue(Maxsize):Maxsize,设置队列支持最大的个数;

  • Queue.qsize:队列的真实个数;

  • Queue.join,Queue.task_done:阻塞进程,当队列中任务执行完毕后,不再阻塞;

  • Queue.empty:判断队列是否为空。

queue.LifoQueue(后进先出):

1
2
3
4
5
6
7
8
import  queue
q = queue.LifoQueue()       #后进先出
q.put(123)
q.put(456)
print(q.get())
 
#结果:
456

queue.PriorityQueue(优先级队列):

1
2
3
4
5
6
7
8
q = queue.PriorityQueue()   #根据优先级处理
q.put((1,"jack1"))    #在优先级相同的情况下,后根据顺序输出
q.put((2,"jack2"))
q.put((3,"jack3"))
print(q.get())
 
#结果:
(1, 'jack1')

queue.deque(高性能双向队列):

1
2
3
4
5
6
7
8
9
10
11
12
import queue
 
q= queue.deque()          #双向队列
q.append((123))
q.append(234)
q.appendleft(456)         #从左边去一个值
print(q.pop())
print(q.popleft())
 
#结果:
234
456

为什么说它是高性能的队列我们来对比双向队列、普通队列和列表的处理速度我们一起来看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import time
import queue
import collections
 
q = collections.deque()
t0 = time.clock()
for i in range(1000000):
    q.append(1)
for i in range(1000000):
    q.popleft()
print('deque', time.clock() - t0)
 
q = queue.Queue(2000000)
t0 = time.clock()
for i in range(1000000):
    q.put(1)
for i in range(1000000):
    q.get()
print('Queue', time.clock() - t0)
 
q = []
t0 = time.clock()
for i in range(1000000):
    q.append(i)
 
for i in range(1000000):
    q.insert(0,i)
 
print('list ', time.clock() - t0)
 
#结果:
deque 1.2658434773287475
Queue 36.728385720614725
list  #这个结果忽略吧,太长时间了....

下面结合上面的知识来写一个生产者消费者模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#生产者消费者模型,解耦的意思就是两个程序之间,互相没有关联了,互不影响。
import queue
import threading
import time
q = queue.Queue(20)      #队列里最多存放20个元素
 
def productor(arg):            #生成者,创建30个线程来请求吃包子,往队列里添加请求元素

    q.put(str(arg) + '- 包子'
 
for i in range(30):
    t = threading.Thread(target=productor,args=(i,))
    t.start()
 
def consumer(arg):       #消费者,接收到队列请求以后开始生产包子,来消费队列里的请求

    while True:
        print(arg,q.get())
        time.sleep(2)
 
for j in range(3):
    t = threading.Thread(target=consumer,args=(j,))
    t.start()

4、自定义线程池

    在使用多线程处理任务也不是线程越多越好,由于在切换线程的时候,需要切换上下文环境,依然会造成CPU的大量开销。为了解决这个问题,就引出了线程池的概念。预先创建一个批线程,然过来的任务立刻能够使用,使用完以后自动释放来去处理新的任务,在Python中,没有内置的较好的线程池模块,需要自己实现或使用第三方模块,下面我们尝试来自定义一个线程池:

初级版:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import queue
import threading
import time
 
class ThreadPool:
    def __init__(self,maxsize=5):
        self.maxsize = maxsize            #线程池大小为5
        self._q = queue.Queue(maxsize)
        for i in range(maxsize):
            self._q.put(threading.Thread) #先往队列里插入的线程池大小的元素,元素为Threading.Thread类,等待处理请求
    def get_thread(self):
        return self._q.get()
 
    def add_thread(self):
        self._q.put(threading.Thread)
 
pool = ThreadPool(5)             #创建线程池
 
def task(arg,p):
    """
    在队列里添加一个元素
    :param arg: 循环的数值
    :param p: 线程池的对象
    :return:
    """
    print(arg)
    time.sleep(1)
    p.add_thread()
 
for i in range(100):
    t = pool.get_thread()         #get,threading.Thread类去消费队列里的一个线程
    obj = t(target=task,args=(i,pool,))  
    obj.start()

进阶版:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
import queue
import threading
import contextlib
import time
 
StopEvent = object()
 
 
class ThreadPool(object):
 
    def __init__(self, max_num, max_task_num = None):
        if max_task_num:
            self.q = queue.Queue(max_task_num)
        else:
            self.q = queue.Queue()
        self.max_num = max_num
        self.cancel = False
        self.terminal = False
        self.generate_list = []
        self.free_list = []
 
    def run(self, func, args, callback=None):
        """
        线程池执行一个任务
        :param func: 任务函数
        :param args: 任务函数所需参数
        :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
        :return: 如果线程池已经终止,则返回True否则None
        """
        if self.cancel:
            return
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)
        self.q.put(w)
 
    def generate_thread(self):
        """
        创建一个线程
        """
        t = threading.Thread(target=self.call)
        t.start()
 
    def call(self):
        """
        循环去获取任务函数并执行任务函数
        """
        current_thread = threading.currentThread
        self.generate_list.append(current_thread)
 
        event = self.q.get()
        while event != StopEvent:
 
            func, arguments, callback = event
            try:
                result = func(*arguments)
                success = True
            except Exception as e:
                success = False
                result = None
 
            if callback is not None:
                try:
                    callback(success, result)
                except Exception as e:
                    pass
 
            with self.worker_state(self.free_list, current_thread):
                if self.terminal:
                    event = StopEvent
                else:
                    event = self.q.get()
        else:
 
            self.generate_list.remove(current_thread)
 
    def close(self):
        """
        执行完所有的任务后,所有线程停止
        """
        self.cancel = True
        full_size = len(self.generate_list)
        while full_size:
            self.q.put(StopEvent)
            full_size -= 1
 
    def terminate(self):
        """
        无论是否还有任务,终止线程
        """
        self.terminal = True
 
        while self.generate_list:
            self.q.put(StopEvent)
 
        self.q.empty()
 
    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        """
        用于记录线程中正在等待的线程数
        """
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)
 
 
 
# How to use
 
 
pool = ThreadPool(5)
 
def callback(status, result):
    # status, execute action status
    # result, execute action return value
    pass
 
 
def action(i):
    print(i)
 
for i in range(30):
    ret = pool.run(action, (i,), callback)
 
time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
# pool.close()
# pool.terminate()

 二、进程

1、创建进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#Author:HaiFeng Di
 
from multiprocessing import Process   #所有进程相关的模块都在multiprocessing模块中调用
import threading
import time
 
def foo(i):
    print('say hi:',i)
 
if __name__=='__main__':      #注意:进程在windows创建需要加上__name__函数,Linux环境下不用
    for i in range(10):       #创建10个进程
        p = Process(target=foo,args=(i,))
        p.start()
#结果:
say hi: 4
say hi: 3
say hi: 1
say hi: 2
say hi: 7
say hi: 0
say hi: 5
say hi: 8
say hi: 9
say hi: 6

注意:由于进程之间的数据需要各自持有一份,所以创建进程需要非常大的开销。

2、进程间数据共享

 进程各自持有一份数据,默认是无法共享数据的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
from multiprocessing import Process
 
li = []
 
def foo(i):
    li.append(i)
    print('say hi',li)
 
if __name__=='__main__':
 
    for i in range(10):
        p = Process(target=foo,args=(i,))
        p.start()
 
    print('ending',li)
 
#结果:
ending []
say hi [5]
say hi [9]
say hi [2]
say hi [1]
say hi [3]
say hi [6]
say hi [7]
say hi [0]
say hi [8]
say hi [4]

通过上面的例子可以看出,每个进程都有自己的一份数据,没有共享数据,在Python中我们通常通过调用第三方模块的方式来实现进程之间的数据共享,主要是调用multiprocessing的Queues、Array、Manager这三个模块。下面我们通过例子类看一下具体用法:

 方法一:通过调用queues共享数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from multiprocessing import Process
from multiprocessing import queues
import multiprocessing
 
def foo(i,arg):
    arg.put(i)
    print('say hi',i,arg.qsize())
 
if __name__=='__main__':
    li = queues.Queue(20,ctx=multiprocessing)
    for i in range(10):
        p = Process(target=foo,args=(i,li,))
        p.start()
#结果:
say hi 4 5
say hi 2 6
say hi 3 6
say hi 6 6
say hi 0 6
say hi 1 6
say hi 7 8
say hi 5 8
say hi 9 9
say hi 8 10

方法二:通过调用数组Array来共享数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Process
from multiprocessing import Array
import multiprocessing
 
def foo(i,arg):
    arg[i] = i + 100
    for item in arg:
        print(item)
    print('============')
 
if  __name__=='__main__':
    li = Array('i',10)
    for i in range(10):
        p = Process(target=foo,args=(i,li,))
        p.start()

    当看到这端代码时有个疑问就是Array数组中的'i',是什么?在Array类在实例化的时候必须指定数组的数据类型和数组的大小,具体数据类型的对照请参考下面的对应关系:

1
2
3
4
5
6
'c': ctypes.c_char,    'u': ctypes.c_wchar,
'b': ctypes.c_byte,    'B': ctypes.c_ubyte,
'h': ctypes.c_short,   'H': ctypes.c_ushort,
'i': ctypes.c_int,     'I': ctypes.c_uint,
'l': ctypes.c_long,    'L': ctypes.c_ulong,
'f': ctypes.c_float,   'd': ctypes.c_double

方法三:通过调用Manager字典来共享数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from multiprocessing import Process
from multiprocessing import Manager
import multiprocessing
 
def foo(i,arg):
    arg[i] = i + 100
    print(arg.values())
if __name__=='__main__':
    obj = Manager()
    li = obj.dict()
    for i in range(10):
        p = Process(target=foo,args=(i,li,))
        p.start()
        p.join()
    import time
    time.sleep(0.1)
#结果:
[100]
[100, 101]
[100, 101, 102]
[100, 101, 102, 103]
[100, 101, 102, 103, 104]
[100, 101, 102, 103, 104, 105]
[100, 101, 102, 103, 104, 105, 106]
[100, 101, 102, 103, 104, 105, 106, 107]
[100, 101, 102, 103, 104, 105, 106, 107, 108]
[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]

3、进程锁

    为了防止和多线程一样的出现数据抢夺和脏数据的问题,同样需要设置进程锁。在multprocessing里也有与线程一样支持Rlock,Lock,Event,Condition,Semaphore几种锁,用法也相同,我们来看一下进程数的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
from multiprocessing import Process, Array, RLock
 
def Foo(lock,temp,i):
    """
    将第0个数加100
    """
    lock.acquire()
    temp[0] = 100+i
    for item in temp:
        print(i,'----->',item)
    lock.release()
 
lock = RLock()
temp = Array('i', [11, 22, 33, 44])
 
for i in range(20):
    p = Process(target=Foo,args=(lock,temp,i,))
    p.start()
#结果:
[100]
[100, 101]
[100, 101, 102]
[100, 101, 102, 103]
[100, 101, 102, 103, 104]
[100, 101, 102, 103, 104, 105]
[100, 101, 102, 103, 104, 105, 106]
[100, 101, 102, 103, 104, 105, 106, 107]
[100, 101, 102, 103, 104, 105, 106, 107, 108]
[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]

4、进程池

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可使用的进程,那么程序就会等待,知道进程池中有可用进程为止。

进程池中有两个方法:

  • apply:子进程串行执行任务,达不到并发的效果;

  • apply_async:apply的异步版本,支持并发,推荐使用这个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from multiprocessing import Pool
import time
 
def f1(arg):
    time.sleep(1)
    print(arg)
 
if __name__=='__main__':
    pool = Pool(5)
    for i in range(30):
        #pool.apply(func=f1,args=(i,))      #子进程串行执行任务,达不到并发
        pool.apply_async(func=f1,args=(i,)) #支持并发
 
    pool.close()         #等待所有进程结束后,才关闭进程池
    # time.sleep(2)  
    # pool.terminate()   #立即关闭进程池
    pool.join()          #主进程等待所有子进程执行完毕,必须在close或terminate之后

总结一句话:IO密集型使用多线程,计算密集型使用多进程。 

三、协程

 线程和进程的操作是由程序触发系统接口,最后的执行者是系统,而协程的操作则是程序员自己。

 协程的原理:利用一个线程,分解一个线程成为多个"微线程",程序级别。

 协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定摸个代码块执行顺序。

 协程的适用场景:当程序存在大量不需要CPU的操作时(IO),适用于协程。

 使用协协程需要调用两个模块:greenlet模块(底层)、gevent模块(高性能)。

 使用greenlet模块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from greenlet import greenlet
  
def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()
  
  
def test2():
    print(56)
    gr1.switch()
    print(78)
  
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()     #greenlet通过switch方法在不同任务之间进行切换
  
#结果:
12
56
34
78

使用gevent模块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import gevent
 
def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')
 
def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')
 
gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])
 
#结果:
Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar

 下面的例子我们去分别请求多个网站,遇到IO操作来实现自动切换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from gevent import monkey; monkey.patch_all()
import gevent
import requests
 
def f(url):
    print('GET: %s' % url)
    resp = requests.get(url)
    data = resp.text
    print('%d bytes received from %s.' % (len(data), url))
 
gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://github.com/'),
])
 
#结果:
GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
47394 bytes received from https://www.python.org/.
25534 bytes received from https://github.com/.
449991 bytes received from https://www.yahoo.com/.

​   今天的内容就到这里了,例子中少了不少的注释,还有一些自己不太理解,见谅。



原文地址:https://www.cnblogs.com/phennry/p/5693630.html