From 14800cbc57ae82dcad32b5edc34c17a3724caaa4 Mon Sep 17 00:00:00 2001 From: Wan Kai Date: Thu, 14 Jul 2022 10:40:21 +0800 Subject: [PATCH] Optimize elasticsearch query performance by using `_mGet` and physical index name (#9339) Optimize ElasticSearch query performance by using `_mGet` and physical index name rather than alias in these scenarios: (a) Metrics aggregation (b) Zipkin query (c) Metrics query (d) Log query --- docs/en/changes/changes.md | 1 + .../elasticsearch/ElasticSearchClient.java | 30 ++++++- .../elasticsearch/client/DocumentClient.java | 31 +++++++ .../requests/factory/DocumentFactory.java | 6 ++ .../factory/v6/V6DocumentFactory.java | 27 ++++++ .../factory/v7/V7DocumentFactory.java | 27 ++++++ .../elasticsearch/ITElasticSearchTest.java | 15 ++++ .../elasticsearch/base/MetricsEsDAO.java | 86 +++++++++---------- .../elasticsearch/base/TimeSeriesUtils.java | 28 ++++++ .../query/AggregationQueryEsDAO.java | 8 +- .../elasticsearch/query/LogQueryEsDAO.java | 10 ++- .../query/MetricsQueryEsDAO.java | 59 +++++++++---- .../query/zipkin/ZipkinQueryEsDAO.java | 9 +- .../base/TimeSeriesUtilsTest.java | 25 ++++++ 14 files changed, 291 insertions(+), 71 deletions(-) diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index a36927785a..65749a99e6 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -21,6 +21,7 @@ * Remove legacy OAL `percentile` functions, `p99`, `p95`, `p90`, `p75`, `p50` func(s). * Revert [#8066](https://github.com/apache/skywalking/pull/8066). Keep all metrics persistent even it is default value. * Skip loading UI templates if folder is empty or doesn't exist. +* Optimize ElasticSearch query performance by using `_mGet` and physical index name rather than alias in these scenarios, (a) Metrics aggregation (b) Zipkin query (c) Metrics query (d) Log query * Support the `NETWORK` type of eBPF Profiling task. * Support `sumHistogram` in `MAL`. * [Breaking Change] Make the eBPF Profiling task support to the service instance level, index/table `ebpf_profiling_task` is required to be re-created when bump up from previous releases. diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java index 44440d67fc..d80f8353dd 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java @@ -24,6 +24,8 @@ import com.google.common.collect.Iterables; import java.time.Duration; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -33,12 +35,13 @@ import java.util.function.Supplier; import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.library.elasticsearch.requests.search.Query; +import org.apache.skywalking.library.elasticsearch.response.Documents; import org.apache.skywalking.oap.server.library.util.StringUtil; import org.apache.skywalking.library.elasticsearch.ElasticSearch; import org.apache.skywalking.library.elasticsearch.ElasticSearchBuilder; import org.apache.skywalking.library.elasticsearch.ElasticSearchVersion; import org.apache.skywalking.library.elasticsearch.bulk.BulkProcessor; -import org.apache.skywalking.library.elasticsearch.requests.search.Query; import org.apache.skywalking.library.elasticsearch.requests.search.Search; import org.apache.skywalking.library.elasticsearch.requests.search.SearchParams; import org.apache.skywalking.library.elasticsearch.response.Document; @@ -300,7 +303,30 @@ public class ElasticSearchClient implements Client, HealthCheckable { return es.get().documents().exists(indexName, TYPE, id); } - public SearchResponse ids(String indexName, Iterable ids) { + + /** + * Provide to get documents from multi indices by IDs. + * @param indexIds key: indexName, value: ids list + * @return Documents + * @since 9.2.0 + */ + public Optional ids(Map> indexIds) { + Map> map = new HashMap<>(); + indexIds.forEach((indexName, ids) -> { + map.put(indexNameConverter.apply(indexName), ids); + }); + return es.get().documents().mGet(TYPE, map); + } + + /** + * Search by ids with index alias, when can not locate the physical index. + * Otherwise, recommend use method {@link #ids} + * @param indexName Index alias name or physical name + * @param ids ID list + * @return SearchResponse + * @since 9.2.0 this method was ids + */ + public SearchResponse searchIDs(String indexName, Iterable ids) { indexName = indexNameConverter.apply(indexName); return es.get().search(Search.builder() diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/DocumentClient.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/DocumentClient.java index f8023f6ebc..d20e6f6392 100644 --- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/DocumentClient.java +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/DocumentClient.java @@ -22,6 +22,7 @@ import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.util.Exceptions; import java.io.InputStream; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -32,6 +33,7 @@ import org.apache.skywalking.library.elasticsearch.ElasticSearchVersion; import org.apache.skywalking.library.elasticsearch.requests.IndexRequest; import org.apache.skywalking.library.elasticsearch.requests.UpdateRequest; import org.apache.skywalking.library.elasticsearch.response.Document; +import org.apache.skywalking.library.elasticsearch.response.Documents; @Slf4j @RequiredArgsConstructor @@ -83,6 +85,35 @@ public final class DocumentClient { return future.get(); } + @SneakyThrows + public Optional mGet(String type, Map> indexIds) { + final CompletableFuture> future = + version.thenCompose( + v -> client.execute(v.requestFactory().document().mget(type, indexIds)) + .aggregate().thenApply(response -> { + if (response.status() != HttpStatus.OK) { + throw new RuntimeException(response.contentUtf8()); + } + + try (final HttpData content = response.content(); + final InputStream is = content.toInputStream()) { + return Optional.of(v.codec().decode(is, Documents.class)); + } catch (Exception e) { + return Exceptions.throwUnsafely(e); + } + })); + future.whenComplete((result, exception) -> { + if (exception != null) { + log.error("Failed to get doc by indexIds {}", indexIds, exception); + return; + } + if (log.isDebugEnabled()) { + log.debug("Docs by indexIds {}: {}", indexIds, result); + } + }); + return future.get(); + } + @SneakyThrows public void index(IndexRequest request, Map params) { final CompletableFuture future = version.thenCompose( diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/DocumentFactory.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/DocumentFactory.java index ae77b977c8..946918fd5c 100644 --- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/DocumentFactory.java +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/DocumentFactory.java @@ -18,6 +18,7 @@ package org.apache.skywalking.library.elasticsearch.requests.factory; import com.linecorp.armeria.common.HttpRequest; +import java.util.List; import java.util.Map; import org.apache.skywalking.library.elasticsearch.requests.IndexRequest; import org.apache.skywalking.library.elasticsearch.requests.UpdateRequest; @@ -39,6 +40,11 @@ public interface DocumentFactory { */ HttpRequest mget(String index, String type, Iterable ids); + /** + * Returns a request to get multiple documents of {@code indexIds}. + */ + HttpRequest mget(final String type, final Map> indexIds); + /** * Returns a request to index a document with {@link IndexRequest}. */ diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6DocumentFactory.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6DocumentFactory.java index bfb1880d91..71fe55d7bc 100644 --- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6DocumentFactory.java +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6DocumentFactory.java @@ -21,6 +21,9 @@ import com.google.common.collect.ImmutableMap; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.HttpRequestBuilder; import com.linecorp.armeria.common.MediaType; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; @@ -91,6 +94,30 @@ final class V6DocumentFactory implements DocumentFactory { .build(); } + @SneakyThrows + @Override + public HttpRequest mget(final String type, final Map> indexIds) { + checkArgument(!isNullOrEmpty(type), "type cannot be null or empty"); + checkArgument(indexIds != null && !indexIds.isEmpty(), "ids cannot be null or empty"); + final List> indexIdList = new ArrayList<>(); + indexIds.forEach((index, ids) -> { + checkArgument(ids != null && !isEmpty(ids), "ids cannot be null or empty"); + ids.forEach(id -> { + indexIdList.add(ImmutableMap.of("_index", index, "_type", type, "_id", id)); + }); + }); + final Map>> m = ImmutableMap.of("docs", indexIdList); + final byte[] content = version.codec().encode(m); + if (log.isDebugEnabled()) { + log.debug("mget indexIds request: {}", new String(content, Charset.defaultCharset())); + } + + return HttpRequest.builder() + .get("/_mget") + .content(MediaType.JSON, content) + .build(); + } + @SneakyThrows @Override public HttpRequest index(IndexRequest request, Map params) { diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v7/V7DocumentFactory.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v7/V7DocumentFactory.java index 49d058d3fc..0c720d2044 100644 --- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v7/V7DocumentFactory.java +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v7/V7DocumentFactory.java @@ -21,6 +21,9 @@ import com.google.common.collect.ImmutableMap; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.HttpRequestBuilder; import com.linecorp.armeria.common.MediaType; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; @@ -88,6 +91,30 @@ final class V7DocumentFactory implements DocumentFactory { .build(); } + @SneakyThrows + @Override + public HttpRequest mget(final String type, final Map> indexIds) { + checkArgument(!isNullOrEmpty(type), "type cannot be null or empty"); + checkArgument(indexIds != null && !indexIds.isEmpty(), "ids cannot be null or empty"); + final List> indexIdList = new ArrayList<>(); + indexIds.forEach((index, ids) -> { + checkArgument(ids != null && !isEmpty(ids), "ids cannot be null or empty"); + ids.forEach(id -> { + indexIdList.add(ImmutableMap.of("_index", index, "_id", id)); + }); + }); + final Map>> m = ImmutableMap.of("docs", indexIdList); + final byte[] content = version.codec().encode(m); + if (log.isDebugEnabled()) { + log.debug("mget indexIds request: {}", new String(content, Charset.defaultCharset())); + } + + return HttpRequest.builder() + .get("/_doc/_mget") + .content(MediaType.JSON, content) + .build(); + } + @SneakyThrows @Override public HttpRequest index(IndexRequest request, Map params) { diff --git a/oap-server/server-library/library-elasticsearch-client/src/test/java/org/apache/skywalking/library/elasticsearch/ITElasticSearchTest.java b/oap-server/server-library/library-elasticsearch-client/src/test/java/org/apache/skywalking/library/elasticsearch/ITElasticSearchTest.java index b38e95f5bc..53ef4af118 100644 --- a/oap-server/server-library/library-elasticsearch-client/src/test/java/org/apache/skywalking/library/elasticsearch/ITElasticSearchTest.java +++ b/oap-server/server-library/library-elasticsearch-client/src/test/java/org/apache/skywalking/library/elasticsearch/ITElasticSearchTest.java @@ -19,14 +19,18 @@ package org.apache.skywalking.library.elasticsearch; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import lombok.RequiredArgsConstructor; import org.apache.skywalking.library.elasticsearch.client.TemplateClient; import org.apache.skywalking.library.elasticsearch.requests.IndexRequest; import org.apache.skywalking.library.elasticsearch.requests.search.Query; import org.apache.skywalking.library.elasticsearch.requests.search.Search; import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.Aggregation; +import org.apache.skywalking.library.elasticsearch.response.Document; +import org.apache.skywalking.library.elasticsearch.response.Documents; import org.apache.skywalking.library.elasticsearch.response.IndexTemplate; import org.apache.skywalking.library.elasticsearch.response.Mappings; import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse; @@ -273,6 +277,17 @@ public class ITElasticSearchTest { .get("buckets") ).size() ); + + //test mGet + Map> indexIdsGroup = new HashMap<>(); + indexIdsGroup.put("test-index", Arrays.asList("id1", "id2")); + Optional documents = client.documents().mGet(type, indexIdsGroup); + Map> result = new HashMap<>(); + for (final Document document : documents.get()) { + result.put(document.getId(), document.getSource()); + } + assertEquals(2, result.get("id1").size()); + assertEquals(2, result.get("id2").size()); }); } } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java index e0de22bac0..98f11abbda 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java @@ -19,11 +19,14 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.library.elasticsearch.response.Documents; import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse; import org.apache.skywalking.oap.server.core.analysis.DownSampling; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; @@ -35,11 +38,8 @@ import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.library.client.request.UpdateRequest; -import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.IndicesMetadataCache; import org.joda.time.DateTime; -import static java.util.stream.Collectors.groupingBy; - @Slf4j public class MetricsEsDAO extends EsDAO implements IMetricsDAO { protected final StorageBuilder storageBuilder; @@ -52,50 +52,48 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO { @Override public List multiGet(Model model, List metrics) { - Map> groupIndices - = metrics.stream() - .collect( - groupingBy(metric -> { - if (model.isTimeRelativeID()) { - // Try to use with timestamp index name(write index), - // if latest cache shows this name doesn't exist, - // then fail back to template alias name. - // This should only happen in very rare case, such as this is the time to create new index - // as a new day comes, and the index cache is pseudo real time. - // This case doesn't affect the result, just has lower performance due to using the alias name. - // Another case is that a removed index showing existing also due to latency, - // which could cause multiGet fails - // but this should not happen in the real runtime, TTL timer only removed the oldest indices, - // which should not have an update/insert. - String indexName = TimeSeriesUtils.writeIndexName(model, metric.getTimeBucket()); - // Format the name to follow the global physical index naming policy. - if (!IndicesMetadataCache.INSTANCE.isExisting( - getClient().formatIndexName(indexName))) { - indexName = IndexController.INSTANCE.getTableName(model); - } - return indexName; - } else { - // Metadata level metrics, always use alias name, due to the physical index of the records - // can't be located through timestamp. - return IndexController.INSTANCE.getTableName(model); - } - }) - ); - - // The groupIndices mostly include one or two group, - // the current day and the T-1 day(if at the edge between days) + Map> groupIndices = new HashMap<>(); List result = new ArrayList<>(metrics.size()); - groupIndices.forEach((tableName, metricList) -> { - List ids = metricList.stream() - .map(item -> IndexController.INSTANCE.generateDocId(model, item.id())) - .collect(Collectors.toList()); - final SearchResponse response = getClient().ids(tableName, ids); - response.getHits().getHits().forEach(hit -> { - Metrics source = storageBuilder.storage2Entity(new HashMapConverter.ToEntity(hit.getSource())); - result.add(source); + + if (model.isTimeRelativeID()) { + metrics.forEach(metric -> { + // Try to use with timestamp index name(write index), + String indexName = TimeSeriesUtils.writeIndexName(model, metric.getTimeBucket()); + groupIndices.computeIfAbsent(indexName, v -> new ArrayList<>()).add(metric); }); - }); + Map> indexIdsGroup = new HashMap<>(); + groupIndices.forEach((tableName, metricList) -> { + List ids = metricList.stream() + .map(item -> IndexController.INSTANCE.generateDocId(model, item.id())) + .collect(Collectors.toList()); + indexIdsGroup.put(tableName, ids); + }); + if (!indexIdsGroup.isEmpty()) { + final Optional response = getClient().ids(indexIdsGroup); + response.ifPresent(documents -> documents.forEach(document -> { + Metrics source = storageBuilder.storage2Entity(new HashMapConverter.ToEntity(document.getSource())); + result.add(source); + })); + } + } else { + metrics.forEach(metric -> { + // Metadata level metrics, always use alias name, due to the physical index of the records + // can't be located through timestamp. + String indexName = IndexController.INSTANCE.getTableName(model); + groupIndices.computeIfAbsent(indexName, v -> new ArrayList<>()).add(metric); + }); + groupIndices.forEach((tableName, metricList) -> { + List ids = metricList.stream() + .map(item -> IndexController.INSTANCE.generateDocId(model, item.id())) + .collect(Collectors.toList()); + final SearchResponse response = getClient().searchIDs(tableName, ids); + response.getHits().getHits().forEach(hit -> { + Metrics source = storageBuilder.storage2Entity(new HashMapConverter.ToEntity(hit.getSource())); + result.add(source); + }); + }); + } return result; } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtils.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtils.java index 175b944b7b..3e9e8e9b29 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtils.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtils.java @@ -24,7 +24,9 @@ import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.UnexpectedException; import org.apache.skywalking.oap.server.core.analysis.DownSampling; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.core.query.enumeration.Step; import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.apache.skywalking.oap.server.library.util.StringUtil; import org.joda.time.DateTime; import org.joda.time.Days; import org.joda.time.format.DateTimeFormat; @@ -80,6 +82,32 @@ public class TimeSeriesUtils { } } + public static String queryIndexName(String tableName, + long pointOfTB, + Step step, + boolean isRecord, + boolean isSuperDataSet) { + if (StringUtil.isBlank(tableName) || pointOfTB <= 0) { + throw new IllegalArgumentException( + "Arguments [tableName]: " + tableName + " can not be blank and [pointOfTB]: " + pointOfTB + " can not <= 0"); + } + if (isRecord && isSuperDataSet) { + return tableName + Const.LINE + compressTimeBucket(pointOfTB / 1000000, SUPER_DATASET_DAY_STEP); + } + switch (step) { + case DAY: + return tableName + Const.LINE + compressTimeBucket(pointOfTB, DAY_STEP); + case HOUR: + return tableName + Const.LINE + compressTimeBucket(pointOfTB / 100, DAY_STEP); + case MINUTE: + return tableName + Const.LINE + compressTimeBucket(pointOfTB / 10000, DAY_STEP); + case SECOND: + return tableName + Const.LINE + compressTimeBucket(pointOfTB / 1000000, DAY_STEP); + } + + throw new UnexpectedException("Failed to get the index name from tableName:" + tableName + ", pointOfTB:" + pointOfTB + ", step:" + step.name()); + } + /** * @return index name based on model definition and given time bucket. */ diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AggregationQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AggregationQueryEsDAO.java index bb1a13e42d..d99d4b0d8d 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AggregationQueryEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AggregationQueryEsDAO.java @@ -41,6 +41,7 @@ import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSear import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController; +import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameGenerator; public class AggregationQueryEsDAO extends EsDAO implements IAggregationQueryDAO { @@ -59,8 +60,6 @@ public class AggregationQueryEsDAO extends EsDAO implements IAggregationQueryDAO final SearchBuilder search = Search.builder(); final boolean asc = condition.getOrder().equals(Order.ASC); - final String tableName = - IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName()); if (CollectionUtils.isEmpty(additionalConditions) && IndexController.LogicIndicesRegister.isMetricTable(condition.getName())) { @@ -110,7 +109,10 @@ public class AggregationQueryEsDAO extends EsDAO implements IAggregationQueryDAO .collectMode(TermsAggregationBuilder.CollectMode.BREADTH_FIRST) .build()); - final SearchResponse response = getClient().search(tableName, search.build()); + final SearchResponse response = getClient().search(new TimeRangeIndexNameGenerator( + IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName()), + duration.getStartTimeBucketInSec(), + duration.getEndTimeBucketInSec()), search.build()); final List topNList = new ArrayList<>(); final Map idTerms = diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/LogQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/LogQueryEsDAO.java index a5d586fb83..c972897e74 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/LogQueryEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/LogQueryEsDAO.java @@ -44,6 +44,7 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder; +import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameGenerator; import static java.util.Objects.nonNull; import static org.apache.skywalking.oap.server.library.util.StringUtil.isNotEmpty; @@ -71,9 +72,6 @@ public class LogQueryEsDAO extends EsDAO implements ILogQueryDAO { final List tags, final List keywordsOfContent, final List excludingKeywordsOfContent) throws IOException { - final String index = - IndexController.LogicIndicesRegister.getPhysicalTableName(LogRecord.INDEX_NAME); - final BoolQueryBuilder query = Query.bool(); if (startSecondTB != 0 && endSecondTB != 0) { query.must(Query.range(Record.TIME_BUCKET).gte(startSecondTB).lte(endSecondTB)); @@ -138,7 +136,11 @@ public class LogQueryEsDAO extends EsDAO implements ILogQueryDAO { .size(limit) .from(from); - SearchResponse response = getClient().search(index, search.build()); + SearchResponse response = getClient().search(new TimeRangeIndexNameGenerator( + IndexController.LogicIndicesRegister.getPhysicalTableName(LogRecord.INDEX_NAME), + startSecondTB, + endSecondTB + ), search.build()); Logs logs = new Logs(); diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java index 118fcae216..31cb215e76 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import org.apache.skywalking.library.elasticsearch.requests.search.Query; import org.apache.skywalking.library.elasticsearch.requests.search.RangeQueryBuilder; @@ -29,6 +30,8 @@ import org.apache.skywalking.library.elasticsearch.requests.search.Search; import org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder; import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.Aggregation; import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.TermsAggregationBuilder; +import org.apache.skywalking.library.elasticsearch.response.Document; +import org.apache.skywalking.library.elasticsearch.response.Documents; import org.apache.skywalking.library.elasticsearch.response.search.SearchHit; import org.apache.skywalking.library.elasticsearch.response.search.SearchHits; import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse; @@ -48,6 +51,8 @@ import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController; +import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameGenerator; +import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeSeriesUtils; public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO { @@ -77,10 +82,10 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO { sourceBuilder.aggregation(entityIdAggregation); - final String index = - IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName()); - - final SearchResponse response = getClient().search(index, sourceBuilder.build()); + final SearchResponse response = getClient().search(new TimeRangeIndexNameGenerator( + IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName()), + duration.getStartTimeBucketInSec(), + duration.getEndTimeBucketInSec()), sourceBuilder.build()); final Map idTerms = (Map) response.getAggregations().get(Metrics.ENTITY_ID); @@ -101,19 +106,23 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO { String tableName = IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName()); final List pointOfTimes = duration.assembleDurationPoints(); + Map> indexIdsGroup = new HashMap<>(); + final List ids = pointOfTimes.stream().map(pointOfTime -> { String id = pointOfTime.id(condition.getEntity().buildId()); if (IndexController.LogicIndicesRegister.isMetricTable(condition.getName())) { id = IndexController.INSTANCE.generateDocId(condition.getName(), id); } + String indexName = TimeSeriesUtils.queryIndexName( + tableName, pointOfTime.getPoint(), duration.getStep(), false, false); + indexIdsGroup.computeIfAbsent(indexName, v -> new ArrayList<>()).add(id); return id; }).collect(Collectors.toList()); MetricsValues metricsValues = new MetricsValues(); - - SearchResponse response = getClient().ids(tableName, ids); - if (!response.getHits().getHits().isEmpty()) { - Map> idMap = toMap(response.getHits()); + Optional response = getClient().ids(indexIdsGroup); + if (response.isPresent()) { + Map> idMap = toMap(response.get()); // Label is null, because in readMetricsValues, no label parameter. IntValues intValues = metricsValues.getValues(); @@ -146,6 +155,8 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO { final List pointOfTimes = duration.assembleDurationPoints(); String tableName = IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName()); + Map> indexIdsGroup = new HashMap<>(); + boolean aggregationMode = !tableName.equals(condition.getName()); List ids = new ArrayList<>(pointOfTimes.size()); pointOfTimes.forEach(pointOfTime -> { @@ -154,16 +165,19 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO { id = IndexController.INSTANCE.generateDocId(condition.getName(), id); } ids.add(id); + String indexName = TimeSeriesUtils.queryIndexName( + tableName, pointOfTime.getPoint(), duration.getStep(), false, false); + indexIdsGroup.computeIfAbsent(indexName, v -> new ArrayList<>()).add(id); }); - SearchResponse response = getClient().ids(tableName, ids); + Optional response = getClient().ids(indexIdsGroup); Map idMap = new HashMap<>(); - if (!response.getHits().getHits().isEmpty()) { - for (final SearchHit hit : response.getHits()) { + if (response.isPresent()) { + for (final Document document : response.get()) { idMap.put( - hit.getId(), - new DataTable((String) hit.getSource().getOrDefault(valueColumnName, "")) + document.getId(), + new DataTable((String) document.getSource().getOrDefault(valueColumnName, "")) ); } } @@ -177,6 +191,8 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO { final List pointOfTimes = duration.assembleDurationPoints(); String tableName = IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName()); + Map> indexIdsGroup = new HashMap<>(); + boolean aggregationMode = !tableName.equals(condition.getName()); List ids = new ArrayList<>(pointOfTimes.size()); pointOfTimes.forEach(pointOfTime -> { @@ -185,15 +201,18 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO { id = IndexController.INSTANCE.generateDocId(condition.getName(), id); } ids.add(id); + String indexName = TimeSeriesUtils.queryIndexName( + tableName, pointOfTime.getPoint(), duration.getStep(), false, false); + indexIdsGroup.computeIfAbsent(indexName, v -> new ArrayList<>()).add(id); }); HeatMap heatMap = new HeatMap(); - SearchResponse response = getClient().ids(tableName, ids); - if (response.getHits().getHits().isEmpty()) { + Optional response = getClient().ids(indexIdsGroup); + if (!response.isPresent()) { return heatMap; } - Map> idMap = toMap(response.getHits()); + Map> idMap = toMap(response.get()); final int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName()); for (String id : ids) { @@ -276,4 +295,12 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO { } return result; } + + private Map> toMap(Documents documents) { + Map> result = new HashMap<>(); + for (final Document document : documents) { + result.put(document.getId(), document.getSource()); + } + return result; + } } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java index 4f71f5d066..8c8f2f196c 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java @@ -36,6 +36,7 @@ import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.B import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.TermsAggregationBuilder; import org.apache.skywalking.library.elasticsearch.response.search.SearchHit; import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.storage.query.IZipkinQueryDAO; import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter; import org.apache.skywalking.oap.server.core.zipkin.ZipkinServiceRelationTraffic; @@ -47,6 +48,7 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.library.util.StringUtil; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController; +import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameGenerator; import zipkin2.Endpoint; import zipkin2.Span; import zipkin2.storage.QueryRequest; @@ -137,7 +139,6 @@ public class ZipkinQueryEsDAO extends EsDAO implements IZipkinQueryDAO { public List> getTraces(final QueryRequest request) { final long startTimeMillis = request.endTs() - request.lookback(); final long endTimeMillis = request.endTs(); - String index = IndexController.LogicIndicesRegister.getPhysicalTableName(ZipkinSpanRecord.INDEX_NAME); BoolQueryBuilder query = Query.bool(); if (startTimeMillis > 0 && endTimeMillis > 0) { query.must(Query.range(ZipkinSpanRecord.TIMESTAMP_MILLIS) @@ -182,7 +183,11 @@ public class ZipkinQueryEsDAO extends EsDAO implements IZipkinQueryDAO { .order(BucketOrder.aggregation(ZipkinSpanRecord.TIMESTAMP_MILLIS, false)); SearchBuilder search = Search.builder().query(query).aggregation(traceIdAggregation); - SearchResponse traceIdResponse = getClient().search(index, search.build()); + SearchResponse traceIdResponse = getClient().search(new TimeRangeIndexNameGenerator( + IndexController.LogicIndicesRegister.getPhysicalTableName(ZipkinSpanRecord.INDEX_NAME), + TimeBucket.getRecordTimeBucket(startTimeMillis), + TimeBucket.getRecordTimeBucket(endTimeMillis) + ), search.build()); final Map idTerms = (Map) traceIdResponse.getAggregations().get(ZipkinSpanRecord.TRACE_ID); final List> buckets = diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java index f1e8b8dc08..25a0d735d5 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java @@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; import com.google.common.collect.Lists; import org.apache.skywalking.oap.server.core.analysis.DownSampling; +import org.apache.skywalking.oap.server.core.query.enumeration.Step; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.model.SQLDatabaseModelExtension; import org.junit.Assert; @@ -93,4 +94,28 @@ public class TimeSeriesUtilsTest { ); } + @Test + public void queryIndexNameTest() { + Assert.assertEquals( + "metrics-apdex-20220710", + TimeSeriesUtils.queryIndexName("metrics-apdex", 20220710111111L, Step.SECOND, false, false) + ); + Assert.assertEquals( + "metrics-apdex-20220710", + TimeSeriesUtils.queryIndexName("metrics-apdex", 202207101111L, Step.MINUTE, false, false) + ); + Assert.assertEquals( + "metrics-apdex-20220710", + TimeSeriesUtils.queryIndexName("metrics-apdex", 2022071011L, Step.HOUR, false, false) + ); + Assert.assertEquals( + "metrics-apdex-20220710", + TimeSeriesUtils.queryIndexName("metrics-apdex", 20220710L, Step.DAY, false, false) + ); + Assert.assertEquals( + "metrics-apdex-20220710", + TimeSeriesUtils.queryIndexName("metrics-apdex", 20220710111111L, Step.DAY, true, true) + ); + } + } -- GitLab