web端自动化——python多线程

Python通过两个标准库thread和threading提供对线程的支持。thread提供了低级别的、原始的线程以及一个简单的锁。threading基于Java的线程模型设计。

锁(Lock)条件变量(Condition)在Java中是对象的基本行为(每一个对象都自带了锁和条件变量),而在 Python中则是独立的对象。

注意:我们应该避免使用thread模块,原因是thread模块不支持守护线程。当主线程退出时,所有的子线程不管它们是否还在工作,都会被强行退出。

threading模块支持守护线程。

1、进程与线程的区别:

2、multiprocessing多进程模块

多进程multiprocessing模块的使用与多线程threading模块的用法类似。multiprocessing提供了本地和远程的并发性,有效地通过全局解释锁(Global InterceptorLock,GIL)来使用进程(而不是线程)。由于GIL的存在,在CPU密集型的程序当中,使用多线程并不能有效地利用多核CPU的优势,因为一个解释器在同一时刻只会有一个线程在执行。所以,multiprocessing模块可以充分利用硬件的多处理器来进行工作。它支持UNlX和Windows系统上的运行。

3、threading 多线程模块

threading模块支持守护线程。

4、Pipe和Queue

multiprocessing提供threading包中没有的IPC(进程间通信),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式(因为它们根据的不是用户进程的资源)。

multiprocessing包中有Pipe类和Queue类来分别支持这两种IPC机制。Pipe和Queue 可以用来传送常见的对象。

①    Pip可以是单向化(half-duplex),也可以是双向(duplex)。我们通过multiprocessing.Pipe (duplex=False)创建单向管道(默认为双向)。一个进程从pipe—端输入对象,然后被pipe 另一端的进程接收。单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。

Pipe.py

import multiprocessing

def proc1(pipe):

    pipe.send('hello')

    print('proc1 rec:', pipe.recv())

def proc2(pipe):

    print('proc2 rec:', pipe.recv())

    pipe.send('hello, too')

if __name__ == "__main__":

    multiprocessing.freeze_support()

    pipe = multiprocessing.Pipe()

    p1 = multiprocessing.Process(target=proc1,args=(pipe[0],))

    p2 = multiprocessing.Process(target=proc2,args=(pipe[1],))

    p1.start()

    p2.start()

    p1.join()

    p2.join()

这里的pipe是双向的。pipe对象建立的时候,返回一个含有两个元素的表,每个元素 代表pipe的一端(Connection对象)。我们对pipe的某一端调用send()方法来传送对象,在另一端使用recv()来接收。

②     Queue类与Pipe相类似,都是先进先出结构。但Queue类允许多个进程放入,多个进程从队列取出对象。Queue类使用Queue (maxsize)创建,maxsize表示队列中可以存放对象的最大数量。

queue.py

import multiprocessing

import os, time

def inputQ(queue):

    #getpid是获得当前进程的进程号。系统每开辟一个新进程就会为他分配一个进程号。

    info = str(os.getpid()) + '(put):'+str(time.time())

    #put方法用以插入数据到队列中

    queue.put(info)

def outputQ(queue, lock):

    #get方法可以从队列读取并且删除一个元素

    info = queue.get()

    lock.acquire()

    print((str(os.getpid()) + '(get):'+info))

    lock.release()

if __name__=='__main__':

    record1 =[]

    record2 =[]

    lock = multiprocessing.Lock()#加锁,为防止散乱的打印

    queue = multiprocessing.Queue(3)

    for i in range(10):

        process = multiprocessing.Process(target=inputQ,args=(queue,))

        process.start()

        record1.append(process)

    for i in range(10):

        process = multiprocessing.Process(target=outputQ,args=(queue,lock))

        process.start()

        record2.append(process)

    for p in record1:

        p.join()

    for p in record2:

        p.join()

    queue.close() #没有更多的对象进来,关闭queue

5、多线程分布式执行测试用例

Selenium Grid只是提供多系统、多浏览器的执行环境,Selenium Grid本身并不提供并行的执行测试用例,这个我们在前面己经反复强调。下面就通过演示使用多线程技术结合Selenium Grid实现分布式并行地执行测试用例。

启动 Selenium Server

在本机打开两个命令提示符窗口。

本机启动一个主hub和一个node节点(端口号别分为4444和5555),本机IP地址为: 172.16.101.239。

>java -jar selenium-server-standalone-2.47.0.jar -role hub

>java -jar selenium-server-standalone-2.47.0.jar -role node -port 5555

启动一个远程的node(设罝端口号为6666),IP地址假设为:172.16.101.238。

>java -jar selenium-server-standalone-2.47.jar -role node -port 6666 -hub http://172.16.101.239:4444/grid/register

运行测试脚本。

grid_thread.py

from threading import Thread

from selenium import webdriver

from time import sleep,ctime

#测试用例

def test_baidu(host,browser):

    print('start:%s' %ctime())

    print(host, browser)

    dc = {'browserName':browser}

    driver = webdriver.Remote(conunand_executor=host,

                              desired_capabilities=dc)

    driver.get('http://www.baidu.com')

    driver.find_element_by_id("kw").send_keys(browser)

    driver.find_element_by_id("su").click()

    sleep(2)

    driver.quit()

if __name__ =='__main__':   

#启动参数(指定运行主机与浏览器)

    lists = {'http://127.0.0.1:4444/wd/hub': 'chrome',

             'http://127.0.0.1:5555/wd/hub': 'internet explorer',

             'http://172.16.101.238:6666/wd/hub':'firefox', # 远程节点

             }

    threads =[]

    files = range(len(lists))

    #创建线程

    for host, browser in lists.items():

            t = Thread(target=test_baidu, args=(host, browser))

            threads.append(t)

    #启动线程

    for i in files:

        threads[i].start()

    for i in files:

        threads [i].join()

    print('end:%s' %ctime())

 6、多线程执行测试用例

from threading import Thread
from selenium import webdriver
from time import ctime,sleep
#测试用例
def test_baidu(browser,search):
print('start:%s' %ctime())
print('browser:%s ,' %browser)
if browser == "chrome":
driver = webdriver.Chrome()
elif browser == "ff":
driver = webdriver.Firefox()
else:
print('browser参数有误,只能为ff、chrome')
driver.get('http://www.baidu.com')
driver.find_element_by_id("kw").send_keys(search)
driver.find_element_by_id('su').click()
sleep(2)
driver.quit()
if __name__ == "__main__":
#启动参数(指浏览器与百度搜索内容)
lists ={'chrome':'threading', 'ff':'python'}
threads =[]
files = range(len(lists))
#创建线程
for browser, search in lists.items():
t = Thread(target=test_baidu,args=(browser,search))
threads.append(t)
#启动线程
for t in files:
threads[t].start()
for t in files:
threads[t].join()
print('end:%s' %ctime())
原文地址:https://www.cnblogs.com/linxiu-0925/p/10077413.html