锁,线程池,生产者消费模式

线程安全

多线程操作时,内部会让所有线程排队处理,必须等前一个处理完,下一个才继续,如:list/dict/Queue

非线程安全

需要人为控制,让其排队处理,避免数据出现混乱

加锁的作用:

  • 非线程安全,数据容易混乱
  • 控制一段代码

Lock

一次放一个,一次只有一个线程通过

import threading
import time

"""
需求:
创建10个线程 
v = []
锁 
- 把自己的添加到列表中。
- 在读取列表的最后一个。
解锁

"""
lst = []
lock = threading.Lock()
def func(arg):
    """
    一次只能进一个线程
    :param arg: 
    :return: 
    """
    lock.acquire()    # 加锁
    lst.append(arg)
    time.sleep(1)
    m = lst[-1]
    print(arg,m)
    lock.release()  # 释放锁
    print(666)

for i in range(10):
    t = threading.Thread(target=func,args=(i,))
    t.start()

# 输出结果:
0 0
666
1 1
666
2 2
666
3 3
666
4 4
666
5 5
666
6 6
666
7 7
666
8 8
666
9 9
666
示例

RLock

一次放一个或多个,一般都用RLock,不用Lock

import threading
import time

v = []
lock = threading.RLock()
def func(arg):
    lock.acquire()
    v.append(arg)
    time.sleep(1)
    m = v[-1]
    print(arg,m)

    lock.release()
    


for i in range(10):
    t =threading.Thread(target=func,args=(i,))
    t.start()
示例

BoundedSemaphore(N)

信号量,一次放固定的N个

lock = threading.BoundedSemaphore(2)
def func(arg):
    """
    两个线程,两个线程的走,这个数是定死的
    :param arg: 
    :return: 
    """
    lock.acquire()
    print(arg)
    time.sleep(2)
    lock.release()

for i in range(10):
    t = threading.Thread(target=func,args=(i,))
    t.start()
示例

Condition

一次放X(动态的)个

lock = threading.Condition()

def func(arg):
    print('线程进来了')
    lock.acquire()
    # print(111)
    lock.wait() # 此时真正加锁
    print(arg)
    time.sleep(2)
    lock.release()

for i in range(10):
    t =threading.Thread(target=func,args=(i,))
    t.start()

while True:
    inp = int(input('请输入>>>:')) # 输入个数控制线程,直到线程跑完
    lock.acquire()
    lock.notify(inp)
    lock.release()
示例

Event

一次放所有

import time
import threading

lock = threading.Event()


def func(arg):
    print('线程来了')
    lock.wait() # 加锁:红灯
    print(arg)


for i in range(10):
    t =threading.Thread(target=func,args=(i,))
    t.start()

# input(">>>>")
time.sleep(2)
lock.set() # 放行:绿灯
lock.clear() # 再次加锁,变红灯
示例
import time
import threading

lock = threading.Event()


def func(arg):
    print('线程来了')
    lock.wait() # 加锁:红灯
    print(arg)


for i in range(10):
    t =threading.Thread(target=func,args=(i,))
    t.start()

# input(">>>>")
time.sleep(2)
lock.set() # 放行:绿灯
lock.clear() # 再次加锁,变红灯

"""再次运行时,还是放行状态,绿灯,而非加锁状态"""
for i in range(10):
    t =threading.Thread(target=func,args=(i,))
    t.start()

time.sleep(2)
lock.set()
迷惑人的

线程池

限制最大使用线程数,不是越多越好,多了会造成上下文混乱

from concurrent.futures import ThreadPoolExecutor
import time

def task(a1,a2):
    time.sleep(5)
    print(a1,a2)

# 创建了一个线程池(最多5个线程)
pool = ThreadPoolExecutor(5)

for i in range(10):
    # 去线程池中申请一个线程,让线程执行task函数。
    pool.submit(task,i,2)
示例

生产者消费模式

生产者和消费者在同一时间段内共用同一存储空间,生产者向空间里生产数据,而消费者取走数据。

作用:解决了不用一直等待的问题

队列:先进先出

补充:栈,先进后出,后进先出

import time
import queue
import threading
q = queue.Queue() # 线程安全

def producer(id):
    """
    生产者
    :return:
    """
    while True:
        time.sleep(2)
        q.put('包子')
        print('厨师%s 生产了一个包子' %id )

for i in range(1,4):
    t = threading.Thread(target=producer,args=(i,))
    t.start()


def consumer(id):
    """
    消费者
    :return:
    """
    while True:
        time.sleep(1)
        v1 = q.get()
        print('顾客 %s 吃了一个包子' % id)

for i in range(1,3):
    t = threading.Thread(target=consumer,args=(i,))
    t.start()

threadinglocal原理

作用:

内部自动为每个线程维护一个空间(字典),用于当前存取属于自己的值。保证线程之间的数据隔离。

import time
import threading

v = threading.local()

def func(arg):
    # 内部会为当前线程创建一个空间用于存储:phone=自己的值
    v.phone = arg
    time.sleep(2)
    print(type(v))
    print(v.phone,arg) # 去当前线程自己空间取值

for i in range(10):
    t =threading.Thread(target=func,args=(i,))
    t.start()
原文地址:https://www.cnblogs.com/--kai/p/9629264.html