未验证 提交 2df3c683 编写于 作者: E Evan 提交者: GitHub

Super Size Dataset record index es rolling step (#5282)

上级 fdd242ed
...@@ -54,6 +54,7 @@ storage: ...@@ -54,6 +54,7 @@ storage:
trustStorePath: ${SW_STORAGE_ES_SSL_JKS_PATH:""} trustStorePath: ${SW_STORAGE_ES_SSL_JKS_PATH:""}
trustStorePass: ${SW_STORAGE_ES_SSL_JKS_PASS:""} trustStorePass: ${SW_STORAGE_ES_SSL_JKS_PASS:""}
dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index. dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index.
superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value lt 0
user: ${SW_ES_USER:""} user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""} password: ${SW_ES_PASSWORD:""}
secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool. secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool.
...@@ -105,6 +106,9 @@ Such as, if dayStep == 11, ...@@ -105,6 +106,9 @@ Such as, if dayStep == 11,
1. data in [2000-01-01, 2000-01-11] will be merged into the index-20000101. 1. data in [2000-01-01, 2000-01-11] will be merged into the index-20000101.
1. data in [2000-01-12, 2000-01-22] will be merged into the index-20000112. 1. data in [2000-01-12, 2000-01-22] will be merged into the index-20000112.
`storage/elasticsearch/superDatasetDayStep` override the `storage/elasticsearch/dayStep` if the value is positive.
This would affect the record related entities, such as the trace segment. In some cases, the size of metrics is much less than the record(trace), this would help the shards balance in the ElasticSearch cluster.
NOTICE, TTL deletion would be affected by these. You should set an extra more dayStep in your TTL. Such as you want to TTL == 30 days and dayStep == 10, you actually need to set TTL = 40; NOTICE, TTL deletion would be affected by these. You should set an extra more dayStep in your TTL. Such as you want to TTL == 30 days and dayStep == 10, you actually need to set TTL = 40;
### Secrets Management File Of ElasticSearch Authentication ### Secrets Management File Of ElasticSearch Authentication
......
...@@ -101,6 +101,7 @@ storage: ...@@ -101,6 +101,7 @@ storage:
password: ${SW_ES_PASSWORD:""} password: ${SW_ES_PASSWORD:""}
secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool. secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool.
dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index. dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index.
superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value is less than 0
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:1} # Shard number of new indexes indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:1} # Shard number of new indexes
superDatasetIndexShardsFactor: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # Super data set has been defined in the codes, such as trace segments. This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger traces. superDatasetIndexShardsFactor: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # Super data set has been defined in the codes, such as trace segments. This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger traces.
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0} indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
...@@ -119,6 +120,7 @@ storage: ...@@ -119,6 +120,7 @@ storage:
trustStorePath: ${SW_STORAGE_ES_SSL_JKS_PATH:""} trustStorePath: ${SW_STORAGE_ES_SSL_JKS_PATH:""}
trustStorePass: ${SW_STORAGE_ES_SSL_JKS_PASS:""} trustStorePass: ${SW_STORAGE_ES_SSL_JKS_PASS:""}
dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index. dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index.
superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value is less than 0
user: ${SW_ES_USER:""} user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""} password: ${SW_ES_PASSWORD:""}
secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool. secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool.
......
...@@ -80,6 +80,11 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig { ...@@ -80,6 +80,11 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
*/ */
@Getter @Getter
private int dayStep = 1; private int dayStep = 1;
/**
* @since 8.2.0, the record day step is for super size dataset record index rolling when the value of it is greater than 0
*/
@Getter
private int superDatasetDayStep = -1;
@Setter @Setter
private int resultWindowMaxSize = 10000; private int resultWindowMaxSize = 10000;
@Setter @Setter
......
...@@ -114,11 +114,15 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider { ...@@ -114,11 +114,15 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
} }
if (config.getDayStep() > 1) { if (config.getDayStep() > 1) {
TimeSeriesUtils.setDAY_STEP(config.getDayStep()); TimeSeriesUtils.setDAY_STEP(config.getDayStep());
TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(config.getDayStep());
}
if (config.getSuperDatasetDayStep() > 0) {
TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(config.getSuperDatasetDayStep());
} }
if (!StringUtil.isEmpty(config.getSecretsManagementFile())) { if (!StringUtil.isEmpty(config.getSecretsManagementFile())) {
MultipleFilesChangeMonitor monitor = new MultipleFilesChangeMonitor( MultipleFilesChangeMonitor monitor = new MultipleFilesChangeMonitor(
10, readableContents -> { 10, readableContents -> {
final byte[] secretsFileContent = readableContents.get(0); final byte[] secretsFileContent = readableContents.get(0);
if (secretsFileContent == null) { if (secretsFileContent == null) {
return; return;
...@@ -146,47 +150,50 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider { ...@@ -146,47 +150,50 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
} }
elasticSearchClient = new ElasticSearchClient( elasticSearchClient = new ElasticSearchClient(
config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
.getTrustStorePass(), config.getUser(), config.getPassword(), .getTrustStorePass(), config.getUser(), config.getPassword(),
indexNameConverters(config.getNameSpace()) indexNameConverters(config.getNameSpace())
); );
this.registerServiceImplementation( this.registerServiceImplementation(
IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config
.getFlushInterval(), config.getConcurrentRequests())); .getFlushInterval(), config.getConcurrentRequests()));
this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient)); this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
this.registerServiceImplementation( this.registerServiceImplementation(
IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(elasticSearchClient)); IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(elasticSearchClient));
this.registerServiceImplementation( this.registerServiceImplementation(
INetworkAddressAliasDAO.class, new NetworkAddressAliasEsDAO(elasticSearchClient, config INetworkAddressAliasDAO.class, new NetworkAddressAliasEsDAO(elasticSearchClient, config
.getResultWindowMaxSize())); .getResultWindowMaxSize()));
this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearchClient)); this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQueryEsDAO(elasticSearchClient)); this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation( this.registerServiceImplementation(
ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient, config.getSegmentQueryMaxSize())); ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient, config.getSegmentQueryMaxSize()));
this.registerServiceImplementation( this.registerServiceImplementation(
IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient, config.getMetadataQueryMaxSize())); IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient, config.getMetadataQueryMaxSize()));
this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEsDAO(elasticSearchClient)); this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEsDAO(elasticSearchClient)); this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearchClient)); this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEsDAO(elasticSearchClient)); this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation( this.registerServiceImplementation(
IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(elasticSearchClient, config IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(elasticSearchClient, config
.getProfileTaskQueryMaxSize())); .getProfileTaskQueryMaxSize()));
this.registerServiceImplementation( this.registerServiceImplementation(
IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(elasticSearchClient, config IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(elasticSearchClient, config
.getProfileTaskQueryMaxSize())); .getProfileTaskQueryMaxSize()));
this.registerServiceImplementation( this.registerServiceImplementation(
IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQueryEsDAO(elasticSearchClient, config IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQueryEsDAO(elasticSearchClient, config
.getProfileTaskQueryMaxSize())); .getProfileTaskQueryMaxSize()));
this.registerServiceImplementation( this.registerServiceImplementation(
UITemplateManagementDAO.class, new UITemplateManagementEsDAO(elasticSearchClient)); UITemplateManagementDAO.class, new UITemplateManagementEsDAO(elasticSearchClient));
} }
@Override @Override
public void start() throws ModuleStartException { public void start() throws ModuleStartException {
MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME).provider().getService(MetricsCreator.class); MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME)
HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge("storage_elasticsearch", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); .provider()
.getService(MetricsCreator.class);
HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge(
"storage_elasticsearch", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
elasticSearchClient.registerChecker(healthChecker); elasticSearchClient.registerChecker(healthChecker);
try { try {
elasticSearchClient.connect(); elasticSearchClient.connect();
...@@ -204,7 +211,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider { ...@@ -204,7 +211,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
@Override @Override
public String[] requiredModules() { public String[] requiredModules() {
return new String[]{CoreModule.NAME}; return new String[] {CoreModule.NAME};
} }
public static List<IndexNameConverter> indexNameConverters(String namespace) { public static List<IndexNameConverter> indexNameConverters(String namespace) {
......
...@@ -40,13 +40,18 @@ public class TimeSeriesUtils { ...@@ -40,13 +40,18 @@ public class TimeSeriesUtils {
private static final DateTime DAY_ONE = TIME_BUCKET_FORMATTER.parseDateTime("20000101"); private static final DateTime DAY_ONE = TIME_BUCKET_FORMATTER.parseDateTime("20000101");
@Setter @Setter
private static int DAY_STEP = 1; private static int DAY_STEP = 1;
@Setter
private static int SUPER_DATASET_DAY_STEP = 1;
/** /**
* @return formatted latest index name, based on current timestamp. * @return formatted latest index name, based on current timestamp.
*/ */
public static String latestWriteIndexName(Model model) { public static String latestWriteIndexName(Model model) {
long timeBucket; long timeBucket;
if (model.isRecord()) { if (model.isRecord() && model.isSuperDataset()) {
timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), model.getDownsampling());
return model.getName() + Const.LINE + compressTimeBucket(timeBucket / 1000000, SUPER_DATASET_DAY_STEP);
} else if (model.isRecord()) {
timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), model.getDownsampling()); timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), model.getDownsampling());
return model.getName() + Const.LINE + compressTimeBucket(timeBucket / 1000000, DAY_STEP); return model.getName() + Const.LINE + compressTimeBucket(timeBucket / 1000000, DAY_STEP);
} else { } else {
...@@ -61,7 +66,9 @@ public class TimeSeriesUtils { ...@@ -61,7 +66,9 @@ public class TimeSeriesUtils {
static String writeIndexName(Model model, long timeBucket) { static String writeIndexName(Model model, long timeBucket) {
final String modelName = model.getName(); final String modelName = model.getName();
if (model.isRecord()) { if (model.isRecord() && model.isSuperDataset()) {
return modelName + Const.LINE + compressTimeBucket(timeBucket / 1000000, SUPER_DATASET_DAY_STEP);
} else if (model.isRecord()) {
return modelName + Const.LINE + compressTimeBucket(timeBucket / 1000000, DAY_STEP); return modelName + Const.LINE + compressTimeBucket(timeBucket / 1000000, DAY_STEP);
} else { } else {
switch (model.getDownsampling()) { switch (model.getDownsampling()) {
......
...@@ -18,12 +18,37 @@ ...@@ -18,12 +18,37 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.testcontainers.shaded.com.google.common.collect.Lists;
import static org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeSeriesUtils.compressTimeBucket; import static org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeSeriesUtils.compressTimeBucket;
import static org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeSeriesUtils.writeIndexName;
public class TimeSeriesUtilsTest { public class TimeSeriesUtilsTest {
private Model superDatasetModel;
private Model normalRecordModel;
private Model normalMetricsModel;
@Before
public void prepare() {
superDatasetModel = new Model("superDatasetModel", Lists.newArrayList(), Lists.newArrayList(),
0, DownSampling.Minute, true, true
);
normalRecordModel = new Model("normalRecordModel", Lists.newArrayList(), Lists.newArrayList(),
0, DownSampling.Minute, true, false
);
normalMetricsModel = new Model("normalMetricsModel", Lists.newArrayList(), Lists.newArrayList(),
0, DownSampling.Minute, false, false
);
TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(1);
TimeSeriesUtils.setDAY_STEP(3);
}
@Test @Test
public void testCompressTimeBucket() { public void testCompressTimeBucket() {
Assert.assertEquals(20000101L, compressTimeBucket(20000105, 11)); Assert.assertEquals(20000101L, compressTimeBucket(20000105, 11));
...@@ -33,4 +58,19 @@ public class TimeSeriesUtilsTest { ...@@ -33,4 +58,19 @@ public class TimeSeriesUtilsTest {
Assert.assertEquals(20000123L, compressTimeBucket(20000123, 11)); Assert.assertEquals(20000123L, compressTimeBucket(20000123, 11));
Assert.assertEquals(20000123L, compressTimeBucket(20000125, 11)); Assert.assertEquals(20000123L, compressTimeBucket(20000125, 11));
} }
@Test
public void testIndexRolling() {
long secondTimeBucket = 2020_0809_1010_59L;
long minuteTimeBucket = 2020_0809_1010L;
Assert.assertEquals("superDatasetModel-20200809", writeIndexName(superDatasetModel, secondTimeBucket));
Assert.assertEquals("normalRecordModel-20200807", writeIndexName(normalRecordModel, secondTimeBucket));
Assert.assertEquals("normalMetricsModel-20200807", writeIndexName(normalMetricsModel, minuteTimeBucket));
secondTimeBucket += 1000000;
minuteTimeBucket += 10000;
Assert.assertEquals("superDatasetModel-20200810", writeIndexName(superDatasetModel, secondTimeBucket));
Assert.assertEquals("normalRecordModel-20200810", writeIndexName(normalRecordModel, secondTimeBucket));
Assert.assertEquals("normalMetricsModel-20200810", writeIndexName(normalMetricsModel, minuteTimeBucket));
}
} }
...@@ -54,6 +54,7 @@ import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedExcepti ...@@ -54,6 +54,7 @@ import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedExcepti
import org.apache.skywalking.oap.server.library.util.MultipleFilesChangeMonitor; import org.apache.skywalking.oap.server.library.util.MultipleFilesChangeMonitor;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.BatchProcessEsDAO; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.BatchProcessEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.HistoryDeleteEsDAO; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.HistoryDeleteEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeSeriesUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.NetworkAddressAliasEsDAO; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.NetworkAddressAliasEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskLogEsDAO; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskLogEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskQueryEsDAO; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskQueryEsDAO;
...@@ -110,6 +111,13 @@ public class StorageModuleElasticsearch7Provider extends ModuleProvider { ...@@ -110,6 +111,13 @@ public class StorageModuleElasticsearch7Provider extends ModuleProvider {
if (!StringUtil.isEmpty(config.getNameSpace())) { if (!StringUtil.isEmpty(config.getNameSpace())) {
config.setNameSpace(config.getNameSpace().toLowerCase()); config.setNameSpace(config.getNameSpace().toLowerCase());
} }
if (config.getDayStep() > 1) {
TimeSeriesUtils.setDAY_STEP(config.getDayStep());
TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(config.getDayStep());
}
if (config.getSuperDatasetDayStep() > 0) {
TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(config.getSuperDatasetDayStep());
}
if (!StringUtil.isEmpty(config.getSecretsManagementFile())) { if (!StringUtil.isEmpty(config.getSecretsManagementFile())) {
MultipleFilesChangeMonitor monitor = new MultipleFilesChangeMonitor( MultipleFilesChangeMonitor monitor = new MultipleFilesChangeMonitor(
10, readableContents -> { 10, readableContents -> {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册