Python中 线程的GIL锁,GIL锁与普通互斥锁的对比,io密集型和计算密集型,死锁问题,递归锁,信号量,Event事件,线程queue,线程池与进程池,多线程爬虫网页,多线程读取文件

昨日回顾

1 生产者消费者
	-在生产者和消费者之间,通过队列,增加缓冲,避免了生产者和消费者之间交互
    -Queue,redis,rabbitmq,kafka
    -解耦合,队列是微服务的基础
2 线程理论,开启
	-进程是资源分配的最小单位,线程是执行的最小单位(cpu调度的最小单位),每个进程中最少一个线程
    -两种方式(跟进程完全类似)
3 join方法
	-等待子线程执行结束,线程对象.join()
4 线程数据共享
	-不同线程,变量是可以共用的,查看和修改(数据错乱)
5 线程对象其他方法
	-name:人为设置,有默认
    -ident:线程号
    -active_count():现在还存活多少线程
    -is_alive():此线程是否还存活
6 线程互斥锁
	-不同线程要修改同一个数据,要加锁
    -让并行变成串行,牺牲了效率,保证了数据安全
    -悲观锁,乐观锁,分布式锁
7 GIL
	-全局解释器锁:在cpython解释器内部有一把大锁,线程要执行,必须获取到这把锁
    -为什么要有它?python的垃圾回收机制是线程不安全的,所有所有线程要抢到GIL才能执行
    -cpython的多线程不是真正的多线程,同一时刻,只有一个线程在执行,不能利用多核优势
    
    -----以下只针对于cpython解释器
    -在单核情况下:
    	-开多线程还是开多进程?不管干什么都是开线程
    -在多核情况下:
    	-如果是计算密集型,需要开进程,能被多个cpu调度执行
        -如果是io密集型,需要开线程,cpu遇到io会切换到其他线程执行

今日内容

1 验证GIL锁的存在方式

1.GIL锁是CPython中独有的

  • GIL本质就是一把互斥锁,既然是互斥锁,所有互斥锁的本质都一样,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。

2.那GIL锁保护的是什么呢?

  • GIL锁保护的是全局的Python解释器,主要目的就是防止被垃圾回收机制给回收线程中的一些赋值操作,我们来举个例子:如果我们在运行一个线程的时候,他在赋值又开始这个值是没有指向的,那我们在指向它的阶段刚刚好垃圾回收机制过来把他清理了!那我们的程序运行肯定是会与我们的需求所不符合了!
  • 所以GIL锁的功能就是防止我们一些操作被垃圾回收机制给删除。

3.那GIL和Lock我们要怎么去理解呢?

  • 首先我们要理解,GIL针对的是CPython解释器,而Lock针对的是我们程序内部的数据处理问题(防止数据修改混乱。)。

总结:

GIL保护的是解释器级的数据,保护用户自己的数据则需要自己加锁处理,如下图

img

4.我么你怎么验证GIL锁的存在呢?

通过启动跟自己机器cpu相同的进程和线程数,验证GIL的存在

打开任务管理器,点击性能可以看到cpu的内核数

在这里插入图片描述

代码如下:

from threading import Thread
from multiprocessing import Process
def task():
    while True:
        pass
if __name__ == '__main__':
    for i in range(4):
        # t=Thread(target=task)  # 因为有GIL锁,同一时刻,只有一条线程执行,所以cpu不会满
        t=Process(target=task)   # 由于是多进程,进程中的线程会被cpu调度执行,6个cpu全在工作,就会跑满
        t.start()

结果:

结果可以自行查看,开进程以后打开任务管理器查看CPU的调用是否已经到达了100%

2 GIL与普通互斥锁的区别

1 GIL锁是不能保证数据的安全,普通互斥锁来保证数据安全

二个需要注意的点:
#1.线程抢的是GIL锁,GIL锁相当于执行权限,拿到执行权限后才能拿到互斥锁Lock,其他线程也可以抢到GIL,但如果发现Lock仍然没有被释放则阻塞,即便是拿到执行权限GIL也要立刻交出来

#2.join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高
from threading import Thread, Lock
import time
mutex = Lock()
money = 100

def task():
    global money
    mutex.acquire()
    temp = money
    time.sleep(1)
    money = temp - 1
    mutex.release()

if __name__ == '__main__':
    ll=[]
    for i in range(10):
        t = Thread(target=task)
        t.start()
        # t.join()  # 为什么不可以在这里加t.join(),应为会变成了串行,不能这么做
        ll.append(t)
    for t in ll:
        t.join()
    print(money)

3 io密集型和计算密集型

-----以下只针对于cpython解释器

  • 单核情况下:
  • 开多线程还是开多进程?不管干什么都是开线程
  • 多核情况下:
  • 如果是计算密集型,需要开进程,能被多个cpu调度执行
  • 如果是io密集型,需要开线程,cpu遇到io会切换到其他线程执行

计算密集型演示代码:

from multiprocessing import Process
from threading import Thread
import time

## 计算密集型
def task():
    count = 0
    for i in range(100000000):
        count += i
if __name__ == '__main__':
    ctime = time.time()
    ll=[]
    for i in range(6):
        # t=Process(target=task) #开进程: 9.717038869857788
        # t.start()
        t=Thread(target=task)  #开线程:35.612813234329224
        t.start()
        ll.append(t)
    for t in ll:
        t.join()
    print(time.time() - ctime)

结论:从上述的结果我们可以看出来,计算密集型的时候,我们最好不要开线程去运行,因为线程耗时会很久我们直接开进程运行,虽然需要cpu大量的支持但是,可以提高用户的体验水平。

io密集型演示代码:(打开文件或者是读取某些在存储在硬盘上的文件信息)

from multiprocessing import Process
from threading import Thread
import time

## io密集型
def task():
    time.sleep(2)
if __name__ == '__main__':
    ctime = time.time()
    ll=[]
    for i in range(500):
        # t=Process(target=task) #开进程: 13.616523027420044
        # t.start()
        t=Thread(target=task)  #开线程:2.101327419281006
        t.start()
        ll.append(t)
    for t in ll:
        t.join()
    print(time.time() - ctime)

结论:从上述的结果我们可以看出来,io集型的时候,我们最好不要开进程去运行,因为进程耗时会很久我们直接开线程运行,既不需要cpu大量的支持,还可以大大提高用户的体验水平。

4 死锁现象(哲学家就餐问题)

单例模式:https://www.cnblogs.com/liuqingzheng/p/10038958.html

  • 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象若无外力作用它们都将无法推进下去此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁

  • 死锁现象,张三拿到了A锁,等B锁,李四拿到了B锁,等A锁

死锁演示代码:

from threading import Thread, Lock
import time
mutexA = Lock()
mutexB = Lock()

def eat_apple(name):
    mutexA.acquire()
    print('%s 获取到了a锁' % name)
    mutexB.acquire()
    print('%s 获取到了b锁' % name)
    print('开始吃苹果,并且吃完了')
    mutexB.release()
    print('%s 释放了b锁' % name)
    mutexA.release()
    print('%s 释放了a锁' % name)


def eat_egg(name):
    mutexB.acquire()
    print('%s 获取到了b锁' % name)
    time.sleep(2)
    mutexA.acquire()
    print('%s 获取到了a锁' % name)
    print('开始吃鸡蛋,并且吃完了')
    mutexA.release()
    print('%s 释放了a锁' % name)
    mutexB.release()
    print('%s 释放了b锁' % name)


if __name__ == '__main__':
    ll = ['egon', 'alex', '铁蛋']
    for name in ll:
        t1 = Thread(target=eat_apple, args=(name,))
        t2 = Thread(target=eat_egg, args=(name,))
        t1.start()
        t2.start()

运行结果:

egon 获取到了a锁
egon 获取到了b锁
开始吃苹果,并且吃完了
egon 释放了b锁
egon 释放了a锁
egon 获取到了b锁
alex 获取到了a锁

程序还在继续,没有暂停那这样我们的程序就会一直卡在这里!这就是因为多把锁而引起的死锁问题了。

死锁的解决方法:

使用递归锁

什么是递归锁:

  • 递归锁(可重入),

递归锁的取用方法:

  • 同一个人可以多次acquire,每acquire一次,内部计数器加1,每relaese一次,内部计数器减一

如果可以拿到递归锁:

  • 只要计数器不为0其他人都不能获得这把锁

递归锁名字(RLock(与线程在同一个模块下面))

本质上递归锁就是通一把锁被多次拿到:

就例如这样:

# 普通的互斥锁
# mutexA = Lock()
# mutexB = mutexA (如果这样打,程序绝对是死锁状态,因为普通的锁只能取一次)

# 使用可重入锁解决(同一把锁)
# mutexA = RLock()
# mutexB = mutexA
mutexA = mutexB =RLock()

通过递归锁解决问题:

from threading import Thread, Lock,RLock
import time

mutexA = mutexB =RLock()

def eat_apple(name):
    mutexA.acquire()
    print('%s 获取到了a锁' % name)
    mutexB.acquire()
    print('%s 获取到了b锁' % name)
    print('开始吃苹果,并且吃完了')
    mutexB.release()
    print('%s 释放了b锁' % name)
    mutexA.release()
    print('%s 释放了a锁' % name)


def eat_egg(name):
    mutexB.acquire()
    print('%s 获取到了b锁' % name)
    time.sleep(2)
    mutexA.acquire()
    print('%s 获取到了a锁' % name)
    print('开始吃鸡蛋,并且吃完了')
    mutexA.release()
    print('%s 释放了a锁' % name)
    mutexB.release()
    print('%s 释放了b锁' % name)


if __name__ == '__main__':
    ll = ['egon', 'alex', '铁蛋']
    for name in ll:
        t1 = Thread(target=eat_apple, args=(name,))
        t2 = Thread(target=eat_egg, args=(name,))
        t1.start()
        t2.start()

结果:

很多这里就不拿出来了,可以自行运行程序去看。

5 Semaphore信号量

1 什么是信号量(Semaphore)

Semaphore:信号量可以理解为多把锁同时允许多个线程来更改数据

这里我们需要注意:

我们不会用这个功能去直接多线程修改文件,我们会利用互斥锁协同,

如果是多线程去运行某个不修改值的函数就可以直接用了。


代码如下:

from  threading import Thread,Semaphore
import time
import random
sm=Semaphore(3) # 数字表示可以同时有多少个线程操作

def task(name):
    sm.acquire()
    print('%s 正在蹲坑'%name)
    time.sleep(random.randint(1,5))
    print('%s 蹲坑结束了' % name)
    sm.release()
    
if __name__ == '__main__':
    for i in range(20):
        t=Thread(target=task,args=('屌丝男%s号'%i,))
        t.start()

运行结果:

自行运行去查看结果:

通过结果我们可以得到个结论,线程开启数就是sm=Semaphore(3) 定义的时候所输入的数字。

如果是5 就是5个线程5 个线程的开启调用。

6 Event事件

Event事件应用场景:

  • 一些线程需要等到其他线程执行完成之后才能执行,类似于发射信号
  • 比如一个线程等待另一个线程执行结束再继续执行

from threading import Thread, Event
import time
event = Event()

def girl(name):
    print('%s 现在不单身,正在谈恋爱'%name)
    time.sleep(10)
    print('%s 分手了,给屌丝男发了信号'%name)
    event.set()


def boy(name):
    print('%s 在等着女孩分手'%name)
    event.wait()       # 只要没来信号,就卡在者
    print('女孩分手了,机会来了,冲啊')

if __name__ == '__main__':
    lyf = Thread(target=girl, args=('刘亦菲',))
    lyf.start()

    for i in range(10):
        b = Thread(target=boy, args=('屌丝男%s号' % i,))
        b.start()

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

event.isSet():返回event的状态值;

event.wait():如果 event.isSet()==False将阻塞线程;

event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

event.clear():恢复event的状态值为False

7 线程queue

queue队列 :使用import queue,用法与进程Queue一样

进程queue和线程queue不是一个

# from multiprocessing import Queue
# 线程queue
from queue import Queue,LifoQueue,PriorityQueue

线程queue讲解:

  • 线程间通信,因为共享变量会出现数据不安全问题线程queue通信,不需要加锁,内部自带

  • queue是线程安全的

'''
三种线程Queue
    -Queue:队列,先进先出
    -PriorityQueue:优先级队列,谁小谁先出
    -LifoQueue:栈,后进先出
'''
# 如何使用
q=Queue(5)
q.put("lqz")
q.put("egon")
q.put("铁蛋")
q.put("钢弹")
q.put("金蛋")


# q.put("银蛋")
# q.put_nowait("银蛋")
# 取值
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
# 卡住
# print(q.get())
# q.get_nowait()
# 是否满,是否空
print(q.full())
print(q.empty())

# LifoQueue
q=LifoQueue(5)
q.put("lqz")
q.put("egon")
q.put("铁蛋")
q.put("钢弹")
q.put("金蛋")
#
# q.put("ddd蛋")
print(q.get())


#PriorityQueue:数字越小,级别越高
q=PriorityQueue(3)
q.put((-10,'金蛋'))
q.put((100,'银蛋'))
q.put((101,'铁蛋'))
# q.put((1010,'铁dd蛋'))  # 不能再放了

print(q.get())
print(q.get())
print(q.get())

8 线程池

1 为什么会出现池?不管是开进程还是开线程,不能无限制开,通过池,假设池子里就有10个,不管再怎么开,永远是这10个

# 如何使用
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(2)
pool.submit(get_pages, url).add_done_callback(call_back)
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from threading import Thread
import time
import random

pool = ThreadPoolExecutor(5)  # 数字是池的大小
# pool = ProcessPoolExecutor(5)  # 数字是池的大小


def task(name):
    print('%s任务开始' % name)
    time.sleep(random.randint(1, 4))
    print('任务结束')
    return '%s 返回了'%name


def call_back(f):
    # print(type(f))
    print(f.result())
if __name__ == '__main__':

    # ll=[]
    # for i in range(10):  # 起了10个线程
    #     # t=Thread(target=task)
    #     # t.start()
    #     res = pool.submit(task, '屌丝男%s号' % i)  # 不需要再写在args中了
    #     # res是Future对象
    #     # from  concurrent.futures._base import Future
    #     # print(type(res))
    #     # print(res.result())  # 像join,只要执行result,就会等着结果回来,就变成串行了
    #     ll.append(res)
    #
    # for res in ll:
    #     print(res.result())

    # 终极使用
    for i in range(10):  # 起了10个线程
        # 向线程池中提交一个任务,等任务执行完成,自动回到到call_back函数执行
        pool.submit(task,'屌丝男%s号' % i).add_done_callback(call_back)

9 进程池

# 1 如何使用
from concurrent.futures import ProcessPoolExecutor
pool = ProcessPoolExecutor(2)
pool.submit(get_pages, url).add_done_callback(call_back)

简单爬取网页信息

其他代码:

from concurrent.futures import ThreadPoolExecutor
import requests  # 爬虫会学到的模块
pool = ThreadPoolExecutor(2)

def get_pages(url):
    # https://www.baidu.com
    res = requests.get(url)  # 向这个地址发送请求

    name = url.rsplit('/')[-1] + '.html'
    print(name)  # www.baidu.com.html
    # res.content拿到页面的二进制
    return {'name': name, 'text': res.content}


def call_back(f):
    dic = f.result()
    with open(dic['name'], 'wb') as f:
        f.write(dic['text'])

if __name__ == '__main__':
    ll = ['https://www.baidu.com', 'https://www.mzitu.com', 'https://www.cnblogs.com']
    for url in ll:
        pool.submit(get_pages, url).add_done_callback(call_back)

起两个线程,第一个线程读文件的前半部分,读完发一个信号,另一个进程读后半部分

其他代码:

from threading import Thread, Event
import time
import os

event = Event()
# 获取文件总大小
size = os.path.getsize('a.txt')


def read_first():
    with open('a.txt', 'r', encoding='utf-8') as f:
        n = size // 2  # 取文件一半,整除
        data = f.read(n)
        print(data)
        print('我一半读完了,发了个信号')
        event.set()


def read_last():
    event.wait()  # 等着发信号
    with open('a.txt', 'r', encoding='utf-8') as f:
        n = size // 2  # 取文件一半,整除
        # 光标从文件开头开始,移动了n个字节,移动到文件一半
        f.seek(n, 0)
        data = f.read()
        print(data)


if __name__ == '__main__':
    t1=Thread(target=read_first)
    t1.start()
    t2=Thread(target=read_last)
    t2.start()
努力学习!
原文地址:https://www.cnblogs.com/Orange-YXH/p/13648092.html