python并发编程之进程

python并发编程之进程

part 1:

操作系统基础--I/O操作

I/O操作:相对内存来说

  • 输入Input输出Output
  • 输入是怎么输入: 键盘input ead ecv
  • 输出是怎么输出: 显示器打印机播放音乐printwritesend
  • 文件操作: read write
  • 网络操作: send recv recvfrom
  • 函数:print input

计算机的工作分为两个状态

  • CPU工作:做计算(对内存中的数据进行操作)的时候工作
  • CPU不工作:IO操作的时候
  • CPU的工作效率500000条指令/ms

多道操作系统/分时操作系统

多道操作系统:一个程序遇到IO就把CPU让给别人

  • 顺序的一个一个执行的思路变成
  • 共同存在在一台计算机中,其中一个程序执行让出cpu之后,另一个程序能继续使用cpu
  • 来提高cpu的利用率+
  • 单纯的切换会不会占用时间:会
  • 但是多道操作系统的原理整体上还是节省了时间,提高了CPU的利用率
    时空复用的概念

分时操作系统:

  • 单cpu分时操作系统:把时间分成很小很小的段,每一个时间都是一个时间片,
  • 每一个程序轮流执行一个时间片的时间,自己的时间片到了就轮到下一个程序执行--时间片的轮转
  • 没有提高CPU的利用率提高了用户体验
例子:
    老教授24h全是计算没有io
	  先来先服务FCFS
	  研究生5min全是计算没有io
	  短作业优先
    研究生25min全是计算没有io

并发概念

  • 进程:进行中的程序就是一个进程
    • 占用资源需要操作系统调度
    • pid:能够唯一标识一个进程
    • 计算机中最小的资源分配单位
  • 线程: 首先由进程开辟空间等,线程是执行具体的代码
    • 线程是进程中的一个单位,不能脱离进程存在
    • 线程是计算机中能够被CPU调度的最小单位
  • 并发:
    • 多个程序同时执行:只有一个cpu,多个程序轮流在一个cpu上执行
    • 宏观上:多个程序在同时执行
    • 微观上:多个程序轮流在一个cpu上执行本质上还是串行
  • 并行:
    • 多个程序同时执行,并且同时在多个CPU上执行(能不能利用多核)
  • 同步:
    • 在做A事情的时候发起B件事,必须等待B件事情结束之后才能继续做A件事情
  • 异步:
    • 在做A事的时候发起B时间,不需要等待B事件结束就可以继续A事件
  • 阻塞:
    • 如果CPU不工作 input accept recv recv from sleep connect
  • 非阻塞:
    • CPU工作

part 2:

进程的概念

  • 进程的三状态图:
  • windows操作系统下
  • Linux系统操作下:

multiprocessing模块:

  • 为什么要用if__name__=='main'?
  • 能不能给子进程传递参数?能
  • 能不能获取子进程的返回值?不能
  • 能不能同时开启多个子进程?可以
  • 同步阻塞异步非阻塞
  • 同步阻塞:join()
  • 异步非阻塞:start()
from  multiprocessing  import  Process

import  os
import  time
def  func(name,age):
	print(f'{name}start')
	time.sleep(1)
	print(os.getpid(),os.getppid(),name,age)#pid(进程id):processidppid(父进程id):parentprocessid
#print('123')
if  __name__=='__main__':
	#只会在主进程中执行的所有的代码写在name=main下
	print('main:',os.getpid(),os.getppid())
	arg_lst=[('alex',84),('taibai',73),('wusir',96)]
	for  arg  in  arg_lst:
		p=Process(target=func,args=arg)#可以给子进程传递参数
		p.start()#开启子进程,异步非阻塞
	#p=Process(target=func,args=('haha',23))#可以开启多个子进程
#p.start()

join()的用法:

from  multiprocessing  import  Process
import  os
import  time
def  func(name,age):
	print(f'发送一封邮件给{age}的{name}')
	time.sleep(1)
	print('发送完毕')
if  __name__=="__main__":
	arg_lst=[('alex',84),('taibai',73),('wusir',96)]
	p_lst=[]
	for  arg  in  arg_lst:
		p=Process(target=func,args=arg)
		p.start()
		p_lst.append(p)
	for  p  in  p_lst:
		p.join()#阻塞:直到p这个进程执行完毕才继续执行代码
print('所有邮件已经发送完毕')#要求:这句话最后打印,并且要求上面的能够异步执行

多进程之间的数据是否隔离

from  multiprocessing  import  Process
n=0
def  func():
	global  n
	n  +=  1

if  __name__=='__main__':
	p_l=[]
	for  i  in  range(50):
		p=Process(target=func)
		p.start()
		p_l.append(p)
	for  p  in  p_l:
		p.join()
print(n)#0对主进程进行了隔离

使用多进程实现一个并发的socketserver

  1. 服务器端:
import  socket
from  multiprocessing  import  Process

def  talk(conn):
	while  True:
		msg=conn.recv(1024).decode('utf-8')
		ret=msg.upper().encode('utf-8')
		conn.send(ret)
	conn.close()

if  __name__=='__main__':
	sk = socket.socket()
	sk.bind(('127.0.0.1',9001))
	sk.listen()
	while  True:
		conn,addr=sk.accept()
		Process(target=talk,args=(conn,)).start()# args为元组
sk.close()
  1. 客户端:
import  socket
import  time
sk=socket.socket()
sk.connect(('127.0.0.1',9001))
while  True:
	sk.send(b'hello')
	msg = sk.recv(1024).decode('utf-8')
	print(msg)
	time.sleep(0.5)

sk.close()

part 3:

开启进程的例外一种方式

  • 面向对象的方法,通过继承和重写run方法完成了启动子进程
  • 通过重写init和调用父类的inin完成了给子进程传参数
import  os
import  time
from  multiprocessing  import  Process

class  MyProcess(Process):
	def  __init__(self,a,b,c):#如果需要传参数,需要写__init__方法
		self.a=a
		self.b=b
		self.c=c
		super().__init__()#调用上级的__init__方法
	def  run(self):#实现run的同名方法
		time.sleep(1)
		print(os.getppid(),os.getpid(),self.a)


if  __name__=='__main__':
	print('主进程-->',os.getpid())
	for  i  in  range(10):
		p=MyProcess(1,2,3)#传参数
p.start()

Process类的其他方法

name  pid  ident  daemon(在start之前设置)
terminate()   is_alive()


import  os
import  time
from  multiprocessing  import  Process

class  MyProcess(Process):
	def  __init__(self,a,b,c):#如果需要传参数,需要写__init__方法
		self.a=a
		self.b=b
		self.c=c
		super().__init__()#调用上级的__init__方法
	def  run(self):#实现run的同名方法
		time.sleep(1)
		print(os.getppid(),os.getpid(),self.a)


if  __name__=='__main__':
	p=MyProcess(1,2,3)
	p.start()
	print(p.pid,  p.ident)#查看子进程的pid  22672    22672(同)
	print(p.name)#查看子进程的名字
	print(p.is_alive())#查看进程是否活着
	p.terminate()#强制结束一个子进程异步非阻塞
	print(p.is_alive())  #True?因为操作系统响应需要一定时间
	time.sleep(0.01)
print(p.is_alive())

守护进程

  • 守护进程
    • 在start一个进程之前设置daemon=True
    • 守护进程会等待主进程的代码结束就立即结束
      • 为什么守护进程只守护主进程的代码?而不是等主进程结束之后才结束
      • 因为主进程要最后结束,为了给守护进程回收资源
      • 守护进程会等待其他子进程结束么?不会
import  time
from  multiprocessing  import  Process

def  son1():
	while  True:
		print('--->inson1')
		time.sleep(1)


def  son2():#10s
	for  i  in  range(10):
		print('inson2')
		time.sleep(1)


if  __name__=='__main__':#3s
	p1=Process(target=son1)
	p1.daemon=True#表示设置p1是一个守护进程需要在start之前设置
	p1.start()
	p2=Process(target=son2)
	p2.start()
	time.sleep(3)
	print('>>>>inmain')
p2.join()#等待p2结束之后才结束
  • 等待p2结束-->主进程的代码才结束-->守护进程结束
  • 主进程会等待所有的子进程结束,是为了回收子进程的资源
  • 守护进程会等待主进程的代码执行结束之后再结束,而不是等待整个主进程结束
  • 主进程的代码什么时候结束,守护进程就什么时候结束,和其他子进程的执行进度无关

进程同步---Lock 锁 *****

  • 进程之间数据安全的问题

锁:

import  time
from  multiprocessing  import  Lock,Process
def  func(i,lock):
	lock.acquire()#拿钥匙
	print(f'{i}被锁起来的代码')
	#time.sleep(1)
	lock.release()#还钥匙

if  __name__=='__main__':
	lock=Lock()
	for  i  in  range(10):
		p=Process(target=func,args=(i,lock))
p.start()

抢票的例子:加锁

import  json
import  time
from  multiprocessing  import  Process,Lock


def  search(i):
	#查询余票
	with  open('ticket',encoding='utf-8')  as  f:
		ticket=json.load(f)
	print(f'{i}:当前的余票为{ticket["count"]}张')


def  buy_ticket(name):
	#查询余票
	with  open('ticket',encoding='utf-8')  as  f:
		ticket=json.load(f)
	if  ticket['count']>0:
		ticket['count'] -= 1
		print(f'{name}买到票了')
	time.sleep(0.1)
	with  open('ticket',mode='w',encoding='utf-8')  as  f:
		json.dump(ticket,f)

def  get_ticket(i,lock):
	search(i)
	with  lock:#代替acquire和release并在此基础上做一些异常处理,保证即便一个进程的代码出错了,也会归还钥匙
		buy_ticket(i)
	#lock.acquire()
	#buy_ticket(i)
	#lock.release()
if  __name__=='__main__':
	lock=Lock()#互斥锁
	for  i  in  range(10):
		p=Process(target=get_ticket,args=(i,lock))
p.start()

为啥叫互斥锁?

from  multiprocessing  import  Lock#互斥锁不能再同一个进程中连续acquire多次
lock=Lock()
lock.acquire()
print(1)    # 只能打印一个
lock.acquire()
print(2)

进程之间数据隔离---队列

  • 进程之间通信(IPC) Inter Process Communication
    • 生产者消费者模型
    • 基于文件:同一台机器上的多个进程之间的通信
      • Queue队列
        • 基于socket的文件级别的通信来完成数据传递的
    • 基于网络:同一台机器或者多台机器上的多进程间通信
      • 第三方工具(消息中间件)
        • memcache
        • redis
        • rabbitmq
        • kafka
from  multiprocessing  import  Queue, Process

def  pro(q):
	for  i  in  range(11):
		print('--->',q.get())
def  son(q):
	for  i  in  range(10):
		q.put(f'hello{i}')

if  __name__=='__main__':
	q=Queue()
	Process(target=son,args=(q,)).start()
Process(target=pro,args=(q,)).start()
  • 生产者消费者模型背下来
    • 爬虫的时候
    • 分布式操作:celery
  • 本质:就是让生产数据和消费数据的效率达到平衡并且最大化的效率
from  multiprocessing  import  Queue,Process
import  time
import  random

def  consumer(q,name):#消费者:通常取到数据之后还要进行某些操作
	while  True:
		food=q.get()
		if  food:
			print(f'{name}吃了{q.get()}')
		else:
			break


def  producer(q,name,food):#生产者:通常再放数据之前先通过某些代码来获取数据
	for i in range(10):
		foodi = f'{food}{i}'
		print(f'{name}生产了{foodi}')
		time.sleep(random.random())
		q.put(foodi)

if __name__ == '__main__':
	q=Queue()
	c1=Process(target=consumer,args=(q,'alex'))
	p1=Process(target=producer,args=(q,'大壮','苹果'))
	p2=Process(target=producer,args=(q,'b哥','梨子'))
	c1.start()
	p1.start()
	p2.start()
	p1.join()
	p2.join()
q.put(None)#有多少个消费者就put多少个None进来
原文地址:https://www.cnblogs.com/zranguai/p/13822571.html