七、Python-Unittest、文件解析、钉钉发送消息、自动化测试案例

(一)Unittest前置条件

1、所有用例运行之前,它会执行一次

@classmethod

def setUpClass(cls):

2、所有用例运行完之后,它会执行一次

@classmethod

def tearDownClass(cls):

3、每条测试用例运行之前都会先执行它

def setUp(self)

4、每条测试用例运行之后都会执行它

def tearDown(self)

5、实例如下:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @FileName  :用例前置条件.py
# @Time      :20201028 0028 22:22
# @Author    :Krystal
# @Desc      :Testcase

import unittest

class Test(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        # 所有用例运行之前,它会执行一次
        print('SetUpClass')

    @classmethod
    def tearDownClass(cls):
        # 所有用例运行完之后,它会执行一次
        print('tearDownClass')

    def tearDown(self):
        # 每条测试用例运行之后都会执行它
        print('tearDown')

    def setUp(self):
        # 每条测试用例运行之前都会先执行它
        print('setUp')

    def testa(self):
        print('testa')

    def testz(self):
        print('testz')

    def testb(self):
        print('testb')

    def testc(self):
        print('testc')


if __name__ == "__main__":
    unittest.main()

执行结果如下:

(二)配置文件:.ini、yaml、yml

1、ini 文件是Initialization File的缩写,即初始化文件。一般可变的东西尽可能的放在配置文件当中,易于编辑修改。

(1)配置文件:config.ini

[redis]
host = 127.0.0.1
password = 123456
port = 6379

[mysql]
host = 127.0.0.1
password = 123456
port = 6379
user = root
db = jxz

[server]
host = 127.0.0.1:8000

(2)解析配置文件

判断查找的节点存不存在:两种方法

# 1、判断节点存不存在: c.sections() # 里面所有的节点
        if node in c.sections():
             result = dict(c[node])
             return result

# 2、用try方法
        try:
            result = dict(c[node])
        except Exception as e:
            print("查找的节点不存在!")
        else:
            return result

(3)所有的代码:解析配置文件.py

import configparser
import os

# with open('config.ini',encoding='utf-8') as fr:
#     c = configparser.ConfigParser()
#     c.read_file(fr)
#     result = dict(c['server'])
#     print(result)

# 定义函数
def parse_ini(node,file_path='config.ini'):
    if not os.path.exists(file_path):
        raise Exception('ini文件不存在!')

    with open(file_path, encoding='utf-8') as fr:
        c = configparser.ConfigParser()
        c.read_file(fr)
        
        # 1、判断节点存不存在: c.sections() # 里面所有的节点
        if node in c.sections():
            result = dict(c[node])
            return result

        # 2、用try方法
        # try:
        #     result = dict(c[node])
        # except Exception as e:
        #     print("查找的节点不存在!")
        # else:
        #     return result


        result1= dict(c[node])

        print(result1)

if __name__ == "__main__":
    redis_info=parse_ini('redis')
    print(redis_info)

2、配置文件:yaml和yml:用于存储测试用例的数据

(1)data.yaml

name : 1
port : 3306
names :
      - body
      - eyes
      - hair

(2)解析yaml文件,需要提前安装模块:

pip install pyyaml

(3)解析yaml.py

import yaml

with open('data.yaml',encoding='utf-8') as fr:
    print(yaml.load(fr,Loader=yaml.SafeLoader))

if __name__ == "__main__":
    pass

注:若遇到如下warning警告:

YAMLLoadWarning: calling yaml.load() without Loader=... is deprecated, as the default Loader is unsafe. Please read https://msg.pyyaml.org/load for full details.
print(yaml.load(fr))

需添加下面的代码:

yaml.load(fr,Loader=yaml.SafeLoader

(4)定义函数的代码:

import yaml

def load_yaml(file_path):
    with open(file_path,encoding='utf-8') as fr:
        return yaml.load(fr,Loader=yaml.SafeLoader)

if __name__ == "__main__":
    result = load_yaml('data.yaml')
    print(result)

(三)钉钉发送消息:加签和验签

1、添加机器人:群设置-智能群助手-添加机器人

(1)添加机器人时,安全设置可选择:自定义关键词 | 加签 | IP地址

(2)安全设置选择:自定义关键词,利用postman进行发送消息,参考钉钉帮助文档:https://ding-doc.dingtalk.com/doc#/serverapi2/qf2nxq/d535db33

 钉钉群获得的消息如下截图:

2、安全设置选择加签方式,发送钉钉消息

(1)加签,签名:把timestamp+" "+密钥当做签名字符串,使用HmacSHA256算法计算签名,然后进行Base64 encode,最后再把签名参数再进行urlEncode,得到最终的签名(需要使用UTF-8字符集)。

(2)url编码进行加密和解密

import urllib
from urllib.parse import quote,unquote
print(quote('hello-123_你好'))
secret = 'hello-123_%E4%BD%A0%E5%A5%BD'
print(unquote(secret))

执行结果如下:

   (3)生成加密,使用postman工具进行发送消息

import time
import hashlib
import base64
import hmac
from urllib.parse import quote
timestamp = int(time.time() * 1000)
secret='SECf2f4947ab1160ccdb6040a88edb91be0427835976aa6dee6eee0969XXXXXXXX'

sign_before = '%s
%s' % (timestamp,secret)
hsha256 = hmac.new(secret.encode(),sign_before.encode(),hashlib.sha256)

# sha256 = hashlib.sha256(sign_before.encode())
sign_sha256 = hsha256.digest()
sign_b64 =base64.b64encode(sign_sha256)
sign = quote(sign_b64)
print(timestamp,sign)

执行结果如下:

 使用postman接口工具进行发送消息:

(4) 直接编码发送钉钉消息

import hmac
import time
import hashlib
import base64
from urllib.parse import quote
import requests

url = "https://oapi.dingtalk.com/robot/send?access_token=bd413385218506104a1903badd88016ba9ae9b7a1738bde9f7a3573aXXXXXXXX"
def create_sign():
    secret = 'SECf2f4947ab1160ccdb6040a88edb91be0427835976aa6dee6eee09691xxxxxxxx'
    timestamp = int(time.time() * 1000)
    sign_before = '%s
%s' % (timestamp,secret)
    hsha265 = hmac.new(secret.encode(),sign_before.encode(),hashlib.sha256)
    sign_sha256 = hsha265.digest()
    sign_b64 = base64.b64encode(sign_sha256)
    sign = quote(sign_b64)
    return {"timestamp":timestamp,"sign":sign}

def send_msg_dingding(msg="happy everyday!"):
    data = {
        "msgtype": "text",
        "text": {
            "content": msg
        },
        "at": {
            "atMobiles": [
                "1312007xxxx"
            ],
            "isAtAll": False
        }
    }

    sign = create_sign()
    r = requests.post(url,params = sign,json=data)
    print(r.json())

if __name__ == "__main__":
    send_msg_dingding("好好过好每一天!")

执行结果如下:

(四)写自动化测试用例:Rainbow

1、彩虹-主架构设计

 (1)分别创建目录,如下图:

图A:

图B:

(2)config_parse.py

import os
import configparser
import yaml
from common.log import Log
from config.settings import CONFIG_FILE,CASE_DATA_PATH

def parse_ini(node,file_path=CONFIG_FILE):
    if not os.path.exists(file_path):
        Log.error("配置文件不存在,文件路径{}",file_path)
        raise Exception('ini文件不存在!')

    with open(file_path, encoding='utf-8') as fr:
        c = configparser.ConfigParser()
        c.read_file(fr)

        if node in c.sections():
            result = dict(c[node])
            return result
        Log.warning("配置文件中[{}]节点不存在",node)

def load_yaml(file_name):
    file_path = os.path.join(CASE_DATA_PATH,file_name)
    if not os.path.exists(file_path):
        Log.error("用例数据文件不存在,文件路径{}",file_path)
        raise Exception('yaml文件不存在!')

    with open(file_path,encoding='utf-8') as fr:
        return yaml.load(fr,Loader = yaml.SaveLoader)
    
if __name__ == "__main__":
    parse_ini("mysql",'mysql.ini')

(3)log.py

from loguru import logger
import sys
from config.settings import LOG_FILE,LOG_LEVEL
class Log:
    logger.remove()
    fmt = '[{time}][{level}][{file.path}:line:{line}:function_name:{function}] || msg={message}'
    # level file function module time message
    logger.add(sys.stdout,level=LOG_LEVEL,format=fmt)
    logger.add(LOG_FILE,level=LOG_LEVEL,format=fmt,encoding='utf-8',enqueue=True,rotation='1 day',retention='10 days')
    debug = logger.debug
    info = logger.info
    warning = logger.warning
    error = logger.error


if __name__ == "__main__":
    Log.info("日志测试")

(4)operate_db.py

import pymysql
import traceback
from common.log import Log

class MySQL:
    def __init__(self,host,user,password,db,charset='utf8',autocommit=True,port=3306):
        port = int(port)
        self.conn = pymysql.connect(user=user,host=host,password=password,port=port,
                                    db=db,charset=charset,autocommit=autocommit)
        self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
        Log.info("开始连接mysql")

    def __del__(self):
        self.__close()

    def execute(self,sql):
        try:
            self.cursor.execute(sql)
        except Exception:
            Log.error('sql执行出错,sql语句是{}',sql)
            Log.error(traceback.format_exc())

    def fetchall(self,sql):
        self.execute(sql)
        return self.cursor.fetchall()

    def fetchone(self,sql):
        self.execute(sql)
        return self.cursor.fetchone()

    def bak_db(self):
        pass

    def __close(self):
        self.cursor.close()
        self.conn.close()

if __name__ == "__main__":
    pass

(5)config.ini

[mysql]
host=118.24.3.xx
user=jxz
password=123456
db=jxz
charset=utf8

[mysql2]
host=118.24.3.xxx
user=jxz
password=123456
db=jxz
charset=utf8
port =3306


[redis]
host=118.24.3.40
password=xxxx
port=6379

[dingding]
url = https://oapi.dingtalk.com/robot/send
secret = SECf2f4947ab1160xxxxxxxxxxedb91be0427835976aa6dee6eee096xxxxxxxxxx
access_token = bd4133852xxxxxxxxxxxx3badd88016ba9ae9b7a1738bde9f7a3573axxxxxxx
at = 1312007xxxx

[mail]
host=smap.qq.com
user=127xxxx070@qq.com
password=1962xxxxzh
to=krystal_xiao@126.com
asc=12xxxxx767@qq.com

(6)settings.py

import os

BASE_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

LOG_FILE = os.path.join(BASE_PATH,'logs','rainbow.log') # 日志文件

REPORT_PATH = os.path.join(BASE_PATH,'report') # 报告存放的目录

CASE_PATH = os.path.join(BASE_PATH,'biz','cases') # 测试用例的目录

CASE_DATA_PATH = os.path.join(BASE_PATH,'biz','data') # 测试数据的目录

CONFIG_FILE = os.path.join(BASE_PATH,'config','config.ini') # 配置文件的目录

LOG_LEVEL = 'INFO' # 默认日志级别



if __name__ == "__main__":
    pass

(7)biz.support.urls.py

from urllib.parse import urljoin
from common.config_parse import parse_ini


host = parse_ini('server').get('host')

class ServerUrl:
    login_url =  urljoin(host,'/api/user/login') #登录url
    register_url =  urljoin(host,'/api/user/user_reg') #注册url



if __name__ == "__main__":
    pass

2、彩虹-MySQL连接

(1)新建一个文件夹:

 (2)utils.py

import redis
import time
import hmac
import hashlib
import base64
from urllib.parse import quote
from common.config_parse import parse_ini
from common.operate_db import MySQL

ddconfig = parse_ini('dingding') # 取钉钉的配置文件
secret = ddconfig.get("secret")

mysql_conn_mapper = {} #{mysqlNone:Mysql,mysql2None:Mysql2}
redis_mapper = {}
def get_mysql(node='mysql',db=None):
    key = '%s%s'%(node,db)
    if not key in mysql_conn_mapper:
        mysql_info = parse_ini(node)
        if db:
            mysql_info['db']=db
        mysql = MySQL(**mysql_info)
        mysql_conn_mapper[key] = mysql
    else:
        mysql = mysql_conn_mapper[key]
    return mysql

def get_redis(node='redis',db=None):
    key = '%s%s'%(node,db)
    if not key in redis_mapper:
        redis_info = parse_ini(node)
        if db:
            redis_info['db'] = db
        r = redis.Redis(**redis_info)
        redis_mapper[key] = r
    else:
        r = redis_mapper[key]
    return r

def create_sign():
    secret = 'SECf2f4947ab1160ccdbxxxxa88edb91be0427835976aa6dee6eee096912xxxxxx'
    timestamp = int(time.time() * 1000)
    sign_before = '%s
%s' % (timestamp,secret)
    hsha265 = hmac.new(secret.encode(),sign_before.encode(),hashlib.sha256)
    sign_sha256 = hsha265.digest()
    sign_b64 = base64.b64encode(sign_sha256)
    sign = quote(sign_b64)
    return {"timestamp":timestamp,"sign":sign}


if __name__ == "__main__":
    c = get_mysql()

3、NbDict

(1)新建一个文件目录

 (2)custom_class.py

class NbDict(dict):

    def __getattr__(self, item): # {"login_time"}
        value = self.get(item)
        if type(value) == dict:
            value = NbDict(value)

        elif isinstance(value,list) or isinstance(value,tuple):
            value = list(value)
            for index,obj in enumerate(value):
                if isinstance(obj,dict):
                    value[index] = NbDict(obj)
        return value
if __name__ == "__main__":
    d = {"login_time":1}
    d1 = NbDict(d)
    print(d1.login_time2)

4、彩虹-封装http请求类

(2)新建一个文件目录:

(2)http_request.py

import requests
import traceback
from common.log import Log
from common.utils import create_sign
from common.custom_class import NbDict


class Requests:
    def __init__(self,url,params=None,data=None,headers=None,json=None,files=None):
        self.url = url
        self.params = params
        self.data = data
        self.headers = headers
        self.json = json
        self.files = files

    def _get_response(self):
        try:
            result = self.req.json()
        except Exception as e:
            return self.req.text
        else:
            result = NbDict(result)
        return result

    def get(self):
        Log.info("开始发送get请求")
        Log.info("url:【{}】,params:【{}】,headers:{}",self.url,self.params,self.headers)
        try:
            self.req = requests.get(self.url,params=self.params,headers=self.headers,verify=False)
        except Exception as e:
            Log.error("http请求发送错误,错误信息:{}",traceback.format_exc())
            raise Exception("接口请求不通")
        else:
            return self._get_response()

    def post(self):
        Log.info("开始发送post请求")
        Log.info("url:【{}】,params:【{}】,headers:{},data:{},json:{},files:{}",
                 self.url, self.params, self.headers,self.data,self.json,self.files)
        try:
            self.req = requests.post(self.url,params=self.params,
                          data=self.data,json=self.json,
                          files=self.files, # {"key":open("f.py",'rb')}
                          headers=self.headers,verify=False)
        except Exception as e:
            Log.error("http请求发送错误,错误信息:{}", traceback.format_exc())
            raise Exception("接口请求不通")
        else:
            return self._get_response()

if __name__ == "__main__":
    url = "https://oapi.dingtalk.com/robot/send?access_token=bd41xxxxxxxxxxx104a190xxxxx88016ba9ae9b7a1738bde9f7a3573axxxxxxxx"
    sign = create_sign()
    msg = "Good Evening!"
    data = {
        "msgtype": "text",
        "text": {
            "content": msg
        },
        "at": {
            "atMobiles": [
                "13xxxxx1212"
            ],
            "isAtAll": False
        }
    }
    r = Requests(url,params=sign,json=data)
    result = r.post()
    print(result)

5、彩虹-封装发送消息的方法

(1)新建一个目录

(2) 代码如下:send_msg.py

import yamail
from common.http_request import Requests
from common.utils import parse_ini,create_sign
ddconfig = parse_ini('dingding') # 取钉钉的配置信息
mail_config =parse_ini('mail') # 取邮件的配置信息
url = ddconfig.get('url') # 钉钉l
access_token = ddconfig.get('access_token') # access_token
at= ddconfig.get('at','').split(',') # 钉钉发送消息的时候at给谁

def send_dingding(msg):
    data = {
        "msgtype": "text",
        "text": {
            "content": msg
        },
        "at": {
            "atMobiles": at,
            "isAtAll": False
        }
    }
    sign = create_sign()
    sign['access_token'] = access_token
    r = Requests(url,params=sign,json=data)
    r.post()

def send_mail(subject,contents,attachments=None):
    smtp = yamail.SMTP(
        host=mail_config.get("host"),
        user = mail_config.get("user"),
        password =mail_config.get("password")

    )
    smtp.sent(to=mail_config.get("to",'').split(','), # 发送给谁
            subject = subject, # 邮件主题
            DD =mail_config.get("xxx",'').split(','), # 抄送
            contents=contents, # 邮件正文
            attachments=attachments #附件,如果是多个附件,写list
    )
    smtp.close()

if __name__ == "__main__":
    send_dingding("good evening,guys!")

6、彩虹-完成

(1)在flow文件夹下面新建一个user.py文件

 代码如下:user.py

from common.http_request import Requests
from biz.support.urls import ServerUrl

class UserRequest:

    @classmethod
    def login(cls,username,password):
        '''
        调用登录接口的
        :param username: 用户名
        :param password: 密码
        :return:
        '''
        data = {
            'username':username,
            'passwd':password
        }
        req = Requests(ServerUrl.login_url,data=data)
        return req.post()

    @classmethod
    def register(cls,username,password,cpassword):
        '''
        注册
        :param username: 用户名
        :param password: 密码
        :param cpassword: 确认密码
        :return:
        '''
        data = {
            'username': username,
            'pwd': password,
            'cpwd':cpassword
        }
        req = Requests(ServerUrl.register_url,data=data)
        return req.post()

if __name__ == "__main__":
    result = UserRequest.login('niuhanyang','aAxxxxxx')
    print(result)

执行结果如下:

(2)在data文件夹下面新建一个login_data.yaml文件:

 yaml文件信息如下:

username : niuhanyang
password : xxxxxxx

(3)在cases文件夹下新建3个文件,分别为:base_case.py、test_login.py、test_open_acc.py

代码如下:

base_case.py

import unittest
from common.config_parse import load_yaml

class BaseCase(unittest.TestCase):
    data_file_name = None

    @property
    def file_data(self):
        data = load_yaml(self.data_file_name)
        return data

    @classmethod
    def get_token(cls,username):
        pass


if __name__ == "__main__":
    pass

test_login.py:

import unittest
from biz.flow.user import UserRequest
from common.utils import get_redis,get_mysql
from common.config_parse import load_yaml
from biz.cases.base_case import BaseCase

class TestLogin(BaseCase):
    '''登录接口测试用例'''
    data_file_name = 'login_data.yaml'

    @classmethod
    def setUpClass(cls):
        cls.redis = get_redis()
        cls.mysql = get_mysql()
        # cls.file_data = load_yaml('login_data.yaml') 

    def test_normal(self):
        '''正常登录'''
        username = self.file_data.get('username')
        password = self.file_data.get('password')

        ret = UserRequest.login(username,password)
        self.assertEqual(0,ret.error_code,'返回的错误码不是0')
        self.assertIsNotNone(ret.login_info.login_time,msg='logintime为空')
        redis_key = 'session:%s' % username
        sessionid = self.redis.get(redis_key)
        sql = 'select id from app_myuser where username = "%s";' % username
        db_result = self.mysql.fetchone(sql)
        user_id = db_result.get('id')
        self.assertEqual(sessionid,ret.login_info.sign,msg="返回的session和Redis中的不一致")
        self.assertEqual(user_id,ret.login_info.userId,msg="返回的userId和数据库中的不一致")



if __name__ == "__main__":
    pass

test_open_acc.py:

import unittest
from common.config_parse import load_yaml

class BaseCase(unittest.TestCase):
    data_file_name = None

    @property
    def file_data(self):
        data = load_yaml(self.data_file_name)
        return data

    @classmethod
    def get_token(cls,username):
        pass


if __name__ == "__main__":
    pass

(4)执行run.py文件

import unittest
import os,time,sys

BASE_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.index(0,BASE_PATH)

from config.settings import CASE_PATH,REPORT_PATH,dd_template,mail_template
from common import send_msg
from common.HTMLTestRunner import HTMLTestRunner

def run():
    test_suite = unittest.defaultTestLoader.discover(CASE_PATH,'test*.py')
    file_name = 'report_%s.html' % time.strftime('%Y%m%d%H%M%S')
    file_abs = os.path.join(REPORT_PATH,file_name)
    with open(file_abs,'wb') as fw:
        runner = HTMLTestRunner(stream=fw,title='测试报告标题',description='描述')
        case_result = runner.run(test_suite)
        all_count = case_result.failure_count + case_result.success_count
        dd_msg = dd_template % (all_count,case_result.success_count,case_result.failure_count)
        mail_msg = mail_template % (all_count, case_result.success_count, case_result.failure_count)
        send_msg.send_dingding(dd_msg)
        subject = '天马座自动化测试报告-%s' % time.strftime('%Y-%m-%d %H:%M:%S')
        send_msg.send_mail(subject,mail_msg,file_abs)

if __name__ == '__main__':
    run()

最后的执行结果:

 7、自动化测试用例流程:

温故而知新
原文地址:https://www.cnblogs.com/krystal-xiao/p/13894163.html