目录:
- 并发多线程
- 协程
- I/O多路复用(未完成,待续)
一、并发多线程
1、线程简述:
一条流水线的执行过程是一个线程,一条流水线必须属于一个车间,一个车间的运行过程就是一个进程(一个进程内至少一个线程)
进程是资源单位
而线程才是cpu上的执行单位
2、线程的优点:共享资源、创建开销小
3、线程的模块开启方式之threading模块
multiprocess模块的完全模仿了threading模块的接口。执行如下图:
方式一: 函数式调用
1 from threading import Thread 2 import time 3 def sayhi(name): # 定义函数 4 time.sleep(2) 5 print('%s say hello' %name) 6 7 if __name__ == '__main__': 8 t=Thread(target=sayhi,args=('wangshuyang',)) #定义线程类并传参 9 t.start() # 调用 10 print('主线程')
方式二:类式调用
1 #方式二 2 from threading import Thread 3 import time 4 class Sayhi(Thread): # 引用类 5 def __init__(self,name): 6 super().__init__() 7 self.name=name 8 def run(self): 9 time.sleep(2) 10 print('%s say hello' % self.name) 11 12 13 if __name__ == '__main__': 14 t = Sayhi('wangshuyang') #定义类 15 t.start() #调用 16 print('主线程')
4、多进程与多线程对比:
多线程比多进程更快,开销更小
1 from threading import Thread 2 from multiprocessing import Process 3 import os 4 5 def work(): 6 print('hello') 7 8 if __name__ == '__main__': 9 #在主进程下开启线程 10 t=Thread(target=work) 11 t.start() 12 print('主线程/主进程') 13 ''' 14 打印结果: 15 hello 16 主线程/主进程 17 ''' 18 19 #在主进程下开启子进程 20 t=Process(target=work) 21 t.start() 22 print('主线程/主进程') 23 ''' 24 打印结果: 25 主线程/主进程 26 hello 27 '''
pid进程号对比
1 from threading import Thread 2 from multiprocessing import Process 3 import os 4 5 def work(): 6 print('hello',os.getpid()) 7 8 if __name__ == '__main__': 9 #part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样 10 t1=Thread(target=work) 11 t2=Thread(target=work) 12 t1.start() 13 t2.start() 14 print('主线程/主进程pid',os.getpid()) 15 16 #part2:开多个进程,每个进程都有不同的pid 17 p1=Process(target=work) 18 p2=Process(target=work) 19 p1.start() 20 p2.start() 21 print('主线程/主进程pid',os.getpid())
5、多线程并发socket示例
1 #_*_coding:utf-8_*_ 2 #!/usr/bin/env python 3 import multiprocessing 4 import threading 5 6 import socket 7 s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) 8 s.bind(('127.0.0.1',8080)) 9 s.listen(5) 10 11 def action(conn): 12 while True: 13 data=conn.recv(1024) 14 print(data) 15 conn.send(data.upper()) 16 17 if __name__ == '__main__': 18 19 while True: 20 conn,addr=s.accept() 21 22 23 p=threading.Thread(target=action,args=(conn,)) 24 p.start() 25 26 服务端
1 #_*_coding:utf-8_*_ 2 #!/usr/bin/env python 3 4 5 import socket 6 7 s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) 8 s.connect(('127.0.0.1',8080)) 9 10 while True: 11 msg=input('>>: ').strip() 12 if not msg:continue 13 14 s.send(msg.encode('utf-8')) 15 data=s.recv(1024) 16 print(data) 17 18 客户端
6、多线程模拟文件操作示例
1 from threading import Thread 2 msg_l=[] 3 format_l=[] 4 def talk(): 5 while True: 6 msg=input('>>: ').strip() 7 if not msg:continue 8 msg_l.append(msg) 9 10 def format_msg(): 11 while True: 12 if msg_l: 13 res=msg_l.pop() 14 format_l.append(res.upper()) 15 16 def save(): 17 while True: 18 if format_l: 19 with open('db.txt','a',encoding='utf-8') as f: 20 res=format_l.pop() 21 f.write('%s ' %res) 22 23 if __name__ == '__main__': 24 t1=Thread(target=talk) 25 t2=Thread(target=format_msg) 26 t3=Thread(target=save) 27 t1.start() 28 t2.start() 29 t3.start()
7、threading模块之调用方法
1)join与setdaemon
与进程的方法都是类似的,其实是multiprocessing模仿threading的接口
join 等待线程执行完成,执行主进程
setdaemon 守护线程,主进程关闭,线程关闭
1 from threading import Thread 2 import time 3 def sayhi(name): 4 time.sleep(2) 5 print('%s say hello' %name) 6 7 if __name__ == '__main__': 8 t=Thread(target=sayhi,args=('egon',)) 9 t.setDaemon(True) 10 t.start() 11 t.join() 12 print('主线程') 13 print(t.is_alive())
2)Thread实例对象的方法
isAlive(): 返回线程是否活动的。
getName(): 返回线程名。
setName(): 设置线程名。
3)其他方法
threading.currentThread(): 返回当前的线程变量。
threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
1 from threading import Thread 2 import threading 3 from multiprocessing import Process 4 import os 5 6 def work(): 7 import time 8 time.sleep(3) 9 print(threading.current_thread().getName()) 10 11 12 if __name__ == '__main__': 13 #在主进程下开启线程 14 t=Thread(target=work) 15 t.start() 16 17 print(threading.current_thread().getName()) 18 print(threading.current_thread()) #主线程 19 print(threading.enumerate()) #连同主线程在内有两个运行的线程 20 print(threading.active_count()) 21 print('主线程/主进程') 22 23 ''' 24 打印结果: 25 MainThread 26 <_MainThread(MainThread, started 140735268892672)> 27 [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>] 28 2 29 主线程/主进程 30 Thread-1 31 '''
8、Python GIL(Global Interpreter Lock)全局解释器锁
在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势。只有cpython中一定会用GIL,Python完全可以不依赖于GIL。
GIL锁原理
ps.额外应该知道的知识
对计算来说,cpu越多越好,但是对于I/O来说,再多的cpu也没用,因为对应的硬盘,在操作系统来看,只有一块。
现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。
9、python 多线程和多进程的应用场景
计算密集型
1 #计算密集型 2 from threading import Thread 3 from multiprocessing import Process 4 import os 5 import time 6 def work(): 7 res=0 8 for i in range(1000000): 9 res+=i 10 11 if __name__ == '__main__': 12 t_l=[] 13 start_time=time.time() 14 # for i in range(300): #串行 15 # work() 16 17 for i in range(300): 18 t=Thread(target=work) #在我的机器上,4核cpu,多线程大概15秒 19 # t=Process(target=work) #在我的机器上,4核cpu,多进程大概10秒 20 t_l.append(t) 21 t.start() 22 23 for i in t_l: 24 i.join() 25 26 stop_time=time.time() 27 print('run time is %s' %(stop_time-start_time)) 28 29 print('主线程')
I/O密集型
1 #I/O密集型 2 from threading import Thread 3 from multiprocessing import Process 4 import time 5 import os 6 def work(): 7 time.sleep(2) #模拟I/O操作,可以打开一个文件来测试I/O,与sleep是一个效果 8 print(os.getpid()) 9 10 if __name__ == '__main__': 11 t_l=[] 12 start_time=time.time() 13 for i in range(1000): 14 t=Thread(target=work) #耗时大概为2秒 15 # t=Process(target=work) #耗时大概为25秒,创建进程的开销远高于线程,而且对于I/O密集型,多cpu根本不管用 16 t_l.append(t) 17 t.start() 18 19 for t in t_l: 20 t.join() 21 stop_time=time.time() 22 print('run time is %s' %(stop_time-start_time))
多线程用于IO密集型,如socket,爬虫,web
多进程用于计算密集型,如金融分析
10、同步锁
多线程或多进程程序访问同一份资源操作时,需要加锁。如下:
1 import time 2 import threading 3 4 def addNum(): 5 global num #在每个线程中都获取这个全局变量 6 #num-=1 7 8 temp=num 9 time.sleep(0.1) 10 num =temp-1 # 对此公共变量进行-1操作 11 12 num = 100 #设定一个共享变量 13 14 thread_list = [] 15 16 for i in range(100): 17 t = threading.Thread(target=addNum) 18 t.start() 19 thread_list.append(t) 20 21 for t in thread_list: #等待所有线程执行完毕 22 t.join() 23 24 print('Result: ', num)
锁通常被用来实现对共享资源的同步访问。为每一个共享资源创建一个Lock对象,当你需要访问该资源时,调用acquire方法来获取锁对象(如果其它线程已经获得了该锁,则当前线程需等待其被释放),待资源访问完后,再调用release方法释放锁:
1 import threading 2 3 R=threading.Lock() 4 5 R.acquire() 6 ''' 7 对公共数据的操作 8 ''' 9 R.release()
GIL VS Lock
机智的同学可能会问到这个问题,就是既然你之前说过了,Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock?
首先我们需要达成共识:锁的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据,然后,我们可以得出结论:保护不同的数据就应该加不同的锁。
最后,问题就很明朗了,GIL 与Lock是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),后者是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock
详细的:
因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程,每过一段时间它起wake
up做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己的程序 里的线程和
py解释器自己的线程是并发运行的,假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题,
这可以说是Python早期版本的遗留问题。
11、死锁与递归锁
进程也有死锁与递归锁,在进程那里忘记说了,放到这里一切说了额
所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
1 from threading import Thread,Lock 2 import time 3 mutexA=Lock() 4 mutexB=Lock() 5 6 class MyThread(Thread): 7 def run(self): 8 self.func1() 9 self.func2() 10 def func1(self): 11 mutexA.acquire() 12 print('