py自动化框架(二)

import unittest
import HTMLTestRunner
from BeautifulReport import BeautifulReport
class MyTest(unittest.TestCase):
def setUp(self):
#每条用例执行之前会执行setup
print('这是setup')
def tearDown(self):
#每条用例执行之后都会执行teardown
print('这是teardown')

@classmethod
def setUpClass(cls):
#这个类里面的用例执行之前,最先执行它,最前面
print('这是setupclass')

@classmethod
def tearDownClass(cls):
#这个类里面所有的用例执行完之后执行它,最后面
print('tearDownClass')


def test_reg(self):
'''注册'''
print('reg')
self.assertEqual(1,2,msg='token不对')
def test_login(self):
'''登录'''
print('login')
self.assertEqual(1,1)
def test_buy(self):
self.assertEqual(1,2,msg='购买失败')
def test_z(self):
self.assertIn(1,[1,23])
def test_assert(self):
res = False
self.assertFalse(res,)
print('test_assert')


#下面这一坨是产生不好看的报告
# f = open('report.html','wb')
# runner = HTMLTestRunner.HTMLTestRunner(f,title='nhytest',
# description='xxx接口测试')
# sutie = unittest.makeSuite(MyTest)#变成测试集合
# runner.run(sutie)


suite = unittest.makeSuite(MyTest)#变成测试集合
report = BeautifulReport(suite)
report.report(filename='bfreport.html',description='接口测试报告')

# 测试用例
# 测试集合 testsuite 多个用例放在一起
# testrunner 运行用例的
# testloader 查找测试用例



import unittest
from parameterized import parameterized
def login(username,password):
print(username,password)
print('=============')
return 1

class MyTest(unittest.TestCase):
@parameterized.expand(
[
['xiaodong','123','success'],
['yangfan','456','fail'],
['hailong','1273','success'],
['liurongxin','1273','success'],
]
)
def test_login(self,username,passwd,check):
'正常登陆'
res = login(username,passwd)
self.assertEqual(check,res)

unittest.main()

自动化测试有两种驱动: 1、数据驱动 ,数据从excel/txt中来     

                                         2、代码驱动,有业务流程

atp只能实现数据驱动

utp能实现数据驱动和代码驱动

注册---登录---抽奖  接口测试(代码驱动)

test_cj.py

import unittest
from conf.setting import default_host
from core.my_requests import MyRequest
from core.tools import mysql,get_real_value
from core.tools import r as redis

class Cj(unittest.TestCase):

password = 'aA123456'
username = 'testhhh'

def reg(self):
'''注册接口'''
url = '/api/user/user_reg'
new_url = default_host + url
data = {'username':self.username,'pwd':self.password,
'cpwd':self.password}
r = MyRequest(new_url,data)
result = r.post()
self.assertEqual(1,result.get('status'),msg='注册接口不通,%s'%result.get('data'))
#校验接口是不是通的
error_code = result.get('data').get('error_code')
self.assertEqual(0,error_code,msg='注册失败,%s'%result.get('data'))
sql ='select * from app_myuser where username = "%s";'%self.username
sql_res = mysql.execute_one(sql)#执行sql
self.assertIsNotNone(sql_res) #判断数据库返回的是否为空

def login(self):
'''登录接口'''
url = default_host + '/api/user/login'
data = {'username':self.username,'passwd':self.password}
r = MyRequest(url,data)#发请求
result = r.post()
self.assertEqual(1,result.get('status'),msg='登录接口不通,%s'%result.get('data'))
sign = get_real_value('sign',result)#从返回值里面取到sign的值
self.assertIsNotNone(sign,msg='登录失败:%s'%result)#校验sign 校验的msg都是在失败的时候才会打印
userid = get_real_value('userId',result)#从返回值里面取到userid的值
return userid,sign

def choujiang(self):
'''抽奖接口'''
url = default_host+'/api/product/choice'
userid,sign = self.login()
data = {'userid':userid,'sign':sign}
r = MyRequest(url,data)
result = r.get()
self.assertEqual(1, result.get('status'), msg='抽奖接口不通,%s' % result.get('data'))
redis_key ='choujiang:%s'%self.username
count = redis.get(redis_key) #操作redis,取key
self.assertEqual('1',count,'抽奖次数错误%s'%result) #redis中取出的count值是一个字符串
sql='select count(*) as cishu from app_record where user_id = %s ;'%userid
cishu = mysql.execute_one(sql).get('cishu') #执行sql execute_one(sql)返回的是一个字典,再从字典中取出cishu
# {'cishu':1}
self.assertEqual(1,cishu,'抽奖记录没有落到数据库里面!') #校验抽奖次数和数据库中是否对的上

def test_choujiang(self):
'''抽奖流程测试'''
self.reg()
self.choujiang()

@classmethod
def tearDownClass(cls):
##数据清除的工作 保证回归测试时 username能重复使用
sql='delete from app_myuser where username="%s" ;'%cls.username
mysql.execute_one(sql)
key='choujiang:%s'%cls.username
redis.delete(key)
print('测试数据清理完成。。')


# 有很多表要清除时
#1、先把以前的数据库备份
# mysqldump -uroot -p123456 -h192.168.1.13 db>/xxx/xx/a.sql
# 2、执行自动化case
# 3、把数据库恢复回来
# sql.excute('RENAME database main TO main_20181204;')
# sql.excute('create database main;')
# os.system('mysql -uroot -p123456 -h192.168.1.13 db>/xxx/xx/a.sql')

setting.py

import os
BAE_PATH = os.path.dirname(
os.path.dirname(os.path.abspath(__file__))
) #atp的目录

LOG_PATH = os.path.join(BAE_PATH,'logs') #log目录
CASE_PATH = os.path.join(BAE_PATH,'cases') #case目录
REPORT_PATH = os.path.join(BAE_PATH,'report') #report目录
SQL_PATH = os.path.join(BAE_PATH,'sql_file') #存放sql的目录
DATA_PATH = os.path.join(BAE_PATH,'data') #存参数化文件的地方


MAIL_INFO = {
'user':'xxx@qq.com',
'password':'rtcxxxqrdgjcd',
'host':'smtp.qq.com',
'smtp_ssl':True,#发件箱是qq邮箱的话,改成True
}

TO = ['lihui@meizu.com']

HOSTS = {
'QA':'http://api.nnzhp.cn',#测试环境
'DEV':'http://dev.nnzhp.cn',#开发环境
'PRE':'http://pre.nnzhp.cn' #预生产环境
}


mysql_info = {
'host':'118.24.3.40',
'port':3306,
'user':'besttest',
'password':'HK139bc',
'db':'main',
'charset':'utf8',
'autocommit':True
}
#mysql 配置信息
redis_info = {
'host': '118.24.3.40',
'password': 'HK139bc&*',
'port': 6379,
'db': 0,
'decode_responses': True
}

default_host = HOSTS.get('QA') #默认测试环境的地址

my_requests.py

import requests
import nnlog
import os
from conf.setting import LOG_PATH
class MyRequest:
log_file_name = os.path.join(LOG_PATH,'MyRequest.log')#日子文件名
time_out = 10 #请求超时时间
def __init__(self,url,data=None,headers=None,file=None):
self.url = url
self.data = data
self.headers = headers
self.file = file
def post(self):
try:
req = requests.post(self.url,data=self.data,headers=self.headers,
files=self.file,timeout=self.time_out)
except Exception as e:
res = {"status":0,"data":e.args} #0代表请求失败
else:
try:
res = {"status":1,"data":req.json()} #1代表返回的json
except Exception as e:
res = {"status":2,"data":req.text} #2代表返回不是json
log_str = 'url: %s 请求方式:post data:%s ,返回数据:%s'%(self.url,self.data,res)
self.write_log(log_str)
return res

def get(self):
try:
req = requests.get(self.url,params=self.data,headers=self.headers,timeout=self.time_out)
except Exception as e:
res = {"status":0,"data":e.args} #0代表请求失败
else:
try:
res = {"status":1,"data":req.json()} #1代表返回的json

except Exception as e:
res = {"status":2,"data":req.text} #2代表返回不是json
log_str = 'url: %s get请求 data:%s ,返回数据:%s'%(self.url,self.data,res)
self.write_log(log_str)
return res

@classmethod
def write_log(cls,content):
log = nnlog.Logger(cls.log_file_name)
log.debug(content)

op_data.py

import os
from conf.setting import mysql_info,SQL_PATH
from core.tools import mysql
import datetime

def get_mysql_info(): #从配置文件中取出数据库相关信息
user = mysql_info.get('user')
password = mysql_info.get('password')
host = mysql_info.get('host')
db = mysql_info.get('db')
port = mysql_info.get('port')
return user,password,host,db,port

def bak_db(): #备份数据库
user, password, host, db, port = get_mysql_info()
sql_filename = datetime.datetime.now().strftime('%Y%m%d%H%M%S')+'.sql' #sql文件名称
sql_filename = os.path.join(SQL_PATH,sql_filename)
command = 'mysqldump -u{user} -p{pwd} -P{port} -h{host} {db} > {file}'.format(
user=user,pwd=password,port=port,host=host,db=db,file=sql_filename
)#执行备份数据库命令 mysqldump 最好写成绝对路径(找到mysqldump命令的位置)
os.system(command) #os.system()执行操作系统命令
print('数据备份完成!')
return sql_filename #返回备份的数据库文件,恢复时还要用到

def recover_db(sql_filename):
user, password, host, db, port = get_mysql_info()
new_db = db+'_'+datetime.datetime.now().strftime('%Y%m%d%H%M%S')
sql='RENAME database %s TO %s; '%(db,new_db)#给原来的数据库改名
mysql.execute_one(sql)
print('数据库改名')
sql2='create database %s charset utf8;'%db
mysql.execute_one(sql2)
print('新的数据库已经创建%s'%db)
command = 'mysql -u{user} -p{pwd} -P{port} -h{host} {db} < {file}'.format(
user=user, pwd=password, port=port, host=host, db=db, file=sql_filename
) #恢复数据库
os.system(command)
print('数据库恢复完成!')

parse_param_file.py

from conf.setting import DATA_PATH
import os
import xlrd

def textFileToList(file,seq=','): #默认以逗号分隔
file=os.path.join(DATA_PATH,file)
print(file)
case_data = []
with open(file,encoding='utf-8') as f:
for line in f:
new_line = line.strip() #去掉空格 /n
if new_line: #可能存在空行,去掉空行
case_data.append(new_line.split(seq))
return case_data

def excelToList(file):
file = os.path.join(DATA_PATH, file)
workbook = xlrd.open_workbook(file)
sheet = workbook.sheet_by_index(0)

case_data = []
for i in range(1,sheet.nrows):
row_data = sheet.row_values(i)[4:8]
case_data.append(row_data)

return case_data

tools.py

import pymysql
from conf.setting import mysql_info,redis_info
import redis
import jsonpath
import datetime
import os,yagmail
from conf import setting

class MySQL:
def __init__(self):
self.conn = pymysql.connect(**mysql_info)
self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
#初始化的时候就连接数据库
def execute_many(self,sql):
self.cur.execute(sql)
res = self.cur.fetchall()
if res:
return res
def execute_one(self,sql):
self.cur.execute(sql)
res = self.cur.fetchone()
if res: #如果res不为NONE,再返回
return res

def __del__(self):
self.cur.close()
self.conn.close()
print('连接已经关闭')

def get_real_value(key,response):
#从字典里面获取key对应的value
res = jsonpath.jsonpath(response,'$..%s'%key)
#$..%s这个是jsonpath这个模块的用法
if res:
return res[0]

def get_redis():
return redis.Redis(**redis_info)

def make_today_dir():
#创建当天的文件夹,返回绝对路径
today = str(datetime.date.today())
abs_path = os.path.join(setting.REPORT_PATH,today)
#拼成当天的绝对路径
if os.path.exists(abs_path):
pass
else:
os.mkdir(abs_path)
return abs_path

def send_mail(content,file_path=None):
#发邮件,传入邮件正文和附件
m = yagmail.SMTP(**setting.MAIL_INFO,)
subject = '接口测试报告_%s'%str(datetime.datetime.today())
m.send(subject=subject,to=setting.TO,contents=content,attachments=file_path)


mysql = MySQL()

r = get_redis()

start.py

import unittest
from BeautifulReport import BeautifulReport as bf
from conf.setting import CASE_PATH
from core.tools import make_today_dir,send_mail
from core.op_data import bak_db,recover_db
import os
import datetime

content = '''
各位好!
本次测试结果:总共运行%s条用例,通过%s条,失败%s条。详细信息见附件。
'''

def run_case():
# sql_file = bak_db() #备份数据库函数
suite = unittest.TestSuite() #建测试集合
cases = unittest.defaultTestLoader.discover(CASE_PATH,'test*.py')
#去某个目录下找测试用例
for case in cases:
suite.addTest(case) #循环把每个文件里面的case加入到测试集合里面
report = bf(suite) #运行测试用例

path = make_today_dir() #创建今天的文件夹,存放报告
file_name = 'report_%s.html'%datetime.datetime.now().strftime('%H%M%S')#生成新的文件名
report.report(filename=file_name,description='接口测试',log_path=path)#生成报告

new_content = content%(report.success_count+report.failure_count,report.success_count,report.failure_count)
abs_path = os.path.join(path,file_name)
send_mail(new_content,abs_path)
# recover_db(sql_file)#恢复数据库

#产生报告
run_case()




原文地址:https://www.cnblogs.com/mengmeng1011/p/12984371.html