多线程

GIL全局解释器锁(global interpreter lock)

GIL使得同一时刻只有一个线程能够调度一个cpu执行字节码

GIL会根据执行的字节码行数以及时间片释放GIL,GIL在遇到io的操作时候会主动释放(线程会被切换)

 对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。

  在多线程环境中,Python 虚拟机按以下方式执行:

  a、设置 GIL;

  b、切换到一个线程去运行;

  c、运行指定数量的字节码指令或者线程主动让出控制(可以调用 time.sleep(0));

  d、把线程设置为睡眠状态;

  e、解锁 GIL;

  d、再次重复以上所有步骤。

threading.Thread

实例化一个Thread类

threading = threading.Thread(target=function,args=(,) )

开启一个线程,同过实例的start方法

threading.start()

主线程结束时,关闭其他线程

threading.setDaemon(True)  # 守护线程

等待子线程执行完毕

theading.join()

面向对象方法

继承threading.Threading方法,重新它的run方法

class Mythreading(threading.Thread):
    def run(self):
        pass
t = Mythreading()
t.start()

Queue实现线程间通信

import threading,requests,queue
from urllib import request
from bs4 import BeautifulSoup
headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36'}
queue1 = queue.Queue(10)
queue2 = queue.Queue(300)
def get_preview_html(queue1):       # 获取图片预览页
    for i in range(6,11):
        url = 'http://pic.netbian.com/4kdongman/index_{}.html'.format(i)
        queue1.put(url)
def get_detail_html(queue1,queue2):
    while not queue1.empty():
        url = queue1.get()
        print(url)
        response = requests.get(url=url,headers=headers)
        response.encoding = 'gbk'
        ht = response.text
        soup = BeautifulSoup(ht, 'lxml')
        tagImg = soup.select('.slist > ul > li > a')
        for j in tagImg:
            imgurl = 'http://pic.netbian.com' + j['href']
            response1 = requests.get(url=imgurl, headers=headers)  # 获取图片url
            response1.encoding = 'gbk'
            ht1 = response1.text
            soup1 = BeautifulSoup(ht1, 'lxml')
            ImgDate = soup1.select('#img > img')[0]
            final_url = 'http://pic.netbian.com' + ImgDate['src']
            imgname = ImgDate['alt'] + '.jpg'
            queue2.put({'final_url':final_url,'imgname':imgname})
def get_picture(queue1,queue2):        # 下载图片
    while not queue2.empty() or not queue1.empty():
        info = queue2.get()
        final_url = info['final_url']
        imgname = info['imgname']
        imgpath = './图片/' + imgname
        request.urlretrieve(final_url, imgpath)
get_preview_html(queue1)
for i in range(2):
    t = threading.Thread(target=get_detail_html,args=(queue1,queue2))
    t.start()
for i in range(2):
    t = threading.Thread(target=get_picture,args=(queue1,queue2))
    t.start()

线程同步问题

当多个线程对同一变量进行操作时,会出现线程不同步的问题

Lock()

Lock(),一旦获取到锁,必须等待锁被释放,线程才可能被切换

Lock()的问题:获取与释放锁消耗性能,会造成死锁

死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。

from threading import Lock
lock = Lock()
lock.acquire()
.........
lock.release()
RLock()可重入锁

在同一个线程中,可以多次获取(aquire),要保证acquire次数与release次数相同 (RuntimeError)

from threading import RLock
lock = RLock()
lock.acquire()
lock.acquire()
.............
lock.release()
lock.release()
条件变量(Condition)

使用Condition对象可以在某些事件触发或者达到特定的条件后才处理数据,Condition除了具有Lock对象的acquire方法和release方法外,还有wait方法,notify方法,notifyAll方法等用于条件处理。
条件变量保持线程同步:threading.Condition()

  • wait():线程挂起,直到收到一个notify通知才会被唤醒继续运行
  • notify():通知其他线程,那些挂起的线程接到这个通知之后会开始运行
  • notifyAll(): 如果wait状态线程比较多,notifyAll的作用就是通知所有线程(这个一般用得少)

1.在调用with condition(condition.aquire())之后才能调用wait或者notify方法

2.启动顺序很重要,等待唤醒的线程要先启动,否则会接收不到notify()的通知,无法被唤醒

3.condition有两层锁, 一把底层锁会在线程调用了wait方法的时候释放, 上面的锁会在每次调用wait的时候分配一把并放入到condition的等待队列中,等到notify方法的唤醒

import threading
class XiaoAi(threading.Thread):
    def __init__(self,condition):
        super().__init__(name='小爱')
        self.condition =condition
    def run(self):
        with self.condition:    #  <---底层锁 (self.condition.aquire()) 1    
            self.condition.wait() # <--- 释放底层锁 ,分配一把并放入到condition的等待队列 2
            print('{}:在'.format(self.name))
            self.condition.notify()
            self.condition.wait()
            print('{}:好啊'.format(self.name))
            self.condition.notify()
            self.condition.wait()
            print('{}:君住长江尾'.format(self.name))
class TianMao(threading.Thread):
    def __init__(self,condition):
        super().__init__(name='天猫')
        self.condition = condition
    def run(self):
        with self.condition:     #   <--- 获取底层锁  3
            print('{}:小爱同学'.format(self.name)) # 4
            self.condition.notify()  # 5
            self.condition.wait()    # 6
            print('{}:我们来对古诗吧'.format(self.name))
            self.condition.notify()
            self.condition.wait()
            print('{}:我住长江头'.format(self.name))
            self.condition.notify()
condition = threading.Condition()
xiaoai = XiaoAi(condition)
tianmao = TianMao(condition)
xiaoai.start()
tianmao.start()

Semaphore 信号量

threading.Semaphore 控制并发数量

当调用acquire时,减少计数,当调用release时,增加计数,当计数为0时,自动阻塞,等待release被调用。

semaphore = threading.Semaphore(3)
semaphore.aquire()
threading1.start()
semaphore.release()
semaphore.aquire()
threading2.start()
semaphore.release()
.........

模拟控制爬虫数量

import threading
import time
class HtmlSpider(threading.Thread):
    def __init__(self, url, sem):
        super().__init__()
        self.url = url
        self.sem = sem
    def run(self):
        time.sleep(2)
        print(self.url)
        self.sem.release()
class UrlProducer(threading.Thread):
    def __init__(self, sem):
        super().__init__()
        self.sem = sem
    def run(self):
        for i in range(20):
            self.sem.acquire()
            html_thread = HtmlSpider("https://www.baidu.com/{}".format(i), self.sem)
            html_thread.start()
sem = threading.Semaphore(3)
up = UrlProducer(sem)
up.run()

线程池

线程池的作用,

第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。

第三:提高线程的可管理性和获取线程的状态。

from concurrent.futures import ThreadPoolExecutor,as_completed,wait
import time
def get_html(t):
    time.sleep(t)
    print(t)
    if t == 2:
        raise RuntimeError
    return t
executor = ThreadPoolExecutor(max_workers=2)
# task = executor.submit(get_html,3)
# task.done()     # 判断是否执行完毕
# task.result()   #  获取返回结果
# task.cancel()   # 可以在线程启动前取消
url = [7,3,1]
'''
for future in executor.map(get_html,url)
    re = future.result()
    print(re)
'''

task_list = [executor.submit(get_html,i) for i in url]
for future in as_completed(task_list): # as_completed会对task_list中的所有线程进行检测,一旦一个线程执行完毕一次,将返回一个Futures
    re = future.result()
    print(re)
#wait(task_list,return_when=ALL_COMPLETED)

原文地址:https://www.cnblogs.com/notfind/p/11861893.html