python多进程

多进程

  • 进程:正在进行的过程或者说是一个任务,而负责执行任务则是cpu
  • 同一个程序执行两次是两次进程
  • 并发:
  • 并行:基于多核cpu

unix开子进程的拷贝一份父进程的数据

进行的三个状态:运行,阻塞,就绪

同步与异步

  • 同步:提交一个任务,只有等待这个任务结束才会继续提交下一个任务
  • 异步:只管提交任务,不等待这个任务执行完毕就可以做其他事情
  • 阻塞:socket模块中的:recv,accept
  • 非阻塞:正常执行代码

在python中如何开启子进程

  1. multiprocessing模块中的process类
# 方式1
from multiprocessing import Process
import time

def task(name):
	print('%s is running'%name)
	time.sleep(5)
	print('%s is done'%name)

if __name__ =='__main__':
	p = Process(target=task,args=('子进程1',))
	p.start() #仅仅只是给操作系统发送了一个信号
#方式二:自定义类继承自process
from multiprocessing import Process
import time
class MyProcess(Process):
	def __init__(self,name):
		super().__init__()
		self.name = name

	def run(self): #函数名必须是run
		print('%s is running'%self.name)
		time.sleep(5)
		print('%s is done'%self.name)

if __name__ == '__main__':
	p = MyProcess('子进程1')
	p.start()#本质就是在调用p.run
	print('进')

查看进程的pid与ppid(进程id)

  • 补充:
    • windows查看正在执行的进程:tasklist | findstr pycharm
    • Mac os查看正在执行的进程:ps aus|grep pycharm
from multiprocessing import Process
import time
import os


def task(name):
	print('%s is running,parent id is <%s>'%(os.getpid(),os.getppid()))
	time.sleep(5)
	print('%s is done'%os.getpid())

if __name__ =='__main__':
	p = Process(target=task,args=('子进程1',))
	p.start() #仅仅只是给操作系统发送了一个信号
	print('主',os.getpid())

process对象的其他属性或方法

  • join()等子进程结束完毕才会继续运行主进程,主进程结束后所有的僵尸进程结束
  • start()只是向操作系统发送信号,并不只是把进程立马开起来,如果连续有几个start有可能执行的先后顺序会错乱,如果start方法后面立马接一个join,多个子进程会变成串行
  • is_alive()查看子进程时候已经结束
  • terminate()杀死这个子进程
  • pid()查看进程id
  • name()查看这个对象的名字

守护进程

  • 当主进程执行结束子进程跟着结束:将daemon属性设置为true,守护进程不能有子进程。代码执行到程序最后一行代表程序运行结束

互斥锁

  • 把并发变成串行:使用multiprocessing模块下的Lock对象
#牺牲效率实现子进程串行
from multiprocessing import Process,Lock
import time

def tack(name,lock):
	lock.acquire()#加锁
	print('%s,1'%name)
	time.sleep(1)
	print('%s,2'%name)
	time.sleep(1)
	print('%s,3'%name)
	lock.release()#释放锁
if __name__ == '__main__':
	lock = Lock()#实例化锁对象
	for i in range(3):
		p = Process(target=tack,args=('进程%s'%i,lock))#把锁传到子进程中
		p.start()
  • 互斥锁与join的区别:join确实能实现代码串行,但join把整个代码变成串行,但互斥锁可以把部分代码变成串行

事件

  • 一个信号可以使所有的进程都进入阻塞状态,也可以控制所有的进程解除阻塞
  • 一个事件被创建之后,默认是阻塞状态
from multiprocessing import Event
e = Event()# 创建了一事件
print(e.is_set())# 查看一个事件的状态,默认是阻塞
e.set() # 将这个事件状态改为True
print(e.is_set())
e.wait()# 根据e.is_set()的值决定是否阻塞,如果是True就是不阻塞,如果是False就是阻塞状态
e.clear() # 将这个事件状态改为False
  • 事件模拟红绿灯:
import time
import random
from multiprocessing import Process,Event

def light(e):
	while True:
		if e.is_set():
			e.clear()
			print('红灯亮了')
		else:
			e.set()
			print('绿灯亮了')
		time.sleep(5)
def cars(i,e):
	if not e.is_set():
		print('%s 在等待'%i)
		e.wait()
	print('%s 通行了'%i)


if __name__ == '__main__':
	e = Event()
	p1 = Process(target=light,args=(e,))
	p1.start()
	for i in range(20):
		p = Process(target=cars,args=(i,e))
		p.start()
		time.sleep(2)

队列

  • multiprocessing有提供基于ipc通信类(inter-process communication)
from multiprocessing import Queue

q = Queue(4)#指定队列大小,如果不指定大小则大小为无穷尽

q.put('hello')#插入数据到队列中
print(q.full())#判断队列中数据是否满了
q.get()#从队列读取并删除一个数据
print(q.empty())#判断队列中数据是否空了
  • 生产者消费这模型:
    • 生产者:生产数据任务
    • 消费者:消费数据任务
from multiprocessing import Process,Queue
import time
def producer(q):
	for i in range(3):
		res = '包子%s'%i
		time.sleep(0.5)
		q.put(res)
		print('生产者生产了%s'%res)

def consumer(q):
	while True:
		res = q.get()
		if res == None:break
		time.sleep(0.7)
		print('消费者吃了%s'%res)


if __name__ == '__main__':
	q = Queue()
	p1 = Process(target=producer,args=(q,))
	c1 = Process(target=consumer,args=(q,))
	p1.start()
	c1.start()
	p1.join()
	c1.join()
	q.put(None)
  • JoinableQueue:
    • 消费者:每次获取一个数据,处理一个数据,并且发送一个记号:标志一个数据被处理成功,计数器-1
    • 生产者:每一次生产一个数据,且每一次生产的数据放在队列中计数器+1,当生产者全部生产完毕之后发送一个join:已经停止生产数据且要等待之前生产的数据被消费完,当数据都被处理完时,join阻塞结束
from multiprocessing import Process,JoinableQueue
import time
def producer(q):
	for i in range(3):
		res = '包子%s'%i
		time.sleep(0.5)
		q.put(res)
		print('生产者生产了%s'%res)
	q.join() #阻塞,直到一个队列中的所有数据,全部被处理完毕

def consumer(q):
	while True:
		res = q.get()
		time.sleep(0.7)
		print('消费者吃了%s'%res)
		q.task_done() # 处理完一个数据,队列中的计数器-1


if __name__ == '__main__':
	q = JoinableQueue()
	p1 = Process(target=producer,args=(q,))
	c1 = Process(target=consumer,args=(q,))
	p1.start()
	c1.daemon = True #守护进程,主进程中的代码执行完毕后,子进程自动结束
	c1.start()
	p1.join()

管道

  • 在进程之间传递信息(数据)
from multiprocessing import Pipe,Process

def func(conn1,conn2):
	conn1.close()
	while True:
		try:
			msg = conn2.recv()
			print(msg)
		except EOFError:
			print('子进程结束')
			conn2.close()
			break


if __name__ == '__main__':
	conn1,conn2 = Pipe()
	Process(target=func,args=(conn1,conn2)).start()
	conn2.close()
	for i in range(5):
		conn1.send('吃屎了你')
	conn1.close()

进程池

  • 创建进程池的过程
  1. 创建一个属于进程的池子
  2. 这个池子指定能存放多少个进程
  3. 开启进程
from multiprocessing import Pool,Process
import time
import os
def func(i):
	print('%s 进程正在运行'%i,os.getpid())
	time.sleep(2)
	print('%s 进程运行结束'%i,os.getpid())


if __name__ == '__main__':
	pool = Pool(3)
	for i in range(10):
	    # pool.apply(func,args=(i,)) 同步调用
		pool.apply_async(func,args=(i,)) # 异步调用
	pool.close() # 结束进程池接收任务
	pool.join() # 感知进程池中的任务执行结束
	# apply_async必须是与close、join一起使用
  • 进程池的返回值
from multiprocessing import Pool

def func(i):

	return i*i

if __name__ == '__main__':
	p = Pool(5)
	res_lis = []
	for i in range(10):
		res = p.apply_async(func,args=(i,))
		res_lis.append(res) 

	for res in res_lis:
		print(res.get())# res.get 默认是阻塞的,因为需要接收进程处理的结果,之后接收到结果之后才可以继续执行,所以
							# 所以将进程的返回结果放在一个列表中,然后循环这个列表,再执行get就不会阻塞了
	p.close()
	p.join()
  • 进程池的回调函数
from multiprocessing import Pool
import os
def func1(i):
	print('in func1',os.getpid())
	return i*i

def func2(ii):
	print('in func2',os.getpid())
	print(ii)

if __name__ == '__main__':
	'''
	1. func1执行结果当作func2的参数,func1进程执行结束后返回结果,通过callback调用func2,将func1的执行返回结果当作参数传递给func2执行
	2. func1是一个新的进程
	3. func2是在主进程中执行的
	'''
	p = Pool(5)
	p.apply_async(func1,args=(2,),callback=func2) 
	p.close()
	p.join()
原文地址:https://www.cnblogs.com/wualin/p/9956900.html