第十四章 进程和线程

 
一、锁
当线程安全,多线程操作时,内部会让所有线程排队处理。如:list、dict、Queue
当线程不安全时,那么我们就需要:不安全线程 + 锁 =》 排队处理
例如:
a. 创建100个线程,在列表中追加8 : 线程安全
b. 创建100个线程
v = []
- 把自己的添加到列表中
- 在读取列表中的最后一个
(锁住以后可以保证最后一个是自己放进去的那一个)
解锁
1. 锁:Lock(同步锁)
# 一次放一个
# 锁 Lock
import threading
import time
v = []
lock = threading.Lock()
def func(arg):
    lock.acquire()
    v.append(arg)
    time.sleep(0.1)
    m = v[-1]
    print(arg, m)
    lock.release()
for i in range(10):
    t = threading.Thread(target=func, args=(i, ))
    t.start()
    time.sleep(0.5)
    print(v)
>>>
0 0
[0]
1 1
[0, 1]
2 2
[0, 1, 2]
3 3
[0, 1, 2, 3]
4 4
[0, 1, 2, 3, 4]
5 5
[0, 1, 2, 3, 4, 5]
6 6
[0, 1, 2, 3, 4, 5, 6]
7 7
[0, 1, 2, 3, 4, 5, 6, 7]
8 8
[0, 1, 2, 3, 4, 5, 6, 7, 8]
9 9
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Lock多次“上锁”会造成死锁状态,不能继续执行(不会报错)
所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。
v = []
lock = threading.Lock()
def func(arg):
    lock.acquire()  
    lock.acquire()  # 多次锁  就会死锁
    v.append(arg)
    time.sleep(arg)
    m = v[-1]
    print(arg, m)
    lock.release()
    lock.release()
for i in range(10):
    t = threading.Thread(target=func, args=(i,))
    t.start()
2. 锁:RLock(递归锁)
RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。
# 锁 RLock
v = []
lock = threading.RLock()
def func(arg):
    lock.acquire()
    lock.acquire()  # 可加多个锁

    v.append(arg)
    time.sleep(0.1)
    m = v[-1]
    print(m, arg)

    lock.release()
    lock.release()  # 用多少锁,就要释放几次
for i in range(10):
    t = threading.Thread(target=func, args=(i, ))
    t.start()
3. 锁:BoundedSemaphore(信号量)
一个工厂函数,返回一个新的有界信号量对象。有界信号量会确保他的值不会超过初始值;如果超出则会抛出ValueError异常。初始值默认为1。
# 一次放n个
v = []
lock = threading.BoundedSemaphore(3)    # 一次放n个
def func(arg):
    lock.acquire()
    print(arg)
    time.sleep(1)
    lock.release()

for i in range(10):
    t = threading.Thread(target=func, args=(i,))
    t.start()
4.锁:Condition(条件)
使得线程等待,只有满足某条件时,才释放n个线程
# 一次方指定个,并可循环指定
# notify通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件
v = []
lock = threading.Condition()
i = 0
def func(arg):
    ident = threading.current_thread()
    print(ident)

    lock.acquire()
    lock.wait() # 在此处加锁
    print(arg)
    num = threading.get_ident()
    print(num)
    time.sleep(0.1)
    lock.release()
for i in range(10):
    t = threading.Thread(target=func, args=(i,))
    t.start()
# 主线程
while True:
    inp = int(input("请输入:"))
    lock.acquire()
    lock.notify(inp)
    lock.release()

# wait进行条件判断  是否为真
v = []
lock = threading.Condition()
def cond():
    ct = threading.current_thread() # 获取当前线程
    cn = ct.getName() # 获取当前线程名称
    print("进入cond函数,线程%s准备好了!" %cn)
    input("按任意键继续!
")
    return True
def func(arg):
    print("线程进来了")
    lock.wait_for(cond) # 在此处加锁 当函数返回为真 才能解锁
    print(arg)
    time.sleep(1)
for i in range(10):
    t = threading.Thread(target=func, args=(i,))
    t.start()
5.锁:Event(事件)
对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。
# 一次放所有
lock = threading.Event()
def func(arg):
    ct = threading.current_thread()  # 获取当前线程
    print(ct.getName())
    lock.wait() # 加锁
    print(arg)
for i in range(10):
    t = threading.Thread(target=func, args=(i,))
    t.start()
input(">>>")
lock.set()  # 此处解锁 释放所有


# wait 条件加锁
lock = threading.Event()
def func(arg):
    ct = threading.current_thread()  # 获取当前线程
    print(ct.getName())
    lock.wait() # 加锁
    print(arg)
# 第一次循环还没有将锁解开,只能显示线程名称
for i in range(10):
    t = threading.Thread(target=func, args=(i,))
    t.start()
input(">>>")
lock.set()  # 开锁 释放全部
time.sleep(3)   # 睡3秒是为了给时间把上一步骤全部释放完再执行下一步
input("<<<")
# 第二次执行的时候线程名称和结果同时出现,因为在上一步骤中set已经将锁打开
for i in range(10):
    t = threading.Thread(target=func, args=(i,))
    t.start()

# 再次加锁
lock = threading.Event()
def func(arg):
    print("线程进来了")
    lock.wait() # 加锁
    print(arg)
for i in range(10):
    t = threading.Thread(target=func, args=(i,))
    t.start()
input(">>>")
lock.set()
time.sleep(1)
lock.clear()    # 再次加速
for i in range(10):
    t = threading.Thread(target=func, args=(i,))
    t.start()
input(">>>")
lock.set() 
总结:
线程安全,列表和字典线程安全
为什么加锁?
- 非线程安全
- 控制一段代码
二、threading.local(本地线程)
- 不同的线程来了,内部会为不同的线程创建不同的空间用于存储
作用:
内部自动为每个线程维护一个空间(字典),用于当前线程存取属于自己的值。保证线程之间的数据隔离。
v = threading.local()
def func(arg):
    # 内部会为当前吸纳从创建一个空间用于存储, phone = 自己的值
    v.phone = arg
    time.sleep(2)
    print(v.phone, arg) # 去当前线程自己空间取值

for i in range(10):
    t = threading.Thread(target=func, args=(i,))
    t.start()
三. 线程池
# 线程池`
from concurrent.futures import ThreadPoolExecutor
import time

def task(a1,a2):
    time.sleep(1)
    print(a1, a2)
# 创建一个线程池(最多3个)
pool = ThreadPoolExecutor(3)
for i in range(40):
    # 去线程池中申请一个线程,让线程执行task函数
    pool.submit(task, i, i+1) 
四. 生产者消费者模型
生产者
队列、栈
消费者
# 生产者消费者模型
import queue
q = queue.Queue()
q.put(3)
q.put(2)
q.put(1)
while 1:
    v1 = q.get()
    print(v1)

q = queue.Queue()   # 线程安全
def producer(id):
    """
    生产者
    :return:
    """
    while True:
        time.sleep(2)
        q.put('包子')
        print("厨师%s生产了一个包子" %id)
for i in range(1,4):
    t = threading.Thread(target=producer, args=(i,))
    t.start()
def consumer(id):
    """
    消费者
    :return:
    """
    while True:
        time.sleep(1)
        q.get()
        print("顾客%s 吃了了一个包子" %id)
for i in range(1, 4):
    t = threading.Thread(target=consumer, args=(i,))
    t.start()
原文地址:https://www.cnblogs.com/jiumo/p/9635997.html