一、线程锁
线程安全,多线程操作时,内部会让所有线程排队处理。如:list/dict/Queue
线程不安全 + 人 => 排队处理。
1. RLock/Lock(一次放一个)
示例: 线程安全
import threading v = [] def func(arg): v.append(arg) # 线程安全 print(v) for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start()
示例: 线程安全+不安全
import threading import time v = [] def func(arg): v.append(arg) # 这里依然线程安全 # 数据被别人修改,这里线程不安全 time.sleep(0.01) # 如果这里的逻辑很长,时间变长,所有人都添加进去,那么取的时候会错乱 m = v[-1] print(arg, m) for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start() # 结果 # 1 9 # 0 9 # 2 9 # 4 9 # 3 9 # 5 9 # 6 9 # 7 9 # 9 9 # 8 9
a. Lock(): 一次放一个(只能锁一次,放一次)
import threading import time v = [] lock = threading.Lock() def func(arg): lock.acquire() v.append(arg) time.sleep(0.01) m = v[-1] print(arg, m) lock.release() for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start()
b. RLock(): 一次放一个(可以锁多次,放多次) -- 常用
import threading import time v = [] lock = threading.RLock() def func(arg): lock.acquire() lock.acquire() v.append(arg) time.sleep(0.01) m = v[-1] print(arg, m) lock.release() lock.release() for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start()
注意:RLock和Lock用法一样,只是Lock只能锁一次解一次,RLock支持锁多次 解多次,以后用RLock。
2. BoundedSemaphore(n): 一次放N个, 信号量
import threading import time lock = threading.BoundedSemaphore(3) # 一次放3个 def func(arg): lock.acquire() print(arg) time.sleep(1) lock.release() for i in range(20): t = threading.Thread(target=func, args=(i,)) t.start()
3. condition(),一次放x个,x是动态的.
方式一: 由用户指定一次放行的数量
import threading import time lock = threading.Condition() def func(arg): print('线程进来了') lock.acquire() lock.wait() # 加锁 print(arg) time.sleep(1) lock.release() for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start() while True: inp = int(input(">>>")) lock.acquire() lock.notify(inp) # 通知放行的个数 lock.release()
方式二: 满足某种条件或状态才放行.
import threading import time lock = threading.Condition() def xxxx(): print("来执行函数了") input(">>>") return True # 返回值 def func(arg): print('线程进来了') lock.wait_for(xxxx) # 收到的值为真,则放行一个 print(arg) time.sleep(1) for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start()
4. Event: 一次放所有
import time import threading lock = threading.Event() def func(arg): print('线程进来了') lock.wait() # 加锁,变红灯 print(arg) for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start() input('>>>') lock.set() # 绿灯 input('>>>') lock.clear() # 再次变红灯 for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start() lock.set() # 绿灯 input('>>>')
总结:
线程安全, 列表和字典线程安全;
为什么要加锁?
- 非线程安全
- 控制一段代码
5. threading.local
作用:内部自动为每个线程维护一个空间(字典),用于当前存取属于自己的值。保证线程之间的数据隔离。
{
线程ID: {...}
线程ID: {...}
线程ID: {...}
线程ID: {...}
}
import time import threading INFO = {} class Local(object): def __getattr__(self, item): ident = threading.get_ident() return INFO[ident][item] def __setattr__(self, key, value): ident = threading.get_ident() if ident in INFO: INFO[ident][key] = value else: INFO[ident] = {key:value} obj = Local() def func(arg): print(threading.get_ident()) obj.phone = arg # obj.phone = arg 调用对象的 __setattr__方法(“phone”,1) time.sleep(2) print(obj.phone, arg) # obj.phone 调用对象的__getattr__方法,得到返回值 for i in range(10): t = threading.Thread(target=func,args=(i,)) t.start()
import threading import time v = threading.local() def func(arg): # 内部护卫当前线程创建一个空间用于存储, phone=自己的值 v.phone = arg time.sleep(2) print(v.phone, arg) # 去当前线程自己空间取值 for i in range(10): t = threading.Thread(target=func,args=(i,)) t.start()
三、线程池
作用:
1、限定线程的个数,不会导致由于线程过多导致系统运行缓慢或崩溃
2、线程池不需要每次都去创建或销毁,通过重复利用已创建的线程降低线程创建和销毁造成的消耗。节约了资源、
3、当任务到达时,任务可以不需要等到线程创建就能立即执行,响应时间更快。
以后写代码不要一个一个创建线程,而是创建一个线程池, 每当有一个新任务时,去线程池中申请线程去执行任务.
from concurrent.futures import ThreadPoolExecutor import time def task(a1, a2): time.sleep(1) print(a1, a2) # 创建了一个线程池(最多5个线程) pool = ThreadPoolExecutor(5) for i in range(40): # 去线程池中申请一个线程,让线程执行task函数 pool.submit(task, i, 8)
四、生产者消费者模型
三部件: 生产者, 消费者, 队列
队列, 先进先出(管道传球)
栈, 后进先出(子弹夹)
问题: 生产者消费者模型解决了什么问题?
答: 不用一直等待的问题
import queue q = queue.Queue() q.put(1) # 先放 1 q.put(2) q.put(3) v1 = q.get() # 先出 1 print(v1)
import time import queue import threading q = queue.Queue() # 线程安全 def producer(id): """ 生产者 :return: """ while True: time.sleep(2) q.put('包子') print('厨师%s 生产了一个包子' %id ) for i in range(1,4): t = threading.Thread(target=producer,args=(i,)) t.start() def consumer(id): """ 消费者 :return: """ while True: time.sleep(1) v1 = q.get() print('顾客 %s 吃了一个包子' % id) for i in range(1,3): t = threading.Thread(target=consumer,args=(i,)) t.start()
五、面向对象补充