Python 多线程

Python多线程

Threading模块用于提供线程相关的操作,线程是应用程序中工作的最小单元。

threading模块提供的类:    

  Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local。

Thread类

Thread是线程类,有两种使用方法,直接传入要运行的方法或从Thread继承并覆盖run():

# -*- coding: utf-8 -*-
# @Author  : Jack.Ming
import threading
import time

def run(arg):
    time.sleep(2)
    print("Thread-%s" % arg, )

if __name__ == '__main__':
    for i in range(10):
        # 将要执行的方法作为参数传递给Thread方法
        thread = threading.Thread(target=run, args=(i,))
        thread.start()
方式1:创建线程
Thread-3
Thread-2
Thread-1
Thread-0
Thread-4
Thread-7
Thread-6
Thread-5
Thread-9
Thread-8

Process finished with exit code 0
执行结果
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Jack.Ming
import threading
import time

# 从Thread继承,并重写run()
class myThread(threading.Thread):
    def __init__(self, num):
        # #注意:一定要显式的调用父类的初始化函数。
        threading.Thread.__init__(self)
        self.num = num

    # 定义每个线程要运行的函数
    def run(self):
        time.sleep(1)
        print("Thread-%s" % self.num)

if __name__ == '__main__':
    for i in range(10):
        thread = myThread(i)
        thread.start()
方式2:自定义线程类,继承Thread类,并重写run方法
Thread-4
Thread-2
Thread-1
Thread-3
Thread-0
Thread-6
Thread-5
Thread-8
Thread-9
Thread-7

Process finished with exit code 0
运行结果

Thread类的构造方法: 

Thread(group=None, target=None, name=None,args=(), kwargs=None, *, daemon=None)

  class Thread:
    def __init__(self, group=None, target=None, name=None,args=(), kwargs=None, *, daemon=None):


  • group: 线程组,目前还没有实现,库引用中提示必须是None;
  • target: 要执行的方法run();
  • name: 线程名;
  • args/kwargs: 要传入方法的参数args元组,kwargs字典。
  • daemon=None:如果子类重写构造函数,则必须确保在对线程执行其他操作之前调用基类构造函数

Thread实例方法: 

  • is_alive(): 返回线程是否在运行[bool]。正在运行指启动后、终止前。
  • getName(): 获取线程名。
  • setName(str):设置线程名
  • start(): 线程准备就绪,启动线程,等待CPU调度
  • isDaemon():返回线程前台线程还是后台线程[bool]
  • setDaemon(bool):设置线程前台线程还是后台线程[bool],必须在start前设置
     如果是后台线程[True],主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,主线程和后台线程均停止
     如果是前台线程[False]默认,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
  • join([timeout]): 阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout(可选参数),逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义

#!/usr/bin/env python
# -*- coding: utf-8 -*
# Created by YangYongming at 2018/11/24 14:07
# FileName: 5.py

import threading

def run(arg):
    print(arg)

thread = threading.Thread(name="yangym", target=run(1), )
print(thread.getName())  # 输出 yangym
thread.setDaemon(False)
thread.setName("YANGYM")
print(thread.getName())  # 输出 YANGYM
thread.start()
print(thread.is_alive())  # 输出 False
print(thread.isDaemon())  # 输出 False
实例演示

setDaemon 使用介绍

setDaemon为True时,主线程不等待其他线程的执行;setDaemon为False时(默认),主线程等待所有其他线程执行完成后再继续往下执行。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Jack.Ming
import threading
import time

# 从Thread继承,并重写run()
class myThread(threading.Thread):
    def __init__(self, num):
        # #注意:一定要显式的调用父类的初始化函数。
        threading.Thread.__init__(self)
        self.num = num

    # 定义每个线程要运行的函数
    def run(self):
        time.sleep(1)
        print("Thread-%s" % self.num)

if __name__ == '__main__':
    for i in range(10):
        thread = myThread(i)
        thread.setDaemon(True)
        thread.start()
setDaemon为True
Process finished with exit code 0
运行结果:主程序执行完毕后立即退出
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Jack.Ming
import threading
import time

# 从Thread继承,并重写run()
class myThread(threading.Thread):
    def __init__(self, num):
        # #注意:一定要显式的调用父类的初始化函数。
        threading.Thread.__init__(self)
        self.num = num

    # 定义每个线程要运行的函数
    def run(self):
        time.sleep(1)
        print("Thread-%s" % self.num)

if __name__ == '__main__':
    for i in range(10):
        thread = myThread(i)
        thread.setDaemon(False) #默认
        thread.start()
setDaemon为False
Thread-2
Thread-3
Thread-1
Thread-0
Thread-9
Thread-6
Thread-4
Thread-7
Thread-8
Thread-5

Process finished with exit code 0
运行结果:主程序执行完毕后会等待其他所有线程全部结束后在退出程序

 join 使用介绍

阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout(可选参数),逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Jack.Ming
import threading
import time

def run(arg):
    time.sleep(2)
    print("Thread-%s Time is: %s" % (arg,time.ctime()) )


if __name__ == '__main__':
    for i in range(10):
        # 将要执行的方法作为参数传递给Thread方法
        thread = threading.Thread(target=run, args=(i,))
        thread.start()
        thread.join()
使用join,使用join的线程会阻塞其他线程的运行,指导线程结束或者超时,其他线程才可以继续运行
Thread-0 Time is: Sat Mar 10 13:33:24 2018
Thread-1 Time is: Sat Mar 10 13:33:26 2018
Thread-2 Time is: Sat Mar 10 13:33:28 2018
Thread-3 Time is: Sat Mar 10 13:33:30 2018
Thread-4 Time is: Sat Mar 10 13:33:32 2018
Thread-5 Time is: Sat Mar 10 13:33:34 2018
Thread-6 Time is: Sat Mar 10 13:33:36 2018
Thread-7 Time is: Sat Mar 10 13:33:38 2018
Thread-8 Time is: Sat Mar 10 13:33:40 2018
Thread-9 Time is: Sat Mar 10 13:33:42 2018

Process finished with exit code 0
运行结果:每个线程都会等待使用join的线程执行完毕后才继续运行,等待2秒,这使得多线程变得没有意义
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Jack.Ming
import threading
import time

def run(arg):
    time.sleep(2)
    print("Thread-%s Time is: %s" % (arg,time.ctime()) )

if __name__ == '__main__':
    for i in range(10):
        # 将要执行的方法作为参数传递给Thread方法
        thread = threading.Thread(target=run, args=(i,))
        thread.start()
不使用join,线程之前运行互不影响
Thread-4 Time is: Sat Mar 10 13:34:22 2018
Thread-2 Time is: Sat Mar 10 13:34:22 2018
Thread-1 Time is: Sat Mar 10 13:34:22 2018
Thread-0 Time is: Sat Mar 10 13:34:22 2018
Thread-3 Time is: Sat Mar 10 13:34:22 2018
Thread-6 Time is: Sat Mar 10 13:34:22 2018
Thread-5 Time is: Sat Mar 10 13:34:22 2018
Thread-9 Time is: Sat Mar 10 13:34:22 2018
Thread-8 Time is: Sat Mar 10 13:34:22 2018
Thread-7 Time is: Sat Mar 10 13:34:22 2018

Process finished with exit code 0
运行结果

 线程锁(Lock、RLock)

由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁 - 同一时刻允许一个线程执行操作。

Lock属于全局,Rlock属于线程。Lock(),Rlock(),推荐使用Rlock()

一把锁头,一把钥匙,钥匙用完给下一个人用

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Jack.Ming
import threading
import time
num = 0
def run(arg):
    global num
    time.sleep(1)
    num = num + arg
    print(num)

if __name__ == '__main__':
    for i in range(10):
        # 将要执行的方法作为参数传递给Thread方法
        thread = threading.Thread(target=run, args=(i,))
        thread.start()
未使用锁,所有线程随机运行,数据混乱
5
8
9
11
15
15
24
32
39
45

Process finished with exit code 0
执行结果,未按照理想的顺序运行
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Jack.Ming
import threading
import time

lock = threading.RLock()
num = 0

def run(arg):
    lock.acquire()
    global num
    time.sleep(1)
    num = num + arg
    print(num)
    lock.release()

if __name__ == '__main__':
    for i in range(10):
        # 将要执行的方法作为参数传递给Thread方法
        thread = threading.Thread(target=run, args=(i,))
        thread.start()
使用锁,设置一条数据在同一时间只可以被一个线程修改
0
1
3
6
10
15
21
28
36
45

Process finished with exit code 0
运行结果

 信号量(Semaphore)

互斥锁Rlock 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

一把锁头,n把钥匙,你把钥匙全部用完,再给下n个人使用

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Jack.Ming
import threading
import time

class myThread(threading.Thread):
    def __init__(self, num):
        threading.Thread.__init__(self)
        self.num = num

    def run(self):
        semaphore.acquire()
        time.sleep(5)
        print("Thread-%s Time is: %s" % (self.num,time.ctime()))
        semaphore.release()

if __name__ == '__main__':
    semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程被同事执行
    for i in range(10):
        thread = myThread(i)
        thread.start()
实例:某条语句设置最多可以5个线程同时执行
Thread-4 Time is: Sat Mar 10 14:07:00 2018
Thread-3 Time is: Sat Mar 10 14:07:00 2018
Thread-2 Time is: Sat Mar 10 14:07:00 2018
Thread-1 Time is: Sat Mar 10 14:07:00 2018
Thread-0 Time is: Sat Mar 10 14:07:00 2018
Thread-9 Time is: Sat Mar 10 14:07:05 2018
Thread-5 Time is: Sat Mar 10 14:07:05 2018
Thread-7 Time is: Sat Mar 10 14:07:05 2018
Thread-6 Time is: Sat Mar 10 14:07:05 2018
Thread-8 Time is: Sat Mar 10 14:07:05 2018

Process finished with exit code 0
# 每次执行5个线程,中间间隔5秒
运行结果:看时间

 事件(event)

python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

  • clear:将“Flag”设置为False 默认的,当Flag为False,无论线程执行到哪里,遇到wait后全部阻塞
  • set:将“Flag”设置为True   无论在哪里,遇到set后所有线程正常执行
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Jack.Ming
import threading
import time

class myThread(threading.Thread):
    def __init__(self, num, event):
        threading.Thread.__init__(self)
        self.num = num
        self.event = event

    def run(self):
        print("Thread-%s" % self.num)
        self.event.wait() # 所有线程到此将被阻塞
        print(self.num)

if __name__ == '__main__':
    event_obj = threading.Event()
    for i in range(10):
        thread = myThread(i, event_obj)
        thread.start()
    inp = input("please input:") # 输入true时,所有被阻塞线程恢复执行
    if inp == "true":
        event_obj.set()
实例
Thread-0
Thread-1
Thread-2
Thread-3
Thread-4
Thread-5
Thread-6
Thread-7
Thread-8
Thread-9
please input:true
0
2
3
1
5
7
8
9
4
6

Process finished with exit code 0
运行结果

Queue类

Python 的 Queue 模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列 PriorityQueue。

这些队列都实现了锁原语,能够在多线程中直接使用,可以使用队列来实现线程间的同步。

Queue 模块中的常用方法:

  • Queue.qsize() 返回队列的大小
  • Queue.empty() 如果队列为空,返回True,反之False
  • Queue.full() 如果队列满了,返回True,反之False
  • Queue.full 与 maxsize 大小对应
  • Queue.get([block[, timeout]])获取队列,timeout等待时间
  • Queue.get_nowait() 相当Queue.get(False)
  • Queue.put(item) 写入队列,timeout等待时间
  • Queue.put_nowait(item) 相当Queue.put(item, False)
  • Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
  • Queue.join() 实际上意味着等到队列为空,再执行别的操作
#!/usr/bin/python3

import queue
import threading
import time

exitFlag = 0

class myThread (threading.Thread):
    def __init__(self, threadID, name, q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.q = q
    def run(self):
        print ("开启线程:" + self.name)
        process_data(self.name, self.q)
        print ("退出线程:" + self.name)

def process_data(threadName, q):
    while not exitFlag:
        queueLock.acquire()
        if not workQueue.empty():
            data = q.get()
            queueLock.release()
            print ("%s processing %s" % (threadName, data))
        else:
            queueLock.release()
        time.sleep(1)

threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = queue.Queue(10)
threads = []
threadID = 1

# 创建新线程
for tName in threadList:
    thread = myThread(threadID, tName, workQueue)
    thread.start()
    threads.append(thread)
    threadID += 1

# 填充队列
queueLock.acquire()
for word in nameList:
    workQueue.put(word)
queueLock.release()

# 等待队列清空
while not workQueue.empty():
    pass

# 通知线程是时候退出
exitFlag = 1

# 等待所有线程完成
for t in threads:
    t.join()
print ("退出主线程")
实例
开启线程:Thread-1
开启线程:Thread-2
开启线程:Thread-3
Thread-3 processing One
Thread-3 processing Two
Thread-1 processing Three
Thread-2 processing Four
Thread-1 processing Five
退出线程:Thread-2
退出线程:Thread-1
退出线程:Thread-3
退出主线程
运行结果

简单应用

多线程ping操作,

ping.py:主程序

sfile:IP地址段列表文件

dfile:ping通的地址保存到该文件中

sfile 文件:存放IP网段列表,每行一个,内容如下

#!/usr/bin/env python
# -*- coding: utf-8 -*
# Created by YangYongming at 2018/11/25 12:26
# FileName: ping.py
import threading
import IPy
import os
import queue


class MyThread(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):  # 定义每个线程要运行的函数
        global aliveip
        while True:
            host = self.queue.get(timeout=2)  # 从队列中取Ip
            host = str(host)
            res = os.popen("ping -n 1 %s" % host)
            os.popen("exit
")
            if "TTL" in res.read():
                print("%s is Alive" % host)
                lock.acquire()
                with open("dfile", "a", encoding="utf-8") as f2:
                    f2.write(host + '
')
                aliveip.append(host)
                lock.release()
            self.queue.task_done()
            if self.queue.empty(): # 当队列为空时,终止该线程
                break


if __name__ == '__main__':
    threadlist = []
    lock = threading.RLock()
    queue = queue.Queue(50)  # 初始化一个队列, 容量50
    aliveip = []
    for i in range(40):  # 建立40个线程
        t = MyThread(queue)
        t.setDaemon(False)  # 主线程执行完成,等待所有的前台线程执行完毕,默认[等待]False
        t.start()  # 线程准备就绪,等待CPU调度
    with open("sfile", "r", encoding="utf-8") as f1:
        for line in f1.readlines():
            ip = IPy.IP(line)
            for x in ip:
                queue.put(x)  # 向队列里面放IP
    queue.join()
主程序

输出结果:

将可以ping通的地址保存到dfile文件中,内容如下:

原文地址:https://www.cnblogs.com/ming5218/p/8538825.html