分布式监控系统开发【day37】:监控数据如何存储(七)

一、如何存储

二、目录结构

三、代码调用逻辑关系

 四、实现代码

1、data_optimization

1、存筛选出来符合条件的数据

    def get_data_slice(self,lastest_data_key,optimization_interval):
        '''
        :param optimization_interval: e.g: 600, means get latest 10 mins real data from redis
        :return:
        '''
        all_real_data = self.redis_conn_obj.lrange(lastest_data_key,1,-1)
        #print("get data range of:",lastest_data_key,optimization_interval)
        #print("get data range of:",all_real_data[-1])
        data_set = [] #存筛选出来符合条件的数据
        for item in all_real_data:
            #print(json.loads(item))
            data  = json.loads(item.decode())
            if len(data) ==2:
                #print("real data item:",data[0],data[1])
                service_data, last_save_time = data
                #print('time:',time.time(), time.time()- last_save_time, optimization_interval)
                if time.time() - last_save_time <= optimization_interval:# fetch this data point out
                    #print(time.time()- last_save_time, optimization_interval)
                    data_set.append(data)
                else:
                    pass
        #print('data set:--->',data_set)
        return data_set

2、优化筛选出来的数据

    def process_and_save(self):
        '''
        processing data and save into redis
        :return:
        '''
        print("33[42;1m---service data-----------------------33[0m")
        #print( self.client_id,self.service_name,self.data)
        if self.data['status'] ==0:# service data is valid
            for key,data_series_val in settings.STATUS_DATA_OPTIMIZATION.items():
                data_series_optimize_interval,max_data_point = data_series_val
                data_series_key_in_redis = "StatusData_%s_%s_%s" %(self.client_id,self.service_name,key)
                #print(data_series_key_in_redis,data_series_val)
                last_point_from_redis = self.redis_conn_obj.lrange(data_series_key_in_redis,-1,-1)
                if not last_point_from_redis: #this key is not exist in redis
                    # 第一次汇报时会执行这段
                    #so initialize a new key ,the first data point in the data set will only be used to identify that when  
                    #the data got saved last time
                    self.redis_conn_obj.rpush(data_series_key_in_redis,json.dumps([None,time.time()] ))

                if data_series_optimize_interval == 0:#this dataset is for unoptimized data, only the latest data no need to be optimized
                    self.redis_conn_obj.rpush(data_series_key_in_redis,json.dumps([self.data, time.time()]))
                    #不需要优化,直接存
                else: #data might needs to be optimized
                    #print("*****>>",self.redis_conn_obj.lrange(data_series_key_in_redis,-2,-1))
                    last_point_data,last_point_save_time =  
                        json.loads(self.redis_conn_obj.lrange(data_series_key_in_redis,-1,-1)[0].decode())

                    if time.time() - last_point_save_time >= data_series_optimize_interval: # reached the data point update interval ,
                        lastest_data_key_in_redis = "StatusData_%s_%s_latest" %(self.client_id,self.service_name)
                        print("calulating data for key:33[31;1m%s33[0m" %data_series_key_in_redis )
                        #最近n分钟的数据 已经取到了,放到了data_set里

                        data_set = self.get_data_slice(lastest_data_key_in_redis,data_series_optimize_interval) #拿到要优化的数据
                        print('--------------------------len dataset :',len(data_set))
                        if len(data_set)>0:
                            #接下来拿这个data_set交给下面这个方法,让它算出优化的结果 来
                            optimized_data = self.get_optimized_data(data_series_key_in_redis, data_set)
                            if optimized_data:
                                self.save_optimized_data(data_series_key_in_redis, optimized_data)
                #同时确保数据在redis中的存储数量不超过settings中指定 的值
                if self.redis_conn_obj.llen(data_series_key_in_redis) >= max_data_point:
                    self.redis_conn_obj.lpop(data_series_key_in_redis) #删除最旧的一个数据
                #self.redis_conn_obj.ltrim(data_series_key_in_redis,0,data_series_val[1])
        else:
            print("report data is invalid::",self.data)
            raise ValueError

3、把数据存储到redis

    def save_optimized_data(self,data_series_key_in_redis, optimized_data):
        '''
        save the optimized data into db
        :param optimized_data:
        :return:
        '''
        self.redis_conn_obj.rpush(data_series_key_in_redis, json.dumps([optimized_data, time.time()]))

4、存储临时数据并计算最大值、最小值、平均值

 1     def get_optimized_data(self,data_set_key, raw_service_data):
 2         '''
 3         calculate out avg,max,min,mid value from raw service data set
 4         :param data_set_key: where the optimized data needed to save to in redis db
 5         :param raw_service_data: raw service data data list
 6         :return:
 7         '''
 8         #index_init =[avg,max,min,mid]
 9         print("get_optimized_data:",raw_service_data[0] )
10         service_data_keys = raw_service_data[0][0].keys() #[iowait, idle,system...]
11         first_service_data_point = raw_service_data[0][0] # use this to build up a new empty dic
12         #print("--->",service_data_keys)
13         optimized_dic = {} #set a empty dic, will save optimized data later
14         if 'data' not  in service_data_keys: #means this dic has  no subdic, works for service like cpu,memory
15             for key in service_data_keys:
16                 optimized_dic[key] = []
17             #optimized_dic = optimized_dic.fromkeys(first_service_data_point,[])
18             tmp_data_dic = copy.deepcopy(optimized_dic)  #为了临时存最近n分钟的数据 ,把它们按照每个指标 都 搞成一个一个列表 ,来存最近N分钟的数据
19             print("tmp data dic:",tmp_data_dic)
20             for service_data_item,last_save_time in raw_service_data: #loop 最近n分钟的数据
21                 #print(service_data_item)
22                 for service_index,v in service_data_item.items(): #loop 每个数据点的指标service_index=iowait , v=33
23                     #print(service_index,v)
24                     try:
25                         tmp_data_dic[service_index].append(round(float(v),2)) #把这个点的当前这个指标 的值 添加到临时dict中
26                     except ValueError as e:
27                         pass
28                 #print(service_data_item,last_save_time)
29             #算临时字典里每个指标数据的平均值,最大值。。。,然后存到  optimized_dic   里
30             for service_k,v_list in tmp_data_dic.items():
31                 print(service_k, v_list)
32                 avg_res = self.get_average(v_list)
33                 max_res = self.get_max(v_list)
34                 min_res = self.get_min(v_list)
35                 mid_res = self.get_mid(v_list)
36                 optimized_dic[service_k]= [avg_res,max_res,min_res,mid_res]
37                 print(service_k, optimized_dic[service_k])
38 
39         else: # has sub dic inside key 'data', works for a service has multiple independent items, like many ethernet,disks...
40             #print("**************>>>",first_service_data_point )
41             for service_item_key,v_dic in first_service_data_point['data'].items():
42                 #service_item_key 相当于lo,eth0,... , v_dic ={ t_in:333,t_out:3353}
43                 optimized_dic[service_item_key] = {}
44                 for k2,v2 in v_dic.items():
45                     optimized_dic[service_item_key][k2] = [] #{etho0:{t_in:[],t_out:[]}}
46 
47             tmp_data_dic = copy.deepcopy(optimized_dic)
48             if tmp_data_dic: #some times this tmp_data_dic might be empty due to client report err
49                 print('tmp data dic:', tmp_data_dic)
50                 for service_data_item,last_save_time in raw_service_data:#loop最近n分钟数据
51                     for service_index,val_dic in service_data_item['data'].items():
52                         #print(service_index,val_dic)
53                         #service_index这个值 相当于eth0,eth1...
54                         for service_item_sub_key, val in val_dic.items():
55                             #上面这个service_item_sub_key相当于t_in,t_out
56                             #if service_index == 'lo':
57                             #print(service_index,service_item_sub_key,val)
58                             tmp_data_dic[service_index][service_item_sub_key].append(round(float(val),2))
59                             #上面的service_index变量相当于 eth0...
60                 for service_k,v_dic in tmp_data_dic.items():
61                     for service_sub_k,v_list in v_dic.items():
62                         print(service_k, service_sub_k, v_list)
63                         avg_res = self.get_average(v_list)
64                         max_res = self.get_max(v_list)
65                         min_res = self.get_min(v_list)
66                         mid_res = self.get_mid(v_list)
67                         optimized_dic[service_k][service_sub_k] = [avg_res,max_res,min_res,mid_res]
68                         print(service_k, service_sub_k, optimized_dic[service_k][service_sub_k])
69 
70             else:
71                 print("33[41;1mMust be sth wrong with client report data33[0m")
72         print("optimized empty dic:", optimized_dic)
73 
74         return optimized_dic
临时数据如何存储

5、获取平均值

    def get_average(self,data_set):
        '''
        calc the avg value of data set
        :param data_set:
        :return:
        '''
        if len(data_set) >0:
            return round(sum(data_set) /len(data_set),2)
        else:
            return 0

6、获取最大值

    def get_max(self,data_set):
        '''
        calc the max value of the data set
        :param data_set:
        :return:
        '''
        if len(data_set) >0:
            return max(data_set)
        else:
            return 0

7、获取最小值

    def get_min(self,data_set):
        '''
        calc the minimum value of the data set
        :param data_set:
        :return:
        '''
        if len(data_set) >0:
            return min(data_set)
        else:
            return 0

8、获取中位数

def get_mid(self,data_set):
    '''
    calc the mid value of the data set
    :param data_set:
    :return:
    '''
    data_set.sort()
    #[1,4,99,32,8,9,4,5,9]
    #[1,3,5,7,9,22,54,77]
    if len(data_set) >0:
        return data_set[  int(len(data_set)/2) ]
    else:
        return 0
9、完整代码
  1 #from s15CrazyMonitor import settings
  2 from  django.conf import settings
  3 import time ,json
  4 import copy
  5 
  6 class DataStore(object):
  7     '''
  8     processing the client reported service data , do some data optimiaztion and save it into redis DB
  9     '''
 10     def __init__(self, client_id,service_name, data,redis_obj):
 11         '''
 12 
 13         :param client_id:
 14         :param service_name:
 15         :param data: the client reported service clean data ,
 16         :return:
 17         '''
 18         self.client_id = client_id
 19         self.service_name = service_name
 20         self.data = data
 21         self.redis_conn_obj = redis_obj
 22         self.process_and_save()
 23 
 24     def get_data_slice(self,lastest_data_key,optimization_interval):
 25         '''
 26         :param optimization_interval: e.g: 600, means get latest 10 mins real data from redis
 27         :return:
 28         '''
 29         all_real_data = self.redis_conn_obj.lrange(lastest_data_key,1,-1)
 30         #print("get data range of:",lastest_data_key,optimization_interval)
 31         #print("get data range of:",all_real_data[-1])
 32         data_set = [] #存筛选出来符合条件的数据
 33         for item in all_real_data:
 34             #print(json.loads(item))
 35             data  = json.loads(item.decode())
 36             if len(data) ==2:
 37                 #print("real data item:",data[0],data[1])
 38                 service_data, last_save_time = data
 39                 #print('time:',time.time(), time.time()- last_save_time, optimization_interval)
 40                 if time.time() - last_save_time <= optimization_interval:# fetch this data point out
 41                     #print(time.time()- last_save_time, optimization_interval)
 42                     data_set.append(data)
 43                 else:
 44                     pass
 45         #print('data set:--->',data_set)
 46         return data_set
 47 
 48     def process_and_save(self):
 49         '''
 50         processing data and save into redis
 51         :return:
 52         '''
 53         print("33[42;1m---service data-----------------------33[0m")
 54         #print( self.client_id,self.service_name,self.data)
 55         if self.data['status'] ==0:# service data is valid
 56             for key,data_series_val in settings.STATUS_DATA_OPTIMIZATION.items():
 57                 data_series_optimize_interval,max_data_point = data_series_val
 58                 data_series_key_in_redis = "StatusData_%s_%s_%s" %(self.client_id,self.service_name,key)
 59                 #print(data_series_key_in_redis,data_series_val)
 60                 last_point_from_redis = self.redis_conn_obj.lrange(data_series_key_in_redis,-1,-1)
 61                 if not last_point_from_redis: #this key is not exist in redis
 62                     # 第一次汇报时会执行这段
 63                     #so initialize a new key ,the first data point in the data set will only be used to identify that when  
 64                     #the data got saved last time
 65                     self.redis_conn_obj.rpush(data_series_key_in_redis,json.dumps([None,time.time()] ))
 66 
 67                 if data_series_optimize_interval == 0:#this dataset is for unoptimized data, only the latest data no need to be optimized
 68                     self.redis_conn_obj.rpush(data_series_key_in_redis,json.dumps([self.data, time.time()]))
 69                     #不需要优化,直接存
 70                 else: #data might needs to be optimized
 71                     #print("*****>>",self.redis_conn_obj.lrange(data_series_key_in_redis,-2,-1))
 72                     last_point_data,last_point_save_time =  
 73                         json.loads(self.redis_conn_obj.lrange(data_series_key_in_redis,-1,-1)[0].decode())
 74 
 75                     if time.time() - last_point_save_time >= data_series_optimize_interval: # reached the data point update interval ,
 76                         lastest_data_key_in_redis = "StatusData_%s_%s_latest" %(self.client_id,self.service_name)
 77                         print("calulating data for key:33[31;1m%s33[0m" %data_series_key_in_redis )
 78                         #最近n分钟的数据 已经取到了,放到了data_set里
 79 
 80                         data_set = self.get_data_slice(lastest_data_key_in_redis,data_series_optimize_interval) #拿到要优化的数据
 81                         print('--------------------------len dataset :',len(data_set))
 82                         if len(data_set)>0:
 83                             #接下来拿这个data_set交给下面这个方法,让它算出优化的结果 来
 84                             optimized_data = self.get_optimized_data(data_series_key_in_redis, data_set)
 85                             if optimized_data:
 86                                 self.save_optimized_data(data_series_key_in_redis, optimized_data)
 87                 #同时确保数据在redis中的存储数量不超过settings中指定 的值
 88                 if self.redis_conn_obj.llen(data_series_key_in_redis) >= max_data_point:
 89                     self.redis_conn_obj.lpop(data_series_key_in_redis) #删除最旧的一个数据
 90                 #self.redis_conn_obj.ltrim(data_series_key_in_redis,0,data_series_val[1])
 91         else:
 92             print("report data is invalid::",self.data)
 93             raise ValueError
 94 
 95     def save_optimized_data(self,data_series_key_in_redis, optimized_data):
 96         '''
 97         save the optimized data into db
 98         :param optimized_data:
 99         :return:
100         '''
101         self.redis_conn_obj.rpush(data_series_key_in_redis, json.dumps([optimized_data, time.time()])   )
102 
103     def get_optimized_data(self,data_set_key, raw_service_data):
104         '''
105         calculate out avg,max,min,mid value from raw service data set
106         :param data_set_key: where the optimized data needed to save to in redis db
107         :param raw_service_data: raw service data data list
108         :return:
109         '''
110         #index_init =[avg,max,min,mid]
111         print("get_optimized_data:",raw_service_data[0] )
112         service_data_keys = raw_service_data[0][0].keys() #[iowait, idle,system...]
113         first_service_data_point = raw_service_data[0][0] # use this to build up a new empty dic
114         #print("--->",service_data_keys)
115         optimized_dic = {} #set a empty dic, will save optimized data later
116         if 'data' not  in service_data_keys: #means this dic has  no subdic, works for service like cpu,memory
117             for key in service_data_keys:
118                 optimized_dic[key] = []
119             #optimized_dic = optimized_dic.fromkeys(first_service_data_point,[])
120             tmp_data_dic = copy.deepcopy(optimized_dic)  #为了临时存最近n分钟的数据 ,把它们按照每个指标 都 搞成一个一个列表 ,来存最近N分钟的数据
121             print("tmp data dic:",tmp_data_dic)
122             for service_data_item,last_save_time in raw_service_data: #loop 最近n分钟的数据
123                 #print(service_data_item)
124                 for service_index,v in service_data_item.items(): #loop 每个数据点的指标service_index=iowait , v=33
125                     #print(service_index,v)
126                     try:
127                         tmp_data_dic[service_index].append(round(float(v),2)) #把这个点的当前这个指标 的值 添加到临时dict中
128                     except ValueError as e:
129                         pass
130                 #print(service_data_item,last_save_time)
131             #算临时字典里每个指标数据的平均值,最大值。。。,然后存到  optimized_dic   里
132             for service_k,v_list in tmp_data_dic.items():
133                 print(service_k, v_list)
134                 avg_res = self.get_average(v_list)
135                 max_res = self.get_max(v_list)
136                 min_res = self.get_min(v_list)
137                 mid_res = self.get_mid(v_list)
138                 optimized_dic[service_k]= [avg_res,max_res,min_res,mid_res]
139                 print(service_k, optimized_dic[service_k])
140 
141         else: # has sub dic inside key 'data', works for a service has multiple independent items, like many ethernet,disks...
142             #print("**************>>>",first_service_data_point )
143             for service_item_key,v_dic in first_service_data_point['data'].items():
144                 #service_item_key 相当于lo,eth0,... , v_dic ={ t_in:333,t_out:3353}
145                 optimized_dic[service_item_key] = {}
146                 for k2,v2 in v_dic.items():
147                     optimized_dic[service_item_key][k2] = [] #{etho0:{t_in:[],t_out:[]}}
148 
149             tmp_data_dic = copy.deepcopy(optimized_dic)
150             if tmp_data_dic: #some times this tmp_data_dic might be empty due to client report err
151                 print('tmp data dic:', tmp_data_dic)
152                 for service_data_item,last_save_time in raw_service_data:#loop最近n分钟数据
153                     for service_index,val_dic in service_data_item['data'].items():
154                         #print(service_index,val_dic)
155                         #service_index这个值 相当于eth0,eth1...
156                         for service_item_sub_key, val in val_dic.items():
157                             #上面这个service_item_sub_key相当于t_in,t_out
158                             #if service_index == 'lo':
159                             #print(service_index,service_item_sub_key,val)
160                             tmp_data_dic[service_index][service_item_sub_key].append(round(float(val),2))
161                             #上面的service_index变量相当于 eth0...
162                 for service_k,v_dic in tmp_data_dic.items():
163                     for service_sub_k,v_list in v_dic.items():
164                         print(service_k, service_sub_k, v_list)
165                         avg_res = self.get_average(v_list)
166                         max_res = self.get_max(v_list)
167                         min_res = self.get_min(v_list)
168                         mid_res = self.get_mid(v_list)
169                         optimized_dic[service_k][service_sub_k] = [avg_res,max_res,min_res,mid_res]
170                         print(service_k, service_sub_k, optimized_dic[service_k][service_sub_k])
171 
172             else:
173                 print("33[41;1mMust be sth wrong with client report data33[0m")
174         print("optimized empty dic:", optimized_dic)
175 
176         return optimized_dic
177 
178     def get_average(self,data_set):
179         '''
180         calc the avg value of data set
181         :param data_set:
182         :return:
183         '''
184         if len(data_set) >0:
185             return round(sum(data_set) /len(data_set),2)
186         else:
187             return 0
188 
189     def get_max(self,data_set):
190         '''
191         calc the max value of the data set
192         :param data_set:
193         :return:
194         '''
195         if len(data_set) >0:
196             return max(data_set)
197         else:
198             return 0
199 
200     def get_min(self,data_set):
201         '''
202         calc the minimum value of the data set
203         :param data_set:
204         :return:
205         '''
206         if len(data_set) >0:
207             return min(data_set)
208         else:
209             return 0
210     def get_mid(self,data_set):
211         '''
212         calc the mid value of the data set
213         :param data_set:
214         :return:
215         '''
216         data_set.sort()
217         #[1,4,99,32,8,9,4,5,9]
218         #[1,3,5,7,9,22,54,77]
219         if len(data_set) >0:
220             return data_set[  int(len(data_set)/2) ]
221         else:
222             return 0
data_optimization

2、redis_conn

import redis

def redis_conn(django_settings):
    #print(django_settings.REDIS_CONN)
    pool = redis.ConnectionPool(host=django_settings.REDIS_CONN['HOST'],
                                port=django_settings.REDIS_CONN['PORT'],
                                db=django_settings.REDIS_CONN['DB'])
    r = redis.Redis(connection_pool=pool)
    return  r

3、api_views

 1 from django.shortcuts import render,HttpResponse
 2 import json
 3 from django.views.decorators.csrf import csrf_exempt
 4 from monitor.backends import data_optimization
 5 from monitor.backends import redis_conn
 6 from django.conf import settings
 7 
 8 
 9 REDIS_OBJ = redis_conn.redis_conn(settings)
10 
11 print(REDIS_OBJ.set("test",32333))
12 
13 
14 from monitor.serializer import  ClientHandler
15 # Create your views here.
16 
17 
18 def client_config(request,client_id):
19 
20     config_obj = ClientHandler(client_id)
21     config = config_obj.fetch_configs()
22 
23     if config:
24         return HttpResponse(json.dumps(config))
25 @csrf_exempt
26 def service_report(request):
27     print("client data:",request.POST)
28 
29     if request.method == 'POST':
30         #REDIS_OBJ.set("test_alex",'hahaha')
31         try:
32             print('host=%s, service=%s' %(request.POST.get('client_id'),request.POST.get('service_name') ) )
33             data =  json.loads(request.POST['data'])
34             #print(data)
35             #StatusData_1_memory_latest
36             client_id = request.POST.get('client_id')
37             service_name = request.POST.get('service_name')
38             #把数据存下来
39             data_saveing_obj = data_optimization.DataStore(client_id,service_name,data,REDIS_OBJ)
40 
41             #redis_key_format = "StatusData_%s_%s_latest" %(client_id,service_name)
42             #data['report_time'] = time.time()
43             #REDIS_OBJ.lpush(redis_key_format,json.dumps(data))
44 
45         except IndexError as e:
46             print('----->err:',e)
47 
48     return HttpResponse(json.dumps("---report success---"))
api_views

4、settings

REDIS_CONN = {
    'HOST':'192.168.16.126',
    'PORT':6379,
    'DB':0,
}

STATUS_DATA_OPTIMIZATION = {
    'latest':[0,20], #0 存储真实数据,600个点
    '10mins':[600,4320], #1m, 每600s进行一次优化,存最大600个点
    '30mins':[1800,4320],#3m
    '60mins':[3600,8760], #365days
}

五、测试截图

0、获取所有的key

1、已经有key列表说明数据写到redis

2、cpu已经有2个数据了

3、控制台获取到数据

4、删除左边第一个值更新最后一个值

已经更新

 5、redis常用命令操作

原文地址:https://www.cnblogs.com/luoahong/p/9561866.html