锁 和 线程池

Python中的各种锁:

  全局解释器锁:

    1;什么是全局解释器锁?

      每个CPU在同一时间只能执行一个线程,那么其他线程就必须等待该线程的全局解释器,使用权消失后才能使用全局解释器.即使多个线程直接不会相互影响在同一个进程下也只有一个线程使用CPU,这样的机制称为全局解释器锁(GIL).GIL的设计简化了CPython的实现,使得对象模型包括关键字的内建类型,如:字典,等都是隐含的可以并发访问的,锁住全局解释器,使得比较容易的实现对线程的支持,但也损失了多处理器主机的并行计算能力

    2;全局解释器锁的好处

      避免了大量的加锁解锁

      使数据更加安全,解决多线程间的数据完整性和状态同步

    3;全局解释器的缺点

      多核处理器退化成单核处理器,只能并发不能并行

    4;GIL的作用:

      多线程情况下必须存在资源的竞争,GIL是为了保证在解释器级别的线程唯一使用共享资源(CPU)

 同步锁:

from threading import Thread
import os,time

def work():
    global n
    temp = n
    time.sleep(1)
    n = temp-1
if __name__ = '__main__':
    n = 100
    l =[]
    for i in range(100):
        p = Thread(target = work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n)   #结果可能为99
同步锁~多个线程抢占资源
from threading import Thread,Lock
import os,time

def work():
    global n
    lock.acquire()
    temp = n
    time.sleep(1)
    n = temp -1
    lock.release()
if __name__ = '__main__':
    lock = Lock()
    n = 100
    l =[]
    for i in range(100):
        p = Thread(target = work)
        l.append(p)
        p.start()
    for p in l:
        p.join()
    print(n)  #结果为零,由原来的并发执行变为成串行,牺牲了执行效率保证了数据安全.
同步锁的引用

什么是同步锁?

  同一时刻的一个进程下的一个线程只能使用一个CPU,要确保这个线程下的程序在一段时间内被CPU执行,那么就要用到同步锁.

为什么用同步锁?

  因为有可能当一个线程在使用CPU时,该线程下的程序可能会遇到IO操作,那么CPU就会切换到别的线程上去,这样就有可能会影响到该程序结果的完整性.

怎么使用同步锁?

  只需要在对公共数据的操作前后加上上锁和释放锁的操作即可.

同步锁的作用:

  为了保证解释器级别下的自己编写的程序唯一使用共享资源产生了同步锁.

锁 Lock(一次放一个):

  线程安全,多线程操作时,内部会让所有线程进行排队处理.如:list/dict/Queue

  线程不安全 + 人 ==>排队处理

  需求:

    a:创建100个线程,在列表中追加8

    b:创建100个线程

      v= []

      锁

      -把自己添加到列表中

      -在读取列表的最后一个

      解锁

import threading
import time

v = []
lock = threading.Lock()

def func(arg):
    lock.acquire()
    v.append(arg)
    time.sleep(0.1)
    m = v[-1]
    print(arg,m)
    lock.release()

for i in range(10):
    t = threading.Thread(target = func, args = (i,))
    t.start()

锁 Rlock

import threading
import time

v = []
lock = threading.Rlock() 
def func(arg):
    lock.acquire()
    lock.acquire()
    
    v.append(arg)
    time.sleep(0.1)
    m = v[-1]
    print(arg,m)

    lock.release()
    lock.release()

for i in range(10):
    t = threading.Thread(target = func,args = (i,))
    t.start()

补充:死锁:所谓死锁是指两个或两个以上进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,他们将无法推进下去.此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程.

from threading import Lock as Lock
import time
mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()
死锁

综上两个例子:如果使用Rlock代替Lock 就不会出现死锁

信号量

同进程的一样

Semaphore管理一个内置的计数器,

每当调用acquire()时内置计数器-1;

调用release()时内置计数器+1;

计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release().

实例:(同时只有3个线程可以获得semaphore,即可以限制最大连接数为3):

import time
import threading

lock = threading.BoundedSemaphore(3)
def func(arg):
    lock.acquire()
    print(arg)
    time.sleep(1)
    lock.release()

for i in range(20)
    t = threading.Thread(target = func,args = (i,))
    t.start()
与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量则产生一堆线程/进程
池与信号量

锁 : Condition (一次放x个) 

import time
import threading

lock = threading.Condition()

def func(arg):
    print('线程来了')
    lock.acquire()
    lock.wait()

    print(arg)
    time.sleep(1)
    
    lock.release()

for i in range(10):
    t = threading.Thread(target = func,args =(i,))
    t.start()

while True:
    inp = int(input('>>>'))
    
    lock.acquire()
    lock.notify(inp)
    lock.release()
方式一
import time
import threading

lock = threading.Condition()

def xxxx():
    print("来执行函数")
    input(">>>")
    #ct = threading.current_thread() #获取当前线程
    #ct.getName()
    retrun True

def func(arg):
    print("线程进来了")
    lock.wait_for(xxxx)
    print(arg)
    time.sleep(1)

for i in range(10):
    t = threading.Thread(traget = func,args =(i,))
    t.start()
方式二

锁 : Event (一次放所有)

event.isSet(): 返回event状态值;
event.wait():如果event.isSet() == False将阻塞线程
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态,等待操作系统调度;
event.clear():恢复event的状态值为False
import threading

lock = threading.Event()

def func(arg):
    print("线程来了")
    lock.wait() #加锁 红灯
    print(arg)

for i in range(10):
    t = threading.Thread(target = func,args = (i,))
    t.start()

input(">>>")
lock.set()   #绿灯

lock.clear() #再次变红灯

for i in range(10):
    t = threading.Thread(target = func,args = (i,))
    t.start()

input(">>>")
lock.set()

threading.local()

import time
import threading

v = threading.local()

def func(arg):
    #内部会为当前线程创建一个空间用于存储:phone = 自己的值
    v.phone = arg
    time.sleep(2)
    print(v.phone,arg) #去当前线程自己空间取值

for i in range(10):
    t = threading.Thread(target = func,args = (i,))
    t.start()

线程池:

from concurrent.futures import ThreadPoolExecutor
import time

def task(a1,a2):
    time.sleep(1)
    print(a1,a2)

#创建一个线程池,最多5个线程
pool = ThreadPoolExecutor(5)

for i in range(10):
    #去线程池申请一个线程,让线程执行task函数
    pool.submit(task,i,8)

生产者消费者模型:

queue队列:使用import,用法与进程Queue一样

队列:先进先出

栈:后进先出

import time
import queue
import threading
q = queue.Queue() # 线程安全

def producer(id):
    """
    生产者
    :return:
    """
    while True:
        time.sleep(2)
        q.put('包子')
        print('厨师%s 生产了一个包子' %id )

for i in range(1,4):
    t = threading.Thread(target=producer,args=(i,))
    t.start()


def consumer(id):
    """
    消费者
    :return:
    """
    while True:
        time.sleep(1)
        v1 = q.get()
        print('顾客 %s 吃了一个包子' % id)

for i in range(1,3):
    t = threading.Thread(target=consumer,args=(i,))
    t.start()
队列示例:
原文地址:https://www.cnblogs.com/wangjun187197/p/9629757.html