第10章 Python 多线程技术简述

在学完第9章Selenium Grid之后,我们了解到Selenium Grid虽然可以分布式执行测试用例,但它并不支持并行。“分布式”和“并行”是两个完全不同的概念,分布式只负责将一个测试用例远程调用到不同的环境下执行;而并行强调“同时”执行多个任务。如何实现并行呢?可以利用编程语言提供的多线程(或多进程)技术来实现并行。
本章将学习Python的多线程与多进程技术,并将其应用到自动化测试用例的执行中。
在使用多线程之前,我们首先要理解进程和线程的概念。

什么是进程?
计算机程序只不过是磁盘中可执行的二进制(或其他类型)数据。它们只有在被读取到内存中、被操作系统调用的时候才开始它们的生命周期。进程是程序的一次执行,每个进程都有自己的地址空间、内存、数据栈,以及其他记录其运行轨迹的辅助数据。操作系统管理在其上面运行的所有进程,并为这些进程公平地分配时间。
什么是线程?
线程(有时被称为轻量级进程)与进程有些相似,不同的是,所有的线程都运行在同一个进程中,共享相同的运行环境。我们可以想象成是在主进程或“主线程”中并行运行的“迷你进程”。

10.1 单线程的时代

在单线程时代,当处理器需要处理多个任务时,必须对这些任务安排执行顺序,并按照这个顺序来执行任务。假如我们创建了两个任务;听音乐(music)和看电影(movie),在单线程中,我们只能按先后顺序来执行这两个任务。下面就通过一个例子来演示。

onethread.py

from time import sleep, ctime


# 听音乐任务
def music():
    print('I was listening to music! %s' % ctime())
    sleep(2)


# 看电影任务
def movie():
    print('I was at the movies! %s' % ctime())
    sleep(5)


if __name__ == '__main__':
    music()
    movie()
    print('all end:', ctime())

分别创建了两个任务music和movie,执行时间分别为2秒和5秒,通过sleep()方法设置休眠时间来模拟任务的运行时间。
运行结果。

D:Python38python.exe F:/python_project/学习验证/onethread.py
I was listening to music! Mon Sep 20 14:53:17 2021
I was at the movies! Mon Sep 20 14:53:19 2021
all end: Mon Sep 20 14:53:24 2021

Process finished with exit code 0

从运行结果可看到,程序从11分04秒开始播放music,11分06秒结束并开始movie的播放,最后,到11分11秒movie播放结束,总耗时7秒。
现在。我们对上面的例子做些调整,使它看起来更加有意思。
首先music和movie作为播放器,在用户使用时,可以根据用户的需求来播放任意的歌曲和影片,并且我们希望播放器能够提供循环播放的功能,尤其对于音乐播放器来说这个很重要,改造后的程序如下。

onethread2.py

from time import sleep, ctime


# 音乐播放器
def music(func, loop):
    for i in range(loop):
        print('I was listening to %s! %s' % (func, ctime()))
        sleep(2)


# 视频播放器
def movie(func, loop):
    for i in range(loop):
        print('I was at the %s! %s' % (func, ctime()))
        sleep(5)


if __name__ == '__main__':
    music('爱情买卖', 2)
    movie('阿凡达', 2)
    print('all end:', ctime())

给music()和movie()两个函数设置参数:播放文件和播放次数。而函数中通过for循环控制播放的次数。再次运行,结果如下。

D:Python38python.exe F:/python_project/学习验证/onethread2.py
I was listening to 爱情买卖! Mon Sep 20 14:56:34 2021
I was listening to 爱情买卖! Mon Sep 20 14:56:36 2021
I was at the 阿凡达! Mon Sep 20 14:56:38 2021
I was at the 阿凡达! Mon Sep 20 14:56:43 2021
all end: Mon Sep 20 14:56:48 2021

Process finished with exit code 0

从运行结果可以看到,程序从42分36秒开始播放music,42分40秒music两轮播放结束并开始播放movie;42分50秒两个任务结束,最终总耗时14秒。

10.2 多线程技术

Python通过两个标准库thread和threading提供对线程的支持。thread提供了低级别的、原始的线程以及一个简单的锁。threading基于Java的线程模型设计。锁(Lock)和条件变量(Condition)在Java中是对象的基本行为(每一个对象都自带了锁和条件变量),而在Python中则是独立的对象。

10.2.1 threading模块

我们应该避免使用thread模块,原因是它不支持守护线程。当主线程退出时,所有的子线程不管它们是否还在工作,都会被强行退出。有时我们并不希望发生这种行为,这时就引入了守护线程的概念。threading模块支持守护线程,所以,我们直接使用threading来改进上面的例子。
threads.py

from time import sleep, ctime
import threading

# 音乐播放器
def music(func, loop):
    for i in range(loop):
        print("I was listening to %s! %s" % (func, ctime()))
        sleep(2)


# 视频播放器
def movie(func, loop):
    for i in range(loop):
        print("I was at the %s! %s" % (func, ctime()))
        sleep(5)

# 创建线程数组
threads = []

# 创建线程t1,并添加到线程数组
t1 = threading.Thread(target=music, args=('爱情买卖',2))
threads.append(t1)

# 创建线程t2,并添加到线程数组
t2 = threading.Thread(target=movie, args=('阿凡达', 2))
threads.append(t2)

if __name__ == '__main__':
    # 启动线程
    for t in threads:
        t.start()
    # 守护线程
    for t in threads:
        t.join()
    print('all end: %s' % ctime())

通过for循环遍历threads数组中所装载的线程;start()开始线程活动,join()等待线程终止。如果不使用join()方法对每个线程做等待终止,那么在线程运行的过程中可能会去执行最后的打印“all end:..”。

class threading.Thread()方法说明:

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={})
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is t he callable obj ect t o be invoked by t he run() m ethod. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.
args is the argument tuple for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If the subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

运行结果。

D:Python38python.exe F:/python_project/学习验证/threads.py
I was listening to 爱情买卖! Mon Sep 20 14:28:52 2021
I was at the 阿凡达! Mon Sep 20 14:28:52 2021
I was listening to 爱情买卖! Mon Sep 20 14:28:54 2021
I was at the 阿凡达! Mon Sep 20 14:28:57 2021
all end: Mon Sep 20 14:29:02 2021

Process finished with exit code 0

从上面的运行结果可以看出,两个子线程(music、movie)同时启动于11分24秒,直到所有线程结束于11分34秒,总耗时10秒。movie的两次电影循环共需要10秒,music的歌曲循环需要4秒,从执行结果可以看出两个线程达到了并行工作。

10.2.2 优化线程的创建

从上面例子中发现线程的创建是颇为麻烦的,每创建一个线程都需要创建一个t(t1、t2、...),当创建的线程较多时这样极其不方便。下面对例子进行改进。

player.py

from time import sleep, ctime
import threading


# 创建超级播放器
def super_player(file_, time):
    for i in range(2):
        print('Start playing: %s! %s' % (file_, ctime()))
        sleep(time)


# 播放的文件与播放时长
lists = {'爱情买卖.mp3': 3, '阿凡达.mp4': 5, '我和你.mp3': 4}

threads = []
files = range(len(lists))

# 创建线程
for file_, time in lists.items():
    t = threading.Thread(target=super_player, args=(file_, time))
    threads.append(t)

if __name__ == '__main__':
    # 启动线程
    for t in files:
        threads[t].start()
    for t in files:
        threads[t].join()

    print('end: %s' % ctime())

有趣的是我们对播放器的功能也做了增强。首先,创建了一个super_player()函数,这个函数可以接收播放文件和播放时长,可以播放任何文件。
然后,我们创建了一个lists字典用于存放播放文件名与时长,通过for循环读取字典,并调用super_player()函数创建字典,接着将创建的字典都追加到threads数组中。
最后,通过循环启动线程数组threads中的线程,运行结果如下。

D:Python38python.exe F:/python_project/学习验证/player.py
Start playing: 爱情买卖.mp3! Mon Sep 20 15:29:55 2021
Start playing: 阿凡达.mp4! Mon Sep 20 15:29:55 2021
Start playing: 我和你.mp3! Mon Sep 20 15:29:55 2021
Start playing: 爱情买卖.mp3! Mon Sep 20 15:29:58 2021
Start playing: 我和你.mp3! Mon Sep 20 15:29:59 2021
Start playing: 阿凡达.mp4! Mon Sep 20 15:30:00 2021
end: Mon Sep 20 15:30:05 2021

Process finished with exit code 0

10.2.3 创建线程类

除直接使用Python所提供的线程类外,我们还可以根据需求自定义自己的线程类。
mythread.py

import threading
from time import sleep, ctime


# 创建线程类
class MyThread(threading.Thread):

    def __init__(self, func, args, name=''):
        threading.Thread.__init__(self)
        self.func = func
        self.args = args
        self.name = name

    def run(self):
        self.func(*self.args)


def super_play(file_, time):
    for i in range(2):
        print('Start playing: %s! %s' % (file_, ctime()))
        sleep(time)


lists = {'爱情买卖.mp3': 3, '阿凡达.mp4': 5, '我和你.mp3': 4}

threads = []
files = range(len(lists))

for file_, time in lists.items():
    t = MyThread(super_play, (file_, time), super_play.__name__)
    threads.append(t)

if __name__ == '__main__':
    # 启动线程
    for i in files:
        threads[i].start()
    for i in files:
        threads[i].join()
    print('end:%s' % ctime())

创建MyThread类,用于继承threading.Thread类。
__init__()类的初始化方法对func、args、name等参数进行初始化。
在Python 2中,apply(func [, args [, kwargs ]])函数的作用是当函数参数已经存在于一个元组或字典中时,apply()间接地调用函数。args是一个包含将要提供给函数的按位置传递的参数的元组。如果省略了args,则任何参数都不会被传递,kwargs是一个包含关键字参数的字典。
Python 3中已经不再支持apply()函数,所以将apply(self.func,self.args),修改为self.func(*self.args)

D:Python38python.exe F:/python_project/学习验证/mythread.py
Start playing: 爱情买卖.mp3! Mon Sep 20 15:34:05 2021
Start playing: 阿凡达.mp4! Mon Sep 20 15:34:05 2021
Start playing: 我和你.mp3! Mon Sep 20 15:34:05 2021
Start playing: 爱情买卖.mp3! Mon Sep 20 15:34:08 2021
Start playing: 我和你.mp3! Mon Sep 20 15:34:09 2021
Start playing: 阿凡达.mp4! Mon Sep 20 15:34:10 2021
end:Mon Sep 20 15:34:15 2021

Process finished with exit code 0

最后,线程的创建与启动与前面的例子相同,唯一的区别是创建线程使用的是MyThread类,线程的入参形式也有所改变。

10.3 多进程技术

10.3.1 multiprocessing模块

多进程multiprocessing模块的使用与多线程threading模块的用法类似。multiprocessing提供了本地和远程的并发性,有效地通过全局解释锁(Global Interceptor Lock, GIL)来使用进程(而不是线程)。由于GIL的存在,在CPU密集型的程序当中,使用多线程并不能有效地利用多核CPU的优势,因为一个解释器在同一时刻只会有一个线程在执行。所以,multiprocessing模块可以充分利用硬件的多处理器来进行工作。它支持UNIX和Windows系统上的运行。
修改多线程的例子,将threading模块中的Thread方法替换为multiprocessing模块的Process就实现了多进程。
process.py

from time import sleep, ctime
import multiprocessing
    
    
def super_player(file_, time):
    for i in range(2):
        print('Start playing: %s! %s' %(file_, ctime()))
        sleep(time)
    
lists = {'爱情买卖.mp3':3,'阿凡达.mp4':5,'我和你.mp3':4}
    
threads = []
files = range(len(lists))
    
# 创建进程
for file_, time in lists.items():
    t = multiprocessing.Process(target=super_player, args=(file_, time))
    threads.append(t)
    
if __name__ == '__main__':
    # 启动进程
    for t in files:
        threads[t].start()
    for t in files:
        threads[t].join()
    print('end:%s' % ctime())

从上面的实例中可以看到,多进程的用法几乎与多线程一样。

我们利用Process对象来创建一个进程。Process对象与Thread对象的用法相同,也有start()、 run()、 join()等方法。
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})
target表示调用对象,args表示调用对象的位置参数元组,kwargs表示调用对象的字典,
name为别名,Group实际上不使用。

扩展阅读
在UnixLinux上面创建新的进程使用的是fork。
一个进程,包括代码、数据和分配给进程的资源。fork()函数通过系统调用创建一个与原来进程几乎完全相同的进程,也就是两个进程可以做完全相同的事情,但如果初始参数或者传入的变量不同,则两个进程也可以做不同的事。
这意味着子进程开始执行的时候具有与父进程相同的全部内容。请记住这点,这是我们讨论基于继承的对象共享的基础。所谓基于继承的对象共享,是指在创建子进程之前由父进程初始化的某些对象可以在子进程当中直接访问到。在Windows平台上,因为没有fork语义的系统调用,所以基于继承的共享对象比UnixLinux有更多的限制,最主要的体现就是要求Process的__init__当中的参数必须可以Pickle。
但是,并不是所有的对象都可以通过继承来共享,只有multiprocessing库当中的某些对象才可以。例如Queue、同步对象、共享变量、Manager,等等。
在一个multiprocessing库的典型使用场景下,所有的子进程都是由一个父进程启动起来的,这个父进程称为master进程。这个父进程非常重要,它会管理一系列的对象状态,一旦这个进程退出,子进程很可能会处于一个很不稳定的状态,因为它们共享的状态也许已经被损坏掉了。因此,这个进程最好尽可能做最少的事情,以便保持其稳定性。

10.3.2 Pipe和Queue

multiprocessing提供了threading包中没有的IPC(进程间通信),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式(因为它们占据的不是用户进程的资源)。
multiprocessing包中有Pipe类和Queue类来分别支持这两种IPC机制。Pipe和Queue可以用来传送常见的对象。
① Pip可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道(默认为双向)。一个进程从pipe一端输入对象,然后被pipe另一端的进程接收。单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。

pipe.py

import multiprocessing
    
    
def proc1(pipe):
    pipe.send('hello')
    print('proc1 rec:', pipe.recv())
    
    
def proc2(pipe):
    print('proc2 rec:', pipe.recv())
    pipe.send('hello, too')
    
if __name__ == '__main__':
    multiprocessing.freeze_support()
    pipe = multiprocessing.Pipe()
    
    p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
    p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
    
    p1.start()
    p2.start()
    p1.join()
    p2.join()
View Code

这里的pipe是双向的。pipe对象建立的时候,返回一个含有两个元素的表,每个元素代表pipe的一端(Connection对象)。我们对pipe的某一端调用send()方法来传送对象,在另一端使用recv()来接收。

② Queue类与Pipe相类似,都是先进先出结构。但Queue类允许多个进程放入,多个进程从队列取出对象。Queue类使用Queue(maxsize)创建,maxsize表示队列中可以存放对象的最大数量。
queue.py

import multiprocessing
import os, time
    
    
def inputQ(queue):
    info = str(os.getpid()) + '(put):' + str(time.time())
    queue.put(info)
    
    
def outputQ(queue, lock):
    info = queue.get()
    lock.acquire()
    print((str(os.getpid()) + '(get):' + info))
    lock.release()
    
if __name__ == '__main__':
    
    record1 = []
    record2 = []
    lock = multiprocessing.Lock()    # 加锁,为防止散乱的打印
    queue = multiprocessing.Queue(3)
    
    for i in range(10):
        process = multiprocessing.Process(target=inputQ,args=(queue,))
        process.start()
        record1.append(process)
    
    for i in range(10):
        process = multiprocessing.Process(target=outputQ,args=(queue,lock))
        process.start()
        record2.append(process)
    
    for p in record1:
        p.join()
    
    queue.close()  # 没有更多的对象进来,关闭queue
    
    for p in record2:
        p.join()
View Code

运行结果:
Python Shell

===================== RESTART: D:/thread_test/queue.py ======================
784(get):4988(put):1445747594.7366996
5460(get):2344(put):1445747594.7679493
4752(get):2648(put):1445747594.83045
9172(get):10964(put):1445747594.83045
9660(get):10888(put):1445747594.8460755
6300(get):5292(put):1445747594.9710765
7232(get):6960(put):1445747595.002327
11052(get):7992(put):1445747595.002327
5384(get):260(put):1445747595.0179522
8448(get):6204(put):1445747595.0179522

10.4 应用于自动化测试

因为多进程与多线程技术在编程语言中属于比较高级的应用,对于初学者来说理解起来会有一定的难度,所以我们运用多个实例循序渐进地进行了讲解。下面就将其与自动化测试用例执行结合起来,从而节省用例的总体运行时间。

10.4.1 多线程执行测试用例

这里同样以百度搜索为例,通过不同的浏览器来启动不同的线程。
baidu_thread.py

from threading  import Thread
from selenium import webdriver
from time import ctime, sleep
    
    
# 测试用例
def test_baidu(browser, search):
    print('start:%s' % ctime())
    print('browser:%s ,' % browser)
    if browser == "ie":
        driver = webdriver.Ie()
    elif browser == "chrome":
        driver = webdriver.Chrome()
    elif browser == "ff":
        driver = webdriver.Firefox()
    else:
        print("browser参数有误,只能为ie、ff、chrome")
    
    driver.get('http://www.baidu.com')
    driver.find_element_by_id("kw").send_keys(search)
    driver.find_element_by_id("su").click()
    sleep(2)
    driver.quit()
    
if __name__ == '__main__':
    # 启动参数(指定浏览器与百度搜索内容)
    lists = {'chrome':'threading', 'ie': 'webdriver', 'ff':'python'}
    
    threads = []
    files = range(len(lists))
    # 创建线程
    for browser, search in lists.items():
        t = Thread(target=test_baidu, args=(browser, search))
        threads.append(t)
    
    # 启动线程
    for t in files:
        threads[t].start()
    for t in files:
        threads[t].join()
    
    print('end:%s' % ctime())
View Code

创建lists字典,对浏览器与搜索的内容进行参数化。通过多线程来运行test_baidu()的测试用例,在执行用例前使用多重if来判断通过哪个浏览器运行测试用例,并通过百度搜索相应的关键字。

10.4.2 多线程分布式执行测试用例

Selenium Grid只是提供多系统、多浏览器的执行环境,Selenium Grid本身并不提供并行的执行测试用例,这个我们在前面已经反复强调。下面就通过演示使用多线程技术结合Selenium Grid实现分布式并行地执行测试用例。
启动Selenium Server
在本机打开两个命令提示符窗口。
本机启动一个主hub和一个node节点(端口号别分为4444和5555),本机IP地址为:172.16.10.66。

C:selenium>java -jarselenium-server-standalone-2.47.0.jar -role hub
C:selenium>java -jarselenium-server-standalone-2.47.0.jar -role node -port 5555

启动一个远程node(设置端口号为6666),IP地址为:172.16.10.34。

fnngj@fnngj-VirtualBox:~/selenium$ java -jars elenium-server-standalone-2.47.0ar -role node -port 6666 -hub http://172.16.10.66:4444/grid/register

运行测试脚本。
grid_thread.py

from threading import Thread
from selenium import webdriver
from time import sleep, ctime
    
    
# 测试用例
def test_baidu(host,browser):
    print('start:%s' % ctime())
    print(host, browser)
    dc = {'browserName': browser}
    driver = webdriver.Remote(command_executor=host,
                          desired_capabilities=dc)
    driver.get('http://www.baidu.com')
    driver.find_element_by_id("kw").send_keys(browser)
    driver.find_element_by_id("su").click()
    driver.close()
    
if __name__ == '__main__':
    # 启动参数(指定运行主机与浏览器)
    lists = {'http://127.0.0.1:4444/wd/hub': 'chrome',
             'http://127.0.0.1:5555/wd/hub': 'internet explorer',
             'http://172.16.10.34:6666/wd/hub': 'firefox',  # 远程节点
              }
    threads = []
    files = range(len(lists))
    
    # 创建线程
    for host, browser in lists.items():
        t = Thread(target=test_baidu, args=(host, browser))
        threads.append(t)
    
    # 启动线程
    for i in files:
        threads[i].start()
    for i in files:
        threads[i].join()
    
    print('end:%s' % ctime())

运行结果如下。
Python Shell

===================== RESTART: D:/test/grid_thread.py ======================
start:Sun Jul 19 13:18:25 2015
http://127.0.0.1:5555/wd/hub internet explorer
start:Sun Jul 19 13:18:25 2015
http://127.0.0.1:6666/wd/hub firefox
start:Sun Jul 19 13:18:25 2015
http://127.0.0.1:4444/wd/hub chrome
end:Sun Jul 19 13:18:40 2015
与前一个例子类似,只是这次多线程根据lists字典中节点与浏览器来启动线程数,test_baidu()测试用例根据节点与浏览器来参数化Remote(),从而在不同的节点上运行测试用例。

本章小结

本章我们使用了大量的实例来介绍Python的多线程与多进程技术。在实际应用中,多线程与多进程是非常有用的技术,尤其对于性能要求比较高的应用,因此对于读者来说,理解并掌握多线程技术可有效提高编程能力。
最后比较遗憾的是,unittest单元测试框架本身并不支持多线程技术,它不能像Java的TestNG框架一样通过简单的配置就可以使用多线程技术执行测试用例。

部分内容来自于学习编程期间收集于网络的免费分享资源和工作后购买的付费内容。 如需获取教程配套的资源文件和一对一专属答疑支持,请加vx:kangmf24联系作者。
原文地址:https://www.cnblogs.com/MarlonKang/p/15314309.html