未验证 提交 1652e0c0 编写于 作者: E Evan 提交者: GitHub

Add a SuperDataset tag for custom super size dataset shards config in es storage (#4793)

* add SuperDataset tag for add super size dataset shards config in es
上级 cffcc58f
......@@ -98,7 +98,8 @@ storage:
password: ${SW_ES_PASSWORD:""}
secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool.
dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index.
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:1} # The index shards number is for store metrics data rather than basic segment record
superDatasetIndexShardsFactor: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # Super data set has been defined in the codes, such as trace segments. This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger traces.
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 1000 requests
......@@ -119,7 +120,8 @@ storage:
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool.
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:1} # The index shards number is for store metrics data rather than basic segment record
superDatasetIndexShardsFactor: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # Super data set has been defined in the codes, such as trace segments. This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger traces.
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 1000 requests
......
......@@ -33,8 +33,10 @@ import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcess
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
@SuperDataset
@Stream(name = SegmentRecord.INDEX_NAME, scopeId = DefaultScopeDefine.SEGMENT, builder = SegmentRecord.Builder.class, processor = RecordStreamProcessor.class)
public class SegmentRecord extends Record {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* This annotation applies to the supersize dataset entity.Storage implementation could provide different and specific
* optimization as this entity has much larger dataset.
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface SuperDataset {
}
......@@ -37,4 +37,6 @@ public class Model {
private final int scopeId;
private final DownSampling downsampling;
private final boolean record;
private final boolean superDataset;
}
......@@ -23,13 +23,16 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.MultipleQueryUnifiedIndex;
import org.apache.skywalking.oap.server.core.storage.annotation.QueryUnifiedIndex;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset;
import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetadata;
/**
......@@ -58,8 +61,9 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
Model model = new Model(
storage.getModelName(), modelColumns, extraQueryIndices, scopeId,
storage.getDownsampling(), record
storage.getDownsampling(), record, isSuperDatasetModel(aClass)
);
this.followColumnNameRules(model);
models.add(model);
......@@ -69,6 +73,10 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
return model;
}
private boolean isSuperDatasetModel(Class<?> aClass) {
return aClass.isAnnotationPresent(SuperDataset.class);
}
/**
* CreatingListener listener could react when {@link #add(Class, int, Storage, boolean)} model happens. Also, the
* added models are being notified in this add operation.
......@@ -99,14 +107,17 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
Column column = field.getAnnotation(Column.class);
modelColumns.add(
new ModelColumn(
new ColumnName(modelName, column.columnName()), field.getType(), column.matchQuery(), column
.storageOnly(), column.dataType().isValue(), column.length()));
new ColumnName(modelName, column.columnName()), field.getType(), column.matchQuery(),
column.storageOnly(), column.dataType().isValue(), column.length()
));
if (log.isDebugEnabled()) {
log.debug("The field named {} with the {} type", column.columnName(), field.getType());
}
if (column.dataType().isValue()) {
ValueColumnMetadata.INSTANCE.putIfAbsent(
modelName, column.columnName(), column.dataType(), column.function(), column.defaultValue());
modelName, column.columnName(), column.dataType(), column.function(),
column.defaultValue()
);
}
List<QueryUnifiedIndex> indexDefinitions = new ArrayList<>();
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
@Getter
......@@ -32,7 +33,9 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
@Setter
String protocol = "http";
@Setter
private int indexShardsNumber = 2;
private int indexShardsNumber = 1;
@Setter
private int superDatasetIndexShardsFactor = 5;
@Setter
private int indexReplicasNumber = 0;
@Setter
......@@ -76,7 +79,6 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
* @since 7.0.0 dayStep represents how many days a single one index represents. Default is 1, meaning no difference
* with previous versions. But if there isn't much traffic for single one day, user could set the step larger to
* reduce the number of indexes, and keep the TTL longer.
*
*/
@Getter
private int dayStep = 1;
......
......@@ -19,10 +19,13 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import com.google.gson.Gson;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.Model;
......@@ -41,7 +44,9 @@ public class StorageEsInstaller extends ModelInstaller {
private final StorageModuleElasticsearchConfig config;
protected final ColumnTypeEsMapping columnTypeEsMapping;
public StorageEsInstaller(Client client, ModuleManager moduleManager, final StorageModuleElasticsearchConfig config) {
public StorageEsInstaller(Client client,
ModuleManager moduleManager,
final StorageModuleElasticsearchConfig config) {
super(client, moduleManager);
this.columnTypeEsMapping = new ColumnTypeEsMapping();
this.config = config;
......@@ -62,7 +67,7 @@ public class StorageEsInstaller extends ModelInstaller {
protected void createTable(Model model) throws StorageException {
ElasticSearchClient esClient = (ElasticSearchClient) client;
Map<String, Object> settings = createSetting(model.isRecord());
Map<String, Object> settings = createSetting(model);
Map<String, Object> mapping = createMapping(model);
log.info("index {}'s columnTypeEsMapping builder str: {}", esClient.formatIndexName(model.getName()), mapping
.toString());
......@@ -89,14 +94,16 @@ public class StorageEsInstaller extends ModelInstaller {
}
}
protected Map<String, Object> createSetting(boolean record) {
protected Map<String, Object> createSetting(Model model) {
Map<String, Object> setting = new HashMap<>();
setting.put("index.number_of_shards", config.getIndexShardsNumber());
setting.put("index.number_of_replicas", config.getIndexReplicasNumber());
setting.put("index.refresh_interval", record ? TimeValue.timeValueSeconds(10)
.toString() : TimeValue.timeValueSeconds(
config.getFlushInterval())
.toString());
setting.put("index.number_of_shards", model.isSuperDataset()
? config.getIndexShardsNumber() * config.getSuperDatasetIndexShardsFactor()
: config.getIndexShardsNumber());
setting.put("index.refresh_interval", model.isRecord()
? TimeValue.timeValueSeconds(10).toString()
: TimeValue.timeValueSeconds(config.getFlushInterval()).toString());
setting.put("analysis.analyzer.oap_analyzer.type", "stop");
if (!StringUtil.isEmpty(config.getAdvanced())) {
Map<String, Object> advancedSettings = gson.fromJson(config.getAdvanced(), Map.class);
......
......@@ -31,8 +31,10 @@ import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcess
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
@SuperDataset
@Stream(name = JaegerSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.JAEGER_SPAN, builder = JaegerSpanRecord.Builder.class, processor = RecordStreamProcessor.class)
public class JaegerSpanRecord extends Record {
public static final String INDEX_NAME = "jaeger_span";
......
......@@ -31,8 +31,10 @@ import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcess
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
@SuperDataset
@Stream(name = ZipkinSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ZIPKIN_SPAN, builder = ZipkinSpanRecord.Builder.class, processor = RecordStreamProcessor.class)
public class ZipkinSpanRecord extends Record {
public static final String INDEX_NAME = "zipkin_span";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册