TCP/IP协议,TCP与平台通信,通讯协议压力测试(python)

最近的项目来了一个需求,要求测试tcp网关通讯协议;

1、液压井盖通过TCP/IP TCP与平台通信;

2、硬件定期发送心跳包(10S)给平台,是平台与硬件保持长连接;

3、每台硬件有一个12字节的唯一编码(字符型);

4、每台设备是1S发送一条报文;

最初使用NetAssist测试功能,模拟硬件设备发送报文,测试硬件设备发过来的状态。

功能测试通过后,新来的压测需求:要求对模拟60个左右的设备每隔一秒发送一条报文到平台,去百度Google搜索TCP压测怎么压测,这类文章博客比较少,试了有个博客自己写的一个软件,发现不能满足需求(都不能安装),其余就是jmeter进行压测,所以就尝试用jmeter进行测试,版本是3.2,按照博客所写进行操作,发现还是不能满足需求,并不能进行60个设备同时1S发送一条报文(加入定时器也不能满足),单个的模拟一个设备可以发送成功。

由于使用工具不能满足需求,只能依靠手写代码来实现了。

common.py     存放公用方法,如进制转化、异或运算等;

manholecover_state.py     结合数据组装报文;

run_test.py     主方法,发送报文,执行脚本;

test.log      日志文件

common.py  

# coding=utf8
# import mysql.connector
import binascii


# 报文头
def reverseStart_field():
    return "F5F5"


# 返回序列
def reverseNumber_field():
    return "0001"


# 补全字节
def zfill(str1, i):
    str1 = str1.zfill(2 * i)  # 补全 字节
    return str1


# int转16进制
def intChange16(str1, i):
    str1 = hex(int(str1))  # 转16进制
    str1 = str1.replace("0x", "")  # 去掉16进制0x
    str1 = str1.zfill(2 * i)  # 补全 字节
    return str1


# 16进制 去掉0x 补全 字节
def strReplace(str1, i):
    str1 = str1.replace("0x", "")  # 去掉16进制0x
    str1 = str1.zfill(2 * i)  # 补全 字节
    return str1


# int转16进制 倒序
def change16Reverse(str1, i):
    # print("==========调用方法change16Reverse")
    str1 = hex(int(str1))  # 转16进制
    str1 = str1.replace("0x", "")  # 去掉16进制0x
    str1 = str1.zfill(2 * i)  # 补全 字节
    # print("==========倒序之前:",str1)
    i = len(str1)
    str2 = ""
    while (i > 0):
        str2 = str2 + (str1[i - 2:i])
        i = i - 2
        # print("==========倒序之后:",str2)
    return str2


# 时间转16进制
def timeChange16(str1):
    arrTime = []
    arrTime = str1.split('-')
    arrChange = []
    strChange = ""
    i = 0
    while (i < len(arrTime)):
        if (i == 0):
            print("----------")
            strChange = change16Reverse(arrTime[i], 2)
        else:
            strTem2 = intChange16(arrTime[i], 1)
            strChange = strChange + strTem2
        i = i + 1
    return strChange + "FF"


# 字符串 转换
def changestr(str2):
    length = len(str2)
    utf8_length = len(str2.encode('utf-8'))  # gbk  8.5    utf-8  9
    length = int((utf8_length - length) / 2 + length)
    str2 = binascii.b2a_hex(str2.encode("gbk"))  # encode("gbk") utf8
    str2 = str(str2)  # 将 byte 类型转换成 str 类型
    str2 = str2.replace("b'", "").replace("'", "")  # 去掉 b'
    str2 = str2.zfill(2 * length)
    return str2


# asc 转换

def changeascii(str1, aa):
    e = 0  # 暂存结果
    for i in str1:
        d = ord(i)  # 单个字符转换成ASCii码
        e = e * 256 + d  # 将单个字符转换成的ASCii码相连
    a = hex(e)
    a = a.replace("0x", "")
    a = a.zfill(2 * aa)
    # print("结果是:%s" % a)
    return a


if __name__ == "__main__":
    message_id = '0x10'
    case_id = "1"

manholecover_state.py

# -*-coding:utf-8-*-

# coding=utf8
import mysql.connector
import common
import logging
import datetime
import pymysql


def parseadd(case):
    config = {
        'host': '127.0.0.1',  # 连接的IP地址
        'user': 'root',
        'password': '123456',
        'port': 3306,
        'database': 'monitor',
        'charset': 'utf8',  # 编码格式,防止查出来的数据中文乱码
    }
    #db1_cursor = mysql.connector.Connect(**config)  # 连接数据库
    db1_cursor = pymysql.Connect(**config)

    cur = db1_cursor.cursor()  # 执行命令,接收结果
    t_str = "select * from monitor.test_manholecover_state_copy where case_id="
    try:
        cur.execute(t_str + str(case))
        t_result = cur.fetchone()
        print(t_result)
        db1_cursor.close()
        print("-----------------------井盖监控点编码", t_result[5])
        strElement0 = common.reverseStart_field()  #
        strElement1 = common.reverseNumber_field()  # 序列
        strElement2 = common.intChange16(t_result[4], 1)  # 应答标志
        # strElement3=common.changeascii(t_result[5],12)#设备编码
        strElement3 = common.changeascii(t_result[5], 12)  # 设备编码
        logging.info("报文发送时间:" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " 编码:" + t_result[5])
        strElement4 = common.intChange16(t_result[6], 1)  # 井盖状态
        strElement5 = common.intChange16(t_result[7], 1)  # 正在操作方式
        strElement6 = common.intChange16(t_result[8], 3)  # 告警状态
        strElement6_1 = common.intChange16(t_result[9], 3) # 告警状态
        strElement7 = common.intChange16(t_result[10], 1)  # 控制授权
        strElement8 = common.intChange16(t_result[11], 1)  # 角度状态
        strElement9 = common.zfill(t_result[12], 4)  # 结束标记

        # 拼接字符串
        strElement = [strElement0, strElement1, strElement2, strElement3, strElement4, strElement5, strElement6,
                      strElement6_1, strElement7, strElement8, strElement9]
        strResult = ''.join(strElement)

        # print("=====================")
        print("拼接报文:", strResult)
        return strResult

    except Exception as e:
        print("Error: %s" % e)


if __name__ == "__main__":

    for id in range(1, 68):
        print(parseadd(1))

        # db1_cursor = mysql.connector.connect(host='127.0.0.1', port='3306', user='root', password='123456', database='monitor', charset='utf8')
        # cur = db1_cursor.cursor()
        # case_id='1'
        # #parseadd(case_id)
        # db1_cursor.close()

run_test.py

# coding=utf-8
import datetime
import logging
import threading
from socket import *
from time import sleep
import manholecover_state

# import logging.handlers

logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='test.log',
                    filemode='w')
logging.debug('debug message')
logging.info('info message')
logging.warning('warning message')
logging.error('error message')
logging.critical('critical message')
socks = []


def singlesend(case, soc2):
    global lock
    lock.acquire()
    # case_id = 1
    # while case_id<=2:   #循环次数
    print("报文发送时间:" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    print("-----------------------井盖监控点编码", case)  # , "上报次数:", case_id)
    send_manholecover_state = manholecover_state.parseadd(case)
    #print("----------------------", send_manholecover_state + '
')

    soc2.send(bytes().fromhex(send_manholecover_state))

    # case_id = case_id + 1
    # print("报文发送时间:" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    # sleep(1) #可修改的上报时间间隔
    lock.release()


def message():
    global lock
    lock.acquire()
    t_code = 201820190601
    threads = []
    # while (t_code <= 201820190667):
    for case in range(1, 68):
        t_singlesend = threading.Thread(target=singlesend, args=(str(case), socks[case-1]))
        threads.append(t_singlesend)
        # t_singlesend.setDaemon(True)
        # t_singlesend.start()
        # sleep(1)
        t_code += 1

    for i in threads:
        i.setDaemon(True)
        i.start()
    lock.release()


if __name__ == '__main__':
    lock = threading.Lock()

    end = datetime.datetime.now() + datetime.timedelta(days=2)  # 获取结束时间
    print("报文发送一周结束时间" + end.strftime("%Y-%m-%d %H:%M:%S") + '
')
    for case in range(1, 68):
        soc = socket(AF_INET, SOCK_STREAM)
        soc.settimeout(300)  # 设置超时时间
        soc.connect(('192.168.0.156', 8845))
        socks.append(soc)

    while True:
        message()
        now = datetime.datetime.now()
        sleep(1)
        if now >= end:
            break

'''
threads = []
threads.append(t_singlesend)
for t in threads:
    t.start()
for t in threads:
    t.join()
    logging.info("全部执行完成~:%s" % ctime())
 '''

运行结果:达到需求,每秒发送67条报文到平台

在这种正常场景下进行压测,连续跑了几个小时候,平台就崩掉了,java内存不断的升高;

开发进行不断的性能优化,已经连续跑了三四的情况下,平台已经走向稳定。

原文地址:https://www.cnblogs.com/hemingwei/p/11150917.html