ES全量索引校验-python

import unittest
import json
import requests
import ddt
from pymysql import connect
 
 
def getMySqlData():
    try:
        db = connect(host="xx.xx.xx.xx", user="xxx", password="xxx", db="gb_goods", port=3306,
                     charset="utf8")
        # db = connect(host="localhost", user="root", password="root", db="test", port=3306, charset="utf8")
        cur = db.cursor()
        # sql需替换测试环境的sql
        sql = "SELECT gie.good_sn,gie.v_wh_code AS goodsSn FROM goods_info_extend_s_7   gie WHERE gie.goods_status IN (2,4,5) AND gie.platform IN (1,2,4) AND gie.site_code = 'GB' GROUP BY gie.good_sn, gie.v_wh_code;"
        # sql = "select goods_sn from kp_goods order by goods_sn"
        # 分表轮询
        # sql_1 = "select goods_sn from price_%s where ..."
        # for table_num in range(10):
        #     sql_2 = sql_1 % table_num
 
        cur.execute(sql)
        result = cur.fetchall()
        cur.close()
        db.close()
    except Exception as e:
        print(e)
    return result
 
 
@ddt.ddt
class MyTest(unittest.TestCase):
 
    @ddt.data(*getMySqlData())
    def test_gb(self, data):
        goodsSn = data[0]
        whCode = data[1]
        goodsId = f"{goodsSn}#{whCode}"
        print(goodsId)
        url_es = "http://10.4.4.80:9200/gearbest20200212201449/sku/_search"
        url_ai = "http://10.4.4.80:9200/GB_daily_full/sku/_search"
        headers = {"Content-Type": "application/json"}
        params = {"query": {"term": {"goodsId": {"value": goodsId}}}}
        # params = {"query": {"term": {"goodsSn": {"value": goodsSn}}}}
 
        # 调用内部的方法时,方法前面加 self.
        es_goodsInfo, goodsId_es = self.gb_index(url_es, params, headers)
        ai_goodsInfo, goodsId_ai = self.gb_index(url_ai, params, headers)
 
        # 判断两个字典数据是否一致
        if es_goodsInfo != ai_goodsInfo:
            self.writeGoodsSn(goodsId_es)
            # print("数据一致")
 
    # 数据写入txt方法
    def writeGoodsSn(self, goodsId):
 
        file = r"D:goodsId_GB.txt"
        with open(file, "a+") as f:
            f.write(goodsId + "
")
        print("数据写入成功...")
 
    # 内嵌对象排序方法
    def func_sort(self, jsonArray, list, sort_key1, sort_key2):
        # jsonArray 排序,先按sort_key1排序,再按sort_key2排序
        jsonArray.sort(key=lambda x: (x[sort_key1], x[sort_key2]))
        # 字典类型转换为 JSON 对象,再将 JSON 对象类型转换为 Python 字典
        data = json.loads(json.dumps(jsonArray))
        # 将字典以指定key的顺序写入list
        newList = []
        for info in data:
            dict = {}
            for i in range(len(list)):
                dict[list[i]] = info[list[i]]
            newList.append(dict)
        return newList
 
    # 请求索引数据
    def gb_index(self, url, params, headers):
        # data:字典对象    json: json字符串
        r = requests.post(url, data=json.dumps(params), headers=headers)
        print("响应对象:", r.json())
 
        r_json = r.json()  # 返回字典类型,可以通过键名获取响应的值
        if len(r_json["hits"]["hits"]) > 0:
            # 定义一个字典,存储sku对应的各字段数据
            goods_info = {}
            # GB索引字段
            goods_info["week2SalesVolume"] = r_json["hits"]["hits"][0]["_source"]["week2SalesVolume"]
            goods_info["payEndTime"] = r_json["hits"]["hits"][0]["_source"]["payEndTime"]
            goods_info["appSwellAmount"] = r_json["hits"]["hits"][0]["_source"]["appSwellAmount"]
            goods_info["appPriceType"] = r_json["hits"]["hits"][0]["_source"]["appPriceType"]
            goods_info["goodsId"] = r_json["hits"]["hits"][0]["_source"]["goodsId"]
            goods_info["youtube"] = r_json["hits"]["hits"][0]["_source"]["youtube"]
            # skuAttrs
            # skuAttrs = r_json["hits"]["hits"][0]["_source"]["skuAttrs"]
            # if len(skuAttrs) > 0:
            #     list = ["attrValueKey", "attrValue", "attrKey", "attrType", "attrName"]
            #     skuAttrs_list = self.func_sort(skuAttrs, list, list[3], list[0])
            #     goods_info["skuAttrs"] = skuAttrs_list
            # else:
            #     goods_info["skuAttrs"] = skuAttrs
 
            goods_info["week2Sales"] = r_json["hits"]["hits"][0]["_source"]["week2Sales"]
            goods_info["shopPrice"] = r_json["hits"]["hits"][0]["_source"]["shopPrice"]
            # grossMargin 暂写固定值
            goods_info["grossMargin"] = "0"
            goods_info["baseScore2"] = r_json["hits"]["hits"][0]["_source"]["baseScore2"]
            goods_info["subTitle"] = r_json["hits"]["hits"][0]["_source"]["subTitle"]
            goods_info["exposureFlag"] = r_json["hits"]["hits"][0]["_source"]["exposureFlag"]
            goods_info["originalUrl"] = r_json["hits"]["hits"][0]["_source"]["originalUrl"]
            goods_info["discount"] = r_json["hits"]["hits"][0]["_source"]["discount"]
            goods_info["whCode"] = r_json["hits"]["hits"][0]["_source"]["whCode"]
            goods_info["baseScore5"] = r_json["hits"]["hits"][0]["_source"]["baseScore5"]
            goods_info["payStartTime"] = r_json["hits"]["hits"][0]["_source"]["payStartTime"]
            goods_info["recommendedLevel"] = r_json["hits"]["hits"][0]["_source"]["recommendedLevel"]
            goods_info["thumbExtendUrl"] = r_json["hits"]["hits"][0]["_source"]["thumbExtendUrl"]
            goods_info["appStatus"] = r_json["hits"]["hits"][0]["_source"]["appStatus"]
            goods_info["passAvgScore"] = r_json["hits"]["hits"][0]["_source"]["passAvgScore"]
            goods_info["stockFlag"] = r_json["hits"]["hits"][0]["_source"]["stockFlag"]
            goods_info["brandName"] = r_json["hits"]["hits"][0]["_source"]["brandName"]
            goods_info["appDisplayPrice"] = r_json["hits"]["hits"][0]["_source"]["appDisplayPrice"]
            goods_info["centerWord"] = r_json["hits"]["hits"][0]["_source"]["centerWord"]
            goods_info["goodsTitle"] = r_json["hits"]["hits"][0]["_source"]["goodsTitle"]
            goods_info["brandCode"] = r_json["hits"]["hits"][0]["_source"]["brandCode"]
            goods_info["appDeposit"] = r_json["hits"]["hits"][0]["_source"]["appDeposit"]
            goods_info["vWhCode"] = r_json["hits"]["hits"][0]["_source"]["vWhCode"]
            goods_info["catId"] = r_json["hits"]["hits"][0]["_source"]["catId"]
            goods_info["priceRates"] = r_json["hits"]["hits"][0]["_source"]["priceRates"]
            # skuDescAttrs
            # skuDescAttrs = r_json["hits"]["hits"][0]["_source"]["skuDescAttrs"]
            # if len(skuDescAttrs) > 0:
            #     list = ["attrValueKey", "attrValue", "attrKey", "attrName"]
            #     skuDescAttrs_list = self.func_sort(skuDescAttrs, list, list[2], list[0])
            #     goods_info["skuDescAttrs"] = skuDescAttrs_list
            # else:
            #     goods_info["skuDescAttrs"] = skuDescAttrs
 
            goods_info["totalFavoriteCount"] = r_json["hits"]["hits"][0]["_source"]["totalFavoriteCount"]
            goods_info["baseScore4"] = r_json["hits"]["hits"][0]["_source"]["baseScore4"]
            goods_info["appPayEndTime"] = r_json["hits"]["hits"][0]["_source"]["appPayEndTime"]
            goods_info["urlTitle"] = r_json["hits"]["hits"][0]["_source"]["urlTitle"]
 
            # categories
            categories = r_json["hits"]["hits"][0]["_source"]["categories"]
            if len(categories) > 0:
                list = ["level", "catId", "catName", "isDefault"]
                categories_list = self.func_sort(categories, list, list[1], list[3])
                goods_info["categories"] = categories_list
            else:
                goods_info["categories"] = categories
 
            goods_info["firstUpTime"] = r_json["hits"]["hits"][0]["_source"]["firstUpTime"]
 
            # labelFlags
            # labelFlags = r_json["hits"]["hits"][0]["_source"]["labelFlags"]
            # if len(labelFlags) > 0:
            #     list = ["platform","type","labelId"]
            #     sort_key = "labelId"
            #     labelFlags_list = self.func_sort(labelFlags,list,sort_key)
            #     goods_info["labelFlags"] = labelFlags_list
            # else:
            #     goods_info["labelFlags"] = labelFlags
 
            goods_info["appPayStartTime"] = r_json["hits"]["hits"][0]["_source"]["appPayStartTime"]
            goods_info["expiredTime"] = r_json["hits"]["hits"][0]["_source"]["expiredTime"]
            goods_info["exposureSalesVolume"] = r_json["hits"]["hits"][0]["_source"]["exposureSalesVolume"]
            goods_info["appDiscount"] = r_json["hits"]["hits"][0]["_source"]["appDiscount"]
            goods_info["baseScore1"] = r_json["hits"]["hits"][0]["_source"]["baseScore1"]
            # coupons
            # coupons = r_json["hits"]["hits"][0]["_source"]["coupons"]
            # if len(coupons) > 0:
            #     list=["code","platforms"]
            #     sort_key = "code"
            #     coupons_list = self.func_sort(coupons,list,sort_key)
            #     goods_info["coupons"] = list
            # else:
            #     goods_info["coupons"] = coupons
 
            goods_info["lang"] = r_json["hits"]["hits"][0]["_source"]["lang"]
            goods_info["isCod"] = r_json["hits"]["hits"][0]["_source"]["isCod"]
            goods_info["searchWords"] = r_json["hits"]["hits"][0]["_source"]["searchWords"]
            # defaultWh 暂写固定值
            goods_info["defaultWh"] = "0"
            goods_info["mStatus"] = r_json["hits"]["hits"][0]["_source"]["mStatus"]
            goods_info["dailyRate"] = r_json["hits"]["hits"][0]["_source"]["dailyRate"]
            goods_info["exposureSalesRate"] = r_json["hits"]["hits"][0]["_source"]["exposureSalesRate"]
            goods_info["sortOrder"] = r_json["hits"]["hits"][0]["_source"]["sortOrder"]
 
            shopGroups = r_json["hits"]["hits"][0]["_source"]["shopGroups"]
            if len(shopGroups) > 0:
                list = ["groupId", "level", "groupName", "path"]
                shopGroups_list = self.func_sort(shopGroups, list, list[1], list[0])
                goods_info["shopGroups"] = shopGroups_list
            else:
                goods_info["shopGroups"] = shopGroups
 
            goods_info["passTotalNum"] = r_json["hits"]["hits"][0]["_source"]["passTotalNum"]
            goods_info["shopCode"] = r_json["hits"]["hits"][0]["_source"]["shopCode"]
            goods_info["appExpiredTime"] = r_json["hits"]["hits"][0]["_source"]["appExpiredTime"]
            goods_info["isTort"] = r_json["hits"]["hits"][0]["_source"]["isTort"]
            goods_info["baseScore3"] = r_json["hits"]["hits"][0]["_source"]["baseScore3"]
            goods_info["priceType"] = r_json["hits"]["hits"][0]["_source"]["priceType"]
            goods_info["createTime"] = r_json["hits"]["hits"][0]["_source"]["createTime"]
            goods_info["swellAmount"] = r_json["hits"]["hits"][0]["_source"]["swellAmount"]
            goods_info["saleMark"] = r_json["hits"]["hits"][0]["_source"]["saleMark"]
            goods_info["deposit"] = r_json["hits"]["hits"][0]["_source"]["deposit"]
            goods_info["goodsModelWord"] = r_json["hits"]["hits"][0]["_source"]["goodsModelWord"]
            # appDefaultWh 暂写固定值
            goods_info["appDefaultWh"] = "0"
            goods_info["goodsWebSku"] = r_json["hits"]["hits"][0]["_source"]["goodsWebSku"]
            goods_info["imgExtendUrl"] = r_json["hits"]["hits"][0]["_source"]["imgExtendUrl"]
            # activityIds
            # goods_info["activityIds"] = r_json["hits"]["hits"][0]["_source"]["activityIds"]
            goods_info["displayPrice"] = r_json["hits"]["hits"][0]["_source"]["displayPrice"]
            goods_info["goodsWebSpu"] = r_json["hits"]["hits"][0]["_source"]["goodsWebSpu"]
 
            goods_info["isPlatform"] = r_json["hits"]["hits"][0]["_source"]["isPlatform"]
            goods_info["webStatus"] = r_json["hits"]["hits"][0]["_source"]["webStatus"]
            goods_info["goodsSn"] = r_json["hits"]["hits"][0]["_source"]["goodsSn"]
            goods_info["imgUrl"] = r_json["hits"]["hits"][0]["_source"]["imgUrl"]
            # tags
            # goods_info["tags"] = r_json["hits"]["hits"][0]["_source"]["tags"]
            yesterdaySales = r_json["hits"]["hits"][0]["_source"]["yesterdaySales"]
            if yesterdaySales > 0:
                goods_info["yesterdaySales"] = yesterdaySales
            else:
                goods_info["yesterdaySales"] = 0
 
                # activities
            # activities = r_json["hits"]["hits"][0]["_source"]["activities"]
            # if len(activities) > 0:
            #     list = ["activityId","activityType"]
            #     sort_key = "activityId"
            #     activities_list = self.func_sort(activities,list,sort_key)
            #     goods_info["activities"] = activities_list
            # else:
            #     goods_info["activities"] = r_json["hits"]["hits"][0]["_source"]["activities"]
 
            # 邮件专享价数据
            goods_info["mailPrice"] = r_json["hits"]["hits"][0]["_source"]["mailPrice"]
            goods_info["appMailPriceDiscount"] = r_json["hits"]["hits"][0]["_source"]["appMailPriceDiscount"]
            goods_info["appMailPriceActive"] = r_json["hits"]["hits"][0]["_source"]["appMailPriceActive"]
            goods_info["mailPriceActive"] = r_json["hits"]["hits"][0]["_source"]["mailPriceActive"]
            goods_info["mailPriceCipherText"] = r_json["hits"]["hits"][0]["_source"]["mailPriceCipherText"]
            goods_info["mailPriceDiscount"] = r_json["hits"]["hits"][0]["_source"]["mailPriceDiscount"]
 
            goodsId = r_json["hits"]["hits"][0]["_source"]["goodsId"]
            print("goods_info:", goods_info)
            return goods_info, goodsId
        else:
            print("无sku数据")
            return 0, 0
 
 
if __name__ == '__main__':
    unittest.main()
原文地址:https://www.cnblogs.com/wakey/p/12689398.html