AggregationServiceImpl.java 13.6 KB
Newer Older
1 2
package com.central.search.service.impl;

3
import cn.hutool.core.util.StrUtil;
4 5 6
import com.central.common.constant.CommonConstant;
import com.central.search.model.AggItemVo;
import com.central.search.service.IAggregationService;
7
import org.elasticsearch.action.search.SearchRequest;
8
import org.elasticsearch.action.search.SearchResponse;
9 10
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
11 12
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
13 14 15
import org.elasticsearch.search.aggregations.bucket.histogram.*;
import org.elasticsearch.search.aggregations.bucket.range.ParsedDateRange;
import org.elasticsearch.search.aggregations.bucket.range.Range;
16
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
17
import org.elasticsearch.search.aggregations.metrics.ParsedCardinality;
18 19
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
20 21
import org.springframework.stereotype.Service;

22
import java.io.IOException;
23
import java.time.LocalDate;
24
import java.time.LocalDateTime;
25 26
import java.time.ZoneId;
import java.time.ZonedDateTime;
27 28 29 30 31 32 33 34 35 36 37
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 聚合分析服务
 *
 * @author zlt
 * @date 2019/5/7
38 39 40
 * <p>
 * Blog: https://zlt2000.gitee.io
 * Github: https://github.com/zlt2000
41 42 43
 */
@Service
public class AggregationServiceImpl implements IAggregationService {
44 45 46 47 48
    private final ElasticsearchRestTemplate elasticsearchRestTemplate;

    public AggregationServiceImpl(ElasticsearchRestTemplate elasticsearchRestTemplate) {
        this.elasticsearchRestTemplate = elasticsearchRestTemplate;
    }
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65

    /**
     * 访问统计聚合查询,需要es里面提供以下结构的数据
     * {
     *    ip, //访问ip
     * 	  browser, //浏览器
     * 	  operatingSystem, //操作系统
     * 	  timestamp //日志时间
     * }
     *
     * @param indexName 索引名
     * @param routing es的路由
     * @return 返回结果样例如下
     * {
     *   "currDate_uv": 219,
     *   "currDate_pv": 2730,
     *   "currWeek_pv": 10309,
zlt2000's avatar
zlt2000 已提交
66
     *   "currHour_uv": 20,
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
     *   "browser_datas": [
     *     {
     *       "name": "CHROME",
     *       "value": 7416
     *     },
     *     {
     *       "name": "SAFARI",
     *       "value": 232
     *     },
     *     ...
     *   ],
     *   "browser_legendData": [
     *     "CHROME",
     *     "SAFARI",
     *     ...
     *   ],
     *   "operatingSystem_datas": [
     *     {
     *       "name": "WINDOWS_10",
     *       "value": 6123
     *     },
     *     {
     *       "name": "MAC_OS_X",
     *       "value": 1455
     *     },
     *     ...
     *   ],
     *   "currMonth_pv": 10311,
     *   "statWeek_uv": [
     *     487,
     *     219,
     *     ...
     *   ],
     *   "operatingSystem_legendData": [
     *     "WINDOWS_10",
     *     "MAC_OS_X",
     *     ...
     *   ],
     *   "statWeek_items": [
     *     "2019-05-08",
     *     "2019-05-09",
     *     ...
     *   ],
     *   "statWeek_pv": [
     *     7567,
     *     2730
     *     ...
     *   ]
     * }
     */
    @Override
118
    public Map<String, Object> requestStatAgg(String indexName, String routing) throws IOException {
119
        ZonedDateTime zonedDateTime = ZonedDateTime.now();
120
        LocalDate localDate = LocalDate.now();
121 122
        LocalDateTime curDateTime = LocalDateTime.now();

123 124 125 126 127 128 129 130 131
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        SearchRequest searchRequest = new SearchRequest(indexName);
        searchRequest.source(searchSourceBuilder).routing(routing);
        searchSourceBuilder.aggregation(
            //聚合查询当天的数据
            AggregationBuilders
                    .dateRange("currDate")
                    .field("timestamp")
                    .addRange(
132
                            zonedDateTime.withHour(0).withMinute(0).withSecond(0).withNano(0), zonedDateTime.plusDays(1)
133 134 135 136 137 138 139 140 141 142 143
                    )
                    .subAggregation(
                            AggregationBuilders
                                    .cardinality("uv")
                                    .field("ip.keyword")
                    )
        ).aggregation(
            //聚合查询7天内的数据
            AggregationBuilders
                    .dateRange("curr24Hour")
                    .field("timestamp")
144
                    .addRange(zonedDateTime.minusDays(1), zonedDateTime)
145 146 147 148 149
                    .subAggregation(
                            //聚合并且按小时分组查询当天内的数据
                            AggregationBuilders
                                    .dateHistogram("statDate")
                                    .field("timestamp")
150
                                    .fixedInterval(new DateHistogramInterval("90m"))
151 152
                                    .format(CommonConstant.DATETIME_FORMAT)
                                    //时区相差8小时
153
                                    .timeZone(ZoneId.of("GMT+8"))
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
                                    .minDocCount(0L)
                                    .extendedBounds(new ExtendedBounds(
                                            curDateTime.minusDays(1).format(DateTimeFormatter.ofPattern(CommonConstant.DATETIME_FORMAT)),
                                            curDateTime.format(DateTimeFormatter.ofPattern(CommonConstant.DATETIME_FORMAT))
                                    ))
                                    .subAggregation(
                                            AggregationBuilders
                                                    .cardinality("uv")
                                                    .field("ip.keyword")
                                    )
                    )
        ).aggregation(
            //聚合查询7天内的数据
            AggregationBuilders
                    .dateRange("currWeek")
                    .field("timestamp")
170
                    .addRange(zonedDateTime.minusDays(7), zonedDateTime)
171 172 173 174 175
                    .subAggregation(
                            //聚合并且按日期分组查询7天内的数据
                            AggregationBuilders
                                    .dateHistogram("statWeek")
                                    .field("timestamp")
176
                                    .calendarInterval(DateHistogramInterval.DAY)
177 178
                                    .format(CommonConstant.DATE_FORMAT)
                                    //时区相差8小时
179
                                    .timeZone(ZoneId.of("GMT+8"))
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
                                    .minDocCount(0L)
                                    .extendedBounds(new ExtendedBounds(
                                            localDate.minusDays(6).format(DateTimeFormatter.ofPattern(CommonConstant.DATE_FORMAT)),
                                            localDate.format(DateTimeFormatter.ofPattern(CommonConstant.DATE_FORMAT))
                                    ))
                                    .subAggregation(
                                            AggregationBuilders
                                                    .cardinality("uv")
                                                    .field("ip.keyword")
                                    )
                    )
        ).aggregation(
            //聚合查询30天内的数据
            AggregationBuilders
                    .dateRange("currMonth")
                    .field("timestamp")
196
                    .addRange(zonedDateTime.minusDays(30), zonedDateTime)
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
        ).aggregation(
            //聚合查询浏览器的数据
            AggregationBuilders
                    .terms("browser")
                    .field("browser.keyword")
        ).aggregation(
            //聚合查询操作系统的数据
            AggregationBuilders
                    .terms("operatingSystem")
                    .field("operatingSystem.keyword")
        ).aggregation(
            //聚合查询1小时内的数据
            AggregationBuilders
                    .dateRange("currHour")
                    .field("timestamp")
                    .addRange(
213
                            zonedDateTime.minusHours(1), zonedDateTime
214 215 216 217 218 219 220 221 222 223
                    )
                    .subAggregation(
                            AggregationBuilders
                                    .cardinality("uv")
                                    .field("ip.keyword")
                    )
        ).size(0);

        RestHighLevelClient client = elasticsearchRestTemplate.getClient();
        SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
224
        Aggregations aggregations = response.getAggregations();
225
        Map<String, Object> result = new HashMap<>(15);
226 227
        if (aggregations != null) {
            setCurrDate(result, aggregations);
zlt2000's avatar
zlt2000 已提交
228
            setCurr24Hour(result, aggregations);
229 230 231 232
            setCurrWeek(result, aggregations);
            setCurrMonth(result, aggregations);
            setTermsData(result, aggregations, "browser");
            setTermsData(result, aggregations, "operatingSystem");
zlt2000's avatar
zlt2000 已提交
233
            setCurrHour(result, aggregations);
234
        }
235 236 237 238 239 240
        return result;
    }
    /**
     * 赋值当天统计
     */
    private void setCurrDate(Map<String, Object> result, Aggregations aggregations) {
241 242
        ParsedDateRange currDate = aggregations.get("currDate");
        Range.Bucket bucket = currDate.getBuckets().get(0);
243
        ParsedCardinality cardinality = bucket.getAggregations().get("uv");
244 245
        result.put("currDate_pv", bucket.getDocCount());
        result.put("currDate_uv", cardinality.getValue());
zlt2000's avatar
zlt2000 已提交
246 247 248 249 250
    }
    /**
     * 赋值周统计
     */
    private void setCurr24Hour(Map<String, Object> result, Aggregations aggregations) {
251 252
        ParsedDateRange curr24Hour = aggregations.get("curr24Hour");
        Range.Bucket bucket = curr24Hour.getBuckets().get(0);
253 254
        //赋值天趋势统计
        setStatDate(result, bucket.getAggregations());
255 256 257 258 259
    }
    /**
     * 赋值周统计
     */
    private void setCurrWeek(Map<String, Object> result, Aggregations aggregations) {
260 261
        ParsedDateRange currWeek = aggregations.get("currWeek");
        Range.Bucket bucket = currWeek.getBuckets().get(0);
262
        result.put("currWeek_pv", bucket.getDocCount());
zlt2000's avatar
zlt2000 已提交
263 264
        //赋值周趋势统计
        setStatWeek(result, bucket.getAggregations());
265 266 267 268 269
    }
    /**
     * 赋值月统计
     */
    private void setCurrMonth(Map<String, Object> result, Aggregations aggregations) {
270 271
        ParsedDateRange currMonth = aggregations.get("currMonth");
        Range.Bucket bucket = currMonth.getBuckets().get(0);
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
        result.put("currMonth_pv", bucket.getDocCount());
    }
    /**
     * 赋值单字段统计
     */
    private void setTermsData(Map<String, Object> result, Aggregations aggregations, String key) {
        Terms terms = aggregations.get(key);
        List<String> legendData = new ArrayList<>();
        List<AggItemVo> datas = new ArrayList<>();
        for (Terms.Bucket bucket : terms.getBuckets()) {
            legendData.add((String)bucket.getKey());
            AggItemVo item = new AggItemVo();
            item.setName((String)bucket.getKey());
            item.setValue(bucket.getDocCount());
            datas.add(item);
        }
        result.put(key+"_legendData", legendData);
        result.put(key+"_datas", datas);
    }
    /**
     * 赋值周趋势统计
     */
    private void setStatWeek(Map<String, Object> result, Aggregations aggregations) {
295
        ParsedDateHistogram agg = aggregations.get("statWeek");
296 297 298
        List<String> items = new ArrayList<>();
        List<Long> uv = new ArrayList<>();
        List<Long> pv = new ArrayList<>();
299
        ParsedCardinality cardinality;
300
        for (Histogram.Bucket bucket : agg.getBuckets()) {
301 302 303 304 305 306 307 308 309 310
            items.add(bucket.getKeyAsString());
            pv.add(bucket.getDocCount());

            cardinality = bucket.getAggregations().get("uv");
            uv.add(cardinality.getValue());
        }
        result.put("statWeek_items", items);
        result.put("statWeek_uv", uv);
        result.put("statWeek_pv", pv);
    }
zlt2000's avatar
zlt2000 已提交
311 312 313 314
    /**
     * 赋值小时内统计-当前在线数
     */
    private void setCurrHour(Map<String, Object> result, Aggregations aggregations) {
315 316
        ParsedDateRange currDate = aggregations.get("currHour");
        Range.Bucket bucket = currDate.getBuckets().get(0);
317
        ParsedCardinality cardinality = bucket.getAggregations().get("uv");
zlt2000's avatar
zlt2000 已提交
318 319
        result.put("currHour_uv", cardinality.getValue());
    }
320 321 322 323
    /**
     * 赋值天趋势统计
     */
    private void setStatDate(Map<String, Object> result, Aggregations aggregations) {
324
        ParsedDateHistogram agg = aggregations.get("statDate");
325 326 327
        List<String> items = new ArrayList<>();
        List<Long> uv = new ArrayList<>();
        List<Long> pv = new ArrayList<>();
328
        ParsedCardinality cardinality;
329
        for (Histogram.Bucket bucket : agg.getBuckets()) {
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
            items.add(getTimeByDatetimeStr(bucket.getKeyAsString()));
            pv.add(bucket.getDocCount());

            cardinality = bucket.getAggregations().get("uv");
            uv.add(cardinality.getValue());
        }
        result.put("statDate_items", items);
        result.put("statDate_uv", uv);
        result.put("statDate_pv", pv);
    }

    /**
     * 2020-03-10 01:30:00 获取时间值:03-10 01:30
     * @return
     */
    private String getTimeByDatetimeStr(String datetimeStr) {
        if (StrUtil.isNotEmpty(datetimeStr)) {
            return datetimeStr.substring(5, 16);
        }
        return "";
    }
351
}