叁拾壹

一、线程锁

​ 运行以下代码:

from threading import Thread,Lock

x = 0
mutex = Lock()
def task():
    global x
    for i in range(200000):
        x = x+1

if __name__ == '__main__':
    t1 = Thread(target=task)
    t2 = Thread(target=task)
    t3 = Thread(target=task)
    t1.start()
    t2.start()
    t3.start()

    t1.join()
    t2.join()
    t3.join()
    print(x)
    
440215

​ 正常运行代码应该得到600000,而实际结果为440215。这是因为代码过长,cpu进行了代码的切换,切入的时间可能恰好是运算的过程中,使得一次完整的计算没有完成,便直接运行其他线程。当再次切回这个线程时,程序会接着上一次的结果运行,造成切出去时运算的数据丢失。

​ 如果计算量少,则可能不会出现上述问题,计算量越大,出现的结果偏差可能越大。

​ 因此为了解决数据丢失的问题,需要使用互斥锁。使用.acquire()方法获取对象,使用.release()释放对象,会完整运行完其中的线程后再进行其他线程。与进程一样,将多线程并行改为串行,保证了数据的安全。

from threading import Thread,Lock

x = 0
mutex = Lock()
def task():
    global x
    mutex.acquire()
    for i in range(200000):
        x = x+1
    mutex.release()

if __name__ == '__main__':
    t1 = Thread(target=task)
    t2 = Thread(target=task)
    t3 = Thread(target=task)
    t1.start()
    t2.start()
    t3.start()

    t1.join()
    t2.join()
    t3.join()
    print(x)

二、死锁问题

​ 假设两个线程分别拿到了两个锁,而且并未释放,它们都需要拿到对方的锁进程才可以进行下去。这样双方都拿不到彼此需要的锁,导致程序不能进行下去,这就是锁死问题。如下程序,task2中,time.sleep使线程切换,导致另一个线程也得到一把锁,两个线程相互锁死。

from threading import Thread,Lock
mutex1 = Lock()
mutex2 = Lock()
import time
class MyThreada(Thread):
    def run(self):
        self.task1()
        self.task2()
    def task1(self):
        mutex1.acquire()
        print(f'{self.name} 抢到了 锁1 ')
        mutex2.acquire()
        print(f'{self.name} 抢到了 锁2 ')
        mutex2.release()
        print(f'{self.name} 释放了 锁2 ')
        mutex1.release()
        print(f'{self.name} 释放了 锁1 ')

    def task2(self):
        mutex2.acquire()
        print(f'{self.name} 抢到了 锁2 ')
        time.sleep(0.1)
        mutex1.acquire()
        print(f'{self.name} 抢到了 锁1 ')
        mutex1.release()
        print(f'{self.name} 释放了 锁1 ')
        mutex2.release()
        print(f'{self.name} 释放了 锁2 ')

for i in range(3):
    t = MyThreada()
    t.start()
    
Thread-1 抢到了 锁1 
Thread-1 抢到了 锁2 
Thread-1 释放了 锁2 
Thread-1 释放了 锁1 
Thread-1 抢到了 锁2 
Thread-2 抢到了 锁1 

三、递归锁

​ 可以使用递归锁来解决死锁的问题。在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

​ 这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁。

from threading import Thread,RLock

mutex1 = RLock()
mutex2 = mutex1

import time
class MyThreada(Thread):
    def run(self):
        self.task1()
        self.task2()
    def task1(self):
        mutex1.acquire()
        print(f'{self.name} 抢到了 锁1 ')
        mutex2.acquire()
        print(f'{self.name} 抢到了 锁2 ')
        mutex2.release()
        print(f'{self.name} 释放了 锁2 ')
        mutex1.release()
        print(f'{self.name} 释放了 锁1 ')

    def task2(self):
        mutex2.acquire()
        print(f'{self.name} 抢到了 锁2 ')
        time.sleep(1)
        mutex1.acquire()
        print(f'{self.name} 抢到了 锁1 ')
        mutex1.release()
        print(f'{self.name} 释放了 锁1 ')
        mutex2.release()
        print(f'{self.name} 释放了 锁2 ')

for i in range(3):
    t = MyThreada()
    t.start()

四、信号量

​ Semaphore管理一个内置的计数器,每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

from threading import Thread,currentThread,Semaphore
import time

def task():
    sm.acquire()
    print(f'{currentThread().name} 在执行')
    time.sleep(3)
    sm.release()

sm = Semaphore(5)
for i in range(15):
    t = Thread(target=task)
    t.start()
    
Thread-1 在执行
Thread-2 在执行
Thread-3 在执行
Thread-4 在执行
Thread-5 在执行
Thread-7 在执行
Thread-8 在执行
Thread-6 在执行
Thread-9 在执行
Thread-10 在执行
Thread-11 在执行
Thread-12 在执行
Thread-13 在执行
Thread-14 在执行
Thread-15 在执行

五、全局解释器锁GIL

​ C python自带垃圾回收机制,在线程运行时如果垃圾回收机制也同时运行,会导致相互“抢夺”数据,因此GIL锁可以使同一进程同一时间只能运行一个线程,这样就能保证线程的数据安全,但是导致了无法使用多核优势,减低运算效率。

​ 如果一个线程抢掉了GIL,如果遇到io或者执行时间过长(cpu被剥夺),会强行释放掉GIL锁,以便其他的线程抢占GIL。

六、多进程与多线程的适用场景

​ 假设有四个任务要进行处理,那么有两种方案可以选择,开启四个进程和一个进程下开启四个线程。

​ 计算密集型推荐使用多进程;

​ 假设每个人物要进行计算10秒,如果采用多线程,因为同一时刻只能执行一个线程,所以串行4个线程,每个都需要10s,一共40s。

def work1():
    res=0
    for i in range(100000000):
        res*=i

if __name__ == '__main__':
    t_list = []
    start = time.time()
    for i in range(4):
        t = Thread(target=work1)
        # t = Process(target=work1)
        t_list.append(t)
        t.start()
    for t in t_list:
        t.join()
    end = time.time()
    # print('多线程',end-start) # 多线程 15.413789510726929
    print('多进程',end-start) # 多进程 4.711405515670776

​ io密集型推荐使用多线程;

​ 假设4个任务90%时间处于io状态,采用多线程,每个线程io时间都不占用cpu,可以执行其他进程,因此所需时间为10s加运算四个就成时间。而采用多进程,可以实行并行,但是花费时间为10s加一个任务执行时间加开启进程的时间。

def work1():
    x = 1+1
    time.sleep(5)

if __name__ == '__main__':
    t_list = []
    start = time.time()
    for i in range(4):
        t = Thread(target=work1)
        # t = Process(target=work1)
        t_list.append(t)
        t.start()
    for t in t_list:
        t.join()
    end = time.time()
    print('多线程',end-start) #  多线程 5.002625942230225
    # print('多进程',end-start) # 多进程 5.660863399505615
原文地址:https://www.cnblogs.com/tangceng/p/11545366.html