Locust 其他协议

Locust 是基于HTTP作为主要目标构建的,但是他同样可以扩展其他的协议,接受请求与获得返回。在编写的客户端的时候,我们就要使用到最常使用的 request_success 和 request_failure 的事件钩子。

对于HTTP(S)以外的协议,在客户端的具体实现上,可通过注册事件的方式,在请求成功时触发events.request_success,在请求失败时触发events.request_failure即可。

然后创建一个继承自Locust类的类,对其设置一个client属性并与我们实现的客户端进行绑定。

后续,我们就可以像使用HttpLocust类一样,测试其它协议类型的系统。

以下为常用的locust种的5种钩子

    1. request_success:当请求完全成功时被触发,钩子函数需要定义如下参数:

      request_type:请求的类型 
      name:请求的URL或者自定义的统计分组名字(如果请求时提供了name参数的话) 
      response_time:请求花费的时间(毫秒为单位) 
      response_length:响应长度

    2. request_failure:当请求失败时触发,钩子函数需要定义如下参数:

      request_type:请求的类型 
      name:请求的URL或者自定义的统计分组名字(如果请求时提供了name参数的话) 
      response_time:请求花费的时间(毫秒为单位) 
      exception:请求失败抛出的异常

    3. locust_error:在Locust实例执行发生异常时触发,钩子函数需要定义如下参数:

      locust_instance:发生异常的Locust类的实例 
      exception:抛出的异常 
      tb:来自sys.exc_info()[2]的Traceback对象

    4. hatch_complete:所有的Locust实例产生完成时触发,钩子函数需要定义如下参数:

      user_count:产生的Locust实例(虚拟用户)的数量。

    5. quitting:Locust进程退出时被触发,钩子函数不需要提供参数。

一个websocket的简单例子

from locust import Locust, events, task, TaskSet
import websocket
import time

class WebSocketClient():
    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.ws = websocket.WebSocket()

    def connect(self, url):
        self.ws.connect(url)

class WebSocketLocust(Locust):
    def __init__(self, *args, **kwargs):
        super(WebSocketLocust, self).__init__(*args, **kwargs)
        self.client = WebSocketClient(host='', port=1234)

class UserBehavior(TaskSet):
    def on_start(self):
        self.ws = WebSocketLocust()

    @task
    def connect(self):
        start_time = time.time()
        try:
            self.ws.client.connect("ws://x.x.x.x:x")
        except Exception as e:
            total_time = int((time.time() - start_time) * 1000)
            events.request_failure.fire(request_type="Websockt", name="WSconnect", response_time=total_time, exception=e)
        else:
            total_time = int((time.time()-start_time) * 1000)
            events.request_success.fire(request_type="Websockt", name="WSconnect", response_time=total_time, response_length=0) #hardcode length to 0 size 

class WebSocketUser(WebSocketLocust):
    min_wait = 3000
    max_wait = 6000
    task_set = UserBehavior

1. 先定义一个Websocket协议类,可以封装该协议写一些自己得方法作为一个Client。

2. 重新定义Locust类中得Client,把自己定义得Websocket类赋给Locust中得Client

3. 通过Client调用封装得方法,并使用locust中得钩子方法获取到并发时得响应时间

4. 设置用户操作间隔时间以及测试用例集

原文地址:https://www.cnblogs.com/grandlulu/p/9456934.html