package com.central.search.service.impl;
import cn.hutool.core.util.StrUtil;
import com.central.common.constant.CommonConstant;
import com.central.search.model.AggItemVo;
import com.central.search.service.IAggregationService;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.*;
import org.elasticsearch.search.aggregations.bucket.range.ParsedDateRange;
import org.elasticsearch.search.aggregations.bucket.range.Range;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.ParsedCardinality;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 聚合分析服务
*
* @author zlt
* @date 2019/5/7
*
* Blog: https://zlt2000.gitee.io
* Github: https://github.com/zlt2000
*/
@Service
public class AggregationServiceImpl implements IAggregationService {
private final ElasticsearchRestTemplate elasticsearchRestTemplate;
public AggregationServiceImpl(ElasticsearchRestTemplate elasticsearchRestTemplate) {
this.elasticsearchRestTemplate = elasticsearchRestTemplate;
}
/**
* 访问统计聚合查询,需要es里面提供以下结构的数据
* {
* ip, //访问ip
* browser, //浏览器
* operatingSystem, //操作系统
* timestamp //日志时间
* }
*
* @param indexName 索引名
* @param routing es的路由
* @return 返回结果样例如下
* {
* "currDate_uv": 219,
* "currDate_pv": 2730,
* "currWeek_pv": 10309,
* "currHour_uv": 20,
* "browser_datas": [
* {
* "name": "CHROME",
* "value": 7416
* },
* {
* "name": "SAFARI",
* "value": 232
* },
* ...
* ],
* "browser_legendData": [
* "CHROME",
* "SAFARI",
* ...
* ],
* "operatingSystem_datas": [
* {
* "name": "WINDOWS_10",
* "value": 6123
* },
* {
* "name": "MAC_OS_X",
* "value": 1455
* },
* ...
* ],
* "currMonth_pv": 10311,
* "statWeek_uv": [
* 487,
* 219,
* ...
* ],
* "operatingSystem_legendData": [
* "WINDOWS_10",
* "MAC_OS_X",
* ...
* ],
* "statWeek_items": [
* "2019-05-08",
* "2019-05-09",
* ...
* ],
* "statWeek_pv": [
* 7567,
* 2730
* ...
* ]
* }
*/
@Override
public Map requestStatAgg(String indexName, String routing) throws IOException {
ZonedDateTime zonedDateTime = ZonedDateTime.now();
LocalDate localDate = LocalDate.now();
LocalDateTime curDateTime = LocalDateTime.now();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder).routing(routing);
searchSourceBuilder.aggregation(
//聚合查询当天的数据
AggregationBuilders
.dateRange("currDate")
.field("timestamp")
.addRange(
zonedDateTime.withHour(0).withMinute(0).withSecond(0).withNano(0), zonedDateTime.plusDays(1)
)
.subAggregation(
AggregationBuilders
.cardinality("uv")
.field("ip.keyword")
)
).aggregation(
//聚合查询7天内的数据
AggregationBuilders
.dateRange("curr24Hour")
.field("timestamp")
.addRange(zonedDateTime.minusDays(1), zonedDateTime)
.subAggregation(
//聚合并且按小时分组查询当天内的数据
AggregationBuilders
.dateHistogram("statDate")
.field("timestamp")
.fixedInterval(new DateHistogramInterval("90m"))
.format(CommonConstant.DATETIME_FORMAT)
//时区相差8小时
.timeZone(ZoneId.of(CommonConstant.TIME_ZONE_GMT8))
.minDocCount(0L)
.extendedBounds(new LongBounds(
curDateTime.minusDays(1).format(DateTimeFormatter.ofPattern(CommonConstant.DATETIME_FORMAT)),
curDateTime.format(DateTimeFormatter.ofPattern(CommonConstant.DATETIME_FORMAT))
))
.subAggregation(
AggregationBuilders
.cardinality("uv")
.field("ip.keyword")
)
)
).aggregation(
//聚合查询7天内的数据
AggregationBuilders
.dateRange("currWeek")
.field("timestamp")
.addRange(zonedDateTime.minusDays(7), zonedDateTime)
.subAggregation(
//聚合并且按日期分组查询7天内的数据
AggregationBuilders
.dateHistogram("statWeek")
.field("timestamp")
.calendarInterval(DateHistogramInterval.DAY)
.format(CommonConstant.DATE_FORMAT)
//时区相差8小时
.timeZone(ZoneId.of(CommonConstant.TIME_ZONE_GMT8))
.minDocCount(0L)
.extendedBounds(new LongBounds(
localDate.minusDays(6).format(DateTimeFormatter.ofPattern(CommonConstant.DATE_FORMAT)),
localDate.format(DateTimeFormatter.ofPattern(CommonConstant.DATE_FORMAT))
))
.subAggregation(
AggregationBuilders
.cardinality("uv")
.field("ip.keyword")
)
)
).aggregation(
//聚合查询30天内的数据
AggregationBuilders
.dateRange("currMonth")
.field("timestamp")
.addRange(zonedDateTime.minusDays(30), zonedDateTime)
).aggregation(
//聚合查询浏览器的数据
AggregationBuilders
.terms("browser")
.field("browser.keyword")
).aggregation(
//聚合查询操作系统的数据
AggregationBuilders
.terms("operatingSystem")
.field("operatingSystem.keyword")
).aggregation(
//聚合查询1小时内的数据
AggregationBuilders
.dateRange("currHour")
.field("timestamp")
.addRange(
zonedDateTime.minusHours(1), zonedDateTime
)
.subAggregation(
AggregationBuilders
.cardinality("uv")
.field("ip.keyword")
)
).size(0);
RestHighLevelClient client = elasticsearchRestTemplate.getClient();
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
Aggregations aggregations = response.getAggregations();
Map result = new HashMap<>(15);
if (aggregations != null) {
setCurrDate(result, aggregations);
setCurr24Hour(result, aggregations);
setCurrWeek(result, aggregations);
setCurrMonth(result, aggregations);
setTermsData(result, aggregations, "browser");
setTermsData(result, aggregations, "operatingSystem");
setCurrHour(result, aggregations);
}
return result;
}
/**
* 赋值当天统计
*/
private void setCurrDate(Map result, Aggregations aggregations) {
ParsedDateRange currDate = aggregations.get("currDate");
Range.Bucket bucket = currDate.getBuckets().get(0);
ParsedCardinality cardinality = bucket.getAggregations().get("uv");
result.put("currDate_pv", bucket.getDocCount());
result.put("currDate_uv", cardinality.getValue());
}
/**
* 赋值周统计
*/
private void setCurr24Hour(Map result, Aggregations aggregations) {
ParsedDateRange curr24Hour = aggregations.get("curr24Hour");
Range.Bucket bucket = curr24Hour.getBuckets().get(0);
//赋值天趋势统计
setStatDate(result, bucket.getAggregations());
}
/**
* 赋值周统计
*/
private void setCurrWeek(Map result, Aggregations aggregations) {
ParsedDateRange currWeek = aggregations.get("currWeek");
Range.Bucket bucket = currWeek.getBuckets().get(0);
result.put("currWeek_pv", bucket.getDocCount());
//赋值周趋势统计
setStatWeek(result, bucket.getAggregations());
}
/**
* 赋值月统计
*/
private void setCurrMonth(Map result, Aggregations aggregations) {
ParsedDateRange currMonth = aggregations.get("currMonth");
Range.Bucket bucket = currMonth.getBuckets().get(0);
result.put("currMonth_pv", bucket.getDocCount());
}
/**
* 赋值单字段统计
*/
private void setTermsData(Map result, Aggregations aggregations, String key) {
Terms terms = aggregations.get(key);
List legendData = new ArrayList<>();
List datas = new ArrayList<>();
for (Terms.Bucket bucket : terms.getBuckets()) {
legendData.add((String)bucket.getKey());
AggItemVo item = new AggItemVo();
item.setName((String)bucket.getKey());
item.setValue(bucket.getDocCount());
datas.add(item);
}
result.put(key+"_legendData", legendData);
result.put(key+"_datas", datas);
}
/**
* 赋值周趋势统计
*/
private void setStatWeek(Map result, Aggregations aggregations) {
ParsedDateHistogram agg = aggregations.get("statWeek");
List items = new ArrayList<>();
List uv = new ArrayList<>();
List pv = new ArrayList<>();
ParsedCardinality cardinality;
for (Histogram.Bucket bucket : agg.getBuckets()) {
items.add(bucket.getKeyAsString());
pv.add(bucket.getDocCount());
cardinality = bucket.getAggregations().get("uv");
uv.add(cardinality.getValue());
}
result.put("statWeek_items", items);
result.put("statWeek_uv", uv);
result.put("statWeek_pv", pv);
}
/**
* 赋值小时内统计-当前在线数
*/
private void setCurrHour(Map result, Aggregations aggregations) {
ParsedDateRange currDate = aggregations.get("currHour");
Range.Bucket bucket = currDate.getBuckets().get(0);
ParsedCardinality cardinality = bucket.getAggregations().get("uv");
result.put("currHour_uv", cardinality.getValue());
}
/**
* 赋值天趋势统计
*/
private void setStatDate(Map result, Aggregations aggregations) {
ParsedDateHistogram agg = aggregations.get("statDate");
List items = new ArrayList<>();
List uv = new ArrayList<>();
List pv = new ArrayList<>();
ParsedCardinality cardinality;
for (Histogram.Bucket bucket : agg.getBuckets()) {
items.add(getTimeByDatetimeStr(bucket.getKeyAsString()));
pv.add(bucket.getDocCount());
cardinality = bucket.getAggregations().get("uv");
uv.add(cardinality.getValue());
}
result.put("statDate_items", items);
result.put("statDate_uv", uv);
result.put("statDate_pv", pv);
}
/**
* 2020-03-10 01:30:00 获取时间值:03-10 01:30
* @return
*/
private String getTimeByDatetimeStr(String datetimeStr) {
if (StrUtil.isNotEmpty(datetimeStr)) {
return datetimeStr.substring(5, 16);
}
return "";
}
}