Python实战笔记(三) 多线程

Python 提供 threading 模块用于控制线程,使我们处理多线程更加方便

1、线程模块的常用属性和方法

  • active_count():返回当前存活的线程对象的数量
  • enumerate():返回当前存活的线程对象的列表
  • current_thread():返回当前线程对象
  • main_thread():返回主线程对象
  • get_ident():返回当前线程的线程标识符
  • stack_size([size]):返回创建线程时使用的堆栈大小
  • TIMEOUT_MAX:指定阻塞函数(如 acquirewaitwait_for 等)timeout 参数的最大值
import threading

thread_number = threading.active_count()
print(thread_number) # 1
curr_thread = threading.main_thread()
main_thread = threading.current_thread()
print(curr_thread == main_thread) # True

2、创建线程对象

threading.Thread(group, target, name, args, kwargs, *, daemon)
  • group:为拓展 ThreadGroup 而保留,无需设置,默认为 None
  • target:调用对象,默认为 None,表示不需要调用任何方法
  • name:线程名称,默认以 Thread-N 的格式命名
  • args:传递给调用对象的参数(位置参数),默认为 ()
  • kwargs:传递给调用对象的参数(关键字参数),默认为 {}
  • daemon:是否设置为守护线程,默认为 None,表示继承调用者的属性

非守护进程和守护进程的区别:

  • 非守护进程:当程序退出时,如果还有非守护线程在运行,程序会等待所有非守护线程结束后才会真正退出
  • 守护线程:当程序退出时,如果还有守护线程运行,程序将会强制结束所有守护线程,导致资源不能正确释放

3、线程对象的常用属性和方法

  • name:线程名称
  • ident:线程标识符
  • daemon:是否为守护线程
  • is_alive():线程是否存活
import threading

main_thread = threading.current_thread()
print(main_thread.name) # MainThread
print(main_thread.ident)
print(main_thread.daemon) # False
print(main_thread.is_alive()) # True
  • start():创建一个新的线程并由新的线程调用方法,不同线程同时执行
import threading
import time

def sleep(wait):
    name = threading.current_thread().name
    print(name, 'Start')
    time.sleep(wait)
    print(name, 'Terminated')

def main():
    name = threading.current_thread().name
    print(name, 'Start')
    worker = threading.Thread(target = sleep, args = (2,))
    worker.start() # 启动工作线程 Thread-1
    print(name, 'Terminated')

if __name__ == '__main__':
    main() # 启动主线程 MainThread

# 执行结果
# MainThread Start
# Thread-1 Start
# MainThread Terminated
# 沉睡 2 秒
# Thread-1 Terminated
  • join():阻塞调用者线程,直至被调用线程结束
import threading
import time

def sleep(wait):
    name = threading.current_thread().name
    print(name, 'Start')
    time.sleep(wait)
    print(name, 'Terminated')

def main():
    name = threading.current_thread().name
    print(name, 'Start')
    worker = threading.Thread(target = sleep, args = (2,))
    worker.start()
    worker.join() # 阻塞线程 MainThread,直至线程 Thread-1 结束
    print(name, 'Terminated')

if __name__ == '__main__':
    main()

# 执行结果
# MainThread Start
# Thread-1 Start
# 沉睡 2 秒
# Thread-1 Terminated
# MainThread Terminated
  • run():不会创建线程,相当于直接调用方法
import threading
import time

def sleep(wait):
    name = threading.current_thread().name
    print(name, 'Start')
    time.sleep(wait)
    print(name, 'Terminated')

def main():
    name = threading.current_thread().name
    print(name, 'Start')
    worker = threading.Thread(target = sleep, args = (2,))
    worker.run() # 相当于直接由主线程 MainThread 调用方法 sleep
    print(name, 'Terminated')

if __name__ == '__main__':
    main()

# 执行结果
# MainThread Start
# MainThread Start
# 沉睡 2 秒
# MainThread Terminated
# MainThread Terminated
  • setDaemon():是否设置为守护线程
import threading
import time

def sleep(wait):
    name = threading.current_thread().name
    print(name, 'Start')
    time.sleep(wait)
    print(name, 'Terminated')

def main():
    name = threading.current_thread().name
    print(name, 'Start')
    worker = threading.Thread(target = sleep, args = (2,))
    worker.setDaemon(True) # 设置工作线程 Thread-1 为守护进程
    worker.start()
    print(name, 'Terminated') # 当程序结束后,会强制终止守护线程

if __name__ == '__main__':
    main()

# 执行结果
# MainThread Start
# Thread-1 Start
# MainThread Terminated

4、线程安全

由于不同线程之间是并行的,如果多个线程同时修改一个数据,那么结果将会是不可预料的

import threading
import time

num = 0

def add(val):
    global num
    time.sleep(1)
    num += val
    print(num)

def main():
    for index in range(1, 9):
        worker = threading.Thread(target = add, args = (index,))
        worker.start()

if __name__ == '__main__':
    main()

程序每次运行的结果都是未知的,所以我们需要采取一些机制,使得线程能够按照期望的方式工作

(1)锁对象:threading.Lockthreading.RLock

一个 threading.Lock 对象只有两种状态,分别是锁定(locked)和非锁定(unlocked)

任意一个线程可以使用 acquire() 方法将锁对象设置为锁定状态(也称为获得锁)

如果此时还有其它线程调用 acquire() 方法,该线程将会被阻塞

直至其它任意线程使用 release() 方法将锁对象设置为非锁定状态(也称为释放锁)

如果在调用 release() 方法时,锁对象处于非锁定状态,则会抛出异常

锁对象状态 调用的方法 结果
unlocked acquire 将锁对象设置为锁定状态
locked acquire 阻塞当前线程
locked release 将锁对象设置为非锁定状态
unlocked release 抛出异常

threading.Lock 有两个常用的方法,分别是 acquire()release()

  • acquire(blocking = True, timeout = -1):获得锁
    • blocking:是否阻塞线程,默认为 True,表示没有获得锁时,将会阻塞当前线程
    • timeout:最长阻塞时间,默认为 -1,表示一直阻塞下去,直至锁被释放
  • release():释放锁
import threading
import time

num = 0
lock = threading.Lock() # 声明锁对象

def add(val):
    lock.acquire() # 修改数据前,将锁对象设置为锁定状态
    global num
    time.sleep(1)
    num += val
    print(num)
    lock.release() # 修改数据后,将锁对象设置为非锁定状态

def main():
    for index in range(1, 8):
        worker = threading.Thread(target = add, args = (index,))
        worker.start()

if __name__ == '__main__':
    main()

threading.RLockthreading.Lock 的功能大致一样,threading.RLock 的特别之处在于:

  • 在同一线程内,多次调用 acquire() 方法,不会阻塞线程
  • 使用多少次 acquire() 方法获得锁,就必须使用多少次 release() 方法才能释放锁
  • 某一线程通过 acquire() 方法获得锁,只能在该线程内通过 release() 方法释放锁

(2)信号量对象:threading.Semaphore

一个 threading.Semaphore 对象在内部维护一个计数器,规定计数器的值不能小于 0

任意一个线程可以使用 acquire() 方法,使得计数器减 1

如果此时计数器已经为 0,那么将会阻塞当前线程,直至计数器大于 0

任意一个线程可以使用 release() 方法,使得计数器加 1

计数器 调用的方法 结果
大于 0 acquire 使计数器减 1
等于 0 acquire 阻塞当前线程
大于等于 0 release 使计数器加 1

threading.Semaphore 有两个常用的方法,分别是 acquire()release()

  • acquire(blocking = True, timeout = -1):使计数器减 1
    • blocking:是否阻塞线程,默认为 True,表示计数器为 0 时,将会阻塞当前线程
    • timeout:最长阻塞时间,默认为 -1,表示一直阻塞下去,直至计数器大于 0
  • release():使计数器加 1
import threading
import time

num = 0
semaphore = threading.Semaphore(1) # 声明信号量,可以指定计数器初始值,默认为 1

def add(val):
    semaphore.acquire() # 使计数器减 1
    global num
    time.sleep(1)
    num += val
    print(num)
    semaphore.release() # 使计数器加 1

def main():
    for index in range(1, 8):
        worker = threading.Thread(target = add, args = (index,))
        worker.start()

if __name__ == '__main__':
    main()

使用信号量还可以使多个线程同时修改一个数据

import threading
import time

semaphore = threading.Semaphore(3)

def run():
    semaphore.acquire()
    time.sleep(1)
    print(threading.current_thread().name, 'Running')
    semaphore.release()
    
def main():
    for _ in range(7):
        worker = threading.Thread(target = run)
        worker.start()

if __name__ == '__main__':
    main()

(3)条件对象:threading.Condition

条件对象在锁对象的基础上封装而成,threading.Condition 常用的方法如下:

  • acquire():获得锁,调用底层(LockRLock)所对应的函数

  • release():释放锁,调用底层(LockRLock)所对应的函数

  • wait(timeout = None)

    在调用该方法后,调用 release() 释放锁,然后阻塞当前线程,等待其它线程调用 notify() 唤醒

    然后在被唤醒后,调用 acquire() 尝试获得锁

    如果有设置 timeout,即使没有其它线程调用 notify() 唤醒当前线程,也会在超时之后自动被唤醒

  • wait_for(predicate, timeout = None)

    在调用该方法后,首先调用 predicate,若返回 True,则继续执行

    若返回 False,调用 release() 释放锁,然后阻塞当前线程,等待其它线程调用 notify() 唤醒

    然后在被唤醒后,也会调用 predicate,若返回 False,将会一直阻塞下去

    若返回 True,调用 acquire() 尝试获得锁

    如果有设置 timeout,即使没有其它线程调用 notify() 唤醒当前线程,也会在超时之后自动被唤醒

  • notify(n = 1):唤醒 n 个线程

  • notify_all():唤醒所有线程

import threading
import time

data = 1
condition = threading.Condition()

def isEven():
    global data
    return data % 2 == 0

def wait():
    condition.acquire() # 要先获得锁,才能释放锁
    print('wait_thread 进入等待')
    condition.wait_for(isEven) # 释放锁,阻塞当前线程,等待唤醒后重新获得锁,继续执行
    print('wait_thread 继续执行')
    condition.release() # 重新获得锁后,记得要释放锁

def wake():
    global data
    condition.acquire() # 要先获得锁,再修改数据
    data = 2
    print('唤醒 wait_thread')
    condition.notify()
    condition.release() # 获得锁后,要释放锁

def main():
    wait_thread = threading.Thread(target = wait)
    wake_thread = threading.Thread(target = wake)
    wait_thread.start()
    time.sleep(1)
    wake_thread.start()

if __name__ == '__main__':
    main()

# 执行结果
# wait_thread 进入等待
# 唤醒 wait_thread
# wait_thread 继续执行

(4)事件对象:threading.Event

一个 threading.Event 对象内部维护一个标记,初始时默认为 False,threading.Event 常用的方法如下:

  • set():将标记设置为 True
  • clear():将标记设置为 False
  • wait():阻塞当前线程,直到标记变为 True
  • is_set():标记是否为 True
import threading
import time

event = threading.Event()

def wait():
    print(threading.current_thread().name, '进入等待')
    event.wait()
    print(threading.current_thread().name, '继续执行')

def wake():
    print('唤醒所有线程')
    event.set()

def main():
    for _ in range(5):
        wait_thread = threading.Thread(target = wait)
        wait_thread.start()
    time.sleep(1)
    wake_thread = threading.Thread(target = wake)
    wake_thread.start()

if __name__ == '__main__':
    main()

# 执行结果
# Thread-1 进入等待
# Thread-2 进入等待
# Thread-3 进入等待
# Thread-4 进入等待
# Thread-5 进入等待
# 唤醒所有线程
# Thread-1 继续执行
# Thread-2 继续执行
# Thread-5 继续执行
# Thread-4 继续执行
# Thread-3 继续执行

【 阅读更多 Python 系列文章,请看 Python学习笔记

原文地址:https://www.cnblogs.com/wsmrzx/p/12275101.html