如何将平时所学的代码知识,用在实际测试场景中

需求如下:

 第一个:每30张卡片最多包含2张过去14天没有登录的用户,卡片尽量不要出现在前15张的位置

# 需求:每30张卡片最多包含2张过去14天没有登录的用户,卡片尽量不要出现在前15张的位置
from knight_util.get_db_token import tk_
import requests
from knight_util.util import KnightUtil
from knight_util.write_txt import write_log
from knight_util.get_current_time import current_time
import threading
import json
m1 = KnightUtil().mysql_()
tk = tk_(2397).get('EPAL_WEB')
# 创建线程锁
mutex = threading.Lock()


def get_user_id_list():
    headers = {
        "_tk_": tk,
        "content-type": "application/json",
        "user-agent": "Mozilla / 5.0(Windows NT 10.0;Win64;x64)"
    }
    data = {"limit": 30, "likeUserIds": [], "dislikeUserIds": [], "excludeUserIds": []}
    url = "https://test-community.gamesegirl.com/user/meets/match/v-3.6"
    response = requests.post(url=url, json=data, headers=headers).json()
    # response = json.loads(response_)
    if response['status'] == 'OK':
        response_length = len(response['content']['data'])
        userid_list = [response['content']['data'][i]['userId'] for i in range(response_length)]
        print(f"总共卡片数:{len(userid_list)}")
        print(f"接口返回的所有meet卡片数据{userid_list}")
        return userid_list


def get_t_user_id():
    """获取t_user表里过去14天没有登录的用户id"""
    start_time = current_time("sub_time", 14)
    print(f"查询开始时间:{start_time}")
    end_time = current_time("add_time", 1)
    print(f"查询结束时间:{end_time}")
    # sql = '''SELECT id FROM community.t_user t1 WHERE DATE_FORMAT(t1.last_access_time,'%Y-%m-%d %H:%i:%S') BETWEEN '2020-12-30 00:00:00' AND  '2021-01-13 23:59:59';'''
    sql = '''SELECT id FROM community.t_user t1 WHERE DATE_FORMAT(t1.last_access_time,'%Y-%m-%d %H:%i:%S') BETWEEN '{}' AND  '{}';'''.format(
        start_time, end_time)
    # 锁定
    mutex.acquire()
    m1.execute(sql)
    # 释放
    mutex.release()
    id_list = [data[0] for data in m1.fetchall()]
    print("在去14天登录的用户数据(含今天)")
    print(id_list)
    return id_list


def get_last_access_time(id_):
    """获取最后一次访问时间"""
    sql = '''SELECT t1.last_access_time FROM community.t_user t1 WHERE id={}'''.format(id_)
    mutex.acquire()
    m1.execute(sql)
    mutex.release()
    for data in m1.fetchall():
        # print(data[0])
        return data[0]


def run():
    db_list = get_t_user_id()  # 数据库数据
    api_list = get_user_id_list()  # 接口数据
    # 没在数据库里面返回的数据
    not_in_list = [id_ for id_ in api_list if id_ not in db_list]
    # 在数据库里面返回的数据
    in_list = [id_ for id_ in api_list if id_ in db_list]
    expire_user_dict = {}
    for j in in_list:
        expire_user_dict.update({f"在过去14天登录的用户ID:{j})": f"最后一次访问时间{get_last_access_time(j)}"})
    print(f"过去14天登录过的用户{in_list}")
    print(expire_user_dict)
    write_log(expire_user_dict)
    last_access_time_dict = {}
    for i in not_in_list:
        last_access_time_dict.update({f"在过去14天没有登录的用户ID{i}": f"最后一次访问时间{get_last_access_time(i)}"})
    # 利用索引找到卡片位置,因索引从0开始,所以返回结果需要加1
    for x in not_in_list:
        print(f"卡片出现位置在:{api_list.index(x) + 1}处")
        write_log(f"卡片出现位置在:{api_list.index(x) + 1}处")
    print(not_in_list)
    print(last_access_time_dict)
    write_log(last_access_time_dict)


if __name__ == '__main__':
    run()
    # lock = threading.Lock()
    # for i in range(5):
    #     t1 = threading.Thread(target=run)
    #     # t1.setDaemon(True)
    #     t1.start()
        
需求:每30张卡片最多包含2张欢迎度小于等于10%的用户,尽量不要出现在前15张
# 需求:每30张卡片最多包含2张欢迎度小于等于10%的用户,尽量不要出现在前15张
from knight_util.get_db_token import tk_
import requests
from knight_util.Util import KnightUtil

m1 = KnightUtil().mysql_()
tk = tk_(2397).get('EPAL_WEB')


def get_user_id_list():
    headers = {
        "_tk_": tk,
        "content-type": "application/json",
        "user-agent": "Mozilla / 5.0(Windows NT 10.0;Win64;x64)"
    }
    data = {"limit": 30, "likeUserIds": [], "dislikeUserIds": [], "excludeUserIds": []}
    url = "https://test-community.gamesegirl.com/user/meets/match/v-3.6"
    response = requests.post(url=url, json=data, headers=headers).json()
    if response['status'] == 'OK':
        response_length = len(response['content']['data'])
        userid_list = [response['content']['data'][i]['userId'] for i in range(response_length)]
        print(f"总共卡片数:{len(userid_list)}")
        print(f"接口返回的所有meet卡片数据{userid_list}")
        return userid_list


def get_t_user_id():
    """获取欢迎度小于10%的"""

    sql = '''SELECT user_id,popularity FROM community.user_meets um WHERE um.popularity <=0.10;'''
    m1.execute(sql)
    id_list = [data[0] for data in m1.fetchall()]
    print(f"欢迎度小于10%的所有数据{id_list}")
    return id_list


def get_popularity(user_id):
    sql = '''SELECT popularity FROM community.user_meets WHERE user_id={};'''.format(user_id)
    m1.execute(sql)
    for data in m1.fetchall():
        return data[0]


def get_region_name(user_id):
    sql = '''SELECT country,region_name FROM egirl.t_dict_area_info WHERE user_id={};'''.format(user_id)
    m1.execute(sql)
    for data in m1.fetchall():
        print(data)
        return data


def run():
    db_list = get_t_user_id()  # 数据库数据
    api_list = get_user_id_list()  # 接口数据
    # 欢迎度大于10
    popularity_greater = [id_ for id_ in api_list if id_ not in db_list]
    print(f"接口返回欢迎度大于10%的数据{popularity_greater}")
    # 欢迎度小于10
    popularity_less = [id_ for id_ in api_list if id_ in db_list]
    print(f"接口返回欢迎度小于10%的数据{popularity_less}")
    popularity_dict = {}
    for j in popularity_less:
        popularity_dict.update({f"欢迎度userID是:{j}": get_popularity(j)})
    print(popularity_dict)
    # 利用索引找到卡片位置,因索引从0开始,所以返回结果需要加1
    for x in popularity_less:
        print(f"卡片出现位置在:{api_list.index(x) + 1}处")
    # 查是否同城
    for t in popularity_less:
        get_region_name(t)


if __name__ == '__main__':
    run()
每30张卡片最多包含2张非真人图片,尽量不要出现在前15张
# 每30张卡片最多包含2张非真人图片,尽量不要出现在前15张
from knight_util.get_db_token import tk_
import requests
from knight_util.Util import KnightUtil
from knight_util.write_txt import write_log

m1 = KnightUtil().mysql_()
tk = tk_(2397).get('EPAL_WEB')


def get_user_id_list():
    headers = {
        "_tk_": tk,
        "content-type": "application/json",
        "user-agent": "Mozilla / 5.0(Windows NT 10.0;Win64;x64)"
    }
    data = {"limit": 30, "likeUserIds": [], "dislikeUserIds": [], "excludeUserIds": []}
    url = "https://test-community.gamesegirl.com/user/meets/match/v-3.6"
    response = requests.post(url=url, json=data, headers=headers).json()
    if response['status'] == 'OK':
        response_length = len(response['content']['data'])
        userid_list = []
        for i in range(response_length):
            userid_list.append(response['content']['data'][i]['userId'])
        print(userid_list)
        print(f"总共卡片数:{len(userid_list)}")
        return userid_list


def is_person(user_id):
    """是否真人"""
    sql = f"SELECT real_person_img FROM community.user_meets WHERE user_id={user_id} AND user_img_status='PASS' AND status='ENABLED';"
    m1.execute(sql)
    for data in m1.fetchall():
        return data[0]


def get_real_person_code(user_id):
    """获取是否是真人图片状态码"""
    sql = '''SELECT real_person_img FROM community.user_meets WHERE user_id={} '''.format(user_id)
    m1.execute(sql)
    for data in m1.fetchall():
        return data[0]


def run():
    userid_dict = {}
    id_list = get_user_id_list()
    # 是真人
    real_people = []
    # 不是真人
    not_real_people = []
    for i in id_list:
        if is_person(i) == 0:
            real_people.append(i)
            # userid_dict.update({f"user_id:{i}": f"是真人"})
        elif is_person(i) == 1:
            not_real_people.append(i)
            # userid_dict.update({f"user_id:{i}": f"不是真人"})
    print(f"是真人图片的userid:{real_people}")
    print(f"不是真人图片的userid:{not_real_people}")
    dic = {}
    for i in not_real_people:
        dic.update({i: get_real_person_code(i)})
    print(f"不是真人图片的数据:{dic}")
    for x in not_real_people:
        print(f"卡片出现位置在:{id_list.index(x) + 1}处")


if __name__ == '__main__':
    run()

 需求如图:

代码如下:

# 智能图片正向场景测试
from knight_util.MysqlUtil import MysqlPool
import requests
from multiprocessing import Pool
from knight_util.Util import KnightUtil
from knight_util.sso_token import query_meet_token
import random
from knight_util.write_txt import write_log

mysql = MysqlPool()
redis = KnightUtil().redis_()
# TODO 全局变量user_id
user_id = [3801]
token_list = query_meet_token(user_id[0])


def query_redis_exposureCount():
    """查询曝光度"""
    redis_data = redis.get("meets:image:calculate:{}".format(user_id[0]))
    img_dict = {}
    for i in range(len(eval(redis_data))):
        img_dict.update({eval(redis_data)[i]['userImg']: eval(redis_data)[i]['exposureCount']})
        write_log(eval(redis_data)[i])
    count_list = [j for j in img_dict.values()]
    return sum(count_list)


def batch_like(userid_list=user_id):
    """批量喜欢"""
    # 查计算中的图片
    sql = "SELECT user_img FROM community.user_meets_image WHERE user_id={} AND calculate_status='CALCULATING';".format(
        userid_list[0])
    data = mysql.fetch_all(sql)
    # 计算中的图片
    # user_img_list = [i['user_img'] for i in data]
    user_img_list = ['https://community-oss.epal.gg/data/community/meet/pt/1611841764302.jpeg']
    # 删除数据触发列表
    triggering_condition_list = [20, 80, 130, 180, 230, 270]
    like_num = 0
    while True:
        for tk in token_list:
            like_num += 1
            print(f"第{like_num}次请求开始")
            write_log(f"第{like_num}次请求开始")
            # 喜欢状态
            like_status_list = ["like", "dislike"]
            # like_status_list = ["like", "dislike", "superlike"]
            # 随机喜欢不喜欢
            status = random.choice(like_status_list)
            # 随机正在计算中的图片
            img_url = random.choice(user_img_list)
            # 请求url, token, 参数
            headers = {
                "_tk_": tk
            }
            url = "https://test-community.gamesegirl.com/user/meets/likeOrDislike"
            data1 = {"likeUserIds": userid_list,
                     "dislikeUserIds": [],
                     "likeUserIdAndImg": [{"userId": userid_list[0],
                                           "imgUrl": img_url}],
                     "disLikeUserIdAndImg": []}
            data2 = {"likeUserIds": [],
                     "dislikeUserIds": userid_list,
                     "likeUserIdAndImg": [],
                     "disLikeUserIdAndImg": [{"userId": userid_list[0],
                                              "imgUrl": img_url}]}

            if status == "like":
                # 喜欢
                try:
                    response1 = requests.post(url=url, json=data1, headers=headers).json()
                    print(response1)
                    if response1['status'] == 'OK':
                        print(f"喜欢成功:{img_url}")
                        print(f"曝光总次数:{query_redis_exposureCount()}")

                        # 总曝光数等于300时结束运行
                        if query_redis_exposureCount() == 299:
                            print("结束计算")
                            return
                        # 满足条件时,删除meet关系数据
                        if like_num in triggering_condition_list:
                            del_sql = "DELETE FROM community.user_relation WHERE user_id={} OR to_user_id={};".format(
                                userid_list[0], userid_list[0])
                            print(f"共删除{mysql.delect(del_sql)}条数据")

                except Exception as e:
                    raise e
            # 超级喜欢
            elif status == "superlike":
                try:
                    super_url = "https://test-community.gamesegirl.com/user/meets/super/like"
                    data3 = {"userId": userid_list[0],
                             "imgUrl": img_url}
                    response3 = requests.post(url=super_url, json=data3, headers=headers).json()
                    print(response3)
                    if response3['status'] == 'OK':
                        print(f"超级喜欢成功:{img_url}")
                        print(f"曝光总次数:{query_redis_exposureCount()}")
                        # # 删除超级喜欢数据
                        list_keys = redis.keys("1002:test:user:member:superLike:*")
                        if list_keys:
                            for key in list_keys:
                                print(redis.delete(key))
                                print("删除超级喜欢会员数据成功")
                        # 总曝光数等于298时结束运行
                        if query_redis_exposureCount() == 299:
                            print("结束计算")
                            return
                        # 满足条件时,删除meet关系数据
                        if like_num in triggering_condition_list:
                            del_sql = "DELETE FROM community.user_relation WHERE user_id={} OR to_user_id={};".format(
                                userid_list[0], userid_list[0])
                            print(f"共删除{mysql.delect(del_sql)}条数据")

                except Exception as e:
                    raise e

            else:
                # 不喜欢
                try:
                    response2 = requests.post(url=url, json=data2, headers=headers).json()
                    print(response2)
                    if response2['status'] == 'OK':
                        print(f"不喜欢成功:{img_url}")
                        print(f"曝光总次数:{query_redis_exposureCount()}")
                        # 总曝光数等于298时结束运行
                        if query_redis_exposureCount() == 299:
                            print("结束计算")
                            # print("开始写入最终结果")
                            return
                        # 满足条件时,删除meet关系数据
                        if like_num in triggering_condition_list:
                            del_sql = "DELETE FROM community.user_relation WHERE user_id={} OR to_user_id={};".format(
                                userid_list[0], userid_list[0])
                            print(f"共删除{mysql.delect(del_sql)}条数据")

                except Exception as e:
                    raise e


if __name__ == '__main__':
    pool = Pool(6)
    pool.apply_async(batch_like)
    pool.close()
    pool.join()
    # a = query_redis_exposureCount()
    # print(a)

原文地址:https://www.cnblogs.com/xiamaojjie/p/14291952.html