针对 WebSocket 协议的 Locust 压测脚本实现(基于 Locust 1.0 以上版本)

针对 WebSocket 协议的 Locust 压测脚本实现(基于 Locust 1.0 以上版本)

 

Locust 默认支持 HTTP 协议(默认通过 HttpUser 类),我们也可以自行实现任意协议的 Client 对它 User 类进行继承(HttpUser 也是继承自 User)并增加所需要的方法,这样也就实现了任意协议的压测。

针对 WebSocket 协议的 Locust 压测脚本实现无非就是三个步骤

  1. 编写一个 WebSocket Client,也就是定义一个 Class,实现 WS连接初始化、事件订阅、消息接收 所需要的方法
  2. 使用 WebSocket Client 继承 User 类,产生 WebsocketUser
  3. 依据测试用例编写压测脚本,使用 WebsocketUser内预定义的方法 实现并发的连接、事件订阅、消息接收

脚本实现参考

from locust import User, task, events, constant
import time
import websocket
import ssl
import json
import jsonpath

def eventType_success(eventType, recvText, total_time):
    events.request_success.fire(request_type="[RECV]",
                                name=eventType,
                                response_time=total_time,
                                response_length=len(recvText))

class WebSocketClient(object):
    
    _locust_environment = None
    
    def __init__(self, host):
        self.host = host
        # 针对 WSS 关闭 SSL 校验警报
        self.ws = websocket.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE})
        
    def connect(self, burl):
        start_time = time.time()
        try:
            self.conn = self.ws.connect(url=burl)
        except websocket.WebSocketConnectionClosedException as e:
            total_time = int((time.time() - start_time) * 1000)
            events.request_failure.fire(
                request_type="[Connect]", name='Connection is already closed', response_time=total_time, exception=e)
        except websocket.WebSocketTimeoutException as e:
            total_time = int((time.time() - start_time) * 1000)
            events.request_failure.fire(
                request_type="[Connect]", name='TimeOut', response_time=total_time, exception=e)
        else:
            total_time = int((time.time() - start_time) * 1000)
            events.request_success.fire(
                request_type="[Connect]", name='WebSocket', response_time=total_time, response_length=0)
        return self.conn
        
    def recv(self):
        return self.ws.recv()
        
    def send(self, msg):
        self.ws.send(msg)
        
class WebsocketUser(User):
    abstract = True
    def __init__(self, *args, **kwargs):
        super(WebsocketUser, self).__init__(*args, **kwargs)
        self.client = WebSocketClient(self.host)
        self.client._locust_environment = self.environment
        
class ApiUser(WebsocketUser):
    host = "wss://ws.xxxxx.com/"
    wait_time = constant(0)
    
    @task(1)
    def pft(self):
        # wss 地址
        self.url = 'wss://ws.xxxxx.com/ws?appid=futures&uid=10000000'
        self.data = {}
        self.client.connect(self.url)
        
        # 发送的订阅请求
        sendMsg = '{"appid":"futures","cover":0,"event":[
            {"type":"exchange_rate","toggle":1,"expireTime":86400},
            {"type":"accountInfo_USDT","toggle":1,"expireTime":86400},
            {"type":"ticker_BTC/USDT","toggle":1,"expireTime":86400}]}'
        self.client.send(sendMsg)
        
        while True:
            # 消息接收计时
            start_time = time.time()
            recv = self.client.recv()
            total_time = int((time.time() - start_time) * 1000)
            
            # 为每个推送过来的事件进行归类和独立计算性能指标
            try:
                recv_j = json.loads(recv)
                eventType_s = jsonpath.jsonpath(recv_j, expr='$.eventType')
                eventType_success(eventType_s[0], recv, total_time)
            except websocket.WebSocketConnectionClosedException as e:
                events.request_failure.fire(request_type="[ERROR] WebSocketConnectionClosedException",
                                            name='Connection is already closed.',
                                            response_time=total_time,
                                            exception=e)
            except:
                print(recv)
                # 正常 OK 响应,或者其它心跳响应加入进来避免当作异常处理
                if 'ok' in recv:
                    eventType_success('ok''ok', total_time)

class WebSocketClient

  1. 实现了 WebSocket 的所有行为方法,包括连接初始化、消息发送(订阅)、消息接收
  2. 对连接过程中的异常进行捕获统计,记录异常响应的时间,便于后续测试分析
  3. 这段脚本基本拷贝就能用:)

class WebsocketUser

  1. 继承 Locust 的 user 成为 WebsocketUser

class ApiUser

  1. 在这里加载 WebsocketUser,初始化的 user,发送订阅请求、并在一个死循环内接收消息推送内容
  2. 对接收的消息内容(json格式)进行解析,最终可以在 WEB UI 看到各种推送事件的推送统计
  3. 对接收推送过程中的异常进行捕获,记录异常响应的时间,便于后续测试分析

也可以在死循环内加入心跳发送,但建议建议按照规则发送,避免过于频繁,此处略

压测过程

 
 
原文地址:https://www.cnblogs.com/xiao-xue-di/p/14859101.html