Appium+Python3 并发测试实例

from selenium import webdriver
import yaml
from time import ctime

with open('desired_caps.yaml','r') as f:
    data = yaml.load(f)

desired_list=['127.0.0.1:62001','127.0.0.1:62025']
def appium_devices(udid,port):
    desired_caps={}
    desired_caps['platformName']=data['platformName']
    desired_caps['platformVerion']=data['platformVersion']
    desired_caps['deviceName']=data['deviceName'] #没有实际作用

    desired_caps['udid']=udid

    desired_caps['app']=data['app']
    desired_caps['noReset']=data['noReset']
    desired_caps['appPackage']=data['appPackage']
    desired_caps['appActivity']=data['appActivity']

    print('appium port:%s start run %s at %s'%(port,udid,ctime()))
    driver=webdriver.Remote('http://'+str(data['ip'])+':'+str(port)+'/wd/hub',desired_caps)
    return driver

if __name__ == '__main__':
    appium_devices(desired_list[0],4723)
    appium_devices(desired_list[1],4725)
import subprocess
from time import ctime

def appium_start(host,port):
    bootstrap_port=str(port+1)
    cmd='start /b appium -a '+ host +' -p '+ str(port) +' -bp '+ str(bootstrap_port)

    print('%s at %s'%(cmd,ctime()))
    subprocess.Popen(cmd,shell=True,stdout=open('./appium_log/'+str(port)+'.log','a'),stderr=subprocess.STDOUT)

if __name__ == '__main__':
    host='127.0.0.1'
    # port=4723
    # appium_start(host,port)

    #启动两个进程
    for i in range(2):
        port=4723+2*i
        appium_start(host,port)
import socket
import os

def check_port(host,port):
    '''检测端口是否被占用'''
    #创建socket对象
    s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)

    try:
        s.connect((host,port))
        s.shutdown(2)
    except OSError as msg:
        print('port %s is available'%port)
        print(msg)
        return True
    else:
        print('port %s is already use'%port)
        return False


def relase_port(port):
    '''释放指定端口'''
    #查找指定端口对应的pid
    cmd_find='netstat -aon | findstr %s'%port
    print(cmd_find)

    #返回命令执行后的结果
    result=os.popen(cmd_find).read()
    print(result)

    if str(port) and 'LISTENING' in result:
        #获取端口对应的pid进程
        i=result.index('LISTENING')
        start=i+len('LISTENING')+7
        end=result.index('
')
        pid=result[start,end]

        #关闭被占用端口的pid
        cmd_kill='taskkill -f -pid %s'%pid
        print(cmd_kill)
        os.popen(cmd_kill)
    else:
        print('port %s is available'%port)


if __name__ == '__main__':
    host='127.0.0.1'
    port=4725
    # check_port(host,port)
    relase_port(port)

实例:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2019-08-07 12:03
# @Author : zhouyang
# @File : appium_devices_sync.py
'''Appium并发测试
并发启动两台appium服务,再并发启动2台设备测试考研帮APP
2个appium服务,端口如下:
Appium服务端口:4723 ,bp端口:4724
Appium服务端口:4725 ,bp端口:4726
2台设备:
127.0.0.1:62001
127.0.0.1:62025
'''

from appium_sync.mulit_devices import appium_devices
from appium_sync.mulit_appium import appium_start
from appium_sync.check_port import *
import multiprocessing
from time import sleep

desired_list=['127.0.0.1:62001','127.0.0.1:62025']

def start_appium_action(host,port):
    if check_port(host,port):
        appium_start(host,port)
        return True
    else:
        print('appium %s is start fail'%port)
        return False

def start_device_action(udid,port):
    host='127.0.0.1'
    if start_appium_action(host,port):
        appium_devices(udid,port)
    else:
        relase_port(port)

def appium_start_sync():
    print('=======appium_start_sync=======')
    appium_process = []

    for i in range(2):
        host = '127.0.0.1'
        port = 4723 + 2 * i
        appium = multiprocessing.Process(target=start_appium_action, args=(host, port))
        appium_process.append(appium)

    for appium in appium_process:
        appium.start()
    for appium in appium_process:
        appium.join()

    sleep(5)

def devices_start_sync():
    print('======devices_start_sync=======')

    # 创建desired进程组
    desired_process = []
    # 加载进程
    for i in range(len(desired_process)):
        port = 4723 + 2 * i
        deired = multiprocessing.Process(target=start_device_action, args=(desired_process[i], port))
        desired_process.append(deired)

    # 开启进程
    for deired in desired_process:
        deired.start()
    # 关闭进程
    for deired in desired_process:
        deired.join()

    sleep(5)

if __name__ == '__main__':
    appium_start_sync()
    devices_start_sync()
原文地址:https://www.cnblogs.com/xiuxiu123456/p/11322459.html