进程
进程 : 什么是进程? 是操作系统的发展过程中,为了提高CPU的利用率,在操作系统同时运行多个程序的时候,为了数据的安全代码不混乱而被创造出来的一个概念 每一个程序运行起来都至少是一个进程. 进程是计算机中最小的资源分配单位 进程被操作系统调度的,有很多相关的算法 - 早期的操作系统 进程之间是数据隔离的 进程的三状态 就绪 运行 阻塞 同步异步 同步 : 一个任务的执行依赖另一个事务的结束 join lock 异步 : 一个任务的执行不依赖另一个事务的结束 start terminate 阻塞非阻塞 阻塞 : accept recv recvfrom queue.get join 非阻塞 : setblocking = False 并发并行 并行是特殊的并发 并行就是 同一时刻 两个以上的程序同时在cpu上执行 并发就是 同一时段 两个以上的程序看起来在同时执行 IO概念 : 文件操作 数据库操作 网络传输 用户输入输出 Input 得到bytes/str Output 发送数据/输出数据 因为进程与进程之间本质上是异步且数据隔离 守护进程 : 子进程等待主进程的代码结束就结束了 同步控制 join 锁 - 互斥锁 : 多个进程同时对一个数据进行操作的时候 操作同一个文件/数据库/管道/Manager.dict 信号量 事件 数据共享 - 数据不安全 Manager IPC-进程之间通信 管道 队列 - 生产者消费者模型(为了解决数据的生产和处理的效率问题) 第三方工具(消息队列,消息中间件) 进程池 解决大量任务 开启多个进程的开销过大的问题 节省资源,提高并发效率的 一般开进程数 cpu_count * 1 or 2
Python并不支持真正意义上的多线程。Python中提供了多线程模块,但如果想通过多线程提高代码的速度,并不推荐使用多线程模块。Python中有一个全局锁Global Interpreter Lock(GIL),全局锁会确保任何时候多个线程中只有一个会被执行。线程的执行速度非常快,会误以为线程是并行执行的,但实际上都是轮流执行。经过GIL处理后,会增加线程执行的开销。
全局锁 GIL(Global interpreter lock) 并不是 Python 的特性,而是在实现 Python 解析器(CPython)时所引入的一个概念。Python有CPython,PyPy,Psyco 等不同的 Python 执行环境,其中 JPython 没有GIL。CPython 是大部分环境下默认的 Python 执行环境,GIL 并不是 Python 的特性,Python 完全可以不依赖于 GIL。
GIL 限制了同一时刻只能有一个线程运行,无法发挥多核 CPU 的优势。GIL 本质是互斥锁,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。在一个 Python 的进程内,不仅有主线程或者由主线程开启的其它线程,还有解释器开启的垃圾回收等解释器级别的线程。进程内,所有数据都是共享的,代码作为一种数据也会被所有线程共享,多个线程先访问到解释器的代码,即拿到执行权限,然后将 target 的代码交给解释器的代码去执行,解释器的代码是所有线程共享的,所以垃圾回收线程也可能访问到解释器的代码而去执行,因此为了保证数据安全需要加锁处理,即 GIL。
由于GIL 的存在,同一时刻同一进程中只有一个线程被执行。多核 CPU可以并行完成计算,因此多核可以提升计算性能,但 CPU 一旦遇到 I/O 阻塞,仍然需要等待,所以多核CPU对 I/O 密集型任务提升不明显。根据执行任务是计算密集型还是I/O 密集型,不同场景使用不同的方法,对于计算密集型任务,多进程占优势,对于 I/O 密集型任务,多线程占优势
所有的程序 - 任务 所有的任务 - 进程 进程 ? 进行中的程序 PID进程ID 进程是计算机中资源分配的最小单位 进程间数据隔离 要通信的话运用socket
进程调度:--先来先服务调度算法,短作业优先,时间片轮转法,多级反馈机制
并发 资源有限的情况下,AB程序交替使用cpu目的是提高效率 并行 同一时刻都在执行 多核 (是并发里的一种特殊情况) 更苛刻的条件 同步: 程序顺序执行,多个任务之间串行执行 (洗衣完--做饭完--洗碗) 异步: 多个任务同时运行 (在同一时间内洗衣做饭洗碗) 阻塞: 程序由于不符合某个条件或者等待某个条件满足 而在某一个地方进入等待状态 非阻塞: 程序正常执行
同步阻塞
一件事儿一件事儿的做
中间还要被阻塞
同步非阻塞 : 费力不讨好
一件事儿一件事儿的做
但是不阻塞
异步阻塞
同时进行的
每一件事儿都会遇到阻塞事件
异步非阻塞
几个事情同时进行
每一件事都不阻塞
import os,time print(os.getpid()) #获取当前进程号 print(os.getppid()) #获取当前父进程id time.sleep(5) print(os.getpid())
import multiprocessing #是个包 from multiprocessing import Process import os def son_process(): # 这个函数的代码实在子进程执行的 print('执行我啦',os.getpid(),os.getppid()) print() if __name__ == '__main__': print('1-->',os.getpid()) #主进程 p = Process(target=son_process) #实例化 p.start() #进程开始 下面是打印结果 # 1--> 6416 # 执行我啦 7388 6416
import time from multiprocessing import Process # 通过并发实现一个有并发效果的socket server # 1.开启了一个子进程就已经实现了并发: 父进程(主进程)和子进程并发(同时执行) def son_process(): print('son start') time.sleep(1) print('son end') if __name__ == '__main__': p = Process(target=son_process) #创建子进程 p.start() #通知操作系统开启一个子进程 os响应需求 分配资源 执行进程中的代码 print('主进程')
import time from multiprocessing import Process # 通过并发实现一个有并发效果的socket server # 1.开启了一个子进程就已经实现了并发: 父进程(主进程)和子进程并发(同时执行) def son_process(): print('son start') time.sleep(1) print('son end') if __name__ == '__main__': p = Process(target=son_process) #创建子进程 p.start() #通知操作系统开启一个子进程 os响应需求 分配资源 执行进程中的代码 for i in range(5): print('主进程') time.sleep(0.3)
import time from multiprocessing import Process def son_process(): print('son start') time.sleep(1) print('son end') if __name__ == '__main__': for i in range(3): p = Process(target=son_process) p.start()
import time from multiprocessing import Process def son_process(i): print('son start',i) time.sleep(1) print('son end',i) if __name__ == '__main__': for i in range(10): p = Process(target=son_process,args=(i,)) #args必须元组 p.start() # 通知操作系统 start并不意味着子进程已经开始了
import time from multiprocessing import Process def son_process(i): print('son start',i) time.sleep(1) print('son end',i) if __name__ == '__main__': for i in range(10): p = Process(target=son_process,args=(i,)) #子进程不支持返回值 p.start() print('主进程的代码执行完毕') # 主进程会等待子进程结束之后才结束 # 为什么? # 父进程负责创建子进程,也负责回收子进程的资源
- server.py- import socket,time from multiprocessing import Process def talk(conn): conn, addr = sk.accept() print(conn) while True: msg = conn.recv(1024).decode() time.sleep(10) conn.send(msg.upper().encode()) if __name__ == '__main__': # 这句话下面的所有代码都只在主进程中执行 sk = socket.socket() sk.bind(('127.0.0.1',9000)) sk.listen() while True: conn,addr = sk.accept() Process(target=talk,args=(sk,)).start() # 卡 大量的while True 并且代码中并没有太多的其他操作 # 如果我们使用socketserver,不会这么卡 # 多进程确实可以帮助我们实现并发效果,但是还不够完美 # 操作系统没开启一个进程要消耗大量的资源 # 操作系统要负责调度进程 进程越多 调度起来就越吃力 - clitent.py- import socket sk = socket.socket() sk.connect(('127.0.0.1',9000)) while True: sk.send(b'hello') print(sk.recv(1024))
def son_process(i): while True: print('son start',i) time.sleep(0.5) print('son end',i) if __name__ == '__main__': p = Process(target=son_process, args=(1,)) p.start() # 开启一个子进程,异步的 print('主进程的代码执行完毕') print(p.is_alive()) # 子进程还活着 p.terminate() # 结束一个子进程,异步的 print(p.is_alive()) # 子进程还在活着 time.sleep(0.1) print(p.is_alive()) # False
n = [100] def sub_n(): # 减法 global n # 子进程对于主进程中的全局变量的修改是不生效的 n.append(1) print('子进程n : ',n) #子进程n : [100, 1] if __name__ == '__main__': p = Process(target = sub_n) p.start() p.join() # 阻塞 直到子进程p结束 print('主进程n : ',n) #主进程n : [100]
# 主进程里的print('主进程n : ',n)这句话在十个子进程执行完毕之后才执行 n = [100] import random def sub_n(): global n # 子进程对于主进程中的全局变量的修改是不生效的 time.sleep(random.random()) n.append(1) print('子进程n : ',n) if __name__ == '__main__': p_lst = [] #进程添加进列表 for i in range(10): p = Process(target = sub_n) #创建 p.start() #通知os 开启 p_lst.append(p) for p in p_lst:p.join() # 阻塞 只有一个条件是能够让我继续执行 这个条件就是子进程结束 print('主进程n : ',n)
n = [100] def sub_n(): global n # 子进程对于主进程中的全局变量的修改是不生效的 n.append(1) print('子进程n : ',n) time.sleep(10) print('子进程结束') if __name__ == '__main__': p = Process(target = sub_n) p.start() p.join(timeout = 5) # 如果不设置超时时间 join会阻塞直到子进程p结束 # # timeout超时 # # 如果设置的超时时间,那么意味着如果不足5s子进程结束了,程序结束阻塞 # # 如果超过5s还没有结束,那么也结束阻塞 print('主进程n : ',n) p.terminate() # 也可以强制结束一个子进程
# 设置子进程为守护进程,守护进程会随着主进程代码的结束而结束 # 由于主进程要负责给所有的子进程收尸,所以主进程必须是最后结束,守护进程只能在主进程的代码结束之后就认为主进程结束了 # 守护进程在主进程的代码结束之后就结束了,不会等待其他子进程结束 # # 希望守护进程必须等待所有的子进程结束之后才结束 # ???? # import time # from multiprocessing import Process # def alive(): # while True: # print('连接监控程序,并且发送报活信息') # time.sleep(0.6) # # def func(): # '主进程中的核心代码' # while True: # print('选择的项目') # time.sleep(1) # print('根据用户的选择做一些事儿') # # if __name__ == '__main__': # p = Process(target=alive) # p.daemon = True # 设置子进程为守护进程,守护进程会随着主进程代码的结束而结束 # p.start() # p = Process(target=func) # p.start() # p.join() # 在主进程中等待子进程结束,守护进程就可以帮助守护其他子进程了 # 守护进程 # 1.守护进程会等待主进程的代码结束而结束,不会等待其他子进程的结束 # 2.要想守护进程等待其他子进程,只需要在主进程中加上join
for i in range(5): pass print(i) # i=4 lst = [] for i in range(5): p = Process() lst.append(p) p.start() for p in lst: p.join() p.terminate()
import os from multiprocessing import Process class MyProcess(Process): #继承 def __init__(self,参数): super().__init__() #父类 初始化 self.一个属性 = 参数 def run(self): print('子进程中要执行的代码') if __name__ == '__main__': conn = '一个链接' mp = MyProcess(conn) mp.start()
当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。 接下来,我们以 模拟抢票 为例,来看看数据安全的重要性。 import json import time from multiprocessing import Process,Lock def search(name): '''查询余票的功能''' with open('ticket') as f: # 'r' dic = json.load(f) # 读取 dict print(name , dic['count']) def buy(name): # 买票 with open('ticket') as f: dic = json.load(f) time.sleep(0.1) if dic['count'] > 0: print(name,'买到票了') dic['count'] -= 1 time.sleep(0.1) with open('ticket','w') as f: json.dump(dic,f) # 写进去 def get_ticket(name,lock): #整个操作 search(name) # 先查 lock.acquire() # 只有第一个到达的进程才能获取锁,剩下的其他人都需要在这里阻塞 上锁 buy(name) # 再买 lock.release() # 有一个人还锁,会有一个人再结束阻塞拿到钥匙 还锁 if __name__ == '__main__': lock = Lock() # 实例化锁 for i in range(10): # 10个进程 p = Process(target=get_ticket,args=('name%s'%i,lock)) # 创建 p.start() # 通知os 开启 # tips : ticket 里面的数据结构 {"count": 0} # 模拟过程描述: # 第一个来的人 取钥匙 开门 进门 关门 带着钥匙反锁 # 第一个拿到钥匙的人 开门 出门 锁门 挂钥匙
进程 状态码 Z/z 僵尸进程 linux命令 主进程中控制子进程的方法: p = Process(target,args) #创建这一刻 根本没有通知操作系统 p.start() #通知os 开启子进程 异步非阻塞 p.terminate() #通知os,关闭子进程,异步非阻塞 p.is_alive() # 查看子进程是否还活着 p.join(timeout=10) # 阻塞 直到子进程结束 超时时间理解 # 守护进程 # 守护进程是一个子进程 # 守护进程会在主进程代码结束之后才结束 # 为什么会这样? # 由于主进程必须要回收所有的子进程的资源 # 所以主进程必须在子进程结束之后才能结束 # 而守护进程就是为了守护主进程存在的 # 不能守护到主进程结束,就只能退而求其次,守护到代码结束了 # 守护到主进程的代码结束,意味着如果有其他子进程没有结束,守护进程无法继续守护 # 解决方案 : 在主进程中加入对其他子进程的join操作,来保证守护进程可以守护所有主进程和子进程的执行 # 如何设置守护进程 # 子进程对象.daemon = True 这句话写在start之前 # 锁 # 为什么要用锁? # 由于多个进程的并发,导致很多数据的操作都在同时进行 # 所以就有可能产生多个进程同时操作 : 文件数据库 中的数据 # 导致数据不安全 # 所以给某一段修改数据的程序加上锁,就可以控制这段代码永远不会被多个进程同时执行 # 保证了数据的安全 # Lock 锁(互斥锁) # 锁实际上是把你的某一段程序变成同步的了,降低了程序运行的速度,为了保证数据的安全性 # 没有数据安全的效率都是耍流氓
# 对于锁 保证一段代码同一时刻只能有一个进程执行 # 对于信号量 保证一段代码同一时刻只能有n个进程执行 # 流量控制 import time import random from multiprocessing import Process,Semaphore def ktv(name,sem): sem.acquire() #拿锁 print("%s走进了ktv"%name) time.sleep(random.randint(5,10)) print("%s走出了ktv" % name) sem.release() #还锁 if __name__ == '__main__': sem = Semaphore(4) # 同时只能有4个进程执行 for i in range(25): p = Process(target=ktv,args = ('name%s'%i,sem)) p.start() # 信号量原理 : 锁 + 计数器实现的 # 普通的锁 acquire 1次 # 信号量 acquire 多次 # count计数 # count = 4 # acquire count -= 1 # 当count减到0的时候 就阻塞 # release count + = 1 # 只要count不为0,你就可以继续acquire
# from multiprocessing import Event # Event 事件类 # e = Event() # e 事件对象 # 事件本身就带着标识 : False # wait 阻塞 # 它的阻塞条件是 对象标识为False # 结束阻塞条件是 对象标识为True # 对象的标识相关的 : # set 将对象的标识设置为True # clear 将对象的标识设置为False # is_set 查看对象的标识是否为True import time import random from multiprocessing import Event,Process def traffic_light(e): print('