Elasticsearch使用DateHistogram聚合

    date_histogram是按照时间来构建集合(桶)Buckts的,当我们需要按照时间进行做一些数据统计的时候,就可以使用它来进行时间维度上构建指标分析.
    在前面几篇中我们用到的hitogram也是可以处理日期的,但是,它不能自动识别日期,只会把日期看做是数字,在聚合的时候是不知道日历时间间隔.

     // 查询
        "query": {
            "bool": {
                "must": [{
                    "range": {
                        "@timestamp": {
                            "gte": 1533556800000,
                            "lte": 1533806520000
                        }
                    }
                }]
            }
        },
        // 不显示具体的内容
        "size": 0,
        // 聚合
        "aggs": {
            // 自己取的聚合名字
            "group_by_grabTime": {
                // es提供的时间处理函数
                "date_histogram": {
                    // 需要聚合分组的字段名称, 类型需要为date, 格式没有要求
                    "field": "@timestamp",
                    // 按什么时间段聚合, 这里是5分钟, 可用的interval在上面给出
                    "interval": "5m",
                    // 设置时区, 这样就相当于东八区的时间
                    "time_zone":"+08:00",
                    // 返回值格式化,HH大写,不然不能区分上午、下午
                    "format": "yyyy-MM-dd HH",   
                    // 为空的话则填充0
                    "min_doc_count": 0,
                    // 需要填充0的范围
                    "extended_bounds": {
                        "min": 1533556800000,
                        "max": 1533806520000
                    }
                },
                // 聚合
                "aggs": {
                    // 自己取的名称
                    "group_by_status": {
                        // es提供
                        "terms": {
                            // 聚合字段名
                            "field": "LowStatusOfPrice"
                        }
                    }
                }
            }
        }
语法解释

统计实例

    def getapplist(self):
        body = {
            "size": 0,
            "aggs": {
                "res": {
                    "terms": {
                        "field": "appname"
                    }
                }
            }
        }
        res = self.es.search(body=body)
        applist = res["aggregations"]["res"]["buckets"]
        namelist=[]
        for app in applist:
            if app["key"]:
                namelist.append(app["key"])

        data = {'code': 'SUCCESS', 'message': '', 'data': namelist}
        return data
精确查找列表
    def getqpsdata(self,appname,startTime,endTime):
        appname = "dzgzpt-wsys"
        userChoiceTime_start = "2019-04-05 09:27:27.820"
        userChoiceTime_end = "2019-06-27 09:27:41.986"
        interval = tools.set_interval(userChoiceTime_start, userChoiceTime_end)
        body = {
            "size": 0,
            "query": {
                "filtered": {
                    "query":  {
                        "query_string": {
                            "analyze_wildcard": True,
                            "query": "appname:" + appname
                        }
                    },
                    "filter": {
                        "bool": {
                            "must": [
                                {
                                    "range": {
                                        "@timestamp": {
                                            "gte": tools.strtime_to_timestamp(userChoiceTime_start),
                                            "lte": tools.strtime_to_timestamp(userChoiceTime_end)
                                        }
                                    }
                                }
                            ]
                        }
                    }
                }
            },
            "aggs": {
                "res": {
                    "date_histogram": {
                        "field": "@timestamp",
                        "interval": "%s" % (interval),
                        "time_zone": "Asia/Shanghai",
                        "min_doc_count": 0,
                        "extended_bounds": {
                            "min": tools.strtime_to_timestamp(userChoiceTime_start),
                            "max": tools.strtime_to_timestamp(userChoiceTime_end)
                        }
                    }
                }
            }
        }

        res = self.es.search(body=body)
        qpslist = res["aggregations"]["res"]["buckets"]
        qlist = []
        for qps in qpslist:
            qlist.append({"time": qps["key_as_string"], "value": qps["doc_count"]})
        data = {'code': 'SUCCESS', 'message': '', 'data': qlist}
        return data
按照时间段和指定字段过滤
  def interfacedata(self,appname,startTime,endTime):

        appname = "dzgzpt-wsys"
        userChoiceTime_start = "2019-04-05 09:27:27.820"
        userChoiceTime_end = "2019-06-27 09:27:41.986"

        body = {
            "size": 0,
            "query": {
                "filtered": {
                    "query": {
                        "query_string": {
                            "analyze_wildcard": True,
                            "query": "appname:" + appname
                        }
                    },
                    "filter": {
                        "bool": {
                            "must": [
                                {
                                    "range": {
                                        "@timestamp": {
                                            "gte": tools.strtime_to_timestamp(userChoiceTime_start),
                                            "lte": tools.strtime_to_timestamp(userChoiceTime_end)
                                        }
                                    }
                                }
                            ]
                        }
                    }
                }
            },
            "aggs":  {
                "res": {
                    "terms": {
                        "field": "interface"
                    },
                    "aggs": {
                        "waste_time": {
                            "sum": {
                                "field": "waste_time"
                            }
                        },
                        "visit": {
                            "terms": {
                                "field": "success"
                            }
                        }
                    }
                }
            }
        }
        res = self.es.search(body=body)
        ilist=[]
        for interface in res["aggregations"]["res"]["buckets"]:
            wastetime = str(interface["waste_time"]["value"])
            requests = interface["doc_count"]
            err_express = [i["doc_count"] for i in interface["visit"]["buckets"] if i["key"] == "false"]
            errcount = err_express[0] if len(err_express) > 0 else 0
            ilist.append({"interface": interface["key"], "responseTime": wastetime, "requestCount": requests, "errRequest": errcount})

        data = {'code': 'SUCCESS', 'message': '', 'data': ilist}
        return data
对指定字段进行时间段统计
    def getdelaydata(self,appname,startTime,endTime):
        appname = "dzgzpt-wsys"
        userChoiceTime_start = "2019-04-05 09:27:27.820"
        userChoiceTime_end = "2019-06-27 09:27:41.986"
        interval = tools.set_interval(userChoiceTime_start, userChoiceTime_end)
        body = {
            "size": 0,
            "query": {
                "filtered": {
                    "query": {
                        "query_string": {
                            "analyze_wildcard": True,
                            "query": "appname:" + appname
                        }
                    },
                    "filter": {
                        "bool": {
                            "must": [
                                {
                                    "range": {
                                        "@timestamp": {
                                            "gte": tools.strtime_to_timestamp(userChoiceTime_start),
                                            "lte": tools.strtime_to_timestamp(userChoiceTime_end)
                                        }
                                    }
                                }
                            ]
                        }
                    }
                }
            },
            "aggs": {
                "res": {
                    "date_histogram": {
                        "field": "@timestamp",
                        "interval": "%s" % (interval),
                        "time_zone": "Asia/Shanghai",
                        "min_doc_count": 1,
                        "extended_bounds": {
                            "min": tools.strtime_to_timestamp(userChoiceTime_start),
                            "max": tools.strtime_to_timestamp(userChoiceTime_end)
                        }
                    },
                    "aggs":
                        {
                            "avg_wastetime":
                                {
                                    "avg":
                                        {
                                            "field": "waste_time"
                                        }
                                }
                        }
                }
            }
        }
        res = self.es.search(body=body)
        delaylist = res["aggregations"]["res"]["buckets"]
        dlist = []
        for de in delaylist:
            dlist.append({"time": de["key_as_string"], "value": de["avg_wastetime"]['value']})
        data = {'code': 'SUCCESS', 'message': '', 'data': dlist}
        return data
View Code

es使用scroll实现大数据分页

 created() 
    {
      this.getddll_data();
    },
    methods: {
      handleClick() {

      },
      getddll_data()
      {
        return this.$http.getdlList({"pageIndex":this.currentPage},{notify:true})
         .then((data) => 
          {
            this.dyllList=data.data;
            this.total=data.total;
          });
      },
      pageChange(pageIndex) 
      { 
        this.currentPage=pageIndex;
        this.getddll_data();
      }
   }
html
    def callLinkdata(self,jylsh,startTime,endTime,pageIndex):
        body = {
            "query": {
                "constant_score": {
                    "filter": {
                        "term": {
                            "jylsh": "5b64fceb42944a699ed6a140d853f74d"
                        }
                    }
                }
            }
        }

        pageIndex=int(pageIndex)
        #第一(次)页查询没有scroll_id
        if pageIndex == 1:
            res = self.es.search(body=body,scroll='1m',size=10)
            scroll_size = res['hits']['total']
            scroll_id = res['_scroll_id']
        else:
            res = self.es.search(body=body, scroll='1m', size=10)
            scroll_size = res['hits']['total']
            scroll_id = res['_scroll_id']
            while pageIndex -1 > 0:
             res = self.es.scroll(scroll_id=scroll_id, scroll='1m')
             pageIndex-=1

        reslist=[]
        t=1;
        for re in res["hits"]["hits"]:
            re = re["_source"]
            reslist.append({"ID":t,"csrq":re["logtime"],"yymc":re["appname"],"zt":re["success"],"ip":re["hostname"],"type":re["type"],"server":re["serviceId"]})
            t+=1
        data = {'type': 'SUCCESS', 'message': '', 'data': {"total":scroll_size,"data":reslist}}
        return data
后端

实现条件过滤和排序列表

    def logListdata(self,jylsh,startTime,endTime,pageIndex):
        body = {
            "query": {
                "filtered": {
                    "filter": {
                        "bool":{
                            "must":
                             [
                               {"term":{"jylsh": "5b64fceb42944a699ed6a140d853f74d"}},
                               {
                                  "range": {
                                     "@timestamp":
                                      {
                                       "gte": tools.strtime_to_timestamp("2019-05-23 14:19:20.559"),
                                       "lte": tools.strtime_to_timestamp("2019-05-23 14:38:54.716")
                                      }
                                  }
                               }
                             ]
                        }
                    }
                }
            },
            "sort": {"@timestamp": { "order": "desc" }}
        }

        pageIndex=int(pageIndex)
        #第一(次)页查询没有scroll_id
        if pageIndex == 1:
            res = self.es.search(body=body,scroll='1m',size=10)
            scroll_size = res['hits']['total']
            scroll_id = res['_scroll_id']
        else:
            res = self.es.search(body=body, scroll='1m', size=10)
            scroll_size = res['hits']['total']
            scroll_id = res['_scroll_id']
            while pageIndex -1 > 0:
             res = self.es.scroll(scroll_id=scroll_id, scroll='1m')
             pageIndex-=1

        reslist=[]
        t=1;
        for re in res["hits"]["hits"]:
            re = re["_source"]
            reslist.append({"a":re["logtime"],"b":re["appname"],"c":re["hostname"],"d":re["message"]})
            t+=1
        data = {'type': 'SUCCESS', 'message': '', 'data': {"total":scroll_size,"data":reslist}}
        return data
View Code

python处理前后端时间字符串统一格式

2019-06-21T08:23:58.169Z   #前端提交数据
2019-06-21 16:23:58.000     #后端数据格式要求


    #格式化日期字符串
    def formatDate(self,timestr):
        timeStruct = time.strptime(timestr, "%Y-%m-%d %H:%M:%S.%f")
        strTime = time.strftime("%Y-%m-%d %H:%M:%S", timeStruct)
        return strTime

    def formartStartTime(self,startTime):
        startTime=datetime.datetime.strptime(startTime, '%Y-%m-%dT%H:%M:%S.%fZ')+datetime.timedelta(hours=8)
        startTime=startTime.strftime('%Y-%m-%d %H:%M:%S.%f')[:-7]
        startTime=startTime+".000"
        return startTime

    def formartEndTime(self,endTime):
        endTime=datetime.datetime.strptime(endTime, '%Y-%m-%dT%H:%M:%S.%fZ')+datetime.timedelta(hours=8)
        endTime=endTime.strftime('%Y-%m-%d %H:%M:%S.%f')[:-7]
        endTime=endTime+".999"
        return endTime



#字符串转成日期格式
import datetime
detester = ‘2017-01-01'
date = datetime.datetime.strptime(detester,’%Y-%m-%d')


#日期格式转成字符串
import datetime
date = datetime.now()
detester = date.strftime(‘%Y-%m-%d')
View Code

python和vue实现最终分页效果

<template>
  <div class="topology-container">
    <bee-breadcrumb/>
    TraceID:
    <el-input
      v-model="input_TraceID"
      size="medium"
      placeholder="TraceID"
      style="350px;margin-right:15px;margin-bottom:15px">
    </el-input>
     &nbsp;&nbsp;
     <el-date-picker
      v-model="value2"
      type="datetimerange"
      :picker-options="pickerOptions"
      range-separator="至"
      start-placeholder="开始日期"
      end-placeholder="结束日期"
      align="right">
      </el-date-picker>
      &nbsp;&nbsp;
  <el-button type="primary" @click="handleQuery">查询</el-button>
  <el-tabs v-model="activeName" @tab-click="handleClick">
    <el-tab-pane label="调用链路" name="1">
        <bee-table :data="dyllList"
               :column-config="LINK_TABLE_COLUMN"
               :pageIndex="currentPage1"
               :pageSize="pageSize1"
               @size-change="handleSizeChange"
               @current-change="handleCurrentChange"
               :total="total1">
               <template slot="operation" slot-scope="{scope}">
                 <div class="el-progress el-progress--line el-progress--text-inside">
                  <div class="el-progress-bar">
                    <div class="el-progress-bar__outer" style="height:16px !important;">
                     <div class="el-progress-bar__inner" :style="styleObject(scope.row.sjz)">
                    <div class="el-progress-bar__innerText" style='color:black;font-size:15px'>{{scope.row.sjz}}ms</div>
                  </div>
                </div>
               </div>
              </div>
            </template>
    </bee-table>
    </el-tab-pane>
    <el-tab-pane label="日志详情" name="2">
     <bee-table :data="logList"
               :column-config="LOG_TABLE_COLUMN"
               :pageIndex="currentPage2"
               :pageSize="pageSize2"
               @size-change="handleSizeChange2"
               @current-change="handleCurrentChange2"
               :total="total2">
      </bee-table>
    </el-tab-pane>
  </el-tabs>
  </div>
</template>
<script>
  import BeeBreadcrumb from '@/common/component/BeeBreadcrumb'
  import BeeTable from '@/common/component/BeeTable'
  import utils from '@/common/utils'
  import { LINK_TABLE_COLUMN,LOG_TABLE_COLUMN  } from './constants'

  export default {
    components: {
      BeeBreadcrumb,
      BeeTable
    },
    mounted() {
      this.breadcrumbList = [
       
      ];
    },
    created()
    {
    },
    data()
    {
      return {
        total1: 0,         //总记录数
        total2:0,
        currentPage1:1,   //初始页
        currentPage2:1,
        pageSize1:10,    // 每页的数据
        pageSize2:10,
        dyllList: [],
        logList:[],
        tableLoading: false,
        LINK_TABLE_COLUMN,
        LOG_TABLE_COLUMN,
        pickerOptions:utils.setTimeOPtions(),
        value1: [new Date(2010, 10, 10, 10, 10), new Date(2010, 10, 11, 10, 10)],
        value2: '',
        tabname: "1",
        input_TraceID:"",
        startTime:"",
        endTime:""
      }
    },
    methods: {
      styleObject(sjz) {
           var color="#67c23a";
           if (sjz<=19)
           {
             sjz=19;
           }
           else if(sjz>20 && sjz<=60)
           {
             color="#e6a23c";
           }
           else if(sjz>60 && sjz<100)
           {
             color="#f56c6c";
           }
           else if(sjz>=100)
           {
              sjz=100;
              color="#f56c6c";
           }
           return {
              sjz+'%',
             backgroundColor:color
        }
      },
      handleClick(tab, event)
      {
         this.tabname=tab["name"];
      },
      getddll_data()
      {
         return this.$http.getdlList(
          {
            "pageIndex":this.currentPage1,
            "pageSize":this.pageSize1,
            "jylsh":this.input_TraceID,
            "startTime":this.startTime,
            "endTime":this.endTime
           },{notify:true})
         .then((data) => 
          {
            this.dyllList=data.list;
            this.total1=data.total;
          });
      },
      handleCurrentChange(pageIndex) 
      { 
        this.currentPage1=pageIndex;
        this.getddll_data();
      },
      handleSizeChange(pageSize) {
        this.currentPage1 = 1;
        this.pageSize1= pageSize;
        this.getddll_data();
      },
      getlogs_data()
      {
        return this.$http.getlogList(
          {
            "pageIndex":this.currentPage2,
            "pageSize":this.pageSize2,
            "jylsh":this.input_TraceID,
            "startTime":this.startTime,
            "endTime":this.endTime
           },{notify:true})
         .then((data) => 
          {
            this.logList=data.list;
            this.total2=data.total;
          });
      },
      handleCurrentChange2(pageIndex) 
      { 
        this.currentPage2=pageIndex;
        this.getlogs_data();
      },
      handleSizeChange2(pageSize) {
        this.currentPage2 = 1;
        this.pageSize2= pageSize;
        this.getlogs_data();
      },
      handleQuery()
      {
         if(this.input_TraceID=="" || this.value2 == "")
         {
            this.$message({
              showClose: true,
              message: 'TraceID和查询日期不能为空!',
              type: 'warning'
           });
         }
         else
         {
           this.startTime=this.value2[0];
           this.endTime=this.value2[1];
           if(this.tabname=="1")
           {
            this.getddll_data();
           }
           else
           {
            this.getlogs_data();
           }
        }
      }
    },
    computed: {
       activeName: {
          get() {
           return this.$route.query.index || '1';
          },
         set() {}
      }
  }
}
</script>
<style lang="scss" scoped>  
  .topology-container
   {
     margin:8px;
   }
</style>
vue
    #链路调用查询统计
    def callLinkdata(self,jylsh,startTime,endTime,pageIndex,pageSize):

        pageIndex=int(pageIndex)
        pageSize=int(pageSize)
        startTime=self.formartStartTime(startTime)
        endTime=self.formartEndTime(endTime)

        body = {
            "query": {
                "filtered": {
                    "filter": {
                        "bool":{
                            "must":
                             [
                               {"term":{"jylsh": jylsh}},
                               {
                                  "range": {
                                     "@timestamp":
                                      {
                                       "gte": tools.strtime_to_timestamp(startTime),
                                       "lte": tools.strtime_to_timestamp(endTime)
                                      }
                                  }
                               }
                             ]
                        }
                    }
                }
            },
            "sort": {"@timestamp": { "order": "desc" }}
        }

        #第一(次)页查询没有scroll_id
        if pageIndex == 1:
            res = self.es.search(body=body,scroll='1m',size=pageSize)
            scroll_size = res['hits']['total']
            scroll_id = res['_scroll_id']
        else:
            res = self.es.search(body=body, scroll='1m', size=pageSize)
            scroll_size = res['hits']['total']
            scroll_id = res['_scroll_id']
            while pageIndex -1 > 0:
             res = self.es.scroll(scroll_id=scroll_id, scroll='1m')
             pageIndex-=1

        reslist=[]
        for re in res["hits"]["hits"]:
            re = re["_source"]
            reslist.append({"csrq":self.formatDate(re["logtime"]),"yymc":re["appname"],"zt":re["success"],"ip":re["hostname"],"type":re["type"],"server":re["serviceId"],"sjz":re["waste_time"]})
        data = {'code': 'SUCCESS', 'message': '', 'data': {"total":scroll_size,"list":reslist}}
        return data

    #日志详情列表
    def logListdata(self,jylsh,startTime,endTime,pageIndex,pageSize):
        pageIndex = int(pageIndex)
        pageSize = int(pageSize)
        startTime = self.formartStartTime(startTime)
        endTime = self.formartEndTime(endTime)

        body = {
            "query": {
                "filtered": {
                    "filter": {
                        "bool":{
                            "must":
                             [
                               {"term":{"jylsh": jylsh}},
                               {
                                  "range": {
                                     "@timestamp":
                                      {
                                       "gte": tools.strtime_to_timestamp(startTime),
                                       "lte": tools.strtime_to_timestamp(endTime)
                                      }
                                  }
                               }
                             ]
                        }
                    }
                }
            },
            "sort": {"@timestamp": { "order": "desc" }}
        }

        #第一(次)页查询没有scroll_id
        if pageIndex == 1:
            res = self.es.search(body=body,scroll='1m',size=pageSize)
            scroll_size = res['hits']['total']
            scroll_id = res['_scroll_id']
        else:
            res = self.es.search(body=body, scroll='1m', size=pageSize)
            scroll_size = res['hits']['total']
            scroll_id = res['_scroll_id']
            while pageIndex -1 > 0:
             res = self.es.scroll(scroll_id=scroll_id, scroll='1m')
             pageIndex-=1

        reslist=[]
        for re in res["hits"]["hits"]:
            re = re["_source"]
            reslist.append({"addTime":self.formatDate(re["logtime"]),"appname":re["appname"],"hostname":re["hostname"],"message":re["message"]})
        data = {'code': 'SUCCESS', 'message': '', 'data': {"total":scroll_size,"list":reslist}}
        return data


    #格式化日期字符串
    def formatDate(self,timestr):
        timeStruct = time.strptime(timestr, "%Y-%m-%d %H:%M:%S.%f")
        strTime = time.strftime("%Y-%m-%d %H:%M:%S", timeStruct)
        return strTime

    def formartStartTime(self,startTime):
        startTime=datetime.datetime.strptime(startTime, '%Y-%m-%dT%H:%M:%S.%fZ')+datetime.timedelta(hours=8)
        startTime=startTime.strftime('%Y-%m-%d %H:%M:%S.%f')[:-7]
        startTime=startTime+".000"
        return startTime

    def formartEndTime(self,endTime):
        endTime=datetime.datetime.strptime(endTime, '%Y-%m-%dT%H:%M:%S.%fZ')+datetime.timedelta(hours=8)
        endTime=endTime.strftime('%Y-%m-%d %H:%M:%S.%f')[:-7]
        endTime=endTime+".999"
        return endTime
python

处理不同时区的时间格式转换

showDetail(traceID){
         this.$router.push({
          path: '/chain',
          query: {"traceID":traceID,"startTime":utils.converTimeFormat(this.startTtime),"endTime": utils.converTimeFormat(this.endTime)}
        });
      }



   converTimeFormat(d){
        d.setHours(d.getHours()-8);
        const resDate = d.getFullYear() + '-' + this.p((d.getMonth() + 1)) + '-' + this.p(d.getDate());
        const resTime = this.p(d.getHours()) + ':' + this.p(d.getMinutes()) + ':' + this.p(d.getSeconds());
        const datestr = resDate+" "+resTime;
        return datestr;
   },
   p(s){
         return s < 10 ? '0' + s : s;
  }




     this.input_TraceID = this.$route.query.traceID || '';
        const startTime=this.$route.query.startTime || '';
        const endTime=this.$route.query.endTime || '';
        if(this.input_TraceID){
          
         let start = new Date(startTime);
         start.setHours(start.getHours()+8);
         let end =new Date(endTime);
         end.setHours(end.getHours()+8);
         this.value2 = [start,end];

         this.startTime=startTime;
         this.endTime=endTime;
js处理
    def formartStartTime(self,startTime):
        try:
            startTime=datetime.datetime.strptime(startTime, '%Y-%m-%dT%H:%M:%S.%fZ')+datetime.timedelta(hours=8)
        except Exception as e:
            startTime = datetime.datetime.strptime(startTime, '%Y-%m-%d %H:%M:%S')+datetime.timedelta(hours=8)
        # startTime = datetime.datetime.strptime(startTime, '%Y-%m-%dT%H:%M:%S.%fZ') + datetime.timedelta(hours=8)
        startTime=startTime.strftime('%Y-%m-%d %H:%M:%S.%f')[:-7]
        startTime=startTime+".000"
        return startTime
python处理

实现定时查询

[root]# crontab -e
每个整点执行一次

0 */60 * * * cd /root/estasks;sh crontb.sh     


vi  crontb.sh  

#! /bin/bash
python /root/estasks/messageSend.py
python /root/estasks/messageService.py
python /root/estasks/smsService.py

echo $(date "+%Y-%m-%d %H:%M:%S") >> msg.log
      

vi msg.log

2019-09-06 07:00:01
2019-09-06 08:00:02
2019-09-06 09:00:01
2019-09-06 10:00:02
                             
View Code

crontab语法特点

   crontab本身是一个后台进程 它不能执行一个要求打开输入输出终端的命令

   主机上执行
       docker exec -it 934073c77e8e /root/a.sh

   crontab -e
      docker exec 934073c77e8e /root/a.sh
      这里docker命令不需要加-it,因为加-it就要开启了一个终端,而计划任务是无法进入任何终端

  

原文地址:https://www.cnblogs.com/yxh168/p/11022277.html