十二、并发编程之多进程
http://www.cnblogs.com/linhaifeng/articles/6817679.html
理论:http://www.cnblogs.com/linhaifeng/articles/7430066.html
链接:http://www.cnblogs.com/linhaifeng/articles/7428874.html
1、必备理论基础
1.1 操作系统的作用
1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口
2:管理、调度进程,并且将多个进程对硬件的竞争变得有序
1.2 多道技术
1.产生背景:针对单核,实现并发
ps:
现在的主机一般是多核,那么每个核都会利用多道技术
有4个cpu,运行于cpu1的某个程序遇到io阻塞,会等到io结束再重新调度,会被调度到4个cpu中的任意一个,具体由操作系统调度算法决定。
2.空间上的复用:如内存中同时有多道程序
3.时间上的复用:复用一个cpu的时间片
强调:遇到io切,占用cpu时间过长也切,核心在于切之前将进程的状态保存下来,这样才能保证下次切换回来时,能基于上次切走的位置继续运行
2、多进程
2.1 什么是进程
进程:正在进行的一个过程或者说一个任务。而负责执行任务则是cpu。
举例(单核+多道,实现多个进程的并发执行):
egon在一个时间段内有很多任务要做:python备课的任务,写书的任务,交女朋友的任务,王者荣耀上分的任务,但egon同一时刻只能做一个任务(cpu同一时间只能干一个活),如何才能玩出多个任务并发执行的效果?
egon备一会课,再去跟李杰的女朋友聊聊天,再去打一会王者荣耀....这就保证了每个任务都在进行中。
2.2 进程与程序的区别
程序仅仅只是一堆代码而已,而进程指的是程序的运行过程。
2.3 并发与并行
2.3.1 并发
并发:是伪并行,即看起来是同时运行。单个cpu+多道技术就可以实现并发,(并行也属于并发)
2.3.2 并行
并行:同时运行,只有具备多个cpu才能实现并行
2.4 阻塞与非阻塞
2.4.1 阻塞
#阻塞调用是指调用结果返回之前,当前线程会被挂起(如遇到io操作)。函数只有在得到结果之后才会将阻塞的线程激活。有人也许会把阻塞调用和同步调用等同起来,实际上他是不同的。对于同步调用来说,很多时候当前线程还是激活的,只是从逻辑上当前函数没有返回而已。
#举例:
#1. 同步调用:apply一个累计1亿次的任务,该调用会一直等待,直到任务返回结果为止,但并未阻塞住(即便是被抢走cpu的执行权限,那也是处于就绪态);
#2. 阻塞调用:当socket工作在阻塞模式的时候,如果没有数据的情况下调用recv函数,则当前线程就会被挂起,直到有数据为止。
2.4.2 非阻塞
#非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前也会立刻返回,同时该函数不会阻塞当前线程。
2.4.3 小结
阻塞与非阻塞针对的是进程或线程:阻塞是当请求不能满足的时候就将进程挂起,而非阻塞则不会阻塞当前进程
2.5 进程的状态
一个进程由三种状态:
2.6 进程的创建
2.6.1 进程的创建
1. 系统初始化(查看进程linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关,运行在后台并且只在需要时才唤醒的进程,称为守护进程,如电子邮件、web页面、新闻、打印)
2. 一个进程在运行过程中开启了子进程(如nginx开启多进程,os.fork,subprocess.Popen等)
3. 用户的交互式请求,而创建一个新进程(如用户双击暴风影音)
4. 一个批处理作业的初始化(只在大型机的批处理系统中应用)
进程都是由操作系统开启的,开进程时先给操作系统发信号,再由操作系统开启进程
2.6.2 创建的子进程UNIX和windows区别
1.相同的是:进程创建后,父进程和子进程有各自不同的地址空间(多道技术要求物理层面实现进程之间内存的隔离),任何一个进程的在其地址空间中的修改都不会影响到另外一个进程。
2.不同的是:在UNIX中,子进程的初始地址空间是父进程的一个副本,提示:子进程和父进程是可以有只读的共享内存区的。但是对于windows系统来说,从一开始父进程与子进程的地址空间就是不同的。
linux子进程和父进程的初始状态一样
windows子进程和父进程的初始状态就不同
2.7 终止进程
1. 正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)
2. 出错退出(自愿,python a.py中a.py不存在)
3. 严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,可以捕捉异常,try...except...)
4. 被其他进程杀死(非自愿,如kill -9)
3、开启子进程multiprocessing
3.1 方式一
定义一个函数
from multiprocessing import Process
import time
def work(name):
print('%s is piaoing' %name)
time.sleep(3)
print('%s piao end' %name)
if __name__ == '__main__': #在windows系统上必须要在__main__下调用
# Process(target=work,kwargs={'name':'alex'})
p=Process(target=work,args=('alex',)) #target函数名,args参数
p.start()
print('主')
子进程结束后,子进程的资源由父进程回收掉,所以主进程要在子进程结束后再终止,如果子进程没有终止而主进程突然被终止,那么子进程的资源无法回收,会成为僵尸进程。
3.2 方式二
from multiprocessing import Process
import time
class Work(Process):
def __init__(self,name):
super().__init__() #重用父类的方法
self.name=name
def run(self): #类下面的run方法是固定的
print('%s is piaoing' %self.name)
time.sleep(2)
print('%s piao end' %self.name)
if __name__ == '__main__':
p=Work('wupeiqi')
p.start()
print('主')
3.3 开启多个子进程
3.3.1 例一
from multiprocessing import Process
import time,random
def work(name):
print('%s is piaoing' %name)
time.sleep(random.randint(1,3))
print('%s piao end' %name)
if __name__ == '__main__':
p1=Process(target=work,args=('alex',))
p2=Process(target=work,args=('wupeiqi',))
p3=Process(target=work,args=('yuanhao',))
p1.start()
p2.start()
p3.start()
print('主')
3.3.2 os.getpid()
os.getpid() 查看进程的id号
os.getppid() 查看进程的父进程的id号
from multiprocessing import Process
import time,random,os
def work():
print('子进程的pid:%s,父进程的pid:%s' %(os.getpid(),os.getppid()))
time.sleep(3)
if __name__ == '__main__':
p1=Process(target=work)
p2=Process(target=work)
p3=Process(target=work)
p1.start()
p2.start()
p3.start()
print('主',os.getpid(),os.getppid())
主进程的父进程是pycharm的进程号
3.4 进程之间内存空间隔离
from multiprocessing import Process
n=100
def work():
global n
n=0
print('子',n)
if __name__ == '__main__':
p=Process(target=work)
p.start()
print('主',n)
4、套接字的并发
服务端:
import socket
from multiprocessing import Process
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
phone.bind(('127.0.0.1',8012))
phone.listen(5)
print('starting...')
def talk(conn):
print(phone)
while True: #通信循环
try:
data=conn.recv(1024) #最大收1024
print(data)
if not data:break #针对linux
conn.send(data.upper())
except Exception:
break
conn.close()
if __name__ == '__main__':
while True:
conn,addr=phone.accept()
print('IP:%s,PORT:%s' %(addr[0],addr[1]))
p=Process(target=talk,args=(conn,))
p.start()
print('===?>')
phone.close()
客户端:
import socket
#1、买手机
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
#2、打电话
phone.connect(('127.0.0.1',8012))
#3、发收消息
while True:
msg=input('>>: ').strip()
if not msg:continue
phone.send(msg.encode('utf-8'))
data=phone.recv(1024)
print(data.decode('utf-8'))
#4、挂电话
phone.close()
服务端开的进程数最好最多开的数目和cup的核数一样多
os.cpu_count() 查看cpu核数
5、join()方法
from multiprocessing import Process
import time
def work(name,n):
print('%s is piaoing' %name)
time.sleep(n)
print('%s piao end' %name)
if __name__ == '__main__':
start_time=time.time()
p1=Process(target=work,args=('alex',1))
p2=Process(target=work,args=('wupeiqi',2))
p3=Process(target=work,args=('yuanhao',3))
# p1.start()
# p2.start()
# p3.start()
# p3.join() #主进程等,等待子进程结束后,主进程再执行后面的代码
# p2.join() #主进程等,等待子进程结束后,主进程再执行后面的代码
# p1.join() #主进程等,等待子进程结束后,主进程再执行后面的代码
p_l=[p1,p2,p3]
for p in p_l:
p.start()
for p in p_l:
p.join()#主进程等,等待子进程结束后,主进程再执行后面的代码
stop_time=time.time()
print('主',(stop_time-start_time))
6、terminate()和is_alive()(了解)
terminate() 关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活,如果被关闭的进程有子进程,这个方法并不会把子进程也关闭,所以这个方法不要用
is_alive() 查看进程是否存活,True为存活,False为不存活
from multiprocessing import Process
import time
def work(name,n):
print('%s is piaoing' %name)
time.sleep(n)
print('%s piao end' %name)
if __name__ == '__main__':
p1=Process(target=work,args=('alex',1))
p1.start()
p1.terminate()
time.sleep(1)
print(p1.is_alive())
print('主')
7、name()和pid()(了解)
name()获取进程名
pid()获取进程pid不要用,一般用os.getpid()
8、socketserver
实现ftp server端和client端的交互
服务端:
import socketserver
class MyServer(socketserver.BaseRequestHandler):
def handle(self):
conn = self.request
conn.sendall(bytes('欢迎致电 10086,请输入1xxx,0转人工服务.',encoding='utf-8'))
Flag = True
while Flag:
data = conn.recv(1024).decode('utf-8')
if data == 'exit':
Flag = False
elif data == '0':
conn.sendall(bytes('通过可能会被录音.balabala一大推',encoding='utf-8'))
else:
conn.sendall(bytes('请重新输入.',encoding='utf-8'))
if __name__ == '__main__':
server = socketserver.ThreadingTCPServer(('127.0.0.1',8008),MyServer)
server.serve_forever()
客户端:
import socket
ip_port = ('127.0.0.1',8008)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)
while True:
data = sk.recv(1024).decode('utf-8')
print('receive:',data)
inp = input('please input:')
sk.sendall(bytes(inp,encoding='utf-8'))
if inp == 'exit':
break
sk.close()
9、守护进程
主进程创建守护进程
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
#主进程代码运行完毕,守护进程就会结束
from multiprocessing import Process
import time
def foo():
print(123)
time.sleep(1)
print("end123")
def bar():
print(456)
time.sleep(3)
print("end456")
if __name__ == '__main__':
p1=Process(target=foo)
p2=Process(target=bar)
p1.daemon=True
p1.start()
p2.start()
print("main-------")
10、互斥锁
进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,
竞争带来的结果就是错乱,如何控制,就是加锁处理
10.1 多个进程共享同一打印终端
10.1.1 不加锁
并发运行,效率高,但竞争同一打印终端,带来了打印错乱
from multiprocessing import Process
import os,time
def work():
print('%s is running' %os.getpid())
time.sleep(2)
print('%s is done' %os.getpid())
if __name__ == '__main__':
for i in range(3):
p=Process(target=work)
p.start()
10.1.2 加锁
由并发变成了串行,牺牲了运行效率,但避免了竞争
from multiprocessing import Process,Lock
import os,time
def work(lock):
lock.acquire()
print('%s is running' %os.getpid())
time.sleep(2)
print('%s is done' %os.getpid())
lock.release()
if __name__ == '__main__':
lock=Lock()
for i in range(3):
p=Process(target=work,args=(lock,))
p.start()
10.2 多个进程共享同一文件
文件当数据库,模拟抢票
10.2.1 不加锁
#文件db的内容为:{"count":1}
#注意一定要用双引号,不然json无法识别
from multiprocessing import Process,Lock
import time,json,random
def search():
dic=json.load(open('db.txt'))
print('