python多线程之Threading

什么是线程?

线程是操作系统内核调度的基本单位,一个进程中包含一个或多个线程,同一个进程内的多个线程资源共享,线程相比进程是“轻”量级的任务,内核进行调度时效率更高。

多线程有什么优势?

  • 多线程可以实现多任务并发执行,简化代码的编写难度,每一个独立的模块都可以设计成一个独立的线程运行

  • 线程间通信比进程间通信难度更小,效率更高,因为资源共享

  • 线程的调度比进程的调度效率高

  • Python 语言内置了多线程功能支持,而不是单纯地作为底层操作系统的调度方式,从而简化了 Python 的多线程编程

Threading库

多线程的启动方式(函数式和类对象式)

import threading
import time

def runOne(info):
    while True:
        print(info)
        time.sleep(1)
    pass

def runTwo(info):
    while True:
        print(info)
        time.sleep(1)
    pass


if __name__ == '__main__':
    t1 = threading.Thread(target = runOne, args = ("task one run",))
    t2 = threading.Thread(target = runTwo, args = ("task two run",))
    t1.start() # 启动t1
    t2.start() # 启动t2
    t1.join()  # 主线程等待t1子线程结束(阻塞)
    t2.join()  # 主线程等待t2子线程结束(阻塞)
import threading
import time

class MyThread(threading.Thread):
    def __init__(self, info):
        super(MyThread, self).__init__()
        self.info = info

    def run(self):
        while True:
            print(self.info)
            time.sleep(1)
        pass

if __name__ == "__main__":
    t1 = MyThread("taskone")
    t2 = MyThread("tasktwo")
    t1.start()  # 启动t1
    t2.start()  # 启动t2
    t1.join()   # 主线程等待t1子线程结束(阻塞)
    t2.join()   # 主线程等待t2子线程结束(阻塞)

守护线程

如果将任务1设置为任务2的守护线程,当任务2结束时,任务1也自动结束。上述例子中如果将子线程设置主线程的守护线程,那么当主线程结束时,子线程也自动结束。

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, info):
        super(MyThread, self).__init__()
        self.info = info

    def run(self):
        print(self.info)
        time.sleep(1)
        print(self.info)
        time.sleep(1)
        pass

if __name__ == "__main__":
    t1 = MyThread("taskone")
    t1.setDaemon(True)  # t1设置为主线程的守护线程
    t1.start()          # 启动t1

主线程等待子线程结束

为了实现子线程结束后,主线程再结束的目的,可以使用join方法,让主线程等待子线程执行。

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, info):
        super(MyThread, self).__init__()
        self.info = info

    def run(self):
        print(self.info)
        time.sleep(1)
        print(self.info)
        time.sleep(1)
        print("Sub tash end")
        pass

if __name__ == "__main__":
    t1 = MyThread("taskone")
    t1.start()  # 启动t1
    t1.join()   # 主线程等待子线程结束
    print("main task end")

多线程之间的通信

互斥锁

由于同一个进程下多个任务可以共享数据,因此都可以访问同一个全局变量,速度很快,但是也有问题。因为线程的调度是内核实现,线程自己不知道自己什么时候被切换,有可能是访问全局变量操作了一半(访问全局变量,看着是一句话,实际上是多条操作)然后被切换了,当下次得到调度时,此时全局变量有可能已经被其他线程修改了,导致再次访问时获取的数据不对,从而引发异常。解决的办法是在访问全局变量的时候,将全局变量锁住,让其它线程访问不了。互斥锁就是用来实现这个功能,加了互斥锁的地方,同一时间永远只有一个线程可以访问这个全局变量,直到该线程访问完毕后,其他任务才能访问。

import threading
import time

num = 0
mutex = threading.Lock()

class MyThread(threading.Thread):
    def run(self):
        global num
        time.sleep(1)

        if mutex.acquire():
            num = num + 1
            msg = self.name + ' set num to ' + str(num)
            print(msg)
            mutex.release()

def test():
    for i in range(5):
        t = MyThread()
        t.start()

如果加了互斥锁,打印就是1 2 3 4 5;如果没加锁,打印就是乱序的。

消息队列

消息队列是一个线程将消息发送给另一个线程的方式,可以把消息列表理解成一个管道,一个线程在管道的一端放东西,一个线程在管道的另一端取东西,东西在管道内一个接一个的流动。通过发消息的方式就可以避免集中访问全局变量的问题,安全性更高。

import threading
import time
import queue

q = queue.Queue()

def runOne():
    while True:
        msg = q.get()
        print(msg)
    pass

def runTwo():
    while True:
        q.put("message")
        time.sleep(1)
    pass


if __name__ == '__main__':
    t1 = threading.Thread(target = runOne)
    t2 = threading.Thread(target = runTwo)
    t1.start() # 启动t1
    t2.start() # 启动t2
    t1.join()  # 主线程等待t1子线程结束(阻塞)
    t2.join()  # 主线程等待t2子线程结束(阻塞)

线程怎么结束?

由于threading模块没有提供停止线程的方法,也就是说线程start之后,就处于失控的状态,只能被动的等待它自己结束,这显然是问题。对于线程的结束,本文提供两种方法:第一种发下消息,让线程自己主动退出;第二种调动自定义接口直接结束线程。

import threading
import time
import queue

q = queue.Queue()

def runOne():
    while True:
        msg = q.get()

        if msg == 'exit':
            break

        print(">> %s" % msg)
    print("task one stop")

def runTwo():
    while True:
        info = input()
        q.put(info)

        if info == 'exit':
            break
    print("task two stop")

if __name__ == '__main__':
    t1 = threading.Thread(target = runOne)
    t2 = threading.Thread(target = runTwo)
    t1.start() # 启动t1
    t2.start() # 启动t2
    t1.join()  # 主线程等待t1子线程结束(阻塞)
    t2.join()  # 主线程等待t2子线程结束(阻塞)
import inspect
import ctypes
import threading
import time

def _async_raise(tid, exctype):
    tid = ctypes.c_long(tid)
    if not inspect.isclass(exctype):
        exctype = type(exctype)
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
    if res == 0:
        raise ValueError("invalid thread id")
    elif res != 1:
        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
        raise SystemError("PyThreadState_SetAsyncExc failed")

# 结束线程的函数
def stop_thread(thread):
    _async_raise(thread.ident, SystemExit)

def runOne(info):
    while True:
        print(info)
        time.sleep(1)
    pass

def runTwo(info):
    while True:
        print(info)
        time.sleep(1)
    pass


if __name__ == '__main__':
    t1 = threading.Thread(target = runOne, args = ("task one run",))
    t2 = threading.Thread(target = runTwo, args = ("task two run",))
    t1.start()
    t2.start()

    time.sleep(5)
    stop_thread(t1) # 停止t1任务

    print('------------------------')

    time.sleep(5)
    stop_thread(t2)  # 停止t1任务

    t1.join()
    t2.join()

GIL(Global Interpreter Lock)全局解释器锁

在非python环境中,单核情况下同时只能有一个任务执行;多核情况下可以支持多个线程同时执行。但是在python中,无论有多少核,同时只能执行一个线程。究其原因,是由于GIL的存在导致的。GIL的全称是Global Interpreter Lock(全局解释器锁),来源是python设计之初的考虑,为了数据安全而设计。某个线程想要执行,必须先拿到GIL,我们可以把GIL看作是“通行证”,并且在一个python进程中,GIL只有一个。拿不到通行证的线程,就不允许进入CPU执行。GIL只在cpython中才有,因为cpython调用的是c语言的原生线程,所以他不能直接操作cpu,只能利用GIL保证同一时间只能有一个线程拿到数据。使用建议:针对多核CPU场景,多进程的执行效率优于多线程,优先使用多进程。

原文地址:https://www.cnblogs.com/chusiyong/p/12898739.html