未验证 提交 fd1d2102 编写于 作者: E Evan 提交者: GitHub

[Incompatible Enhancement]New index policy of ElasticSearch storage option (#6499)

上级 03824477
......@@ -5,6 +5,7 @@ Release Notes.
8.5.0
------------------
#### Project
* **Incompatible Change**. Indices and templates of ElasticSearch(6/7, including zipkin-elasticsearch7) storage option have been changed.
* Update frontend-maven-plugin to 1.11.0, for Download node x64 binary on Apple Silicon.
* Add E2E test for VM monitoring that metrics from Prometheus node-exporter.
* Upgrade lombok to 1.18.16.
......@@ -47,6 +48,7 @@ Release Notes.
* Fix wrong metrics name setting in the `self-observability.yml`.
* Add telemetry data about metrics in, metrics scraping and trace in metrics to zipkin receiver.
* Fix tags store of log and trace on h2/mysql/pg storage.
* Merge indices by Metrics Function and Meter Function in Elasticsearch Storage.
#### UI
* Update selector scroller to show in all pages.
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis;
import java.lang.annotation.Annotation;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.meter.function.MeterFunction;
import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.MetricsFunction;
@AllArgsConstructor
@Getter
public enum FunctionCategory {
METER("meter", MeterFunction.class),
METRICS("metrics", MetricsFunction.class);
private final String name;
private final Class<? extends Annotation> annotationClass;
/**
* The unique function name pattern is {function category}-{function name}.
*/
public static String uniqueFunctionName(final Class<?> aClass) {
Annotation annotation = doGetAnnotation(aClass, MeterFunction.class);
if (annotation != null) {
return (METER.getName() + Const.LINE + ((MeterFunction) annotation).functionName()).toLowerCase();
}
annotation = doGetAnnotation(aClass, MetricsFunction.class);
if (annotation != null) {
return (METRICS.getName() + Const.LINE + ((MetricsFunction) annotation).functionName()).toLowerCase();
}
return "";
}
private static Annotation doGetAnnotation(Class<?> clazz, Class<? extends Annotation> annotationClass) {
if (clazz.equals(Object.class)) {
return null;
}
Annotation[] annotations = clazz.getAnnotations();
for (final Annotation annotation : annotations) {
if (annotation.annotationType().equals(annotationClass)) {
return annotation;
}
}
return doGetAnnotation(clazz.getSuperclass(), annotationClass);
}
}
......@@ -37,6 +37,7 @@ public class Model {
private final boolean record;
private final boolean superDataset;
private final boolean isTimeSeries;
private final String aggregationFunctionName;
public Model(final String name,
final List<ModelColumn> columns,
......@@ -44,7 +45,8 @@ public class Model {
final int scopeId,
final DownSampling downsampling,
final boolean record,
final boolean superDataset) {
final boolean superDataset,
final String aggregationFunctionName) {
this.name = name;
this.columns = columns;
this.extraQueryIndices = extraQueryIndices;
......@@ -53,5 +55,6 @@ public class Model {
this.isTimeSeries = !DownSampling.None.equals(downsampling);
this.record = record;
this.superDataset = superDataset;
this.aggregationFunctionName = aggregationFunctionName;
}
}
......@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.FunctionCategory;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
......@@ -60,7 +61,9 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
Model model = new Model(
storage.getModelName(), modelColumns, extraQueryIndices, scopeId,
storage.getDownsampling(), record, isSuperDatasetModel(aClass)
storage.getDownsampling(), record,
isSuperDatasetModel(aClass),
FunctionCategory.uniqueFunctionName(aClass)
);
this.followColumnNameRules(model);
......
......@@ -21,9 +21,11 @@ package org.apache.skywalking.oap.server.library.client.elasticsearch;
import com.google.common.base.Splitter;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyManagementException;
......@@ -38,6 +40,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLContext;
import lombok.RequiredArgsConstructor;
......@@ -71,6 +74,8 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
......@@ -84,6 +89,7 @@ import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
......@@ -209,6 +215,51 @@ public class ElasticSearchClient implements Client, HealthCheckable {
return response.isAcknowledged();
}
public boolean updateIndexMapping(String indexName, Map<String, Object> mapping) throws IOException {
indexName = formatIndexName(indexName);
Map<String, Object> properties = (Map<String, Object>) mapping.get(ElasticSearchClient.TYPE);
PutMappingRequest putMappingRequest = new PutMappingRequest(indexName);
Gson gson = new Gson();
putMappingRequest.type(ElasticSearchClient.TYPE);
putMappingRequest.source(gson.toJson(properties), XContentType.JSON);
PutMappingResponse response = client.indices().putMapping(putMappingRequest);
log.debug("put {} index mapping finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
public Map<String, Object> getIndex(String indexName) throws IOException {
if (StringUtil.isBlank(indexName)) {
return new HashMap<>();
}
indexName = formatIndexName(indexName);
try {
Response response = client.getLowLevelClient()
.performRequest(HttpGet.METHOD_NAME, indexName);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
healthChecker.health();
throw new IOException(
"The response status code of template exists request should be 200, but it is " + statusCode);
}
Type type = new TypeToken<HashMap<String, Object>>() {
}.getType();
Map<String, Object> templates = new Gson().<HashMap<String, Object>>fromJson(
new InputStreamReader(response.getEntity().getContent()),
type
);
return (Map<String, Object>) Optional.ofNullable(templates.get(indexName)).orElse(new HashMap<>());
} catch (ResponseException e) {
if (e.getResponse().getStatusLine().getStatusCode() == HttpStatus.SC_NOT_FOUND) {
return new HashMap<>();
}
healthChecker.unHealth(e);
throw e;
} catch (IOException t) {
healthChecker.unHealth(t);
throw t;
}
}
public boolean createIndex(String indexName, Map<String, Object> settings,
Map<String, Object> mapping) throws IOException {
indexName = formatIndexName(indexName);
......@@ -286,6 +337,39 @@ public class ElasticSearchClient implements Client, HealthCheckable {
return client.indices().exists(request);
}
public Map<String, Object> getTemplate(String name) throws IOException {
name = formatIndexName(name);
try {
Response response = client.getLowLevelClient()
.performRequest(HttpGet.METHOD_NAME, "_template/" + name);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
healthChecker.health();
throw new IOException(
"The response status code of template exists request should be 200, but it is " + statusCode);
}
Type type = new TypeToken<HashMap<String, Object>>() {
}.getType();
Map<String, Object> templates = new Gson().<HashMap<String, Object>>fromJson(
new InputStreamReader(response.getEntity().getContent()),
type
);
if (templates.containsKey(name)) {
return (Map<String, Object>) templates.get(name);
}
return new HashMap<>();
} catch (ResponseException e) {
if (e.getResponse().getStatusLine().getStatusCode() == HttpStatus.SC_NOT_FOUND) {
return new HashMap<>();
}
healthChecker.unHealth(e);
throw e;
} catch (IOException t) {
healthChecker.unHealth(t);
throw t;
}
}
public boolean isExistsTemplate(String indexName) throws IOException {
indexName = formatIndexName(indexName);
......@@ -302,8 +386,8 @@ public class ElasticSearchClient implements Client, HealthCheckable {
}
}
public boolean createTemplate(String indexName, Map<String, Object> settings,
Map<String, Object> mapping) throws IOException {
public boolean createOrUpdateTemplate(String indexName, Map<String, Object> settings,
Map<String, Object> mapping) throws IOException {
indexName = formatIndexName(indexName);
String[] patterns = new String[] {indexName + "-*"};
......@@ -327,11 +411,13 @@ public class ElasticSearchClient implements Client, HealthCheckable {
public boolean deleteTemplate(String indexName) throws IOException {
indexName = formatIndexName(indexName);
Response response = client.getLowLevelClient().performRequest(HttpDelete.METHOD_NAME, "/_template/" + indexName);
Response response = client.getLowLevelClient()
.performRequest(HttpDelete.METHOD_NAME, "/_template/" + indexName);
return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK;
}
public SearchResponse search(IndexNameMaker indexNameMaker, SearchSourceBuilder searchSourceBuilder) throws IOException {
public SearchResponse search(IndexNameMaker indexNameMaker,
SearchSourceBuilder searchSourceBuilder) throws IOException {
String[] indexNames = Arrays.stream(indexNameMaker.make()).map(this::formatIndexName).toArray(String[]::new);
return doSearch(searchSourceBuilder, indexNames);
}
......@@ -342,7 +428,7 @@ public class ElasticSearchClient implements Client, HealthCheckable {
}
protected SearchResponse doSearch(SearchSourceBuilder searchSourceBuilder,
String... indexNames) throws IOException {
String... indexNames) throws IOException {
SearchRequest searchRequest = new SearchRequest(indexNames);
searchRequest.indicesOptions(IndicesOptions.fromOptions(true, true, true, false));
searchRequest.types(TYPE);
......
......@@ -57,7 +57,7 @@ public class ITElasticSearchClient {
private final String namespace;
public ITElasticSearchClient() {
namespace = "";
namespace = "default-test-namespace";
}
protected ITElasticSearchClient(String namespace) {
......@@ -176,13 +176,12 @@ public class ITElasticSearchClient {
String indexName = "template_operate";
client.createTemplate(indexName, settings, mapping);
client.createOrUpdateTemplate(indexName, settings, mapping);
Assert.assertTrue(client.isExistsTemplate(indexName));
XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("name", "pengys").endObject();
client.forceInsert(indexName + "-2019", "testid", builder);
JsonObject index = getIndex(indexName + "-2019");
LOGGER.info(index.toString());
......@@ -196,7 +195,6 @@ public class ITElasticSearchClient {
.getAsJsonObject("index")
.get("number_of_replicas")
.getAsInt());
client.deleteTemplate(indexName);
Assert.assertFalse(client.isExistsTemplate(indexName));
}
......@@ -235,7 +233,7 @@ public class ITElasticSearchClient {
column.addProperty("type", "text");
properties.add("name", column);
client.createTemplate(indexName, new HashMap<>(), mapping);
client.createOrUpdateTemplate(indexName, new HashMap<>(), mapping);
XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("name", "pengys").endObject();
client.forceInsert(timeSeriesIndexName, "testid", builder);
......@@ -245,6 +243,7 @@ public class ITElasticSearchClient {
String index = indexes.get(0);
Assert.assertTrue(client.deleteByIndexName(index));
Assert.assertFalse(client.isExistsIndex(timeSeriesIndexName));
client.deleteTemplate(indexName);
}
private JsonObject getIndex(String indexName) throws IOException {
......
......@@ -27,6 +27,7 @@ import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
......@@ -87,6 +88,7 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
/**
* The storage provider for ElasticSearch 6.
*/
@Slf4j
public class StorageModuleElasticsearchProvider extends ModuleProvider {
protected final StorageModuleElasticsearchConfig config;
......@@ -116,7 +118,9 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
public void prepare() throws ServiceNotProvidedException {
this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
if (!StringUtil.isEmpty(config.getNameSpace())) {
if (StringUtil.isEmpty(config.getNameSpace())) {
config.setNameSpace("sw");
} else {
config.setNameSpace(config.getNameSpace().toLowerCase());
}
if (config.getDayStep() > 1) {
......@@ -161,10 +165,11 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
.getTrustStorePass(), config.getUser(), config.getPassword(),
indexNameConverters(config.getNameSpace())
);
this.registerServiceImplementation(
IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getSyncBulkActions(), config
.getFlushInterval(), config.getConcurrentRequests()));
IBatchDAO.class,
new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getSyncBulkActions(), config
.getFlushInterval(), config.getConcurrentRequests())
);
this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
this.registerServiceImplementation(
IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(elasticSearchClient));
......
......@@ -30,6 +30,7 @@ import org.joda.time.DateTime;
@Slf4j
public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO {
public HistoryDeleteEsDAO(ElasticSearchClient client) {
super(client);
}
......@@ -50,8 +51,8 @@ public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO {
}
}
deadline = Long.parseLong(new DateTime().plusDays(-ttl).toString("yyyyMMdd"));
List<String> indexes = client.retrievalIndexByAliases(model.getName());
String tableName = IndexController.INSTANCE.getTableName(model);
List<String> indexes = client.retrievalIndexByAliases(tableName);
List<String> prepareDeleteIndexes = new ArrayList<>();
List<String> leftIndices = new ArrayList<>();
......@@ -67,7 +68,7 @@ public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO {
client.deleteByIndexName(prepareDeleteIndex);
}
String latestIndex = TimeSeriesUtils.latestWriteIndexName(model);
String formattedLatestIndex = client.formatIndexName(latestIndex);
String formattedLatestIndex = client.formatIndexName(latestIndex);
if (!leftIndices.contains(formattedLatestIndex)) {
client.createIndex(latestIndex);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.storage.model.Model;
/**
* The metrics data, that generated by OAL or MAL, would be partitioned to storage by the functions of the OAL or MAL.
* And, the other record data would be insulated storage by themselves definitions.
*/
@Slf4j
public enum IndexController {
INSTANCE;
public String getTableName(Model model) {
return isMetricModel(model) ? model.getAggregationFunctionName() : model.getName();
}
/**
* Generate the index doc ID. When a model is the aggregation storage mode, the logicTableName is a part of new ID
* to avoid conflicts.
*/
public String generateDocId(Model model, String originalID) {
if (!isMetricModel(model)) {
return originalID;
}
return this.generateDocId(model.getName(), originalID);
}
/**
* Generate the index doc ID.
*/
public String generateDocId(String logicTableName, String originalID) {
return logicTableName + Const.ID_CONNECTOR + originalID;
}
/**
* Check the mode of the Model definition.
*/
public boolean isMetricModel(Model model) {
return StringUtil.isNotBlank(model.getAggregationFunctionName());
}
/**
* When a model is the metric storage mode, a column named {@link LogicIndicesRegister#METRIC_TABLE_NAME} would be
* append to the physical index. The value of the column is the original table name in other storages, such as the
* OAL name.
*/
public Map<String, Object> appendMetricTableColumn(Model model, Map<String, Object> columns) {
if (!isMetricModel(model)) {
return columns;
}
columns.put(LogicIndicesRegister.METRIC_TABLE_NAME, model.getName());
return columns;
}
public static class LogicIndicesRegister {
/**
* The relations of the logic table and the physical table.
*/
private static final Map<String, String> LOGIC_INDICES_CATALOG = new ConcurrentHashMap<>();
/**
* The metric table name in aggregation physical storage.
*/
public static final String METRIC_TABLE_NAME = "metric_table";
public static String getPhysicalTableName(String logicName) {
return Optional.of(LOGIC_INDICES_CATALOG.get(logicName)).orElse(logicName);
}
public static void registerRelation(String logicName, String physicalName) {
LOGIC_INDICES_CATALOG.put(logicName, physicalName);
}
public static boolean isMetricTable(String logicName) {
return !getPhysicalTableName(logicName).equals(logicName);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
public class IndexStructures {
private final Map<String, Fields> structures;
@Getter
private final PropertiesExtractor extractor;
@Getter
private final PropertiesWrapper wrapper;
public IndexStructures() {
this.structures = new HashMap<>();
this.extractor = doGetPropertiesExtractor();
this.wrapper = doGetPropertiesWrapper();
}
protected PropertiesExtractor doGetPropertiesExtractor() {
return mapping -> (Map<String, Object>) ((Map<String, Object>) mapping.get(
ElasticSearchClient.TYPE)).get("properties");
}
protected PropertiesWrapper doGetPropertiesWrapper() {
return properties -> {
HashMap<String, Object> mappings = new HashMap<>();
HashMap<String, Object> types = new HashMap<>();
mappings.put(ElasticSearchClient.TYPE, types);
types.put("properties", properties);
return mappings;
};
}
public Map<String, Object> getMapping(String tableName) {
return wrapper.wrapper(
structures.containsKey(tableName) ? structures.get(tableName).properties : new HashMap<>());
}
/**
* Add or append field when the current structures don't contain the input structure or having new fields in it.
*/
public void putStructure(String tableName, Map<String, Object> mapping) {
if (Objects.isNull(mapping) || mapping.isEmpty()) {
return;
}
Map<String, Object> properties = this.extractor.extract(mapping);
Fields fields = new Fields(properties);
if (structures.containsKey(tableName)) {
structures.get(tableName).appendNewFields(fields);
} else {
structures.put(tableName, fields);
}
}
/**
* Returns mappings with fields that not exist in the input mappings.
*/
public Map<String, Object> diffStructure(String tableName, Map<String, Object> mappings) {
if (!structures.containsKey(tableName)) {
return new HashMap<>();
}
Map<String, Object> properties = this.extractor.extract(mappings);
Map<String, Object> diffProperties = structures.get(tableName).diffFields(new Fields(properties));
return this.wrapper.wrapper(diffProperties);
}
/**
* Returns true when the current structures already contains the properties of the input mappings.
*/
public boolean containsStructure(String tableName, Map<String, Object> mappings) {
if (Objects.isNull(mappings) || mappings.isEmpty()) {
return true;
}
return structures.containsKey(tableName)
&& structures.get(tableName).containsAllFields(new Fields(this.extractor.extract(mappings)));
}
/**
* The properties of the template or index.
*/
public static class Fields {
private final Map<String, Object> properties;
private Fields(Map<String, Object> properties) {
this.properties = properties;
}
/**
* Returns ture when the input fields have already been stored in the properties.
*/
private boolean containsAllFields(Fields fields) {
return fields.properties.entrySet().stream().allMatch(item -> this.properties.containsKey(item.getKey()));
}
/**
* Append new fields to the properties when have new fields.
*/
private void appendNewFields(Fields fields) {
Map<String, Object> newFields = fields.properties.entrySet()
.stream()
.filter(e -> !this.properties.containsKey(e.getKey()))
.collect(Collectors.toMap(
Map.Entry::getKey, Map.Entry::getValue
));
newFields.forEach(properties::put);
}
/**
* Returns the properties that not exist in the input fields.
*/
private Map<String, Object> diffFields(Fields fields) {
return this.properties.entrySet().stream()
.filter(e -> !fields.properties.containsKey(e.getKey()))
.collect(Collectors.toMap(
Map.Entry::getKey, Map.Entry::getValue
));
}
}
/**
* Extract properties form the mappings.
*/
@FunctionalInterface
public interface PropertiesExtractor {
Map<String, Object> extract(Map<String, Object> mappings);
}
/**
* Wrapper properties to the mappings.
*/
@FunctionalInterface
public interface PropertiesWrapper {
Map<String, Object> wrapper(Map<String, Object> properties);
}
}
......@@ -30,21 +30,22 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
public class ManagementEsDAO extends EsDAO implements IManagementDAO {
private final StorageHashMapBuilder<ManagementData> storageBuilder;
public ManagementEsDAO(ElasticSearchClient client, StorageHashMapBuilder<ManagementData> storageBuilder) {
public ManagementEsDAO(ElasticSearchClient client,
StorageHashMapBuilder<ManagementData> storageBuilder) {
super(client);
this.storageBuilder = storageBuilder;
}
@Override
public void insert(Model model, ManagementData managementData) throws IOException {
String modelName = model.getName();
final String id = managementData.id();
final GetResponse response = getClient().get(modelName, id);
String tableName = IndexController.INSTANCE.getTableName(model);
String docId = IndexController.INSTANCE.generateDocId(model, managementData.id());
final GetResponse response = getClient().get(tableName, docId);
if (response.isExists()) {
return;
}
XContentBuilder builder = map2builder(storageBuilder.entity2Storage(managementData));
getClient().forceInsert(modelName, id, builder);
XContentBuilder builder = map2builder(
IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(managementData)));
getClient().forceInsert(tableName, docId, builder);
}
}
\ No newline at end of file
......@@ -32,19 +32,21 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
protected final StorageHashMapBuilder<Metrics> storageBuilder;
protected MetricsEsDAO(ElasticSearchClient client, StorageHashMapBuilder<Metrics> storageBuilder) {
protected MetricsEsDAO(ElasticSearchClient client,
StorageHashMapBuilder<Metrics> storageBuilder) {
super(client);
this.storageBuilder = storageBuilder;
}
@Override
public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {
String[] ids = metrics.stream().map(Metrics::id).toArray(String[]::new);
SearchResponse response = getClient().ids(model.getName(), ids);
String tableName = IndexController.INSTANCE.getTableName(model);
String[] ids = metrics.stream()
.map(item -> IndexController.INSTANCE.generateDocId(model, item.id()))
.toArray(String[]::new);
SearchResponse response = getClient().ids(tableName, ids);
List<Metrics> result = new ArrayList<>(response.getHits().getHits().length);
for (int i = 0; i < response.getHits().getHits().length; i++) {
Metrics source = storageBuilder.storage2Entity(response.getHits().getAt(i).getSourceAsMap());
......@@ -55,15 +57,19 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
@Override
public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
XContentBuilder builder = map2builder(storageBuilder.entity2Storage(metrics));
XContentBuilder builder = map2builder(
IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(metrics)));
String modelName = TimeSeriesUtils.writeIndexName(model, metrics.getTimeBucket());
return getClient().prepareInsert(modelName, metrics.id(), builder);
String id = IndexController.INSTANCE.generateDocId(model, metrics.id());
return getClient().prepareInsert(modelName, id, builder);
}
@Override
public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
XContentBuilder builder = map2builder(storageBuilder.entity2Storage(metrics));
XContentBuilder builder = map2builder(
IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(metrics)));
String modelName = TimeSeriesUtils.writeIndexName(model, metrics.getTimeBucket());
return getClient().prepareUpdate(modelName, metrics.id(), builder);
String id = IndexController.INSTANCE.generateDocId(model, metrics.id());
return getClient().prepareUpdate(modelName, id, builder);
}
}
......@@ -30,18 +30,20 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
* Synchronize storage Elasticsearch implements
*/
public class NoneStreamEsDAO extends EsDAO implements INoneStreamDAO {
private final StorageHashMapBuilder<NoneStream> storageBuilder;
public NoneStreamEsDAO(ElasticSearchClient client, StorageHashMapBuilder<NoneStream> storageBuilder) {
public NoneStreamEsDAO(ElasticSearchClient client,
StorageHashMapBuilder<NoneStream> storageBuilder) {
super(client);
this.storageBuilder = storageBuilder;
}
@Override
public void insert(Model model, NoneStream noneStream) throws IOException {
XContentBuilder builder = map2builder(storageBuilder.entity2Storage(noneStream));
XContentBuilder builder = map2builder(
IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(noneStream)));
String modelName = TimeSeriesUtils.writeIndexName(model, noneStream.getTimeBucket());
getClient().forceInsert(modelName, noneStream.id(), builder);
String id = IndexController.INSTANCE.generateDocId(model, noneStream.id());
getClient().forceInsert(modelName, id, builder);
}
}
......@@ -28,18 +28,20 @@ import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
public class RecordEsDAO extends EsDAO implements IRecordDAO {
private final StorageHashMapBuilder<Record> storageBuilder;
public RecordEsDAO(ElasticSearchClient client, StorageHashMapBuilder<Record> storageBuilder) {
public RecordEsDAO(ElasticSearchClient client,
StorageHashMapBuilder<Record> storageBuilder) {
super(client);
this.storageBuilder = storageBuilder;
}
@Override
public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
XContentBuilder builder = map2builder(storageBuilder.entity2Storage(record));
XContentBuilder builder = map2builder(
IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(record)));
String modelName = TimeSeriesUtils.writeIndexName(model, record.getTimeBucket());
return getClient().prepareInsert(modelName, record.id(), builder);
String id = IndexController.INSTANCE.generateDocId(model, record.id());
return getClient().prepareInsert(modelName, id, builder);
}
}
......@@ -26,8 +26,8 @@ import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
......
......@@ -38,28 +38,45 @@ import org.elasticsearch.common.unit.TimeValue;
@Slf4j
public class StorageEsInstaller extends ModelInstaller {
private final Gson gson = new Gson();
private final StorageModuleElasticsearchConfig config;
protected final ColumnTypeEsMapping columnTypeEsMapping;
/**
* The mappings of the template .
*/
private final IndexStructures structures;
public StorageEsInstaller(Client client,
ModuleManager moduleManager,
final StorageModuleElasticsearchConfig config) {
StorageModuleElasticsearchConfig config) throws StorageException {
super(client, moduleManager);
this.columnTypeEsMapping = new ColumnTypeEsMapping();
this.config = config;
this.structures = getStructures();
}
protected IndexStructures getStructures() {
return new IndexStructures();
}
@Override
protected boolean isExists(Model model) throws StorageException {
ElasticSearchClient esClient = (ElasticSearchClient) client;
String tableName = IndexController.INSTANCE.getTableName(model);
IndexController.LogicIndicesRegister.registerRelation(model.getName(), tableName);
try {
if (model.isTimeSeries()) {
return esClient.isExistsTemplate(model.getName()) && esClient.isExistsIndex(
TimeSeriesUtils.latestWriteIndexName(model));
} else {
return esClient.isExistsIndex(model.getName());
if (!model.isTimeSeries()) {
return esClient.isExistsIndex(tableName);
}
boolean exist = esClient.isExistsTemplate(tableName)
&& esClient.isExistsIndex(TimeSeriesUtils.latestWriteIndexName(model));
if (exist && IndexController.INSTANCE.isMetricModel(model)) {
structures.putStructure(
tableName, (Map<String, Object>) esClient.getTemplate(tableName).get("mappings")
);
exist = structures.containsStructure(tableName, createMapping(model));
}
return exist;
} catch (IOException e) {
throw new StorageException(e.getMessage());
}
......@@ -67,38 +84,72 @@ public class StorageEsInstaller extends ModelInstaller {
@Override
protected void createTable(Model model) throws StorageException {
if (model.isTimeSeries()) {
createTimeSeriesTable(model);
} else {
createNormalTable(model);
}
}
private void createNormalTable(Model model) throws StorageException {
ElasticSearchClient esClient = (ElasticSearchClient) client;
String tableName = IndexController.INSTANCE.getTableName(model);
try {
if (!esClient.isExistsIndex(tableName)) {
boolean isAcknowledged = esClient.createIndex(tableName);
log.info("create {} index finished, isAcknowledged: {}", tableName, isAcknowledged);
if (!isAcknowledged) {
throw new StorageException("create " + tableName + " time series index failure, ");
}
}
} catch (IOException e) {
throw new StorageException("cannot create the normal index", e);
}
}
private void createTimeSeriesTable(Model model) throws StorageException {
ElasticSearchClient esClient = (ElasticSearchClient) client;
String tableName = IndexController.INSTANCE.getTableName(model);
Map<String, Object> settings = createSetting(model);
Map<String, Object> mapping = createMapping(model);
log.info("index {}'s columnTypeEsMapping builder str: {}", esClient.formatIndexName(model.getName()), mapping
.toString());
String indexName = TimeSeriesUtils.latestWriteIndexName(model);
try {
String indexName;
if (!model.isTimeSeries()) {
indexName = model.getName();
} else {
if (!esClient.isExistsTemplate(model.getName())) {
boolean isAcknowledged = esClient.createTemplate(model.getName(), settings, mapping);
log.info(
"create {} index template finished, isAcknowledged: {}", model.getName(), isAcknowledged);
if (!isAcknowledged) {
throw new StorageException("create " + model.getName() + " index template failure, ");
}
}
indexName = TimeSeriesUtils.latestWriteIndexName(model);
boolean shouldUpdateTemplate = !esClient.isExistsTemplate(tableName);
if (IndexController.INSTANCE.isMetricModel(model)) {
shouldUpdateTemplate = shouldUpdateTemplate || !structures.containsStructure(tableName, mapping);
}
if (!esClient.isExistsIndex(indexName)) {
boolean isAcknowledged = esClient.createIndex(indexName);
log.info("create {} index finished, isAcknowledged: {}", indexName, isAcknowledged);
if (shouldUpdateTemplate) {
structures.putStructure(tableName, mapping);
boolean isAcknowledged = esClient.createOrUpdateTemplate(
tableName, settings, structures.getMapping(tableName));
log.info("create {} index template finished, isAcknowledged: {}", tableName, isAcknowledged);
if (!isAcknowledged) {
throw new StorageException("create " + indexName + " time series index failure, ");
throw new IOException("create " + tableName + " index template failure, ");
}
}
if (esClient.isExistsIndex(indexName)) {
Map<String, Object> historyMapping = (Map<String, Object>) esClient.getIndex(indexName)
.get("mappings");
Map<String, Object> appendMapping = structures.diffStructure(tableName, historyMapping);
if (!appendMapping.isEmpty()) {
isAcknowledged = esClient.updateIndexMapping(indexName, appendMapping);
log.info("update {} index finished, isAcknowledged: {}, append mappings: {}", indexName,
isAcknowledged, appendMapping.toString()
);
if (!isAcknowledged) {
throw new StorageException("update " + indexName + " time series index failure");
}
}
} else {
isAcknowledged = esClient.createIndex(indexName);
log.info("create {} index finished, isAcknowledged: {}", indexName, isAcknowledged);
if (!isAcknowledged) {
throw new StorageException("create " + indexName + " time series index failure");
}
}
}
} catch (IOException e) {
throw new StorageException(e.getMessage());
throw new StorageException("cannot create " + tableName + " index template", e);
}
}
......@@ -134,14 +185,7 @@ public class StorageEsInstaller extends ModelInstaller {
}
protected Map<String, Object> createMapping(Model model) {
Map<String, Object> mapping = new HashMap<>();
Map<String, Object> type = new HashMap<>();
mapping.put(ElasticSearchClient.TYPE, type);
Map<String, Object> properties = new HashMap<>();
type.put("properties", properties);
for (ModelColumn columnDefine : model.getColumns()) {
if (columnDefine.isMatchQuery()) {
String matchCName = MatchCNameBuilder.INSTANCE.build(columnDefine.getColumnName().getName());
......@@ -167,8 +211,14 @@ public class StorageEsInstaller extends ModelInstaller {
}
}
log.debug("elasticsearch index template setting: {}", mapping.toString());
if (IndexController.INSTANCE.isMetricModel(model)) {
Map<String, Object> column = new HashMap<>();
column.put("type", "keyword");
properties.put(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, column);
}
Map<String, Object> mappings = this.structures.getWrapper().wrapper(properties);
log.debug("elasticsearch index template setting: {}", mappings.toString());
return mapping;
return mappings;
}
}
......@@ -50,15 +50,16 @@ public class TimeSeriesUtils {
*/
public static String latestWriteIndexName(Model model) {
long timeBucket;
String tableName = IndexController.INSTANCE.getTableName(model);
if (model.isRecord() && model.isSuperDataset()) {
timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), model.getDownsampling());
return model.getName() + Const.LINE + compressTimeBucket(timeBucket / 1000000, SUPER_DATASET_DAY_STEP);
return tableName + Const.LINE + compressTimeBucket(timeBucket / 1000000, SUPER_DATASET_DAY_STEP);
} else if (model.isRecord()) {
timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), model.getDownsampling());
return model.getName() + Const.LINE + compressTimeBucket(timeBucket / 1000000, DAY_STEP);
return tableName + Const.LINE + compressTimeBucket(timeBucket / 1000000, DAY_STEP);
} else {
timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute);
return model.getName() + Const.LINE + compressTimeBucket(timeBucket / 10000, DAY_STEP);
return tableName + Const.LINE + compressTimeBucket(timeBucket / 10000, DAY_STEP);
}
}
......@@ -89,24 +90,23 @@ public class TimeSeriesUtils {
* @return index name based on model definition and given time bucket.
*/
static String writeIndexName(Model model, long timeBucket) {
final String modelName = model.getName();
String tableName = IndexController.INSTANCE.getTableName(model);
if (model.isRecord() && model.isSuperDataset()) {
return modelName + Const.LINE + compressTimeBucket(timeBucket / 1000000, SUPER_DATASET_DAY_STEP);
return tableName + Const.LINE + compressTimeBucket(timeBucket / 1000000, SUPER_DATASET_DAY_STEP);
} else if (model.isRecord()) {
return modelName + Const.LINE + compressTimeBucket(timeBucket / 1000000, DAY_STEP);
return tableName + Const.LINE + compressTimeBucket(timeBucket / 1000000, DAY_STEP);
} else {
switch (model.getDownsampling()) {
case None:
return modelName;
return tableName;
case Hour:
return modelName + Const.LINE + compressTimeBucket(timeBucket / 100, DAY_STEP);
return tableName + Const.LINE + compressTimeBucket(timeBucket / 100, DAY_STEP);
case Minute:
return modelName + Const.LINE + compressTimeBucket(timeBucket / 10000, DAY_STEP);
return tableName + Const.LINE + compressTimeBucket(timeBucket / 10000, DAY_STEP);
case Day:
return modelName + Const.LINE + compressTimeBucket(timeBucket, DAY_STEP);
return tableName + Const.LINE + compressTimeBucket(timeBucket, DAY_STEP);
case Second:
return modelName + Const.LINE + compressTimeBucket(timeBucket / 1000000, DAY_STEP);
return tableName + Const.LINE + compressTimeBucket(timeBucket / 1000000, DAY_STEP);
default:
throw new UnexpectedException("Unexpected down sampling value, " + model.getDownsampling());
}
......
......@@ -29,7 +29,9 @@ import org.apache.skywalking.oap.server.core.query.type.KeyValue;
import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
......@@ -60,17 +62,40 @@ public class AggregationQueryEsDAO extends EsDAO implements IAggregationQueryDAO
if (condition.getOrder().equals(Order.ASC)) {
asc = true;
}
String tableName = IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName());
if (additionalConditions != null && additionalConditions.size() > 0) {
if (CollectionUtils.isEmpty(additionalConditions)
&& IndexController.LogicIndicesRegister.isMetricTable(condition.getName())) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
additionalConditions.forEach(additionalCondition -> {
boolQuery.must()
.add(QueryBuilders.termsQuery(additionalCondition.getKey(), additionalCondition.getValue()));
});
boolQuery.must()
.add(QueryBuilders.termQuery(
IndexController.LogicIndicesRegister.METRIC_TABLE_NAME,
condition.getName()
));
boolQuery.must().add(queryBuilder);
sourceBuilder.query(boolQuery);
} else {
} else if (CollectionUtils.isEmpty(additionalConditions)) {
sourceBuilder.query(queryBuilder);
} else if (CollectionUtils.isNotEmpty(additionalConditions)
&& IndexController.LogicIndicesRegister.isMetricTable(condition.getName())) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must()
.add(QueryBuilders.termQuery(
IndexController.LogicIndicesRegister.METRIC_TABLE_NAME,
condition.getName()
));
additionalConditions.forEach(additionalCondition -> boolQuery
.must()
.add(QueryBuilders.termsQuery(additionalCondition.getKey(), additionalCondition.getValue())));
boolQuery.must().add(queryBuilder);
sourceBuilder.query(boolQuery);
} else {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
additionalConditions.forEach(additionalCondition -> boolQuery
.must()
.add(QueryBuilders.termsQuery(additionalCondition.getKey(), additionalCondition.getValue())));
boolQuery.must().add(queryBuilder);
sourceBuilder.query(boolQuery);
}
sourceBuilder.aggregation(
......@@ -81,7 +106,7 @@ public class AggregationQueryEsDAO extends EsDAO implements IAggregationQueryDAO
.subAggregation(AggregationBuilders.avg(valueColumnName).field(valueColumnName))
);
SearchResponse response = getClient().search(condition.getName(), sourceBuilder);
SearchResponse response = getClient().search(tableName, sourceBuilder);
List<SelectedRecord> topNList = new ArrayList<>();
Terms idTerms = response.getAggregations().get(Metrics.ENTITY_ID);
......
......@@ -22,12 +22,13 @@ import com.google.common.base.Strings;
import java.io.IOException;
import java.util.Objects;
import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
import org.apache.skywalking.oap.server.core.query.enumeration.Scope;
import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
import org.apache.skywalking.oap.server.core.query.type.Alarms;
import org.apache.skywalking.oap.server.core.query.enumeration.Scope;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
......@@ -44,7 +45,7 @@ public class AlarmQueryEsDAO extends EsDAO implements IAlarmQueryDAO {
@Override
public Alarms getAlarm(final Integer scopeId, final String keyword, final int limit, final int from,
final long startTB, final long endTB) throws IOException {
final long startTB, final long endTB) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
......@@ -63,7 +64,8 @@ public class AlarmQueryEsDAO extends EsDAO implements IAlarmQueryDAO {
sourceBuilder.size(limit);
sourceBuilder.from(from);
SearchResponse response = getClient().search(AlarmRecord.INDEX_NAME, sourceBuilder);
SearchResponse response = getClient()
.search(IndexController.LogicIndicesRegister.getPhysicalTableName(AlarmRecord.INDEX_NAME), sourceBuilder);
Alarms alarms = new Alarms();
alarms.setTotal((int) response.getHits().totalHits);
......
......@@ -27,6 +27,7 @@ import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLogs;
import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
......@@ -81,8 +82,8 @@ public class BrowserLogQueryEsDAO extends EsDAO implements IBrowserLogQueryDAO {
}
sourceBuilder.size(limit);
sourceBuilder.from(from);
SearchResponse response = getClient().search(BrowserErrorLogRecord.INDEX_NAME, sourceBuilder);
SearchResponse response = getClient()
.search(IndexController.LogicIndicesRegister.getPhysicalTableName(BrowserErrorLogRecord.INDEX_NAME), sourceBuilder);
BrowserErrorLogs logs = new BrowserErrorLogs();
logs.setTotal((int) response.getHits().totalHits);
......
......@@ -32,6 +32,7 @@ import org.apache.skywalking.oap.server.core.query.type.event.Source;
import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
......@@ -53,7 +54,8 @@ public class ESEventQueryDAO extends EsDAO implements IEventQueryDAO {
public Events queryEvents(final EventQueryCondition condition) throws Exception {
final SearchSourceBuilder sourceBuilder = buildQuery(condition);
final SearchResponse response = getClient().search(Event.INDEX_NAME, sourceBuilder);
final SearchResponse response = getClient()
.search(IndexController.LogicIndicesRegister.getPhysicalTableName(Event.INDEX_NAME), sourceBuilder);
final Events events = new Events();
events.setTotal((int) response.getHits().totalHits);
......@@ -84,7 +86,10 @@ public class ESEventQueryDAO extends EsDAO implements IEventQueryDAO {
mustQueryList.add(QueryBuilders.termQuery(Event.SERVICE_INSTANCE, source.getServiceInstance()));
}
if (!isNullOrEmpty(source.getEndpoint())) {
mustQueryList.add(QueryBuilders.matchPhraseQuery(MatchCNameBuilder.INSTANCE.build(Event.ENDPOINT), source.getEndpoint()));
mustQueryList.add(QueryBuilders.matchPhraseQuery(
MatchCNameBuilder.INSTANCE.build(Event.ENDPOINT),
source.getEndpoint()
));
}
}
......
......@@ -33,6 +33,7 @@ import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.Strings;
......@@ -137,7 +138,8 @@ public class LogQueryEsDAO extends EsDAO implements ILogQueryDAO {
sourceBuilder.size(limit);
sourceBuilder.from(from);
SearchResponse response = getClient().search(LogRecord.INDEX_NAME, sourceBuilder);
SearchResponse response = getClient()
.search(IndexController.LogicIndicesRegister.getPhysicalTableName(LogRecord.INDEX_NAME), sourceBuilder);
Logs logs = new Logs();
logs.setTotal((int) response.getHits().totalHits);
......
......@@ -40,6 +40,7 @@ import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
......@@ -70,7 +71,8 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(queryMaxSize);
SearchResponse response = getClient().search(ServiceTraffic.INDEX_NAME, sourceBuilder);
SearchResponse response = getClient()
.search(IndexController.LogicIndicesRegister.getPhysicalTableName(ServiceTraffic.INDEX_NAME), sourceBuilder);
return buildServices(response);
}
......@@ -85,7 +87,8 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(queryMaxSize);
SearchResponse response = getClient().search(ServiceTraffic.INDEX_NAME, sourceBuilder);
SearchResponse response = getClient()
.search(IndexController.LogicIndicesRegister.getPhysicalTableName(ServiceTraffic.INDEX_NAME), sourceBuilder);
return buildServices(response);
}
......@@ -100,7 +103,8 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(queryMaxSize);
SearchResponse response = getClient().search(ServiceTraffic.INDEX_NAME, sourceBuilder);
SearchResponse response = getClient()
.search(IndexController.LogicIndicesRegister.getPhysicalTableName(ServiceTraffic.INDEX_NAME), sourceBuilder);
final List<Service> serviceList = buildServices(response);
List<Database> databases = new ArrayList<>();
......@@ -128,7 +132,8 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(queryMaxSize);
SearchResponse response = getClient().search(ServiceTraffic.INDEX_NAME, sourceBuilder);
SearchResponse response = getClient()
.search(IndexController.LogicIndicesRegister.getPhysicalTableName(ServiceTraffic.INDEX_NAME), sourceBuilder);
return buildServices(response);
}
......@@ -140,7 +145,8 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
boolQueryBuilder.must().add(QueryBuilders.termQuery(ServiceTraffic.NAME, serviceCode));
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(1);
SearchResponse response = getClient().search(ServiceTraffic.INDEX_NAME, sourceBuilder);
SearchResponse response = getClient()
.search(IndexController.LogicIndicesRegister.getPhysicalTableName(ServiceTraffic.INDEX_NAME), sourceBuilder);
final List<Service> services = buildServices(response);
return services.size() > 0 ? services.get(0) : null;
}
......@@ -160,7 +166,8 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(limit);
SearchResponse response = getClient().search(EndpointTraffic.INDEX_NAME, sourceBuilder);
SearchResponse response = getClient()
.search(IndexController.LogicIndicesRegister.getPhysicalTableName(EndpointTraffic.INDEX_NAME), sourceBuilder);
List<Endpoint> endpoints = new ArrayList<>();
for (SearchHit searchHit : response.getHits()) {
......@@ -193,7 +200,8 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(queryMaxSize);
SearchResponse response = getClient().search(InstanceTraffic.INDEX_NAME, sourceBuilder);
SearchResponse response = getClient()
.search(IndexController.LogicIndicesRegister.getPhysicalTableName(InstanceTraffic.INDEX_NAME), sourceBuilder);
List<ServiceInstance> serviceInstances = new ArrayList<>();
for (SearchHit searchHit : response.getHits()) {
......
......@@ -38,6 +38,7 @@ import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetad
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
......@@ -58,8 +59,8 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
@Override
public long readMetricsValue(final MetricsCondition condition,
final String valueColumnName,
final Duration duration) throws IOException {
final String valueColumnName,
final Duration duration) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
buildQuery(sourceBuilder, condition, duration);
int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
......@@ -75,7 +76,8 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
sourceBuilder.aggregation(entityIdAggregation);
SearchResponse response = getClient().search(condition.getName(), sourceBuilder);
SearchResponse response = getClient()
.search(IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName()), sourceBuilder);
Terms idTerms = response.getAggregations().get(Metrics.ENTITY_ID);
for (Terms.Bucket idBucket : idTerms.getBuckets()) {
......@@ -98,13 +100,20 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
public MetricsValues readMetricsValues(final MetricsCondition condition,
final String valueColumnName,
final Duration duration) throws IOException {
String tableName = IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName());
final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
List<String> ids = new ArrayList<>(pointOfTimes.size());
pointOfTimes.forEach(pointOfTime -> {
ids.add(pointOfTime.id(condition.getEntity().buildId()));
String id = pointOfTime.id(condition.getEntity().buildId());
if (IndexController.LogicIndicesRegister.isMetricTable(condition.getName())) {
id = IndexController.INSTANCE.generateDocId(condition.getName(), id);
}
ids.add(id);
});
SearchResponse response = getClient().ids(condition.getName(), ids.toArray(new String[0]));
SearchResponse response = getClient()
.ids(tableName, ids.toArray(new String[0]));
Map<String, Map<String, Object>> idMap = toMap(response);
MetricsValues metricsValues = new MetricsValues();
......@@ -136,12 +145,18 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
final List<String> labels,
final Duration duration) throws IOException {
final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
String tableName = IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName());
boolean aggregationMode = !tableName.equals(condition.getName());
List<String> ids = new ArrayList<>(pointOfTimes.size());
pointOfTimes.forEach(pointOfTime -> {
ids.add(pointOfTime.id(condition.getEntity().buildId()));
String id = pointOfTime.id(condition.getEntity().buildId());
if (aggregationMode) {
id = IndexController.INSTANCE.generateDocId(condition.getName(), id);
}
ids.add(id);
});
SearchResponse response = getClient().ids(condition.getName(), ids.toArray(new String[0]));
SearchResponse response = getClient().ids(tableName, ids.toArray(new String[0]));
Map<String, DataTable> idMap = new HashMap<>();
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
......@@ -155,12 +170,18 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
final String valueColumnName,
final Duration duration) throws IOException {
final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
String tableName = IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName());
boolean aggregationMode = !tableName.equals(condition.getName());
List<String> ids = new ArrayList<>(pointOfTimes.size());
pointOfTimes.forEach(pointOfTime -> {
ids.add(pointOfTime.id(condition.getEntity().buildId()));
String id = pointOfTime.id(condition.getEntity().buildId());
if (aggregationMode) {
id = IndexController.INSTANCE.generateDocId(condition.getName(), id);
}
ids.add(id);
});
SearchResponse response = getClient().ids(condition.getName(), ids.toArray(new String[0]));
SearchResponse response = getClient().ids(tableName, ids.toArray(new String[0]));
Map<String, Map<String, Object>> idMap = toMap(response);
HeatMap heatMap = new HeatMap();
......@@ -206,13 +227,28 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
final String entityId = condition.getEntity().buildId();
if (entityId == null) {
if (entityId == null && IndexController.LogicIndicesRegister.isMetricTable(condition.getName())) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(rangeQueryBuilder);
boolQuery.must().add(QueryBuilders.termQuery(
IndexController.LogicIndicesRegister.METRIC_TABLE_NAME,
condition.getName()
));
} else if (entityId == null) {
sourceBuilder.query(rangeQueryBuilder);
} else if (IndexController.LogicIndicesRegister.isMetricTable(condition.getName())) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(rangeQueryBuilder);
boolQuery.must().add(QueryBuilders.termsQuery(Metrics.ENTITY_ID, entityId));
boolQuery.must().add(QueryBuilders.termQuery(
IndexController.LogicIndicesRegister.METRIC_TABLE_NAME,
condition.getName()
));
sourceBuilder.query(boolQuery);
} else {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(rangeQueryBuilder);
boolQuery.must().add(QueryBuilders.termsQuery(Metrics.ENTITY_ID, entityId));
sourceBuilder.query(boolQuery);
}
sourceBuilder.size(0);
......
......@@ -27,6 +27,7 @@ import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLogOperationT
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
......@@ -53,7 +54,8 @@ public class ProfileTaskLogEsDAO extends EsDAO implements IProfileTaskLogQueryDA
sourceBuilder.sort(ProfileTaskLogRecord.OPERATION_TIME, SortOrder.DESC);
sourceBuilder.size(queryMaxSize);
final SearchResponse response = getClient().search(ProfileTaskLogRecord.INDEX_NAME, sourceBuilder);
final SearchResponse response = getClient().search(
IndexController.LogicIndicesRegister.getPhysicalTableName(ProfileTaskLogRecord.INDEX_NAME), sourceBuilder);
final LinkedList<ProfileTaskLog> tasks = new LinkedList<>();
for (SearchHit searchHit : response.getHits().getHits()) {
......
......@@ -27,6 +27,7 @@ import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
......@@ -76,7 +77,8 @@ public class ProfileTaskQueryEsDAO extends EsDAO implements IProfileTaskQueryDAO
sourceBuilder.sort(ProfileTaskRecord.START_TIME, SortOrder.DESC);
final SearchResponse response = getClient().search(ProfileTaskRecord.INDEX_NAME, sourceBuilder);
final SearchResponse response = getClient().search(
IndexController.LogicIndicesRegister.getPhysicalTableName(ProfileTaskRecord.INDEX_NAME), sourceBuilder);
final LinkedList<ProfileTask> tasks = new LinkedList<>();
for (SearchHit searchHit : response.getHits().getHits()) {
......@@ -96,7 +98,8 @@ public class ProfileTaskQueryEsDAO extends EsDAO implements IProfileTaskQueryDAO
sourceBuilder.query(QueryBuilders.idsQuery().addIds(id));
sourceBuilder.size(1);
final SearchResponse response = getClient().search(ProfileTaskRecord.INDEX_NAME, sourceBuilder);
final SearchResponse response = getClient().search(
IndexController.LogicIndicesRegister.getPhysicalTableName(ProfileTaskRecord.INDEX_NAME), sourceBuilder);
if (response.getHits().getHits().length > 0) {
return parseTask(response.getHits().getHits()[0]);
......
......@@ -19,6 +19,12 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
......@@ -27,6 +33,7 @@ import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSear
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
......@@ -38,13 +45,6 @@ import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
public class ProfileThreadSnapshotQueryEsDAO extends EsDAO implements IProfileThreadSnapshotQueryDAO {
private final int querySegmentMaxSize;
......@@ -70,7 +70,10 @@ public class ProfileThreadSnapshotQueryEsDAO extends EsDAO implements IProfileTh
sourceBuilder.size(querySegmentMaxSize);
sourceBuilder.sort(ProfileThreadSnapshotRecord.DUMP_TIME, SortOrder.DESC);
SearchResponse response = getClient().search(ProfileThreadSnapshotRecord.INDEX_NAME, sourceBuilder);
SearchResponse response = getClient().search(
IndexController.LogicIndicesRegister.getPhysicalTableName(ProfileThreadSnapshotRecord.INDEX_NAME),
sourceBuilder
);
final LinkedList<String> segments = new LinkedList<>();
for (SearchHit searchHit : response.getHits().getHits()) {
......@@ -105,7 +108,8 @@ public class ProfileThreadSnapshotQueryEsDAO extends EsDAO implements IProfileTh
basicTrace.getEndpointNames().add((String) searchHit.getSourceAsMap().get(SegmentRecord.ENDPOINT_NAME));
basicTrace.setDuration(((Number) searchHit.getSourceAsMap().get(SegmentRecord.LATENCY)).intValue());
basicTrace.setError(BooleanUtils.valueToBoolean(((Number) searchHit.getSourceAsMap()
.get(SegmentRecord.IS_ERROR)).intValue()));
.get(
SegmentRecord.IS_ERROR)).intValue()));
basicTrace.getTraceIds().add((String) searchHit.getSourceAsMap().get(SegmentRecord.TRACE_ID));
result.add(basicTrace);
......@@ -116,16 +120,24 @@ public class ProfileThreadSnapshotQueryEsDAO extends EsDAO implements IProfileTh
@Override
public int queryMinSequence(String segmentId, long start, long end) throws IOException {
return querySequenceWithAgg(AggregationBuilders.min(ProfileThreadSnapshotRecord.SEQUENCE).field(ProfileThreadSnapshotRecord.SEQUENCE), segmentId, start, end);
return querySequenceWithAgg(
AggregationBuilders.min(ProfileThreadSnapshotRecord.SEQUENCE).field(ProfileThreadSnapshotRecord.SEQUENCE),
segmentId, start, end
);
}
@Override
public int queryMaxSequence(String segmentId, long start, long end) throws IOException {
return querySequenceWithAgg(AggregationBuilders.max(ProfileThreadSnapshotRecord.SEQUENCE).field(ProfileThreadSnapshotRecord.SEQUENCE), segmentId, start, end);
return querySequenceWithAgg(
AggregationBuilders.max(ProfileThreadSnapshotRecord.SEQUENCE).field(ProfileThreadSnapshotRecord.SEQUENCE),
segmentId, start, end
);
}
@Override
public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, int minSequence, int maxSequence) throws IOException {
public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId,
int minSequence,
int maxSequence) throws IOException {
// search traces
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
......@@ -134,10 +146,14 @@ public class ProfileThreadSnapshotQueryEsDAO extends EsDAO implements IProfileTh
List<QueryBuilder> mustQueryList = boolQueryBuilder.must();
mustQueryList.add(QueryBuilders.termQuery(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId));
mustQueryList.add(QueryBuilders.rangeQuery(ProfileThreadSnapshotRecord.SEQUENCE).gte(minSequence).lt(maxSequence));
mustQueryList.add(
QueryBuilders.rangeQuery(ProfileThreadSnapshotRecord.SEQUENCE).gte(minSequence).lt(maxSequence));
sourceBuilder.size(maxSequence - minSequence);
SearchResponse response = getClient().search(ProfileThreadSnapshotRecord.INDEX_NAME, sourceBuilder);
SearchResponse response = getClient().search(
IndexController.LogicIndicesRegister.getPhysicalTableName(ProfileThreadSnapshotRecord.INDEX_NAME),
sourceBuilder
);
List<ProfileThreadSnapshotRecord> result = new ArrayList<>(maxSequence - minSequence);
for (SearchHit searchHit : response.getHits().getHits()) {
......@@ -154,7 +170,8 @@ public class ProfileThreadSnapshotQueryEsDAO extends EsDAO implements IProfileTh
sourceBuilder.query(QueryBuilders.termQuery(SegmentRecord.SEGMENT_ID, segmentId));
sourceBuilder.size(1);
SearchResponse response = getClient().search(SegmentRecord.INDEX_NAME, sourceBuilder);
SearchResponse response = getClient().search(
IndexController.LogicIndicesRegister.getPhysicalTableName(SegmentRecord.INDEX_NAME), sourceBuilder);
if (response.getHits().getHits().length == 0) {
return null;
......@@ -177,7 +194,10 @@ public class ProfileThreadSnapshotQueryEsDAO extends EsDAO implements IProfileTh
return segmentRecord;
}
protected int querySequenceWithAgg(AbstractAggregationBuilder aggregationBuilder, String segmentId, long start, long end) throws IOException {
protected int querySequenceWithAgg(AbstractAggregationBuilder aggregationBuilder,
String segmentId,
long start,
long end) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
......@@ -189,8 +209,12 @@ public class ProfileThreadSnapshotQueryEsDAO extends EsDAO implements IProfileTh
sourceBuilder.size(0);
sourceBuilder.aggregation(aggregationBuilder);
SearchResponse response = getClient().search(ProfileThreadSnapshotRecord.INDEX_NAME, sourceBuilder);
NumericMetricsAggregation.SingleValue agg = response.getAggregations().get(ProfileThreadSnapshotRecord.SEQUENCE);
SearchResponse response = getClient().search(
IndexController.LogicIndicesRegister.getPhysicalTableName(ProfileThreadSnapshotRecord.INDEX_NAME),
sourceBuilder
);
NumericMetricsAggregation.SingleValue agg = response.getAggregations()
.get(ProfileThreadSnapshotRecord.SEQUENCE);
return (int) agg.value();
}
......
......@@ -32,6 +32,7 @@ import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
......@@ -62,7 +63,8 @@ public class TopNRecordsQueryEsDAO extends EsDAO implements ITopNRecordsQueryDAO
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(condition.getTopN())
.sort(valueColumnName, condition.getOrder().equals(Order.DES) ? SortOrder.DESC : SortOrder.ASC);
SearchResponse response = getClient().search(condition.getName(), sourceBuilder);
SearchResponse response = getClient().search(
IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName()), sourceBuilder);
List<SelectedRecord> results = new ArrayList<>(condition.getTopN());
......
......@@ -34,6 +34,7 @@ import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
......@@ -192,7 +193,8 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
.field(ServiceRelationServerSideMetrics.COMPONENT_ID))
.size(1000));
SearchResponse response = getClient().search(indexName, sourceBuilder);
SearchResponse response = getClient().search(
IndexController.LogicIndicesRegister.getPhysicalTableName(indexName), sourceBuilder);
List<Call.CallDetail> calls = new ArrayList<>();
Terms entityTerms = response.getAggregations().get(Metrics.ENTITY_ID);
......@@ -218,7 +220,8 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
.field(ServiceInstanceRelationServerSideMetrics.COMPONENT_ID))
.size(1000));
SearchResponse response = getClient().search(indexName, sourceBuilder);
SearchResponse response = getClient().search(
IndexController.LogicIndicesRegister.getPhysicalTableName(indexName), sourceBuilder);
List<Call.CallDetail> calls = new ArrayList<>();
Terms entityTerms = response.getAggregations().get(Metrics.ENTITY_ID);
......@@ -239,7 +242,8 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
DetectPoint detectPoint) throws IOException {
sourceBuilder.aggregation(AggregationBuilders.terms(Metrics.ENTITY_ID).field(Metrics.ENTITY_ID).size(1000));
SearchResponse response = getClient().search(indexName, sourceBuilder);
SearchResponse response = getClient().search(
IndexController.LogicIndicesRegister.getPhysicalTableName(indexName), sourceBuilder);
List<Call.CallDetail> calls = new ArrayList<>();
Terms entityTerms = response.getAggregations().get(Metrics.ENTITY_ID);
......
......@@ -37,6 +37,7 @@ import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSear
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameMaker;
import org.elasticsearch.action.search.SearchResponse;
......@@ -131,7 +132,11 @@ public class TraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
}
sourceBuilder.size(limit);
sourceBuilder.from(from);
SearchResponse response = getClient().search(new TimeRangeIndexNameMaker(SegmentRecord.INDEX_NAME, startSecondTB, endSecondTB), sourceBuilder);
SearchResponse response = getClient().search(
new TimeRangeIndexNameMaker(
IndexController.LogicIndicesRegister.getPhysicalTableName(SegmentRecord.INDEX_NAME), startSecondTB,
endSecondTB
), sourceBuilder);
TraceBrief traceBrief = new TraceBrief();
traceBrief.setTotal((int) response.getHits().totalHits);
......@@ -160,7 +165,8 @@ public class TraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
sourceBuilder.query(QueryBuilders.termQuery(SegmentRecord.TRACE_ID, traceId));
sourceBuilder.size(segmentQueryMaxSize);
SearchResponse response = getClient().search(SegmentRecord.INDEX_NAME, sourceBuilder);
SearchResponse response = getClient().search(
IndexController.LogicIndicesRegister.getPhysicalTableName(SegmentRecord.INDEX_NAME), sourceBuilder);
List<SegmentRecord> segmentRecords = new ArrayList<>();
for (SearchHit searchHit : response.getHits().getHits()) {
......
......@@ -31,6 +31,7 @@ import org.apache.skywalking.oap.server.core.storage.management.UITemplateManage
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
......@@ -51,14 +52,19 @@ public class UITemplateManagementEsDAO extends EsDAO implements UITemplateManage
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
if (!includingDisabled) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(UITemplate.DISABLED, BooleanUtils.booleanToValue(includingDisabled)));
boolQueryBuilder.must()
.add(QueryBuilders.termQuery(
UITemplate.DISABLED,
BooleanUtils.booleanToValue(includingDisabled)
));
}
sourceBuilder.query(boolQueryBuilder);
//It is impossible we have 10000+ templates.
sourceBuilder.size(10000);
SearchResponse response = getClient().search(UITemplate.INDEX_NAME, sourceBuilder);
SearchResponse response = getClient().search(
IndexController.LogicIndicesRegister.getPhysicalTableName(UITemplate.INDEX_NAME), sourceBuilder);
List<DashboardConfiguration> configs = new ArrayList<>();
final UITemplate.Builder builder = new UITemplate.Builder();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
public class IndexStructuresTest {
@Test
public void getMapping() {
IndexStructures structures = new IndexStructures();
HashMap<String, Object> properties = new HashMap<>();
properties.put("a", "b");
properties.put("c", "d");
structures.putStructure("test", structures.getWrapper().wrapper(properties));
Map<String, Object> mapping = structures.getMapping("test");
Assert.assertEquals(structures.getExtractor().extract(mapping), properties);
structures.putStructure("test2", structures.getWrapper().wrapper(new HashMap<>()));
mapping = structures.getMapping("test2");
Assert.assertTrue(structures.getExtractor().extract(mapping).isEmpty());
}
@Test
public void resolveStructure() {
IndexStructures structures = new IndexStructures();
HashMap<String, Object> properties = new HashMap<>();
properties.put("a", "b");
properties.put("c", "d");
structures.putStructure("test", structures.getWrapper().wrapper(properties));
Map<String, Object> mapping = structures.getMapping("test");
Assert.assertEquals(properties, structures.getExtractor().extract(mapping));
HashMap<String, Object> properties2 = new HashMap<>();
properties2.put("a", "b");
properties2.put("f", "g");
structures.putStructure("test", structures.getWrapper().wrapper(properties2));
mapping = structures.getMapping("test");
HashMap<String, Object> res = new HashMap<>();
res.put("a", "b");
res.put("c", "d");
res.put("f", "g");
Assert.assertEquals(res, structures.getExtractor().extract(mapping));
}
@Test
public void diffStructure() {
IndexStructures structures = new IndexStructures();
HashMap<String, Object> properties = new HashMap<>();
properties.put("a", "b");
properties.put("c", "d");
properties.put("f", "g");
structures.putStructure("test", structures.getWrapper().wrapper(properties));
HashMap<String, Object> properties2 = new HashMap<>();
properties2.put("a", "b");
Map<String, Object> diffMappings = structures.diffStructure(
"test", structures.getWrapper().wrapper(properties2));
HashMap<String, Object> res = new HashMap<>();
res.put("c", "d");
res.put("f", "g");
Assert.assertEquals(res, structures.getExtractor().extract(diffMappings));
diffMappings = structures.diffStructure("test", structures.getWrapper().wrapper(properties));
Assert.assertEquals(new HashMap<>(), structures.getExtractor().extract(diffMappings));
}
@Test
public void containsStructure() {
IndexStructures structures = new IndexStructures();
HashMap<String, Object> properties = new HashMap<>();
properties.put("a", "b");
properties.put("c", "d");
properties.put("f", "g");
structures.putStructure("test", structures.getWrapper().wrapper(properties));
HashMap<String, Object> properties2 = new HashMap<>();
properties2.put("a", "b");
properties2.put("c", "d");
Assert.assertTrue(structures.containsStructure("test", structures.getWrapper().wrapper(properties2)));
HashMap<String, Object> properties3 = new HashMap<>();
properties3.put("a", "b");
properties3.put("q", "d");
Assert.assertFalse(structures.containsStructure("test", structures.getWrapper().wrapper(properties3)));
}
}
\ No newline at end of file
......@@ -37,13 +37,13 @@ public class TimeSeriesUtilsTest {
@Before
public void prepare() {
superDatasetModel = new Model("superDatasetModel", Lists.newArrayList(), Lists.newArrayList(),
0, DownSampling.Minute, true, true
0, DownSampling.Minute, true, true, ""
);
normalRecordModel = new Model("normalRecordModel", Lists.newArrayList(), Lists.newArrayList(),
0, DownSampling.Minute, true, false
0, DownSampling.Minute, true, false, ""
);
normalMetricsModel = new Model("normalMetricsModel", Lists.newArrayList(), Lists.newArrayList(),
0, DownSampling.Minute, false, false
0, DownSampling.Minute, false, false, ""
);
TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(1);
TimeSeriesUtils.setDAY_STEP(3);
......@@ -63,14 +63,33 @@ public class TimeSeriesUtilsTest {
public void testIndexRolling() {
long secondTimeBucket = 2020_0809_1010_59L;
long minuteTimeBucket = 2020_0809_1010L;
Assert.assertEquals("superDatasetModel-20200809", writeIndexName(superDatasetModel, secondTimeBucket));
Assert.assertEquals("normalRecordModel-20200807", writeIndexName(normalRecordModel, secondTimeBucket));
Assert.assertEquals("normalMetricsModel-20200807", writeIndexName(normalMetricsModel, minuteTimeBucket));
Assert.assertEquals(
"superDatasetModel-20200809",
writeIndexName(superDatasetModel, secondTimeBucket)
);
Assert.assertEquals(
"normalRecordModel-20200807",
writeIndexName(normalRecordModel, secondTimeBucket)
);
Assert.assertEquals(
"normalMetricsModel-20200807",
writeIndexName(normalMetricsModel, minuteTimeBucket)
);
secondTimeBucket += 1000000;
minuteTimeBucket += 10000;
Assert.assertEquals("superDatasetModel-20200810", writeIndexName(superDatasetModel, secondTimeBucket));
Assert.assertEquals("normalRecordModel-20200810", writeIndexName(normalRecordModel, secondTimeBucket));
Assert.assertEquals("normalMetricsModel-20200810", writeIndexName(normalMetricsModel, minuteTimeBucket));
Assert.assertEquals(
"superDatasetModel-20200810",
writeIndexName(superDatasetModel, secondTimeBucket)
);
Assert.assertEquals(
"normalRecordModel-20200810",
writeIndexName(normalRecordModel, secondTimeBucket)
);
Assert.assertEquals(
"normalMetricsModel-20200810",
writeIndexName(normalMetricsModel, minuteTimeBucket)
);
}
}
......@@ -25,6 +25,7 @@ import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
......@@ -86,6 +87,7 @@ import static org.apache.skywalking.oap.server.storage.plugin.elasticsearch.Stor
/**
* The storage provider for ElasticSearch 7.
*/
@Slf4j
public class StorageModuleElasticsearch7Provider extends ModuleProvider {
protected final StorageModuleElasticsearch7Config config;
......@@ -115,7 +117,9 @@ public class StorageModuleElasticsearch7Provider extends ModuleProvider {
public void prepare() throws ServiceNotProvidedException {
this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
if (!StringUtil.isEmpty(config.getNameSpace())) {
if (StringUtil.isEmpty(config.getNameSpace())) {
config.setNameSpace("sw");
} else {
config.setNameSpace(config.getNameSpace().toLowerCase());
}
if (config.getDayStep() > 1) {
......@@ -159,11 +163,12 @@ public class StorageModuleElasticsearch7Provider extends ModuleProvider {
.getTrustStorePass(), config.getUser(), config.getPassword(),
indexNameConverters(config.getNameSpace())
);
this.registerServiceImplementation(
IBatchDAO.class, new BatchProcessEsDAO(elasticSearch7Client, config.getBulkActions(), config.getSyncBulkActions(),
config.getFlushInterval(), config.getConcurrentRequests()
));
IBatchDAO.class,
new BatchProcessEsDAO(elasticSearch7Client, config.getBulkActions(), config.getSyncBulkActions(),
config.getFlushInterval(), config.getConcurrentRequests()
)
);
this.registerServiceImplementation(StorageDAO.class, new StorageEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(
IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(elasticSearch7Client));
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.base;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexStructures;
public class IndexEs7Structures extends IndexStructures {
@Override
protected PropertiesExtractor doGetPropertiesExtractor() {
return mapping -> (Map<String, Object>) mapping.get("properties");
}
@Override
protected PropertiesWrapper doGetPropertiesWrapper() {
return properties -> {
HashMap<String, Object> mappings = new HashMap<>();
mappings.put("properties", properties);
return mappings;
};
}
}
......@@ -17,12 +17,11 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.base;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexStructures;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.StorageEsInstaller;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.StorageModuleElasticsearch7Config;
......@@ -30,19 +29,12 @@ import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.StorageMod
public class StorageEs7Installer extends StorageEsInstaller {
public StorageEs7Installer(final Client client,
final ModuleManager moduleManager,
final StorageModuleElasticsearch7Config config) {
final StorageModuleElasticsearch7Config config) throws StorageException {
super(client, moduleManager, config);
}
@Override
@SuppressWarnings("unchecked")
protected Map<String, Object> createMapping(Model model) {
Map<String, Object> mapping = super.createMapping(model);
Map<String, Object> type = (Map<String, Object>) mapping.remove(ElasticSearchClient.TYPE);
mapping.put("properties", type.get("properties"));
log.debug("elasticsearch index template setting: {}", mapping.toString());
return mapping;
protected IndexStructures getStructures() {
return new IndexEs7Structures();
}
}
......@@ -18,18 +18,26 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.client;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpGet;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.client.elasticsearch.IndexNameConverter;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
......@@ -37,6 +45,7 @@ import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
......@@ -52,7 +61,10 @@ import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.GetAliasesResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
......@@ -60,6 +72,7 @@ import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
import org.elasticsearch.client.indices.PutIndexTemplateRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
......@@ -154,6 +167,86 @@ public class ElasticSearch7Client extends ElasticSearchClient {
return response.isAcknowledged();
}
@Override
public Map<String, Object> getIndex(String indexName) throws IOException {
if (StringUtil.isBlank(indexName)) {
return new HashMap<>();
}
indexName = formatIndexName(indexName);
try {
Response response = client.getLowLevelClient()
.performRequest(new Request(HttpGet.METHOD_NAME, indexName));
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
healthChecker.health();
throw new IOException(
"The response status code of template exists request should be 200, but it is " + statusCode);
}
Type type = new TypeToken<HashMap<String, Object>>() {
}.getType();
Map<String, Object> templates = new Gson().<HashMap<String, Object>>fromJson(
new InputStreamReader(response.getEntity().getContent()),
type
);
return (Map<String, Object>) Optional.ofNullable(templates.get(indexName)).orElse(new HashMap<>());
} catch (ResponseException e) {
if (e.getResponse().getStatusLine().getStatusCode() == HttpStatus.SC_NOT_FOUND) {
return new HashMap<>();
}
healthChecker.unHealth(e);
throw e;
} catch (IOException t) {
healthChecker.unHealth(t);
throw t;
}
}
@Override
public boolean updateIndexMapping(String indexName, final Map<String, Object> mapping) throws IOException {
indexName = formatIndexName(indexName);
PutMappingRequest putMappingRequest = new PutMappingRequest(indexName);
Gson gson = new Gson();
putMappingRequest.source(gson.toJson(mapping), XContentType.JSON);
putMappingRequest.type("_doc");
AcknowledgedResponse response = client.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT);
log.debug("put {} index mapping finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
@Override
public Map<String, Object> getTemplate(String name) throws IOException {
name = formatIndexName(name);
try {
Response response = client.getLowLevelClient()
.performRequest(new Request(HttpGet.METHOD_NAME, "_template/" + name));
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
healthChecker.health();
throw new IOException(
"The response status code of template exists request should be 200, but it is " + statusCode);
}
Type type = new TypeToken<HashMap<String, Object>>() {
}.getType();
Map<String, Object> templates = new Gson().<HashMap<String, Object>>fromJson(
new InputStreamReader(response.getEntity().getContent()),
type
);
if (templates.containsKey(name)) {
return (Map<String, Object>) templates.get(name);
}
return new HashMap<>();
} catch (ResponseException e) {
if (e.getResponse().getStatusLine().getStatusCode() == HttpStatus.SC_NOT_FOUND) {
return new HashMap<>();
}
healthChecker.unHealth(e);
throw e;
} catch (IOException t) {
healthChecker.unHealth(t);
throw t;
}
}
@Override
public boolean isExistsIndex(String indexName) throws IOException {
indexName = formatIndexName(indexName);
......@@ -171,8 +264,8 @@ public class ElasticSearch7Client extends ElasticSearchClient {
}
@Override
public boolean createTemplate(String indexName, Map<String, Object> settings,
Map<String, Object> mapping) throws IOException {
public boolean createOrUpdateTemplate(String indexName, Map<String, Object> settings,
Map<String, Object> mapping) throws IOException {
indexName = formatIndexName(indexName);
PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest(indexName).patterns(
......
......@@ -18,32 +18,14 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.dao;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MetricsEsDAO;
import org.elasticsearch.action.search.SearchResponse;
public class MetricsEs7DAO extends MetricsEsDAO {
MetricsEs7DAO(final ElasticSearchClient client, final StorageHashMapBuilder<Metrics> storageBuilder) {
MetricsEs7DAO(ElasticSearchClient client, StorageHashMapBuilder<Metrics> storageBuilder) {
super(client, storageBuilder);
}
@Override
public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {
String[] ids = metrics.stream().map(Metrics::id).toArray(String[]::new);
SearchResponse response = getClient().ids(model.getName(), ids);
List<Metrics> result = new ArrayList<>(response.getHits().getHits().length);
for (int i = 0; i < response.getHits().getHits().length; i++) {
Metrics source = storageBuilder.storage2Entity(response.getHits().getAt(i).getSourceAsMap());
result.add(source);
}
return result;
}
}
......@@ -28,6 +28,8 @@ import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
import org.apache.skywalking.oap.server.core.query.type.KeyValue;
import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.AggregationQueryEsDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
......@@ -54,7 +56,6 @@ public class AggregationQueryEs7DAO extends AggregationQueryEsDAO {
final Duration duration,
final List<KeyValue> additionalConditions) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
final RangeQueryBuilder queryBuilder = QueryBuilders.rangeQuery(Metrics.TIME_BUCKET)
.lte(duration.getEndTimeBucket())
.gte(duration.getStartTimeBucket());
......@@ -63,17 +64,35 @@ public class AggregationQueryEs7DAO extends AggregationQueryEsDAO {
if (condition.getOrder().equals(Order.ASC)) {
asc = true;
}
String tableName = IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName());
if (additionalConditions != null && additionalConditions.size() > 0) {
if (CollectionUtils.isEmpty(additionalConditions)
&& IndexController.LogicIndicesRegister.isMetricTable(condition.getName())) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
additionalConditions.forEach(additionalCondition -> {
boolQuery.must()
.add(QueryBuilders.termsQuery(additionalCondition.getKey(), additionalCondition.getValue()));
});
boolQuery.must().add(QueryBuilders.termQuery(
IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, condition.getName()));
boolQuery.must().add(queryBuilder);
sourceBuilder.query(boolQuery);
} else {
} else if (CollectionUtils.isEmpty(additionalConditions)) {
sourceBuilder.query(queryBuilder);
} else if (CollectionUtils.isNotEmpty(additionalConditions)
&& IndexController.LogicIndicesRegister.isMetricTable(condition.getName())) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must()
.add(QueryBuilders.termQuery(
IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, condition.getName()));
additionalConditions.forEach(additionalCondition -> boolQuery
.must()
.add(QueryBuilders.termsQuery(additionalCondition.getKey(), additionalCondition.getValue())));
boolQuery.must().add(queryBuilder);
sourceBuilder.query(boolQuery);
} else {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
additionalConditions.forEach(additionalCondition -> boolQuery
.must()
.add(QueryBuilders.termsQuery(additionalCondition.getKey(), additionalCondition.getValue())));
boolQuery.must().add(queryBuilder);
sourceBuilder.query(boolQuery);
}
sourceBuilder.aggregation(
......@@ -84,7 +103,7 @@ public class AggregationQueryEs7DAO extends AggregationQueryEsDAO {
.subAggregation(AggregationBuilders.avg(valueColumnName).field(valueColumnName))
);
SearchResponse response = getClient().search(condition.getName(), sourceBuilder);
SearchResponse response = getClient().search(tableName, sourceBuilder);
List<SelectedRecord> topNList = new ArrayList<>();
Terms idTerms = response.getAggregations().get(Metrics.ENTITY_ID);
......
......@@ -19,13 +19,16 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.Objects;
import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
import org.apache.skywalking.oap.server.core.query.enumeration.Scope;
import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
import org.apache.skywalking.oap.server.core.query.type.Alarms;
import org.apache.skywalking.oap.server.core.query.enumeration.Scope;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
......@@ -34,9 +37,6 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.Objects;
public class AlarmQueryEs7DAO extends EsDAO implements IAlarmQueryDAO {
public AlarmQueryEs7DAO(ElasticSearchClient client) {
......@@ -44,8 +44,9 @@ public class AlarmQueryEs7DAO extends EsDAO implements IAlarmQueryDAO {
}
@Override
public Alarms getAlarm(final Integer scopeId, final String keyword, final int limit, final int from,
final long startTB, final long endTB) throws IOException {
public Alarms getAlarm(final Integer scopeId, final String keyword,
final int limit, final int from,
final long startTB, final long endTB) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
......@@ -64,7 +65,8 @@ public class AlarmQueryEs7DAO extends EsDAO implements IAlarmQueryDAO {
sourceBuilder.size(limit);
sourceBuilder.from(from);
SearchResponse response = getClient().search(AlarmRecord.INDEX_NAME, sourceBuilder);
SearchResponse response = getClient()
.search(IndexController.LogicIndicesRegister.getPhysicalTableName(AlarmRecord.INDEX_NAME), sourceBuilder);
Alarms alarms = new Alarms();
alarms.setTotal((int) response.getHits().getTotalHits().value);
......
......@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.browser.source.BrowserErrorCategory
import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog;
import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLogs;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.BrowserLogQueryEsDAO;
import org.elasticsearch.action.search.SearchResponse;
......@@ -80,8 +81,8 @@ public class BrowserLogQueryEs7DAO extends BrowserLogQueryEsDAO {
}
sourceBuilder.size(limit);
sourceBuilder.from(from);
SearchResponse response = getClient().search(BrowserErrorLogRecord.INDEX_NAME, sourceBuilder);
SearchResponse response = getClient()
.search(IndexController.LogicIndicesRegister.getPhysicalTableName(BrowserErrorLogRecord.INDEX_NAME), sourceBuilder);
BrowserErrorLogs logs = new BrowserErrorLogs();
logs.setTotal((int) response.getHits().getTotalHits().value);
......
......@@ -24,6 +24,7 @@ import org.apache.skywalking.oap.server.core.event.Event;
import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
import org.apache.skywalking.oap.server.core.query.type.event.Events;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ESEventQueryDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.builder.SearchSourceBuilder;
......@@ -37,7 +38,8 @@ public class ES7EventQueryDAO extends ESEventQueryDAO {
public Events queryEvents(final EventQueryCondition condition) throws Exception {
final SearchSourceBuilder sourceBuilder = buildQuery(condition);
final SearchResponse response = getClient().search(Event.INDEX_NAME, sourceBuilder);
final SearchResponse response = getClient()
.search(IndexController.LogicIndicesRegister.getPhysicalTableName(Event.INDEX_NAME), sourceBuilder);
final Events events = new Events();
events.setTotal(response.getHits().getTotalHits().value);
......
......@@ -33,6 +33,7 @@ import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.Strings;
......@@ -79,7 +80,6 @@ public class LogQueryEs7DAO extends EsDAO implements ILogQueryDAO {
if (startSecondTB != 0 && endSecondTB != 0) {
mustQueryList.add(QueryBuilders.rangeQuery(Record.TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
}
if (isNotEmpty(serviceId)) {
mustQueryList.add(QueryBuilders.termQuery(AbstractLogRecord.SERVICE_ID, serviceId));
}
......@@ -138,7 +138,8 @@ public class LogQueryEs7DAO extends EsDAO implements ILogQueryDAO {
sourceBuilder.size(limit);
sourceBuilder.from(from);
SearchResponse response = getClient().search(LogRecord.INDEX_NAME, sourceBuilder);
SearchResponse response = getClient()
.search(IndexController.LogicIndicesRegister.getPhysicalTableName(LogRecord.INDEX_NAME), sourceBuilder);
Logs logs = new Logs();
logs.setTotal((int) response.getHits().getTotalHits().value);
......@@ -146,7 +147,8 @@ public class LogQueryEs7DAO extends EsDAO implements ILogQueryDAO {
for (SearchHit searchHit : response.getHits().getHits()) {
Log log = new Log();
log.setServiceId((String) searchHit.getSourceAsMap().get(AbstractLogRecord.SERVICE_ID));
log.setServiceInstanceId((String) searchHit.getSourceAsMap().get(AbstractLogRecord.SERVICE_INSTANCE_ID));
log.setServiceInstanceId((String) searchHit.getSourceAsMap()
.get(AbstractLogRecord.SERVICE_INSTANCE_ID));
log.setEndpointId((String) searchHit.getSourceAsMap().get(AbstractLogRecord.ENDPOINT_ID));
log.setEndpointName((String) searchHit.getSourceAsMap().get(AbstractLogRecord.ENDPOINT_NAME));
log.setTraceId((String) searchHit.getSourceAsMap().get(AbstractLogRecord.TRACE_ID));
......
......@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.query.input.MetricsCondition;
import org.apache.skywalking.oap.server.core.query.sql.Function;
import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetadata;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.MetricsQueryEsDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.aggregations.AggregationBuilders;
......@@ -51,20 +52,21 @@ public class MetricsQueryEs7DAO extends MetricsQueryEsDAO {
final Duration duration) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
buildQuery(sourceBuilder, condition, duration);
int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
if (function == Function.Latest) {
return readMetricsValues(condition, valueColumnName, duration).getValues().latestValue(defaultValue);
}
TermsAggregationBuilder entityIdAggregation = AggregationBuilders.terms(Metrics.ENTITY_ID)
.field(Metrics.ENTITY_ID)
.size(1);
.field(Metrics.ENTITY_ID)
.size(1);
functionAggregation(function, entityIdAggregation, valueColumnName);
sourceBuilder.aggregation(entityIdAggregation);
SearchResponse response = getClient().search(condition.getName(), sourceBuilder);
SearchResponse response = getClient()
.search(IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName()), sourceBuilder);
Terms idTerms = response.getAggregations().get(Metrics.ENTITY_ID);
for (Terms.Bucket idBucket : idTerms.getBuckets()) {
......
......@@ -31,6 +31,7 @@ import org.apache.skywalking.oap.server.core.query.type.TraceState;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameMaker;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TraceQueryEsDAO;
......@@ -118,16 +119,14 @@ public class TraceQueryEs7DAO extends TraceQueryEsDAO {
}
if (CollectionUtils.isNotEmpty(tags)) {
BoolQueryBuilder tagMatchQuery = QueryBuilders.boolQuery();
tags.forEach(tag -> {
tagMatchQuery.must(QueryBuilders.termQuery(SegmentRecord.TAGS, tag.toString()));
});
tags.forEach(tag -> tagMatchQuery.must(QueryBuilders.termQuery(SegmentRecord.TAGS, tag.toString())));
mustQueryList.add(tagMatchQuery);
}
sourceBuilder.size(limit);
sourceBuilder.from(from);
SearchResponse response = getClient().search(
new TimeRangeIndexNameMaker(SegmentRecord.INDEX_NAME, startSecondTB, endSecondTB), sourceBuilder);
new TimeRangeIndexNameMaker(
IndexController.LogicIndicesRegister.getPhysicalTableName(SegmentRecord.INDEX_NAME), startSecondTB, endSecondTB), sourceBuilder);
TraceBrief traceBrief = new TraceBrief();
traceBrief.setTotal((int) response.getHits().getTotalHits().value);
......@@ -140,7 +139,8 @@ public class TraceQueryEs7DAO extends TraceQueryEsDAO {
basicTrace.setDuration(((Number) searchHit.getSourceAsMap().get(SegmentRecord.LATENCY)).intValue());
basicTrace.setError(
BooleanUtils.valueToBoolean(
((Number) searchHit.getSourceAsMap().get(SegmentRecord.IS_ERROR)).intValue())
((Number) searchHit.getSourceAsMap().get(SegmentRecord.IS_ERROR)).intValue()
)
);
basicTrace.getTraceIds().add((String) searchHit.getSourceAsMap().get(SegmentRecord.TRACE_ID));
traceBrief.getTraces().add(basicTrace);
......
......@@ -33,7 +33,7 @@
<test.framework.version>7.3.0</test.framework.version>
<elasticsearch.version>7.3.0</elasticsearch.version>
<elasticsearch.version>7.5.0</elasticsearch.version>
<spring-boot-version>2.1.6.RELEASE</spring-boot-version>
</properties>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册