1、demo
public void test(){ //用过设置es服务的ip,端口号和协议建立es客户端工厂 RestClientBuilder builder = RestClient.builder(new HttpHost("10.50.31.110", 9200, "http")); //建立客户端 RestHighLevelClient client = new RestHighLevelClient(builder); //设置es的索引index和类型他type SearchRequest searchRequest = new SearchRequest("data_node"); searchRequest.types("data_node"); //查询工厂 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); //bool工厂,相当于sql中where条件,where之后的条件都写在这里 BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); //queryBuilder.should方法是或的逻辑,满足一个,queryBuilder.must是必须满足 queryBuilder.should(QueryBuilders.termQuery("data.name", "014")); //将where条件放入工厂 searchSourceBuilder.query(queryBuilder); //聚合工厂,相当于sql中的group by,如果es数据中是对象包含对象,就如下写法,以下就是以data对象中的name作为分组 AggregationBuilder termsBuilder = AggregationBuilders.terms("by_data.name").field("data.name"); //将聚合条件放入工厂 searchSourceBuilder.aggregation(termsBuilder); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = null; try { //执行请求 searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } //返回查询聚合后的结果,处理 Aggregations terms= searchResponse.getAggregations(); for (Aggregation a:terms){ ParsedTerms parsedTerms = (ParsedTerms) a; for (Terms.Bucket bucket : parsedTerms.getBuckets()) { log.info(bucket.getKeyAsString() + " " + bucket.getDocCount()); } } //返回查询到的具体数据 SearchHits searchHits = searchResponse.getHits(); for (SearchHit hit : searchHits.getHits()) { log.info(hit.getSourceAsString()); } }
2、范围查询
/** * 查询时间范围内所有接口调用次数 * @param index * @param type * @param startTime * @param endTime * @return * @throws IOException */ public SearchHits getCountByTimeRange(String index, String type, Long startTime, Long endTime) throws IOException { SearchRequest request = new SearchRequest(index); request.types(type); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); queryBuilder.must(QueryBuilders.rangeQuery("data.requestTime").gt(startTime).lt(endTime)); searchSourceBuilder.query(queryBuilder); request.source(searchSourceBuilder); SearchResponse searchResponse = client.search(request, RequestOptions.DEFAULT); SearchHits searchHits = searchResponse.getHits(); return searchHits; }
3、聚合设置过滤条件
/** * 聚合中设置过滤条件 * @param index * @param type * @param startTime * @param endTime * @return */ public Aggregations getOnDayInvokeAndError(String index, String type, Long startTime, Long endTime, Integer from, Integer size) throws Exception{ SearchRequest request = new SearchRequest(index); request.types(type); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); queryBuilder.must(QueryBuilders.rangeQuery("data.requestTime").gt(startTime).lt(endTime)) .must(QueryBuilders.termQuery("resource.logsource", "qqzx")); searchSourceBuilder.query(queryBuilder); //以interfaceCode分组,取interfaceName作为别名 AggregationBuilder termsBuilder = AggregationBuilders.terms("by_data.interfaceCode").field("data.interfaceCode").field("resource.interfaceName"); //设置过滤条件 AggregationBuilder filter = AggregationBuilders.filter("当日异常数量", QueryBuilders.boolQuery().must(QueryBuilders.termQuery("resource.status", "1"))); termsBuilder.subAggregation(filter); searchSourceBuilder.aggregation(termsBuilder).from(from).size(size); request.source(searchSourceBuilder); SearchResponse searchResponse = client.search(request, RequestOptions.DEFAULT); return searchResponse.getAggregations(); }
4、设置需要查询的字段和排除的字段
/** * * @param index * @param type * @param startTime * @param endTime * @return * @throws Exception */ public Set<String> getActiveInterfaceNotRegister(String index, String type, Long startTime, Long endTime, Page page) throws Exception { SearchRequest request = new SearchRequest(index); request.types(type); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); queryBuilder.must(QueryBuilders.rangeQuery("data.requestTime").gt(startTime).lt(endTime)); AggregationBuilder termsBuilder = AggregationBuilders.terms("by_data.interfaceCode").field("data.interfaceCode"); searchSourceBuilder.aggregation(termsBuilder);
//fetchSource中设置需要查询字段和排除字段 searchSourceBuilder.query(queryBuilder).fetchSource("data.interfaceCode", null).from(page.getOffset()).size(page.getLimit()); request.source(searchSourceBuilder); SearchResponse searchResponse = client.search(request, RequestOptions.DEFAULT); SearchHit[] hits = searchResponse.getHits().getHits(); Set<String> result = new HashSet<>(); for (SearchHit hit : hits) { String sourceAsString = hit.getSourceAsString(); JSONObject jsonObject = JSONObject.parseObject(sourceAsString); JSONObject data = jsonObject.getJSONObject("data"); result.add(data.getString("interfaceCode")); } return result; }