未验证 提交 14800cbc 编写于 作者: W Wan Kai 提交者: GitHub

Optimize elasticsearch query performance by using `_mGet` and physical index name (#9339)

Optimize ElasticSearch query performance by using `_mGet` and physical index name rather than alias in these scenarios:
(a) Metrics aggregation 
(b) Zipkin query
(c) Metrics query
(d) Log query
上级 63636fbe
......@@ -21,6 +21,7 @@
* Remove legacy OAL `percentile` functions, `p99`, `p95`, `p90`, `p75`, `p50` func(s).
* Revert [#8066](https://github.com/apache/skywalking/pull/8066). Keep all metrics persistent even it is default value.
* Skip loading UI templates if folder is empty or doesn't exist.
* Optimize ElasticSearch query performance by using `_mGet` and physical index name rather than alias in these scenarios, (a) Metrics aggregation (b) Zipkin query (c) Metrics query (d) Log query
* Support the `NETWORK` type of eBPF Profiling task.
* Support `sumHistogram` in `MAL`.
* [Breaking Change] Make the eBPF Profiling task support to the service instance level, index/table `ebpf_profiling_task` is required to be re-created when bump up from previous releases.
......
......@@ -24,6 +24,8 @@ import com.google.common.collect.Iterables;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
......@@ -33,12 +35,13 @@ import java.util.function.Supplier;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.response.Documents;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.library.elasticsearch.ElasticSearch;
import org.apache.skywalking.library.elasticsearch.ElasticSearchBuilder;
import org.apache.skywalking.library.elasticsearch.ElasticSearchVersion;
import org.apache.skywalking.library.elasticsearch.bulk.BulkProcessor;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
import org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
import org.apache.skywalking.library.elasticsearch.response.Document;
......@@ -300,7 +303,30 @@ public class ElasticSearchClient implements Client, HealthCheckable {
return es.get().documents().exists(indexName, TYPE, id);
}
public SearchResponse ids(String indexName, Iterable<String> ids) {
/**
* Provide to get documents from multi indices by IDs.
* @param indexIds key: indexName, value: ids list
* @return Documents
* @since 9.2.0
*/
public Optional<Documents> ids(Map<String, List<String>> indexIds) {
Map<String, List<String>> map = new HashMap<>();
indexIds.forEach((indexName, ids) -> {
map.put(indexNameConverter.apply(indexName), ids);
});
return es.get().documents().mGet(TYPE, map);
}
/**
* Search by ids with index alias, when can not locate the physical index.
* Otherwise, recommend use method {@link #ids}
* @param indexName Index alias name or physical name
* @param ids ID list
* @return SearchResponse
* @since 9.2.0 this method was ids
*/
public SearchResponse searchIDs(String indexName, Iterable<String> ids) {
indexName = indexNameConverter.apply(indexName);
return es.get().search(Search.builder()
......
......@@ -22,6 +22,7 @@ import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.util.Exceptions;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
......@@ -32,6 +33,7 @@ import org.apache.skywalking.library.elasticsearch.ElasticSearchVersion;
import org.apache.skywalking.library.elasticsearch.requests.IndexRequest;
import org.apache.skywalking.library.elasticsearch.requests.UpdateRequest;
import org.apache.skywalking.library.elasticsearch.response.Document;
import org.apache.skywalking.library.elasticsearch.response.Documents;
@Slf4j
@RequiredArgsConstructor
......@@ -83,6 +85,35 @@ public final class DocumentClient {
return future.get();
}
@SneakyThrows
public Optional<Documents> mGet(String type, Map<String, List<String>> indexIds) {
final CompletableFuture<Optional<Documents>> future =
version.thenCompose(
v -> client.execute(v.requestFactory().document().mget(type, indexIds))
.aggregate().thenApply(response -> {
if (response.status() != HttpStatus.OK) {
throw new RuntimeException(response.contentUtf8());
}
try (final HttpData content = response.content();
final InputStream is = content.toInputStream()) {
return Optional.of(v.codec().decode(is, Documents.class));
} catch (Exception e) {
return Exceptions.throwUnsafely(e);
}
}));
future.whenComplete((result, exception) -> {
if (exception != null) {
log.error("Failed to get doc by indexIds {}", indexIds, exception);
return;
}
if (log.isDebugEnabled()) {
log.debug("Docs by indexIds {}: {}", indexIds, result);
}
});
return future.get();
}
@SneakyThrows
public void index(IndexRequest request, Map<String, Object> params) {
final CompletableFuture<Void> future = version.thenCompose(
......
......@@ -18,6 +18,7 @@
package org.apache.skywalking.library.elasticsearch.requests.factory;
import com.linecorp.armeria.common.HttpRequest;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.library.elasticsearch.requests.IndexRequest;
import org.apache.skywalking.library.elasticsearch.requests.UpdateRequest;
......@@ -39,6 +40,11 @@ public interface DocumentFactory {
*/
HttpRequest mget(String index, String type, Iterable<String> ids);
/**
* Returns a request to get multiple documents of {@code indexIds}.
*/
HttpRequest mget(final String type, final Map<String, List<String>> indexIds);
/**
* Returns a request to index a document with {@link IndexRequest}.
*/
......
......@@ -21,6 +21,9 @@ import com.google.common.collect.ImmutableMap;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpRequestBuilder;
import com.linecorp.armeria.common.MediaType;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
......@@ -91,6 +94,30 @@ final class V6DocumentFactory implements DocumentFactory {
.build();
}
@SneakyThrows
@Override
public HttpRequest mget(final String type, final Map<String, List<String>> indexIds) {
checkArgument(!isNullOrEmpty(type), "type cannot be null or empty");
checkArgument(indexIds != null && !indexIds.isEmpty(), "ids cannot be null or empty");
final List<Map<String, String>> indexIdList = new ArrayList<>();
indexIds.forEach((index, ids) -> {
checkArgument(ids != null && !isEmpty(ids), "ids cannot be null or empty");
ids.forEach(id -> {
indexIdList.add(ImmutableMap.of("_index", index, "_type", type, "_id", id));
});
});
final Map<String, Iterable<Map<String, String>>> m = ImmutableMap.of("docs", indexIdList);
final byte[] content = version.codec().encode(m);
if (log.isDebugEnabled()) {
log.debug("mget indexIds request: {}", new String(content, Charset.defaultCharset()));
}
return HttpRequest.builder()
.get("/_mget")
.content(MediaType.JSON, content)
.build();
}
@SneakyThrows
@Override
public HttpRequest index(IndexRequest request, Map<String, ?> params) {
......
......@@ -21,6 +21,9 @@ import com.google.common.collect.ImmutableMap;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpRequestBuilder;
import com.linecorp.armeria.common.MediaType;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
......@@ -88,6 +91,30 @@ final class V7DocumentFactory implements DocumentFactory {
.build();
}
@SneakyThrows
@Override
public HttpRequest mget(final String type, final Map<String, List<String>> indexIds) {
checkArgument(!isNullOrEmpty(type), "type cannot be null or empty");
checkArgument(indexIds != null && !indexIds.isEmpty(), "ids cannot be null or empty");
final List<Map<String, String>> indexIdList = new ArrayList<>();
indexIds.forEach((index, ids) -> {
checkArgument(ids != null && !isEmpty(ids), "ids cannot be null or empty");
ids.forEach(id -> {
indexIdList.add(ImmutableMap.of("_index", index, "_id", id));
});
});
final Map<String, Iterable<Map<String, String>>> m = ImmutableMap.of("docs", indexIdList);
final byte[] content = version.codec().encode(m);
if (log.isDebugEnabled()) {
log.debug("mget indexIds request: {}", new String(content, Charset.defaultCharset()));
}
return HttpRequest.builder()
.get("/_doc/_mget")
.content(MediaType.JSON, content)
.build();
}
@SneakyThrows
@Override
public HttpRequest index(IndexRequest request, Map<String, ?> params) {
......
......@@ -19,14 +19,18 @@ package org.apache.skywalking.library.elasticsearch;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.library.elasticsearch.client.TemplateClient;
import org.apache.skywalking.library.elasticsearch.requests.IndexRequest;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.Aggregation;
import org.apache.skywalking.library.elasticsearch.response.Document;
import org.apache.skywalking.library.elasticsearch.response.Documents;
import org.apache.skywalking.library.elasticsearch.response.IndexTemplate;
import org.apache.skywalking.library.elasticsearch.response.Mappings;
import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
......@@ -273,6 +277,17 @@ public class ITElasticSearchTest {
.get("buckets")
).size()
);
//test mGet
Map<String, List<String>> indexIdsGroup = new HashMap<>();
indexIdsGroup.put("test-index", Arrays.asList("id1", "id2"));
Optional<Documents> documents = client.documents().mGet(type, indexIdsGroup);
Map<String, Map<String, Object>> result = new HashMap<>();
for (final Document document : documents.get()) {
result.put(document.getId(), document.getSource());
}
assertEquals(2, result.get("id1").size());
assertEquals(2, result.get("id2").size());
});
}
}
......@@ -19,11 +19,14 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.library.elasticsearch.response.Documents;
import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
......@@ -35,11 +38,8 @@ import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.IndicesMetadataCache;
import org.joda.time.DateTime;
import static java.util.stream.Collectors.groupingBy;
@Slf4j
public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
protected final StorageBuilder<Metrics> storageBuilder;
......@@ -52,50 +52,48 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
@Override
public List<Metrics> multiGet(Model model, List<Metrics> metrics) {
Map<String, List<Metrics>> groupIndices
= metrics.stream()
.collect(
groupingBy(metric -> {
if (model.isTimeRelativeID()) {
// Try to use with timestamp index name(write index),
// if latest cache shows this name doesn't exist,
// then fail back to template alias name.
// This should only happen in very rare case, such as this is the time to create new index
// as a new day comes, and the index cache is pseudo real time.
// This case doesn't affect the result, just has lower performance due to using the alias name.
// Another case is that a removed index showing existing also due to latency,
// which could cause multiGet fails
// but this should not happen in the real runtime, TTL timer only removed the oldest indices,
// which should not have an update/insert.
String indexName = TimeSeriesUtils.writeIndexName(model, metric.getTimeBucket());
// Format the name to follow the global physical index naming policy.
if (!IndicesMetadataCache.INSTANCE.isExisting(
getClient().formatIndexName(indexName))) {
indexName = IndexController.INSTANCE.getTableName(model);
}
return indexName;
} else {
// Metadata level metrics, always use alias name, due to the physical index of the records
// can't be located through timestamp.
return IndexController.INSTANCE.getTableName(model);
}
})
);
// The groupIndices mostly include one or two group,
// the current day and the T-1 day(if at the edge between days)
Map<String, List<Metrics>> groupIndices = new HashMap<>();
List<Metrics> result = new ArrayList<>(metrics.size());
groupIndices.forEach((tableName, metricList) -> {
List<String> ids = metricList.stream()
.map(item -> IndexController.INSTANCE.generateDocId(model, item.id()))
.collect(Collectors.toList());
final SearchResponse response = getClient().ids(tableName, ids);
response.getHits().getHits().forEach(hit -> {
Metrics source = storageBuilder.storage2Entity(new HashMapConverter.ToEntity(hit.getSource()));
result.add(source);
if (model.isTimeRelativeID()) {
metrics.forEach(metric -> {
// Try to use with timestamp index name(write index),
String indexName = TimeSeriesUtils.writeIndexName(model, metric.getTimeBucket());
groupIndices.computeIfAbsent(indexName, v -> new ArrayList<>()).add(metric);
});
});
Map<String, List<String>> indexIdsGroup = new HashMap<>();
groupIndices.forEach((tableName, metricList) -> {
List<String> ids = metricList.stream()
.map(item -> IndexController.INSTANCE.generateDocId(model, item.id()))
.collect(Collectors.toList());
indexIdsGroup.put(tableName, ids);
});
if (!indexIdsGroup.isEmpty()) {
final Optional<Documents> response = getClient().ids(indexIdsGroup);
response.ifPresent(documents -> documents.forEach(document -> {
Metrics source = storageBuilder.storage2Entity(new HashMapConverter.ToEntity(document.getSource()));
result.add(source);
}));
}
} else {
metrics.forEach(metric -> {
// Metadata level metrics, always use alias name, due to the physical index of the records
// can't be located through timestamp.
String indexName = IndexController.INSTANCE.getTableName(model);
groupIndices.computeIfAbsent(indexName, v -> new ArrayList<>()).add(metric);
});
groupIndices.forEach((tableName, metricList) -> {
List<String> ids = metricList.stream()
.map(item -> IndexController.INSTANCE.generateDocId(model, item.id()))
.collect(Collectors.toList());
final SearchResponse response = getClient().searchIDs(tableName, ids);
response.getHits().getHits().forEach(hit -> {
Metrics source = storageBuilder.storage2Entity(new HashMapConverter.ToEntity(hit.getSource()));
result.add(source);
});
});
}
return result;
}
......
......@@ -24,7 +24,9 @@ import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.query.enumeration.Step;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.joda.time.DateTime;
import org.joda.time.Days;
import org.joda.time.format.DateTimeFormat;
......@@ -80,6 +82,32 @@ public class TimeSeriesUtils {
}
}
public static String queryIndexName(String tableName,
long pointOfTB,
Step step,
boolean isRecord,
boolean isSuperDataSet) {
if (StringUtil.isBlank(tableName) || pointOfTB <= 0) {
throw new IllegalArgumentException(
"Arguments [tableName]: " + tableName + " can not be blank and [pointOfTB]: " + pointOfTB + " can not <= 0");
}
if (isRecord && isSuperDataSet) {
return tableName + Const.LINE + compressTimeBucket(pointOfTB / 1000000, SUPER_DATASET_DAY_STEP);
}
switch (step) {
case DAY:
return tableName + Const.LINE + compressTimeBucket(pointOfTB, DAY_STEP);
case HOUR:
return tableName + Const.LINE + compressTimeBucket(pointOfTB / 100, DAY_STEP);
case MINUTE:
return tableName + Const.LINE + compressTimeBucket(pointOfTB / 10000, DAY_STEP);
case SECOND:
return tableName + Const.LINE + compressTimeBucket(pointOfTB / 1000000, DAY_STEP);
}
throw new UnexpectedException("Failed to get the index name from tableName:" + tableName + ", pointOfTB:" + pointOfTB + ", step:" + step.name());
}
/**
* @return index name based on model definition and given time bucket.
*/
......
......@@ -41,6 +41,7 @@ import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSear
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameGenerator;
public class AggregationQueryEsDAO extends EsDAO implements IAggregationQueryDAO {
......@@ -59,8 +60,6 @@ public class AggregationQueryEsDAO extends EsDAO implements IAggregationQueryDAO
final SearchBuilder search = Search.builder();
final boolean asc = condition.getOrder().equals(Order.ASC);
final String tableName =
IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName());
if (CollectionUtils.isEmpty(additionalConditions)
&& IndexController.LogicIndicesRegister.isMetricTable(condition.getName())) {
......@@ -110,7 +109,10 @@ public class AggregationQueryEsDAO extends EsDAO implements IAggregationQueryDAO
.collectMode(TermsAggregationBuilder.CollectMode.BREADTH_FIRST)
.build());
final SearchResponse response = getClient().search(tableName, search.build());
final SearchResponse response = getClient().search(new TimeRangeIndexNameGenerator(
IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName()),
duration.getStartTimeBucketInSec(),
duration.getEndTimeBucketInSec()), search.build());
final List<SelectedRecord> topNList = new ArrayList<>();
final Map<String, Object> idTerms =
......
......@@ -44,6 +44,7 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameGenerator;
import static java.util.Objects.nonNull;
import static org.apache.skywalking.oap.server.library.util.StringUtil.isNotEmpty;
......@@ -71,9 +72,6 @@ public class LogQueryEsDAO extends EsDAO implements ILogQueryDAO {
final List<Tag> tags,
final List<String> keywordsOfContent,
final List<String> excludingKeywordsOfContent) throws IOException {
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(LogRecord.INDEX_NAME);
final BoolQueryBuilder query = Query.bool();
if (startSecondTB != 0 && endSecondTB != 0) {
query.must(Query.range(Record.TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
......@@ -138,7 +136,11 @@ public class LogQueryEsDAO extends EsDAO implements ILogQueryDAO {
.size(limit)
.from(from);
SearchResponse response = getClient().search(index, search.build());
SearchResponse response = getClient().search(new TimeRangeIndexNameGenerator(
IndexController.LogicIndicesRegister.getPhysicalTableName(LogRecord.INDEX_NAME),
startSecondTB,
endSecondTB
), search.build());
Logs logs = new Logs();
......
......@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.RangeQueryBuilder;
......@@ -29,6 +30,8 @@ import org.apache.skywalking.library.elasticsearch.requests.search.Search;
import org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.Aggregation;
import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.TermsAggregationBuilder;
import org.apache.skywalking.library.elasticsearch.response.Document;
import org.apache.skywalking.library.elasticsearch.response.Documents;
import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
import org.apache.skywalking.library.elasticsearch.response.search.SearchHits;
import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
......@@ -48,6 +51,8 @@ import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameGenerator;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeSeriesUtils;
public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
......@@ -77,10 +82,10 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
sourceBuilder.aggregation(entityIdAggregation);
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName());
final SearchResponse response = getClient().search(index, sourceBuilder.build());
final SearchResponse response = getClient().search(new TimeRangeIndexNameGenerator(
IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName()),
duration.getStartTimeBucketInSec(),
duration.getEndTimeBucketInSec()), sourceBuilder.build());
final Map<String, Object> idTerms =
(Map<String, Object>) response.getAggregations().get(Metrics.ENTITY_ID);
......@@ -101,19 +106,23 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
String tableName =
IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName());
final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
Map<String, List<String>> indexIdsGroup = new HashMap<>();
final List<String> ids = pointOfTimes.stream().map(pointOfTime -> {
String id = pointOfTime.id(condition.getEntity().buildId());
if (IndexController.LogicIndicesRegister.isMetricTable(condition.getName())) {
id = IndexController.INSTANCE.generateDocId(condition.getName(), id);
}
String indexName = TimeSeriesUtils.queryIndexName(
tableName, pointOfTime.getPoint(), duration.getStep(), false, false);
indexIdsGroup.computeIfAbsent(indexName, v -> new ArrayList<>()).add(id);
return id;
}).collect(Collectors.toList());
MetricsValues metricsValues = new MetricsValues();
SearchResponse response = getClient().ids(tableName, ids);
if (!response.getHits().getHits().isEmpty()) {
Map<String, Map<String, Object>> idMap = toMap(response.getHits());
Optional<Documents> response = getClient().ids(indexIdsGroup);
if (response.isPresent()) {
Map<String, Map<String, Object>> idMap = toMap(response.get());
// Label is null, because in readMetricsValues, no label parameter.
IntValues intValues = metricsValues.getValues();
......@@ -146,6 +155,8 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
String tableName =
IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName());
Map<String, List<String>> indexIdsGroup = new HashMap<>();
boolean aggregationMode = !tableName.equals(condition.getName());
List<String> ids = new ArrayList<>(pointOfTimes.size());
pointOfTimes.forEach(pointOfTime -> {
......@@ -154,16 +165,19 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
id = IndexController.INSTANCE.generateDocId(condition.getName(), id);
}
ids.add(id);
String indexName = TimeSeriesUtils.queryIndexName(
tableName, pointOfTime.getPoint(), duration.getStep(), false, false);
indexIdsGroup.computeIfAbsent(indexName, v -> new ArrayList<>()).add(id);
});
SearchResponse response = getClient().ids(tableName, ids);
Optional<Documents> response = getClient().ids(indexIdsGroup);
Map<String, DataTable> idMap = new HashMap<>();
if (!response.getHits().getHits().isEmpty()) {
for (final SearchHit hit : response.getHits()) {
if (response.isPresent()) {
for (final Document document : response.get()) {
idMap.put(
hit.getId(),
new DataTable((String) hit.getSource().getOrDefault(valueColumnName, ""))
document.getId(),
new DataTable((String) document.getSource().getOrDefault(valueColumnName, ""))
);
}
}
......@@ -177,6 +191,8 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
String tableName =
IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName());
Map<String, List<String>> indexIdsGroup = new HashMap<>();
boolean aggregationMode = !tableName.equals(condition.getName());
List<String> ids = new ArrayList<>(pointOfTimes.size());
pointOfTimes.forEach(pointOfTime -> {
......@@ -185,15 +201,18 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
id = IndexController.INSTANCE.generateDocId(condition.getName(), id);
}
ids.add(id);
String indexName = TimeSeriesUtils.queryIndexName(
tableName, pointOfTime.getPoint(), duration.getStep(), false, false);
indexIdsGroup.computeIfAbsent(indexName, v -> new ArrayList<>()).add(id);
});
HeatMap heatMap = new HeatMap();
SearchResponse response = getClient().ids(tableName, ids);
if (response.getHits().getHits().isEmpty()) {
Optional<Documents> response = getClient().ids(indexIdsGroup);
if (!response.isPresent()) {
return heatMap;
}
Map<String, Map<String, Object>> idMap = toMap(response.getHits());
Map<String, Map<String, Object>> idMap = toMap(response.get());
final int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
for (String id : ids) {
......@@ -276,4 +295,12 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
}
return result;
}
private Map<String, Map<String, Object>> toMap(Documents documents) {
Map<String, Map<String, Object>> result = new HashMap<>();
for (final Document document : documents) {
result.put(document.getId(), document.getSource());
}
return result;
}
}
......@@ -36,6 +36,7 @@ import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.B
import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.TermsAggregationBuilder;
import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.storage.query.IZipkinQueryDAO;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.core.zipkin.ZipkinServiceRelationTraffic;
......@@ -47,6 +48,7 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameGenerator;
import zipkin2.Endpoint;
import zipkin2.Span;
import zipkin2.storage.QueryRequest;
......@@ -137,7 +139,6 @@ public class ZipkinQueryEsDAO extends EsDAO implements IZipkinQueryDAO {
public List<List<Span>> getTraces(final QueryRequest request) {
final long startTimeMillis = request.endTs() - request.lookback();
final long endTimeMillis = request.endTs();
String index = IndexController.LogicIndicesRegister.getPhysicalTableName(ZipkinSpanRecord.INDEX_NAME);
BoolQueryBuilder query = Query.bool();
if (startTimeMillis > 0 && endTimeMillis > 0) {
query.must(Query.range(ZipkinSpanRecord.TIMESTAMP_MILLIS)
......@@ -182,7 +183,11 @@ public class ZipkinQueryEsDAO extends EsDAO implements IZipkinQueryDAO {
.order(BucketOrder.aggregation(ZipkinSpanRecord.TIMESTAMP_MILLIS, false));
SearchBuilder search = Search.builder().query(query).aggregation(traceIdAggregation);
SearchResponse traceIdResponse = getClient().search(index, search.build());
SearchResponse traceIdResponse = getClient().search(new TimeRangeIndexNameGenerator(
IndexController.LogicIndicesRegister.getPhysicalTableName(ZipkinSpanRecord.INDEX_NAME),
TimeBucket.getRecordTimeBucket(startTimeMillis),
TimeBucket.getRecordTimeBucket(endTimeMillis)
), search.build());
final Map<String, Object> idTerms =
(Map<String, Object>) traceIdResponse.getAggregations().get(ZipkinSpanRecord.TRACE_ID);
final List<Map<String, Object>> buckets =
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import com.google.common.collect.Lists;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.query.enumeration.Step;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.SQLDatabaseModelExtension;
import org.junit.Assert;
......@@ -93,4 +94,28 @@ public class TimeSeriesUtilsTest {
);
}
@Test
public void queryIndexNameTest() {
Assert.assertEquals(
"metrics-apdex-20220710",
TimeSeriesUtils.queryIndexName("metrics-apdex", 20220710111111L, Step.SECOND, false, false)
);
Assert.assertEquals(
"metrics-apdex-20220710",
TimeSeriesUtils.queryIndexName("metrics-apdex", 202207101111L, Step.MINUTE, false, false)
);
Assert.assertEquals(
"metrics-apdex-20220710",
TimeSeriesUtils.queryIndexName("metrics-apdex", 2022071011L, Step.HOUR, false, false)
);
Assert.assertEquals(
"metrics-apdex-20220710",
TimeSeriesUtils.queryIndexName("metrics-apdex", 20220710L, Step.DAY, false, false)
);
Assert.assertEquals(
"metrics-apdex-20220710",
TimeSeriesUtils.queryIndexName("metrics-apdex", 20220710111111L, Step.DAY, true, true)
);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册