Python_生产者消费者模型、管道、数据共享、进程池

1、生产者消费者模型

   生产者 —— 生产数据的人

  消费者 —— 消费数据的人

  生产者消费者模型:供销数据不平衡的现象。

 1 import time
 2 import random
 3 from multiprocessing import Process, Queue
 4 
 5 def consumer(q):
 6     while True:
 7         obj = q.get()
 8         print(f'消费了一个数据{obj}')
 9         time.sleep(random.randint(1, 3))
10 
11 if __name__ == "__main__":
12     q = Queue()
13     Process(target=consumer, args=(q,)).start()
14     for i in range(10):
15         time.sleep(random.randint(1, 5))
16         q.put(f"food{i}")
17         print(f'生产了一个数据food{i}')
生产者消费者速度不一致
D:Python36python.exe E:/Python/草稿纸.py
生产了一个数据food0
消费了一个数据food0
生产了一个数据food1
消费了一个数据food1
生产了一个数据food2
消费了一个数据food2
生产了一个数据food3
消费了一个数据food3
生产了一个数据food4
消费了一个数据food4
生产了一个数据food5
消费了一个数据food5
生产了一个数据food6
生产了一个数据food7
消费了一个数据food6
生产了一个数据food8
生产了一个数据food9
消费了一个数据food7
消费了一个数据food8
消费了一个数据food9
结果(阻塞)
 1 import time
 2 import random
 3 from multiprocessing import Process, Queue
 4 
 5 def consumer(name, q):
 6     while True:
 7         obj = q.get()
 8         print(f'{name}吃了一个{obj}')
 9         time.sleep(random.randint(1, 3))
10 
11 def producer(name, food, q):
12     for i in range(10):
13         time.sleep(random.randint(1, 5))
14         q.put(f'{name}生产的{food}{i}')
15         print(f'{name}生产了一个数据{food}{i}')
16 
17 if __name__ == '__main__':
18     q = Queue()
19     Process(target=consumer, args=('alex', q)).start()
20     Process(target=producer, args=('yuan', '泔水', q)).start()
21     Process(target=producer, args=('egon', '骨头', q)).start()
供大于求
D:Python36python.exe E:/Python/草稿纸.py
egon生产了一个数据骨头0
alex吃了一个egon生产的骨头0
yuan生产了一个数据泔水0
egon生产了一个数据骨头1
alex吃了一个yuan生产的泔水0
alex吃了一个egon生产的骨头1
yuan生产了一个数据泔水1
egon生产了一个数据骨头2
alex吃了一个yuan生产的泔水1
egon生产了一个数据骨头3
yuan生产了一个数据泔水2
alex吃了一个egon生产的骨头2
alex吃了一个egon生产的骨头3
egon生产了一个数据骨头4
alex吃了一个yuan生产的泔水2
yuan生产了一个数据泔水3
alex吃了一个egon生产的骨头4
alex吃了一个yuan生产的泔水3
yuan生产了一个数据泔水4
egon生产了一个数据骨头5
yuan生产了一个数据泔水5
alex吃了一个yuan生产的泔水4
yuan生产了一个数据泔水6
egon生产了一个数据骨头6
alex吃了一个egon生产的骨头5
yuan生产了一个数据泔水7
alex吃了一个yuan生产的泔水5
yuan生产了一个数据泔水8
alex吃了一个yuan生产的泔水6
egon生产了一个数据骨头7
alex吃了一个egon生产的骨头6
yuan生产了一个数据泔水9
alex吃了一个yuan生产的泔水7
egon生产了一个数据骨头8
alex吃了一个yuan生产的泔水8
egon生产了一个数据骨头9
alex吃了一个egon生产的骨头7
alex吃了一个yuan生产的泔水9
alex吃了一个egon生产的骨头8
alex吃了一个egon生产的骨头9
结果(consumer结束不了)

  

 1 import time
 2 import random
 3 from multiprocessing import Process, Queue
 4 
 5 
 6 def consumer(name, q):
 7     while True:
 8         obj = q.get()  # 阻塞
 9         if obj is None:
10             break
11         print(f'{name}吃了一个{obj}')
12         time.sleep(random.randint(1, 3))
13 
14 
15 def producer(name, food, q):
16     for i in range(10):
17         time.sleep(random.randint(1, 5))
18         q.put(f'{name}生产的{food}{i}')
19         print(f'{name}生产了一个{food}{i}')
20 
21 
22 if __name__ == '__main__':
23     q = Queue()
24     Process(target=consumer, args=('alex', q)).start()
25     Process(target=consumer, args=('wusir', q)).start()
26     p1 = Process(target=producer, args=('yuan', '泔水', q))
27     p1.start()
28     p2 = Process(target=producer, args=('egon', '骨头', q))
29     p2.start()
30     p1.join()
31     p2.join()
32     q.put(None)
33     q.put(None)
数据被消费完就结束进程
D:Python36python.exe E:/Python/草稿纸.py
egon生产了一个骨头0
alex吃了一个egon生产的骨头0
yuan生产了一个泔水0
wusir吃了一个yuan生产的泔水0
yuan生产了一个泔水1
alex吃了一个yuan生产的泔水1
egon生产了一个骨头1
yuan生产了一个泔水2
alex吃了一个egon生产的骨头1
wusir吃了一个yuan生产的泔水2
egon生产了一个骨头2
alex吃了一个egon生产的骨头2
yuan生产了一个泔水3
wusir吃了一个yuan生产的泔水3
egon生产了一个骨头3
wusir吃了一个egon生产的骨头3
yuan生产了一个泔水4
wusir吃了一个yuan生产的泔水4
egon生产了一个骨头4
alex吃了一个egon生产的骨头4
yuan生产了一个泔水5
wusir吃了一个yuan生产的泔水5
egon生产了一个骨头5
alex吃了一个egon生产的骨头5
egon生产了一个骨头6
wusir吃了一个egon生产的骨头6
yuan生产了一个泔水6
alex吃了一个yuan生产的泔水6
yuan生产了一个泔水7
alex吃了一个yuan生产的泔水7
egon生产了一个骨头7
wusir吃了一个egon生产的骨头7
yuan生产了一个泔水8
alex吃了一个yuan生产的泔水8
yuan生产了一个泔水9
wusir吃了一个yuan生产的泔水9
egon生产了一个骨头8
alex吃了一个egon生产的骨头8
egon生产了一个骨头9
wusir吃了一个egon生产的骨头9

Process finished with exit code 0
结果

  

  q.join()  对这个队列进行阻塞,这个队列中的所有值被取走,且执行了task_done.

 1 import time
 2 from multiprocessing import Process, JoinableQueue
 3 
 4 def consumer(q):
 5     while True:
 6         print(q.get())
 7         time.sleep(0.3)
 8         q.task_done()   # 通知队列一个数据已经被处理完了
 9 
10 if __name__ == "__main__":
11     q = JoinableQueue()
12     c = Process(target=consumer, args=(q,))
13     c.daemon = True
14     c.start()
15     for i in range(10):
16         q.put(i)    # 10个数据
17     q.join()    # join表示所有的数据都被取走且被处理完才结束阻塞
18     print('所有数据都被处理完了')
JoinableQueue
D:Python36python.exe E:/Python/草稿纸.py
0
1
2
3
4
5
6
7
8
9
所有数据都被处理完了

Process finished with exit code 0
结果

  

 1 import time
 2 import random
 3 from multiprocessing import Process, JoinableQueue
 4 
 5 def consumer(name, q):
 6     while True:
 7         obj = q.get()   # 阻塞
 8         print(f'{name}吃了一个{obj}')
 9         time.sleep(random.randint(1, 3))
10         q.task_done()
11 
12 def producer(name, food, q):
13     for i in range(10):
14         time.sleep(random.randint(1, 5))
15         q.put(f'{name}生产的{food}{i}')
16         print(f'{name}生产了一个{food}{i}')
17 
18 if __name__ == "__main__":
19     q = JoinableQueue()
20     c1 = Process(target=consumer, args=('alex', q))
21     c2 = Process(target=consumer, args=('wusir', q))
22     c1.daemon = True
23     c2.daemon = True
24     p1 = Process(target=producer, args=('yuan', '泔水', q))
25     p2 = Process(target=producer, args=('egon', '骨头', q))
26     c1.start()
27     c2.start()
28     p1.start()
29     p2.start()
30     p1.join()
31     p2.join()
32     q.join()
协调生产者与消费者之间的供给关系
D:Python36python.exe E:/Python/草稿纸.py
egon生产了一个骨头0
alex吃了一个egon生产的骨头0
yuan生产了一个泔水0
wusir吃了一个yuan生产的泔水0
egon生产了一个骨头1
wusir吃了一个egon生产的骨头1
egon生产了一个骨头2
alex吃了一个egon生产的骨头2
yuan生产了一个泔水1
wusir吃了一个yuan生产的泔水1
egon生产了一个骨头3
alex吃了一个egon生产的骨头3
yuan生产了一个泔水2
wusir吃了一个yuan生产的泔水2
egon生产了一个骨头4
alex吃了一个egon生产的骨头4
yuan生产了一个泔水3
wusir吃了一个yuan生产的泔水3
egon生产了一个骨头5
alex吃了一个egon生产的骨头5
yuan生产了一个泔水4
wusir吃了一个yuan生产的泔水4
yuan生产了一个泔水5
egon生产了一个骨头6
alex吃了一个yuan生产的泔水5
wusir吃了一个egon生产的骨头6
egon生产了一个骨头7
alex吃了一个egon生产的骨头7
yuan生产了一个泔水6
wusir吃了一个yuan生产的泔水6
yuan生产了一个泔水7
alex吃了一个yuan生产的泔水7
egon生产了一个骨头8
wusir吃了一个egon生产的骨头8
yuan生产了一个泔水8
alex吃了一个yuan生产的泔水8
egon生产了一个骨头9
wusir吃了一个egon生产的骨头9
yuan生产了一个泔水9
alex吃了一个yuan生产的泔水9

Process finished with exit code 0
结果

  

  队列:

    维护了一个先进先出的顺序.

    且保证了数据在进程之间的安全.

2、管道

   管道在数据管理上是不安全的

  队列的实现机制就是 管道 + 锁

1 from multiprocessing import Pipe, Process
2 
3 # 左发右
4 lp, rp = Pipe()
5 lp.send([1, 2, 3])
6 print(rp.recv())
7 # 右发左
8 rp.send('aa')
9 print(lp.recv())
管道通信
D:Python36python.exe E:/Python/草稿纸.py
[1, 2, 3]
aa

Process finished with exit code 0
结果
 1 from multiprocessing import Pipe, Process
 2 
 3 def consumer(lp, rp):
 4     lp.close()
 5     while True:
 6         try:
 7             print(rp.recv())
 8         except EOFError:
 9             break
10 
11 if __name__ == '__main__':
12     lp, rp = Pipe()
13     Process(target=consumer, args=(lp, rp)).start()
14     Process(target=consumer, args=(lp, rp)).start()
15     Process(target=consumer, args=(lp, rp)).start()
16     Process(target=consumer, args=(lp, rp)).start()
17     Process(target=consumer, args=(lp, rp)).start()
18     rp.close()
19     for i in range(500):
20         lp.send(f'food{i}')
21     lp.close()
示例
D:Python36python.exe E:/Python/草稿纸.py
food0
food1
food2
food3
food4
food5
food6
food7
food8
food9
food10
food11
food12
food13
food14
food15
food16
food17
food18
food19
food20
food21
food22
food23
food24
food25
food26
food27
food28
food29
food30
food31
food32
food33
food34
food35
food36
food37
food38
food39
food40
food41
food42
food43
food44
food45
food46
food47
food48
food49
food50
food51
food52
food53
food54
food55
food56
food57
food58
food59
food60
food61
food62
food63
food64
food65
food66
food67
food68
food69
food70
food71
food72
food73
food74
food75
food76
food77
food78
food79
food80
food81
food82
food83
food84
food85
food86
food87
food88
food89
food90
food91
food92
food93
food94
food95
food96
food97
food98
food99
food100
food101
food102
food103
food104
food105
food106
food107
food108
food109
food110
food111
food112
food113
food114
food115
food116
food117
food118
food119
food120
food121
food122
food123
food124
food125
food126
food127
food128
food129
food130
food131
food132
food133
food134
food135
food136
food137
food138
food139
food140
food141
food142
food143
food144
food145
food146
food147
food148
food149
food150
food151
food152
food153
food154
food155
food156
food157
food158
food159
food160
food161
food162
food163
food164
food165
food166
food167
food168
food169
food170
food171
food172
food173
food174
food175
food176
food177
food178
food179
food180
food181
food182
food183
food184
food185
food186
food187
food188
food189
food190
food191
food192
food193
food194
food195
food196
food197
food198
food199
food200
food201
food202
food203
food204
food205
food206
food207
food208
food209
food210
food211
food212
food213
food214
food215
food216
food217
food218
food219
food220
food221
food222
food223
food224
food225
food226
food227
food228
food229
food230
food231
food232
food233
food234
food235
food236
food237
food238
food239
food240
food241
food242
food243
food244
food245
food246
food247
food248
food249
food250
food251
food252
food253
food254
food255
food256
food257
food258
food259
food260
food261
food262
food263
food264
food265
food266
food267
food268
food269
food270
food271
food272
food273
food274
food275
food276
food277
food278
food279
food280
food281
food282
food283
food284
food285
food286
food287
food288
food289
food290
food291
food292
food293
food294
food295
food296
food297
food298
food299
food300
food301
food302
food303
food304
food305
food306
food307
food308
food309
food310
food311
food312
food313
food314
food315
food316
food317
food318
food319
food320
food321
food322
food323
food324
food325
food326
food327
food328
food329
food330
food331
food332
food333
food334
food335
food336
food337
food338
food339
food340
food341
food342
food343
food344
food345
food346
food347
food348
food349
food350
food351
food352
food353
food354
food355
food356
food357
food358
food359
food360
food361
food362
food363
food364
food365
food366
food367
food368
food369
food370
food371
food372
food373
food374
food375
food376
food377
food378
food379
food380
food381
food382
food383
food384
food385
food386
food387
food388
food389
food390
food391
food392
food393
food394
food395
food396
food397
food398
food399
food400
food401
food402
food403
food404
food405
food406
food407
food408
food409
food410
food411
food412
food413
food414
food415
food416
food417
food418
food419
food420
food421
food422
food423
food424
food425
food426
food427
food428
food429
food430
food431
food432
food433
food434
food435
food436
food437
food438
food439
food440
food441
food442
food443
food444
food445
food446
food447
food448
food449
food450
food451
food452
food453
food454
food455
food456
food457
food458
food459
food460
food461
food462
food463
food464
food465
food466
food467
food468
food469
food470
food471
food472
food474
food475
food473
food476
food477
food478
food479
food481
food482
food483
food484
food485
food486
food487
food488
food489
food490
food491
food492
food493
food494
food495
food496
food497
food498
food499
food480

Process finished with exit code 0
结果

3、数据共享

   普通模式:

 1 from multiprocessing import Manager, Process, Lock
 2 
 3 
 4 def work(d, lock):
 5     with lock:  # 不加锁而操作共享的数据,肯定会出现数据错乱
 6         d['count'] -= 1
 7 
 8 
 9 if __name__ == '__main__':
10     lock = Lock()
11     m = Manager()
12     dic = m.dict({'count': 100})
13     p_l = []
14     for i in range(100):
15         p = Process(target=work, args=(dic, lock))
16         p_l.append(p)
17         p.start()
18     for p in p_l:
19         p.join()
20     print(dic)
示例
D:Python36python.exe E:/Python/草稿纸.py
{'count': 0}

Process finished with exit code 0
结果

  上下文管理模式:

 1 from multiprocessing import Manager, Process, Lock
 2 
 3 
 4 def work(d, lock):
 5     with lock:  # 不加锁而操作共享的数据,肯定会出现数据错乱
 6         d['count'] -= 1
 7 
 8 
 9 if __name__ == '__main__':
10     lock = Lock()
11     with Manager() as m:
12         dic = m.dict({'count': 100})
13         p_l = []
14         for i in range(100):
15             p = Process(target=work, args=(dic, lock))
16             p_l.append(p)
17             p.start()
18         for p in p_l:
19             p.join()
20         print(dic)
示例
D:Python36python.exe E:/Python/草稿纸.py
{'count': 0}

Process finished with exit code 0
结果

4、进程池

  多进程和进程池的对比:

    对于纯计算型的代码,使用进程池更好 ——真理

    对于高IO的代码,直接使用多进程更好 —— 相对论

  结论:进程池比起多进程来说,节省了开启进程回收进程的时间,给操作系统调度进程降低了难度

  使用进程池提交任务:

    apply  # 同步提交任务,没有多进程的优势

    apply_async  # 异步提交任务,常用,可以通过get方法获取返回值

    close  # 关闭进程池,阻止往池中添加新的任务

    join  # join依赖close,一个进程必须先close再join

  map  # 接收一个任务函数,和一个iterable。节省了for循环和close、join,是一种简便的写法。

  apply_async和map相比,操作复杂,但是可以通过get方法获取返回值。

 1 import os
 2 def wahaha(num):
 3     print('',os.getpid())
 4     return num ** num
 5 
 6 def call(argv): # 回调函数用的是主进程的资源
 7     print(os.getpid())
 8     print(argv)
 9 
10 if __name__ == '__main__':
11     print('', os.getpid())
12     p = Pool(5)
13     p.apply_async(func=wahaha, args=(50,), callback=call)
14     p.close()
15     p.join()
进程池
D:Python36python.exe E:/Python/草稿纸.py
主 1363613636
13636
8881784197001252323389053344726562500000000000000000000000000000000000000000000000000

Process finished with exit code 0
结果
原文地址:https://www.cnblogs.com/ZN-225/p/9179112.html