/**
* 一个通用的es聚合查询方法
* @param restHighLevelClient
* @param start
* @param end
* @param index
* @param aggregationName
* @param painlessScript
* @param existsQueryParams
* @param notExistsQueryParams
* @return
* @throws IOException
*/
private List<String> queryFromEs(RestHighLevelClient restHighLevelClient, String start, String end, String[] index, String aggregationName, String painlessScript, List<String> existsQueryParams, List<String> notExistsQueryParams) throws IOException {
List<String> resultList = new ArrayList<>();
//创建一个搜索源
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
//SearchRequest按一个或多个索引查询,需要一个SearchSourceBuilder,搜索源提供了搜索选项
SearchRequest searchRequest = new SearchRequest();
//聚合的条件
TermsAggregationBuilder aggregation = AggregationBuilders.terms(aggregationName).script(new Script(painlessScript)).size(Integer.MAX_VALUE).minDocCount(1).shardMinDocCount(0).showTermDocCountError(false);
//添加bool过滤器,进行条件查询
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
//must --时间必须满足
boolQueryBuilder.must(QueryBuilders.rangeQuery("TIME").gte(start).lte(end));
//存在的条件
for (String existsQueryParam : existsQueryParams) {
boolQueryBuilder.must(QueryBuilders.existsQuery(existsQueryParam));
}
//不存在的条件
for (String notExistsQueryParam : notExistsQueryParams) {
boolQueryBuilder.mustNot(QueryBuilders.existsQuery(notExistsQueryParam));
}
//定义sourceBuilder按时间排序,正序,再传入之前的查询条件,from 0 size 0 不查原始数据
sourceBuilder.sort("TIME", SortOrder.ASC).from(0).size(0).query(boolQueryBuilder).aggregation(aggregation);
//定义查询的索引,定义搜索源,即sourceBuilder对象
searchRequest.indices(index);
searchRequest.source(sourceBuilder);
log.info("-query from es index:{}, sql:{}-->", index, sourceBuilder);
//开始搜索,拿到结果
SearchResponse response = null;
response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//拿到聚合数据
Aggregations aggregations = response.getAggregations();
if (aggregations != null) {
ParsedStringTerms empAccountAggregationsTerms = aggregations.get(aggregationName);
List extends Terms.Bucket> buckets = empAccountAggregationsTerms.getBuckets();
for (Terms.Bucket bucket : buckets) {
log.info("--query from es index results:-->" + bucket.getKey().toString());
resultList.add(bucket.getKey().toString());
}
}
return resultList;
}