Python-Basis-18th

周二,晴,记录生活分享点滴

参考博客1:https://www.cnblogs.com/yuanchenqi/articles/5733873.html

参考博客2:https://www.cnblogs.com/alex3714/articles/5230609.html

Python版本:3.5

线程与进程

简述

线程 thread:是操作系统能够进行运算调度的最小单位(一堆指令集合,能够让操作系统工作起来最小的执行器是一个线程)

进程 process:对线程及其他资源的集合

注意:

  1. 开进程的消耗比开线程的消耗大的多
  2. 线程可以共享一个数据,而进程不行
  3. 线程与进程执行的是同样的东西,没有谁快谁慢之说

GIL(Global Interpreter Lock)

GIL 全局解释器锁 在同一时刻,只能有一个线程进入解释器

IO密集型任务函数:只有一个CPU可以处理;计算密集型任务函数:只有一个CPU无法处理

在python里,如果处理的人物是IO密集型的,可以用多线程;如果是计算密集型的,推荐用C。

 

threading模块

线程的两种调用方式

直接调用

# 简要版

import threading

def foo(n):
    pass

t1 = threading.Thread(target = foo, args = (1, ))  # 创建子线程对象
t1.start()  # 使线程执行
# 完整版

import threading
import time
 
def sayhi(num):  # 定义每个线程要运行的函数
 
    print("running on number:%s" %num)
 
    time.sleep(3)  # sleep执行时不占CPU
 
if __name__ == '__main__':
 
    t1 = threading.Thread(target=sayhi,args=(1,))  # 生成一个线程实例
    t2 = threading.Thread(target=sayhi,args=(2,))  # 生成另一个线程实例
 
    t1.start()  # 启动线程
    t2.start()  # 启动另一个线程
 
    print(t1.getName())  # 获取线程名
    print(t2.getName())

继承式调用

# 优先推荐 类 的方法

import threading
import time
 
 
class MyThread(threading.Thread):  # 类继承threading.Thread
    def __init__(self,num):  # 参数用__init__获取
        threading.Thread.__init__(self)
        self.num = num  # self指实例对象
 
    def run(self):  # 定义每个线程要运行的函数  # 方法(对应foo的函数)写在run里面
 
        print("running on number:%s" %self.num)
 
        time.sleep(3)
 
if __name__ == '__main__':
 
    t1 = MyThread(1)  # MyThread表示一个类;MyThread()表示实例化一个类的对象,自动调用__init__方法;MyThread(1)将1传给__init__的参数num
    t2 = MyThread(2)
    t1.start()
    t2.start()

小结:

  1. 直接调用 t1 = threading.Thread(target = foo, args = (1, )) 是通过threading.Thread里面加一个函数名字和参数创建一个线程对象
  2. 继承式调用是创建一个类 class MyThread(threading.Thread): ,这个类继承threading.Thread,参数用__init__获取,方法(对应foo的函数)写在run里面

Join & Daemon

setDaemon(True):

将线程声明为守护线程,必须在start() 方法调用之前设置,和join是相反的。当只需要主线程完成后,不管子线程是否完成,都要和主线程一起退出,需要用setDaemon方法

join():

在子线程完成运行之前,这个子线程的父线程将一直被阻塞。(join是阻塞用的,谁调用join就阻塞谁)

import threading
from time import ctime,sleep
import time

def music(func):
    for i in range(2):
        print ("Begin listening to %s. %s" %(func,ctime()))
        sleep(4)  # 执行时间为4秒*2次=8秒
        print("end listening %s"%ctime())

def move(func):
    for i in range(2):
        print ("Begin watching at the %s! %s" %(func,ctime()))
        sleep(5)  # 执行时间为5秒*2次=10秒
        print('end watching %s'%ctime())

threads = []
t1 = threading.Thread(target=music,args=('七里香',))
threads.append(t1)
t2 = threading.Thread(target=move,args=('阿甘正传',))
threads.append(t2)

if __name__ == '__main__':

    for t in threads:
        # t.setDaemon(True)  # 我们需要的是只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,用setDaemon方法
        t.start()
        # t.join()  # 子线程结束之后再执行主线程这部分的代码  # 先执行的是mucis的8秒进程,在执行move的10秒进程,串行
    # t1.join()  # Python、C认为是for循环的最后一次的赋值,在其他语言是错误的
    t2.join()
    
    print ("all over %s" %ctime())

其他方法

thread 模块提供的其他方法:
# threading.currentThread(): 返回当前的线程变量。
# threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
# threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
# 除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:
# run(): 用以表示线程活动的方法。
# start():启动线程活动。
# join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
# isAlive(): 返回线程是否活动的。
# getName(): 返回线程名。
# setName(): 设置线程名。

同步锁 Lock 

import time
import threading

def addNum():
    global num #在每个线程中都获取这个全局变量
    # num-=1

    temp=num
    print('--get num:',num )
    #time.sleep(0.1)
    num =temp-1 #对此公共变量进行-1操作


num = 100  #设定一个共享变量
thread_list = []
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有线程执行完毕
    t.join()

print('final num:', num )

注意:

  • num-=1时输出结果为0,因为动作太快(完成这个动作在切换的时间内)
  • 如果sleep(1),100个线程每一个一定都没有执行完就进行了切换,sleep等效于IO阻塞,1s之内不会再切换回来,所以最后的结果一定是99。

应用同步锁

同步锁是把计算的部分(涉及到操作公共数据)改成串行的,锁的是线程,其他的逻辑正常

join是指把所有的改成串行的,会把整个线程给停住,延长了运行时间,失去了多线程的意义

import time
import threading

def addNum():
    global num #在每个线程中都获取这个全局变量
    # num-=1
    lock.acquire()  
    temp=num
    print('--get num:',num )
    #time.sleep(0.1)
    num =temp-1 #对此公共变量进行-1操作
    lock.release() 

num = 100  #设定一个共享变量
thread_list = []
lock=threading.Lock() 

for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有线程执行完毕
    t.join()

print('final num:', num )

同步锁与GIL的关系

  • GIL是保证在同一时刻只有一个线程进入解释器里面
  • 同步锁是在CPU运行中进行切换时,锁住一部分使CPU在运行这部分时不切换,防止冲突

线程死锁和递归锁

死锁

在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都正在使用,所有这两个线程在无外力作用下将一直等待下去。

import threading,time

class myThread(threading.Thread):
    def doA(self):
        lockA.acquire()
        print(self.name,"gotlockA",time.ctime())
        time.sleep(3)
        lockB.acquire()
        print(self.name,"gotlockB",time.ctime())
        lockB.release()
        lockA.release()

    def doB(self):
        lockB.acquire()
        print(self.name,"gotlockB",time.ctime())
        time.sleep(2)
        lockA.acquire()
        print(self.name,"gotlockA",time.ctime())
        lockA.release()
        lockB.release()
    def run(self):
        self.doA()
        self.doB()
if __name__=="__main__":

    lockA=threading.Lock()
    lockB=threading.Lock()
    threads=[]
    for i in range(5):
        threads.append(myThread())
    for t in threads:
        t.start()
    for t in threads:
        t.join()

递归锁

解决死锁,使用“可重入锁”——递归锁

lockA=threading.Lock()
lockB=threading.Lock()<br>  # --------------<br>lock=threading.RLock()

应用

import time

import threading

class Account:
    def __init__(self, _id, balance):
        self.id = _id
        self.balance = balance
        self.lock = threading.RLock()  # 通过RLock代替lockA、lockB解决死锁

    def withdraw(self, amount):

        with self.lock:
            self.balance -= amount

    def deposit(self, amount):
        with self.lock:
            self.balance += amount


    def drawcash(self, amount):  # lock.acquire中嵌套lock.acquire的场景

        with self.lock:
            interest=0.05
            count=amount+amount*interest

            self.withdraw(count)


def transfer(_from, to, amount):

    # 锁不可以加在这里 因为其他的其它线程执行的其它方法在不加锁的情况下数据同样是不安全的
     _from.withdraw(amount)

     to.deposit(amount)


zhangsan = Account('zhangsan',1000)
lisi = Account('lisi',1000)

t1=threading.Thread(target = transfer, args = (zhangsan,lisi, 100))
t1.start()

t2=threading.Thread(target = transfer, args = (lisi,zhangsan, 200))
t2.start()

t1.join()
t2.join()

print('>>>',zhangsan.balance)
print('>>>',lisi.balance)

信号量 Semaphore

信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数器,每当调用acquire()时-1,调用release()时+1。

信号量与递归锁注意区分:递归锁是一层一层往里面加,信号量是并行的

信号量的应用可以在连接数据库时进行一定数量的限制,限制同时连接数据库的数量

import threading,time
class myThread(threading.Thread):
    def run(self):
        if semaphore.acquire():
            print(self.name)
            time.sleep(3)
            semaphore.release()
if __name__=="__main__":
    semaphore=threading.Semaphore(5) 
    thrs=[]
    for i in range(23):  # 最后出3个
        thrs.append(myThread())
    for t in thrs:
        t.start()

条件变量 condition

锁+线程间的通信:提供RLock()或Lock()的方法,还提供了 wait()、notify()、notifyAll()方法

  • wait():条件不满足时调用,线程会释放锁并进入等待阻塞;
  • notify():条件创造后调用,通知等待池激活一个线程;
  • notifyAll():条件创造后调用,通知等待池激活所有线程。

lock_con=threading.Condition([Lock/Rlock]): 锁是可选选项,不传入锁,对象自动创建一个RLock()。

import threading,time
from random import randint
class Producer(threading.Thread):
    def run(self):
        global L
        while True:
            val=randint(0,100)
            print('生产者',self.name,":Append"+str(val),L)
            if lock_con.acquire():
                L.append(val)
                lock_con.notify()  # 激活Consumer中的wait
                lock_con.release()
            time.sleep(3)
class Consumer(threading.Thread):
    def run(self):
        global L
        while True:
                lock_con.acquire()
                if len(L)==0:
                    lock_con.wait()
                print('消费者',self.name,":Delete"+str(L[0]),L)
                del L[0]
                lock_con.release()
                time.sleep(0.25)

if __name__=="__main__":

    L=[]
    lock_con=threading.Condition()
    threads=[]
    for i in range(5):
        threads.append(Producer())
    threads.append(Consumer())
    for t in threads:
        t.start()
    for t in threads:
        t.join()

同步条件 Event

条件同步event和条件变量同步condition相比少了锁功能。

  • event=threading.Event():内部有一个标志位(条件环境对象,初始值为False;)
  • event.isSet():返回当前的标志位是True还是False(返回event的状态值;)
  • event.wait():判断标志位(如果 event.isSet()==False将阻塞线程,如果 event.isSet()==True将继续向下执行;)
  • event.set(): 将标志位改成True(设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;)
  • event.clear():将标志位改成False(恢复event的状态值为False。)

 例1:加班

import threading,time
class Boss(threading.Thread):
    def run(self):
        print("BOSS:今晚大家都要加班到22:00。")
        event.set()
        time.sleep(5)
        print("BOSS:<22:00>可以下班了。")
        event.set()
class Worker(threading.Thread):
    def run(self):
        event.wait()
        print("Worker:哎……命苦啊!")
        time.sleep(1)
        event.clear()
        event.wait()
        print("Worker:OhYeah!")
if __name__=="__main__":
    event=threading.Event()
    threads=[]
    for i in range(5):
        threads.append(Worker())
    threads.append(Boss())
    for t in threads:
        t.start()
    for t in threads:
        t.join()

例2:红绿灯 

import threading,time
import random
def light():
    if not event.isSet():
        event.set() #wait就不阻塞 #绿灯状态
    count = 0
    while True:
        if count < 10:
            print('33[42;1m--green light on---33[0m')
        elif count <13:
            print('33[43;1m--yellow light on---33[0m')
        elif count <20:
            if event.isSet():
                event.clear()
            print('33[41;1m--red light on---33[0m')
        else:
            count = 0
            event.set() #打开绿灯
        time.sleep(1)
        count +=1
def car(n):
    while 1:
        time.sleep(random.randrange(10))
        if  event.isSet(): #绿灯
            print("car [%s] is running.." % n)
        else:
            print("car [%s] is waiting for the red light.." %n)
if __name__ == '__main__':
    event = threading.Event()
    Light = threading.Thread(target=light)
    Light.start()
    for i in range(3):
        t = threading.Thread(target=car,args=(i,))
        t.start()

队列 queue(重要)

多线程利器(队列):与字典、列表类似,用于存储数据

import queue

d = queue.Queue(3) # 3表示可以在里面插入3个数据,如果是2,程序会结束,被阻塞住了 # 如果数字为空,默认值为0,无限大

d.put('zhangsan')
d.put('lisi')
d.put('wangwu')

# FIFO(first in first out) 默认为先进先出,可以修改
print(d.get())  # zhangsan
print(d.get())  # lisi
print(d.get())  # wangwu

# 在d = queue.Queue(2),d.put('xxx')有3个的时候,多一个,d.put('xxx', 0) 报错:queue Full;d.put('xxx')程序结束,表示被阻塞
# 在d = queue.Queue(2),print(d.get())有3个的时候,多一个,print(d.get(0)) 报错:queue empty;print(d.get())程序结束,表示被阻塞

例子

import threading,queue
from time import sleep
from random import randint
class Production(threading.Thread):
    def run(self):
        while True:
            r=randint(0,100)
            q.put(r)  # 加锁的目的是生产者避免重复操作
            print("生产出来%s号包子"%r)
            sleep(1)
            
class Proces(threading.Thread):
    def run(self):
        while True:
            re=q.get()  # 加锁的目的是消费者避免重复操作
            print("吃掉%s号包子"%re)
            
if __name__=="__main__":
    q=queue.Queue(10)
    threads=[Production(),Production(),Production(),Proces()]  # 创建4个线程对象
    for t in threads:
        t.start()

注意:列表如果是线程——不安全

import threading,time

li=[1,2,3,4,5]

def pri():
    while li:
        a=li[-1]
        print(a)
        time.sleep(1)
        try:
            li.remove(a)
        except:
            print('----',a)

t1=threading.Thread(target=pri,args=())
t1.start()
t2=threading.Thread(target=pri,args=())
t2.start()

# 两个线程同时取,有可能拿到同一个数,线程不安全
原文地址:https://www.cnblogs.com/chungzhao/p/13064681.html