from multiprocessing import Process

1.确认多个子进程执行完毕

import demo
from multiprocessing import Process

def target(i):
   print(i)

if __name__ == '__main__':
   p_l = []
   for i in range(5):
       p = Process(target=target,args=(i,))
       p.start()
       p_l.append(p)
   # p.join() # 阻塞 主进程 直到子进程执行完毕
   for p in p_l:p.join()
   print('子进程已经都执行完毕了')

2.守护进程

# 有一个参数可以把一个子进程设置为一个守护进程
import time
from multiprocessing import Process

def son1(a,b):
   while True:
       print('is alive')
       time.sleep(0.5)

def son2():
   for i in range(5):
       print('in son2')
       time.sleep(1)

if __name__ == '__main__':
   p = Process(target=son1,args=(1,2))
   p.daemon = True
   p.start()      # 把p子进程设置成了一个守护进程
   p2 = Process(target=son2)
   p2.start()
   time.sleep(2)

# 守护进程是随着主进程的代码结束而结束的,注意只是代码跑完,守护进程就结束,例如如上主进程sleep两秒,子进程每次sleep半秒,那么子进程运行四次后停止。
   # 生产者消费者模型的时候
   # 和守护线程做对比的时候
# 所有的子进程都必须在主进程结束之前结束,由主进程来负责回收资源

3.Process对象的其他方法

import time
from multiprocessing import Process

def son1():
   while True:
       print('is alive')
       time.sleep(0.5)

if __name__ == '__main__':
   p = Process(target=son1)
   p.start()      # 异步 非阻塞
   print(p.is_alive())   # True False
   time.sleep(1)
   p.terminate()   # 异步的 非阻塞   # 手动退出 和 start对应
   print(p.is_alive())   # 进程还活着 因为操作系统还没来得及关闭进程
   time.sleep(0.01)
   print(p.is_alive())   # 操作系统已经响应了我们要关闭进程的需求,再去检测的时候,得到的结果是进程已经结束了

# 什么是异步非阻塞?
   # terminate

4.面向对象的方式开启进程及总结

import os
import time
from multiprocessing import Process

class MyProcecss2(Process):
   def run(self):
       while True:
           print('is alive')
           time.sleep(0.5)

class MyProcecss1(Process):
   def __init__(self,x,y):
       self.x = x
       self.y = y
       super().__init__()
   def run(self):
       print(self.x,self.y,os.getpid())
       for i in range(5):
           print('in son2')
           time.sleep(1)

if __name__ == '__main__':
   mp = MyProcecss1(1,2)
   mp.daemon = True
   mp.start()
   print(mp.is_alive())
   mp.terminate()
   # mp2 = MyProcecss2()
   # mp2.start()
   # print('main :',os.getpid())
   # time.sleep(1)
# Process类
# 开启进程的方式
   # 面向函数
       # def 函数名:要在子进程中执行的代码
       # p = Process(target= 函数名,args=(参数1,))
   # 面向对象
       # class 类名(Process):
           # def __init__(self,参数1,参数2):   # 如果子进程不需要参数可以不写
               # self.a = 参数1
               # self.b = 参数2
               # super().__init__()
           # def run(self):
               # 要在子进程中执行的代码
       # p = 类名(参数1,参数2)
   # Process提供的操作进程的方法
       # p.start() 开启进程     异步非阻塞
       # p.terminate() 结束进程 异步非阻塞

       # p.join()     同步阻塞
       # p.isalive() 获取当前进程的状态

       # daemon = True 设置为守护进程,守护进程永远在主进程的代码结束之后自动结束

5.锁

# 并发 能够做的事儿
# 1.实现能够响应多个client端的server
# 2.抢票系统
import time
import json
from multiprocessing import Process,Lock

def search_ticket(user):
   with open('ticket_count') as f:
       dic = json.load(f)
       print('%s查询结果 : %s张余票'%(user,dic['count']))

def buy_ticket(user,lock):
   # with lock:
   # lock.acquire()   # 给这段代码加上一把锁
       time.sleep(0.02)
       with open('ticket_count') as f:
           dic = json.load(f)
       if dic['count'] > 0:
           print('%s买到票了'%(user))
           dic['count'] -= 1
       else:
           print('%s没买到票' % (user))
       time.sleep(0.02)
       with open('ticket_count','w') as f:
           json.dump(dic,f)
   # lock.release()   # 给这段代码解锁

def task(user, lock):
   search_ticket(user)
   with lock:
       buy_ticket(user, lock)

if __name__ == '__main__':
   lock = Lock()
   for i in range(10):
       p = Process(target=task,args=('user%s'%i,lock))
       p.start()


# 1.如果在一个并发的场景下,涉及到某部分内容
   # 是需要修改一些所有进程共享数据资源
   # 需要加锁来维护数据的安全
# 2.在数据安全的基础上,才考虑效率问题
# 3.同步存在的意义
   # 数据的安全性

# 在主进程中实例化 lock = Lock()
# 把这把锁传递给子进程
# 在子进程中 对需要加锁的代码 进行 with lock:
   # with lock相当于lock.acquire()和lock.release()
# 在进程中需要加锁的场景
   # 共享的数据资源(文件、数据库)
   # 对资源进行修改、删除操作
# 加锁之后能够保证数据的安全性 但是也降低了程序的执行效率

6.进程之间的通信

# 进程之间的数据隔离
from multiprocessing import Process
n = 100
def func():
   global n
   n -= 1

if __name__ == '__main__':
   p_l = []
   for i in range(10):
       p = Process(target=func)
       p.start()
       p_l.append(p)
   for p in p_l:p.join()
   print(n)


# 通信
# 进程之间的通信 - IPC(inter process communication)
from multiprocessing import Queue,Process
# 先进先出
def func(exp,q):
   ret = eval(exp)
   q.put({ret,2,3})
   q.put(ret*2)
   q.put(ret*4)

if __name__ == '__main__':
   q = Queue()
   Process(target=func,args=('1+2+3',q)).start()
   print(q.get())
   print(q.get())
   print(q.get())

# Queue基于 天生就是数据安全的
#     文件家族的socket pickle lock
# pipe 管道(不安全的) = 文件家族的socket pickle
# 队列 = 管道 + 锁
from multiprocessing import Pipe
pip = Pipe()
pip.send()
pip.recv()
import queue

from multiprocessing import Queue
q = Queue(5)
q.put(1)
q.put(2)
q.put(3)
q.put(4)
q.put(5)   # 当队列为满的时候再向队列中放数据 队列会阻塞
print('5555555')
try:
   q.put_nowait(6)  # 当队列为满的时候再向队列中放数据 会报错并且会丢失数据
except queue.Full:   # 模块queue 中的 Full
   pass
print('6666666')

print(q.get())
print(q.get())
print(q.get())   # 在队列为空的时候会发生阻塞
print(q.get())   # 在队列为空的时候会发生阻塞
print(q.get())   # 在队列为空的时候会发生阻塞 等待下一个数据被放进队列中再去取
try:
   print(q.get_nowait())   # 在队列为空的时候 直接报错
except queue.Empty:pass

 

原文地址:https://www.cnblogs.com/usherwang/p/13028859.html