概念:
进程:运行中的程序 有生命周期 关掉程序就销毁了 进程调度:1)先来先服务算法 2)短作业优先调度算法 3)时间片轮转法 4)多级反馈队列 并发:多个程序一起运行,轮流交替使用资源,独木桥 a先走一段 然后让给b走 并行:指两者同时执行,赛跑,两个人同时往前跑 阻塞:input sleep 文件的输入输出 网络请求 recv accept 同步:一个任务的完成需要依赖另一个任务,等到被依赖的任务完成后,依赖的任务才算完成 等待的过程什么都不干 异步:不需要等待被依赖的任务完成,当等待被依赖的任务完成时,立即执行 等待的过程可以干别的事 同步阻塞:一直等待 异步阻塞:可以干别的事 同步非阻塞:来回切换观察 异步非阻塞:可以干别的事。等待别人通知 多进程 :同时开启多个运行程序
进程:
1.创建进程:
from multiprocessing import Process import os def func(m,n): print(m,n) print('获取子进程的pid:', os.getpid()) print('获取子进程的父进程pid:', os.getppid()) if __name__ == '__main__': p = Process(target=func,args=('参数1','参数2')) #创建一个对象 传递一个参数是需要后面加, p.start() #启动一个子进程 后面的print也同时执行 所以是异步的 print('获取此进程的pid:',os.getpid()) print('获取此进程的父进程pid:', os.getppid())
from multiprocessing import Process class MyProcess(Process): #创建一个类,继承Process类 def __init__(self,args1,args2): super().__init__() self.args1 = args1 self.args2 = args2 def run(self): #必须实现一个run方法,run方法中是在子进程中的代码 print(self.args1) print(self.args2) if __name__ == '__main__': p = MyProcess(1,2) p.start()
2.进程的几个方法:
join
from multiprocessing import Process import time def func(n): print(n) time.sleep(2) if __name__ == '__main__': p = Process(target=func,args=(1,)) p.start() p.join() #join方法 感知子进程的结束,才执行之后的print print('hello')
3.开启多个子进程:
from multiprocessing import Process import time def func(s): print('子进程%s'%s) time.sleep(2) if __name__ == '__main__': for i in range(10): p = Process(target=func,args=(i,)) p.start()
>>基于多进程实现scoket实验(可以重复开启多个client)
server端
import socket from multiprocessing import Process def serve(conn): ret = '你好'.encode('utf-8') conn.send(ret) msg = conn.recv(1024).decode('utf-8') print(msg) conn.close() if __name__ == '__main__': sk = socket.socket() sk.bind(('127.0.0.1',8080)) sk.listen() while True: conn,addr = sk.accept() p = Process(target=serve,args=(conn,)) p.start() sk.close()
client端
import socket sk=socket.socket() sk.connect(('127.0.0.1',8080)) msg = sk.recv(1024).decode('utf-8') print(msg) msg2= input('>>>').encode('utf-8') sk.send(msg2) sk.close()
4. 多进程之间数据隔离问题
from multiprocessing import Process n = 100 def func(): global n n = n-1 return 111 if __name__ == '__main__': n_l = [] for i in range(100): p = Process(target=func) p.start() n_l.append(p) for p in n_l : p.join() print(n) #多进程之间的数据是隔离的不共享的
5.守护进程
#守护进程随着主进程代码执行结束而结束
#守护线程会在主线程结束之后等待子线程结束而结束
from multiprocessing import Process import time def func(): while True: time.sleep(0.2) print('我还活着') if __name__ == '__main__': p = Process(target=func) p.daemon = True #设置子线程为守护进程 p.start() time.sleep(2) print('主进程结束') #守护进程会随着 主进程的代码执行完毕而结束 #在主进程结束一个子进程 p.terminate() 结束一个进程并不是执行方法后立即生效,需要一个操作系统效应的过程 #p.name获取进程的名字
6.进程锁
lock = Lock() # 创造了一把锁
lock.acquire() # 获取了这把锁的钥匙
lock.release() # 归还这把锁的钥匙
import json import time from multiprocessing import Process from multiprocessing import Lock,Manager def show(i): with open('ticket') as f: dic=json.load(f) print("余票 %s"%dic['ticket']) def buy_ticket(i,lock): lock.acquire() with open('ticket') as f: dic = json.load(f) #文件用load 字符串用loads time.sleep(0.1) if dic['ticket']>0: dic['ticket'] -= 1 print("买到票了%s"%i) else: print("票卖完了%s"%i) time.sleep(0.1) with open('ticket','w') as f: json.dump(dic,f) lock.release() if __name__ == '__main__': for i in range(10): p = Process(target=show,args=(i,)) p.start() lock = Lock() for i in range(10): p = Process(target=buy_ticket,args=(i,lock)) p.start() --------------------------------------------------- ticket {"ticket": 1}
7.信号量
from multiprocessing import Process,Semaphore import time def func(i,sem): sem.acquire() print('进程%s'%i) time.sleep(4) sem.release() if __name__ == '__main__': sem = Semaphore(4) #允许开启四个进程每一次 for i in range(10): p = Process(target=func,args=(i,sem)) p.start()
8.事件(红绿灯案例)
''' from multiprocessing import Event e = Event() #创建一个事件 print(e.is_set()) #查看一个事件 e.set() #更改事件的状态为true print(e.is_set()) e.wait() #依据e.is_set的值来决定是否阻塞 print(123) e.clear()#将事件的状态更改为false ''' from multiprocessing import Event,Process import time import random def cars(e,i): if not e.is_set(): print('car %i在等待通行'%i) e.wait() #阻塞 直到一个事件状态改变 else: print('car %i可通行' % i) def light(e): while True: if e.is_set(): print('绿灯亮了') time.sleep(2) e.clear() else: print('红灯亮了') time.sleep(2) e.set() if __name__ == '__main__': e = Event() p = Process(target=light,args=(e,)) p.start() for i in range(20): car = Process(target=cars,args=(e,i)) car.start() time.sleep(random.random())
9.队列
from multiprocessing import Queue,Process def produce(q): q.put('hello') def consume(q): print(q.get()) if __name__ == '__main__': q = Queue() p = Process(target=produce,args=(q,)) p.start() c = Process(target=consume, args=(q,)) c.start()
10.管道
from multiprocessing import Pipe,Process def func(conn1,conn2): conn2.close() while True: try: msg = conn1.recv() print(msg) except EOFError: conn1.close() break if __name__ == '__main__': conn1, conn2 = Pipe() Process(target=func,args=(conn1, conn2)).start() conn1.close() for i in range(20): conn2.send('吃了么') conn2.close()
11.进程池
import os from multiprocessing import Pool import time def func(n): print('start func%s'%n,os.getpid()) time.sleep(1) print('end func%s'%n,os.getpid()) if __name__ == '__main__': p = Pool(5) for i in range(10): p.apply(func,args=(i,)) #同步 ------------------------------------------------------------- import os from multiprocessing import Pool import time def func(n): print('start func%s'%n,os.getpid()) time.sleep(1) print('end func%s'%n,os.getpid()) if __name__ == '__main__': p = Pool(5) for i in range(10): p.apply(func,args=(i,)) #异步 p.close() #结束进程池接受任务 p.join() #感知进程池中的任务执行结束
>>基于进程池实现scoket实验(可以重复开启多个client)
server端
import socket from multiprocessing import Pool def func(conn): conn.send(b'hello') print(conn.recv(1024).decode('utf-8')) conn.close() if __name__ == '__main__': p = Pool(5) sk= socket.socket() sk.bind(('127.0.0.1',8080)) sk.listen() while True: conn, addr = sk.accept() p.apply_async(func,args=(conn,)) sk.close()
client端
import socket sk = socket.socket() sk.connect(('127.0.0.1',8080)) ret = sk.recv(1024).decode('utf-8') print(ret) msg = input('>>>').encode('utf-8') sk.send(msg) sk.close()
12.进程池回调函数
from multiprocessing import Pool import os def func1(n): print('in func1',os.getpid()) return n*n #子进程 def func2(nn): print('in func2',os.getpid()) print(nn) #主进程 if __name__ == '__main__': print('主进程:',os.getpid()) p = Pool(5) for i in range(10): p.apply_async(func1,args=(10,),callback=func2) #callback回调 p.close() p.join()
13.生产者消费者模型
import time import random from multiprocessing import Process,Queue def produce(q,name,food): for i in range(4): time.sleep(random.randint(1, 3)) f = '%s生产%s%s' % (name, food, i) print(f) q.put(f) def consumer(q,name): while True: food = q.get() if food is None: print("%s获得一个空"%name) break print('%s吃了%s'%(name,food)) if __name__ == '__main__': q = Queue(20) p = Process(target=produce,args=(q,'小白','肉包子')) p2 = Process(target=produce, args=(q, '小黑', '菜包子')) c1 = Process(target=consumer,args=(q,'狗')) c2 = Process(target=consumer, args=(q,'猫')) p.start() p2.start() c1.start() c2.start() p.join() p2.join() q.put(None) q.put(None)
线程:
1.创建线程
from threading import Thread import time def func(n): time.sleep(1) print(n) for i in range(10): t = Thread(target=func, args=(i,)) t.start()
from threading import Thread import time class MyThread(Thread): def __init__(self,arg): super().__init__() self.arg = arg def run(self): time.sleep(1) print(self.arg) t=MyThread(10) t.start()
2.数据共享问题
from threading import Thread import os def func(n): global g g = 0 print(g,os.getpid()) g=100 t_list=[] for i in range(10): t = Thread(target=func, args=(i,)) t.start() t_list.append(t) for t in t_list: t.join() print(g) #多线程的数据是共享的
3.>>基于多线程实现scoket实验(可以重复开启多个client)
server端
import socket from threading import Thread def chat(conn): conn.send(b'hello') msg = conn.recv(1024).decode('utf-8') print(msg) conn.close() sk = socket.socket() sk.bind(('127.0.0.1',8080)) sk.listen() while True: conn,addr = sk.accept() Thread(target=chat,args=(conn,)).start() sk.close()
client端
import socket sk = socket.socket() sk.connect(('127.0.0.1',8080)) msg = sk.recv(1024).decode('utf-8') print(msg) info = input('>>>').encode('utf-8') sk.send(info) sk.close()
4.守护线程
#守护进程随着主进程代码执行结束而结束
#守护线程会在主线程结束之后等待子线程结束而结束
from threading import Thread import time def func1(): print(666) time.sleep(2) def func2(): print('in func2') time.sleep(2) t = Thread(target=func1,) t.daemon = True t.start() print('主线程') t2 = Thread(target=func2,) t2.start()
5.线程锁
#死锁现象 from threading import Thread,Lock import time noodle_lock = Lock() fork_lock = Lock() def func(name): noodle_lock.acquire() print('%s拿到面条'%name) fork_lock.acquire() print('%s拿到叉子'% name) print('%s吃面'%name) fork_lock.release() noodle_lock.release() def func1(name): fork_lock.acquire() print('%s拿到叉子' % name) time.sleep(5) noodle_lock.acquire() print('%s拿到面条' % name) print('%s吃面' % name) noodle_lock.release() fork_lock.release() Thread(target=func,args=('1号顾客',)).start() Thread(target=func1,args=('2号顾客',)).start() Thread(target=func,args=('3号顾客',)).start() Thread(target=func1,args=('4号顾客',)).start()
from threading import Thread,RLock import time noodle_lock = fork_lock = RLock() def func(name): noodle_lock.acquire() print('%s拿到面条'%name) fork_lock.acquire() print('%s拿到叉子'% name) print('%s吃面'%name) time.sleep(2) fork_lock.release() noodle_lock.release() def func1(name): fork_lock.acquire() print('%s拿到叉子' % name) noodle_lock.acquire() print('%s拿到面条' % name) print('%s吃面' % name) time.sleep(2) noodle_lock.release() fork_lock.release() Thread(target=func,args=('1号顾客',)).start() Thread(target=func1,args=('2号顾客',)).start() Thread(target=func,args=('3号顾客',)).start() Thread(target=func1,args=('4号顾客',)).start()
6.信号量
from threading import Thread,Semaphore import time def func(sem,a,b): sem.acquire() time.sleep(1) print(a+b) sem.release() sem = Semaphore(4) for i in range(10): t = Thread(target=func, args=(sem,i, i + 5)) t.start()
7.事件
#事件被创建的时候是false状态
#flase状态 wait()阻塞
#true状态 wait()非阻塞
#set false状态设置为true
#clear 设置状态为false
#连接数据库 #检测数据库的可连接情况 from threading import Thread,Event import time import random def connect_db(e): count = 0 while count<3: e.wait(0.5) #状态为flase的时候 只等待0.5s就结束 if e.is_set() == True: print('连接成功') break else: count += 1 print('第%s次连接失败'%count) else: raise TimeoutError def check_web(e): time.sleep(random.randint(0,3)) e.set() e = Event() t1 = Thread(target=connect_db,args=(e,)) t2 = Thread(target=check_web,args=(e,)) t1.start() t2.start()
8.条件
#notify(int数据类型) 造钥匙 from threading import Condition,Thread def func(con,i): con.acquire() con.wait() #等钥匙 print('在第%s个循环里'%i) con.release() con = Condition() for i in range(10): Thread(target=func,args=(con,i)).start() while True: num = int(input('>>>')) con.acquire() con.notify(num) #造钥匙 con.release()
9.定时器
from threading import Timer import time def func(): print('时间同步') while True: Timer(5,func).start() time.sleep(5)
10.队列
import queue q = queue.Queue() # 队列 先进先出 q.put(1) q.put(2) print(q.get()) print(q.get()) # q.put_nowait() # q.get_nowait() q = queue.LifoQueue() #栈 先进后出 q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) q = queue.PriorityQueue() #优先级队列 数字越小优先级越高 相同优先级按ascii排 q.put((20,'a')) q.put((10,'b')) q.put((1,'c')) q.put((1,'q')) print(q.get())
11.线程池
from concurrent.futures import ThreadPoolExecutor import time def func(n): time.sleep(2) print(n) return n*n tpool = ThreadPoolExecutor(max_workers=5) t_list = [] for i in range(20): #异步的 t = tpool.submit(func,i) t_list.append(t) tpool.shutdown() #close+join print('主线程') for t in t_list: print('***',t.result())
12.回调函数
from concurrent.futures import ThreadPoolExecutor import time def func(n): time.sleep(2) print(n) return n*n def callback(m): print('结果是%s'%m.result()) tpool = ThreadPoolExecutor(max_workers=5) for i in range(20): tpool.submit(func,i).add_done_callback(callback)
协程
1.概念
#进程 启动多个进程 进程之间是由操作系统负责调用
#线程 启动多个线程 真正被cpu执行的最小单位是线程
#开启一个线程 创建一个线程 寄存器 堆栈
#关闭一个线程
#协程
#本质上是一个线程
#能够在多个任务之间切换来节省一些io时间
#协程中任务之间的切换时间消耗的时间 远远小于进程和线程之间的切换
#实现并发的手段
#进程和线程的任务切换由操作系统完成 #协程任务之间的切换由代码完成,只有遇到协程模块可识别的io操作,程序才会进程任务切换实现并发效果 from gevent import monkey;monkey.patch_all() import time import gevent def task(): time.sleep(1) print(123) def sync(): for i in range(10): task() def asynct(): g_list = [] for i in range(10): g = gevent.spawn(task) g_list.append(g) gevent.joinall(g_list) sync() asynct()
2.基于协程实现socket实验(可以重复开启多个client)
server端
import socket from gevent import monkey;monkey.patch_all() import gevent def func(conn): conn.send(b'hello') msg = conn.recv(1024).decode('utf-8') print(msg) conn.close() sk = socket.socket() sk.bind(('127.0.0.1',8080)) sk.listen() while True: conn,addr = sk.accept() gevent.spawn(func,conn) sk.close()
client端
import socket sk = socket.socket() sk.connect(('127.0.0.1',8080)) msg = sk.recv(1024).decode('utf-8') print(msg) info = input('>>>').encode('utf-8') sk.send(info) sk.close()