Locust性能测试 --headless 模式下在 Grafana 展示聚合报告

locust版本:1.1.1

参考地址:https://www.jianshu.com/p/c6802a88beaf

locust 性能脚本  coupon_test.py

from locust import task, between, tag
from locust.contrib.fasthttp import FastHttpUser
from public.common import setup_request, store, coupon # 参数化函数、加密处理
from stats.listener import * # hook函数


class MyUser(FastHttpUser):
    host = ""
    wait_time = between(0, 0)

    @tag("getCustomerCouponList")
    @task
    def get_customer_coupon_list(self):
        data = {"status": 0, "pageSize": 15, "pageIndex": 1}
        req = {"body": data, "params": {}, "headers": {}}
        req = setup_request(req, self.host)
        with self.client.post("/api/coupons/app/getCustomerCouponList", headers=req["headers"],
                              json=req["body"], name="/api/coupons/app/getCustomerCouponList", catch_response=True) as res:
            try:
                response = res.json()
            except Exception as e:
                logger.error("=======/api/coupons/app/getCustomerCouponList=== Exception ====>>>> {} >>>> {}".format(res.text, e))
                res.failure("json error!")
                return
            try:
                if response["success"] and response["message"] == "请求成功":
                    res.success()
                else:
                    logger.error("=======/api/coupons/app/getCustomerCouponList=== 断言 ====>>>> {}".format(response))
                    res.failure("断言失败!")
            except KeyError as e:
                logger.error("=======/api/coupons/app/getCustomerCouponList=== KeyError ====>>>> {} >>>> {}".format(res.text, e))
                res.failure("key error!")
  ......

--headless模式执行命令

locust -f testcasescouponcoupon_test.py --headless -u 10 -r 3 -t 1m  -H https://xxxxx.com --logfile=loglocust.log --loglevel=INFO 1>log
un.log 2>&1 --master

生成run.log文件格式

读取日志文件并写入 influxdb 数据库


#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020-07-23 10:32
# @Author : lixiaofeng
# @Site :
# @File : helper.py
# @Software: PyCharm
import time
import osimport jsonimport re
import platform
from io import StringIO
from public.connect_influxdb import TestInflux

def read_performance(t):
    """
    读取日志中的性能数据
    :param t: 时间 time.time()
    :return:
    """
    t1 = h.getvalue()
    if t1 and t - float(t1[:18]) < 5:  # 5秒写入一次数据库
        return
    h.seek(0)
    h.write(str(t))
    performance_path = os.path.join(BASE_DIR, "log" + pattern + "run.log")
    with io.open(performance_path) as f:
        data_list = f.readlines()
        locust_list = []
        i = 0
        for data in data_list:
            res = re.match(
                r'^s+(?P<method>GET|POST)s+(?P<api>[/w?=-&]+)s+(?P<reqs>d+)s+(?P<fails>[d(.)\%]+)s+(?P<Avg>d+)s+(?P<Min>d+)s+(?P<Max>d+)s+(|)s+(?P<Median>d+)s+(?P<qps>[d(.)\%]+)s+(?P<failures>[d(.)\%]+)$',
                data) # 正则,匹配数据
            if res:
                i += 1
                # print(res.group('method'), res.group('api'), res.group('reqs'), res.group('fails'), res.group('Avg'),
                #       res.group('Min'), res.group('Max'), res.group('Median'), res.group('qps'), res.group('failures'))
                method = res.group('method')
                api = res.group('api')
                reqs = res.group('reqs')
                fails = res.group('fails')
                Avg = res.group('Avg')
                Min = res.group('Min')
                Max = res.group('Max')
                Median = res.group('Median')
                qps = res.group('qps')
                failures = res.group('failures')
                locust_dict = {'Type': method, 'Name': api, 'Requests': reqs, 'Fails': fails, 'Average_ms': Avg,
                               'Min_ms': Min,
                               'Max_ms': Max, 'Median_ms': Median, 'Current_RPS': qps, 'Failures_s': failures}
                locust_list.append(locust_dict)
        influx.post_dump_data(locust_list)

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020-07-31 13:21
# @Author : lixiaofeng
# @Site :
# @File : connect_influxdb.py
# @Software: PyCharm
from influxdb import InfluxDBClient


class TestInflux:
    """
    连接 influxdb ,并写入数据
    """

    def __init__(self):
        self.influx_client = InfluxDBClient('localhost', 8086, '', '', 'mydb')
        self.make = False
        for database in self.influx_client.get_list_database():
            if "mydb" not in database["name"]:
                self.make = True
            else:
                # self.influx_client.drop_measurement("locust") # 写入前删除数据
                self.make = False
        if self.make:
            self.influx_client.create_database("mydb")

    def post_dump_data(self, data):
        """
        :param data:  数据 list
        :return:
        """
        if isinstance(data, list):
            for key in data:
                json_body = [
                    {
                        "measurement": "locust",
                        # "tags": key,
                        "fields": key
                    }
                ]
                self.influx_client.write_points(json_body)

在hook函数中对 read_performance 函数进行调用

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020-07-29 11:20
# @Author  : lixiaofeng
# @Site    : 
# @File : listener.py
# @Software: PyCharm


import time
from locust import events
from loguru import logger
from public.helper import read_performance


@events.quitting.add_listener
def process_exit(environment, **kwargs):
    """
    locust 退出时调用
    :param environment:
    :param kwargs:
    :return:
    """
    if environment.stats.total.fail_ratio > 0.01:
        logger.error("Test failed due to failure ratio > 1%")
        environment.process_exit_code = 1
    elif environment.stats.total.avg_response_time > 200:
        logger.error("Test failed due to average response time ratio > 200 ms")
        environment.process_exit_code = 1
    elif environment.stats.total.get_response_time_percentile(0.95) > 800:
        logger.error("Test failed due to 95th percentile response time > 800 ms")
        environment.process_exit_code = 1
    else:
        environment.process_exit_code = 0
    read_performance(time.time())


@events.request_success.add_listener
def success(request_type, name, response_time, response_length, **kwargs):
    """
    接口 请求成功时调用
    :param request_type:
    :param name:
    :param response_time:
    :param response_length:
    :param kwargs:
    :return:
    """
    read_performance(time.time())


@events.request_failure.add_listener
def failure(request_type, name, response_time, response_length, **kwargs):
    """
    接口 请求失败时调用
    :param request_type:
    :param name:
    :param response_time:
    :param response_length:
    :param kwargs:
    :return:
    """
    read_performance(time.time())

influxdb插入的数据

配置Grafana<安装步骤网上有详细教程,不再赘述>

设置influxdb数据源

面板中选择 可视化 table 展示,使用sql查询数据

select Type,"Name",Requests,Fails,Average_ms,Min_ms,Max_ms,Median_ms,Current_RPS,Failures_s from locust order by time desc  limit 3

 和web模式对比

~~~【完】

原文地址:https://www.cnblogs.com/changqing8023/p/13417755.html