未验证 提交 33423938 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Support Daily Index Step feature in the OAP (#4368)

* Support Daily Index Step.

* Fix format.

* Fix mischanged ES.
上级 329f7e15
......@@ -83,6 +83,7 @@ storage:
# trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:"../es_keystore.jks"}
# trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
# enablePackedDownsampling: ${SW_STORAGE_ENABLE_PACKED_DOWNSAMPLING:true} # Hour and Day metrics will be merged into minute index.
# dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index.
# user: ${SW_ES_USER:""}
# password: ${SW_ES_PASSWORD:""}
# indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
......@@ -106,6 +107,7 @@ storage:
# trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:"../es_keystore.jks"}
# trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
# enablePackedDownsampling: ${SW_STORAGE_ENABLE_PACKED_DOWNSAMPLING:true} # Hour and Day metrics will be merged into minute index.
# dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index.
# user: ${SW_ES_USER:""}
# password: ${SW_ES_PASSWORD:""}
# indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
......
......@@ -48,6 +48,7 @@ storage:
#trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:""}
#trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
enablePackedDownsampling: ${SW_STORAGE_ENABLE_PACKED_DOWNSAMPLING:true} # Hour and Day metrics will be merged into minute index.
dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index.
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
......@@ -108,6 +109,19 @@ storage:
### Data TTL
TTL in ElasticSearch overrides the settings of core, read [ElasticSearch section in TTL document](ttl.md#elasticsearch-6-storage-ttl)
### Daily Index Step
Daily index step(`storage/elasticsearch/dayStep`, default 1) represents the index creation period. In this period, several days(dayStep value)' metrics are saved.
Mostly, users don't need to change the value manually. As SkyWalking is designed to observe large scale distributed system.
But in some specific cases, users want to set a long TTL value, such as more than 60 days, but their ElasticSearch cluster isn't powerful due to the low traffic in the production environment.
This value could be increased to 5(or more), if users could make sure single one index could support these days(5 in this case) metrics and traces.
Such as, if dayStep == 11,
1. data in [2000-01-01, 2000-01-11] will be merged into the index-20000101.
1. data in [2000-01-12, 2000-01-22] will be merged into the index-20000112.
NOTICE, TTL deletion would be affected by these. You should set an extra more dayStep in your TTL. Such as you want to TTL == 30 days and dayStep == 10, you actually need to set TTL = 40;
### Advanced Configurations For Elasticsearch Index
You can add advanced configurations in `JSON` format to set `ElasticSearch index settings` by following [ElasticSearch doc](https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules.html)
......
......@@ -36,5 +36,7 @@ You have following settings in Elasticsearch storage.
```
- `recordDataTTL` affects **Record** data.
- `otherMetricsDataTTL` affects minute/hour/day dimensions of metrics. `minuteMetricsDataTTL`, `hourMetricsDataTTL` and `dayMetricsDataTTL` are still there, but the **Unit** of them changed to **DAY** too. If you want to set them manually, please remove `otherMetricsDataTTL`.
- `otherMetricsDataTTL` affects minute/hour/day dimensions of metrics. `minuteMetricsDataTTL`, `hourMetricsDataTTL` and `dayMetricsDataTTL` are still there, but the **Unit** of them changed to **DAY** too.
If you want to set them manually, please remove `otherMetricsDataTTL`. Since 7.0.0, `enablePackedDownsampling` is activated by default, in that case, only `minuteMetricsDataTTL` works.
There is no much difference between use `otherMetricsDataTTL` or not, unless turn `enablePackedDownsampling` OFF.
- `monthMetricsDataTTL` affects month dimension of metrics.
......@@ -36,7 +36,10 @@ public class MetricsHolder {
if (aClass.isAnnotationPresent(MetricsFunction.class)) {
MetricsFunction metricsFunction = aClass.getAnnotation(MetricsFunction.class);
REGISTER.put(metricsFunction.functionName(), (Class<? extends org.apache.skywalking.oap.server.core.analysis.metrics.Metrics>) aClass);
REGISTER.put(
metricsFunction.functionName(),
(Class<? extends org.apache.skywalking.oap.server.core.analysis.metrics.Metrics>) aClass
);
}
}
}
......@@ -44,9 +47,10 @@ public class MetricsHolder {
public static Class<? extends org.apache.skywalking.oap.server.core.analysis.metrics.Metrics> find(
String functionName) {
String func = functionName;
Class<? extends org.apache.skywalking.oap.server.core.analysis.metrics.Metrics> metricsClass = REGISTER.get(func);
Class<? extends org.apache.skywalking.oap.server.core.analysis.metrics.Metrics> metricsClass = REGISTER.get(
func);
if (metricsClass == null) {
throw new IllegalArgumentException("Can't find metrics.");
throw new IllegalArgumentException("Can't find metrics, " + func);
}
return metricsClass;
}
......
......@@ -58,7 +58,7 @@ public class GRPCCallback implements AlarmCallback {
@Override
public void doAlarm(List<AlarmMessage> alarmMessage) {
if (alarmSetting.isEmptySetting()) {
if (alarmSetting == null || alarmSetting.isEmptySetting()) {
return;
}
......
......@@ -84,6 +84,7 @@ storage:
# user: ${SW_ES_USER:""}
# password: ${SW_ES_PASSWORD:""}
# enablePackedDownsampling: ${SW_STORAGE_ENABLE_PACKED_DOWNSAMPLING:true} # Hour and Day metrics will be merged into minute index.
# dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index.
# indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
# indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# # Those data TTL settings will override the same settings in core module.
......@@ -106,6 +107,7 @@ storage:
#trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:"../es_keystore.jks"}
#trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
enablePackedDownsampling: ${SW_STORAGE_ENABLE_PACKED_DOWNSAMPLING:true} # Hour and Day metrics will be merged into minute index.
dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index.
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
......
......@@ -18,7 +18,19 @@
package org.apache.skywalking.oap.server.core.analysis;
public enum Downsampling {
None(0, ""), Second(1, "second"), Minute(2, "minute"), Hour(3, "hour"), Day(4, "day"), Month(5, "month");
/**
* None downsampling is for inventory
*/
None(0, ""),
/**
* Second downsampling is not for metrics, but for record, profile and top n. Those are details but don't do
* aggregation, and still merge into day level in the persistence.
*/
Second(1, "second"),
Minute(2, "minute"),
Hour(3, "hour"),
Day(4, "day"),
Month(5, "month");
private final int value;
private final String name;
......
......@@ -65,6 +65,20 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
*/
@Getter
private boolean enablePackedDownsampling = true;
/**
* Since 6.4.0, the index of metrics and traces data in minute/hour/month precision are organized in days. ES
* storage creates new indexes in every day.
*
* @since 7.0.0 dayStep represents how many days a single one index represents. Default is 1, meaning no difference
* with previous versions. But if there isn't much traffic for single one day, user could set the step larger to
* reduce the number of indexes, and keep the TTL longer.
*
* Same as {@link #enablePackedDownsampling} this config doesn't affects month level data. Because usually, no one
* keeps the observability data in several months.
*
*/
@Getter
private int dayStep = 1;
@Setter
private int resultWindowMaxSize = 10000;
@Setter
......
......@@ -64,6 +64,7 @@ import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.BatchP
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.HistoryDeleteEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.StorageEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.StorageEsInstaller;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeSeriesUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.EndpointInventoryCacheEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.NetworkAddressInventoryCacheEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.ServiceInstanceInventoryCacheDAO;
......@@ -113,6 +114,9 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
if (!StringUtil.isEmpty(config.getNameSpace())) {
config.setNameSpace(config.getNameSpace().toLowerCase());
}
if (config.getDayStep() > 1) {
TimeSeriesUtils.setDAY_STEP(config.getDayStep());
}
elasticSearchClient = new ElasticSearchClient(
config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
.getTrustStorePass(), config.getUser(), config.getPassword(),
......@@ -126,7 +130,8 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockDAOImpl(elasticSearchClient));
this.registerServiceImplementation(
IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearchClient,
new ElasticsearchStorageTTL()
new ElasticsearchStorageTTL(),
config.isEnablePackedDownsampling()
));
this.registerServiceImplementation(
......
......@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.Downsampling;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
......@@ -39,12 +40,14 @@ public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO {
private final StorageTTL storageTTL;
private final ModuleDefineHolder moduleDefineHolder;
private final boolean enablePackedDownsampling;
public HistoryDeleteEsDAO(ModuleDefineHolder moduleDefineHolder, ElasticSearchClient client,
StorageTTL storageTTL) {
StorageTTL storageTTL, boolean enablePackedDownsampling) {
super(client);
this.moduleDefineHolder = moduleDefineHolder;
this.storageTTL = storageTTL;
this.enablePackedDownsampling = enablePackedDownsampling;
}
@Override
......@@ -52,13 +55,22 @@ public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO {
ConfigService configService = moduleDefineHolder.find(CoreModule.NAME)
.provider()
.getService(ConfigService.class);
Downsampling downsampling = model.getDownsampling();
if (enablePackedDownsampling) {
if (Downsampling.Hour.equals(downsampling) || Downsampling.Day.equals(downsampling)) {
/**
* Once enablePackedDownsampling, only minute TTL works.
*/
return;
}
}
ElasticSearchClient client = getClient();
TTLCalculator ttlCalculator;
if (model.isRecord()) {
ttlCalculator = storageTTL.recordCalculator();
} else {
ttlCalculator = storageTTL.metricsCalculator(model.getDownsampling());
ttlCalculator = storageTTL.metricsCalculator(downsampling);
}
long timeBefore = ttlCalculator.timeBefore(new DateTime(), configService.getDataTTLConfig());
......@@ -83,7 +95,10 @@ public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO {
} else {
int statusCode = client.delete(model.getName(), timeBucketColumnName, timeBefore);
if (logger.isDebugEnabled()) {
logger.debug("Delete history from {} index, status code {}", client.formatIndexName(model.getName()), statusCode);
logger.debug(
"Delete history from {} index, status code {}", client.formatIndexName(model.getName()),
statusCode
);
}
}
}
......
......@@ -22,6 +22,7 @@ import com.google.gson.Gson;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.Model;
......@@ -32,12 +33,9 @@ import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSear
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
import org.elasticsearch.common.unit.TimeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Slf4j
public class StorageEsInstaller extends ModelInstaller {
private static final Logger logger = LoggerFactory.getLogger(StorageEsInstaller.class);
private final Gson gson = new Gson();
private final StorageModuleElasticsearchConfig config;
......@@ -54,7 +52,8 @@ public class StorageEsInstaller extends ModelInstaller {
ElasticSearchClient esClient = (ElasticSearchClient) client;
try {
if (model.isCapableOfTimeSeries()) {
return esClient.isExistsTemplate(model.getName()) && esClient.isExistsIndex(model.getName());
String timeSeriesIndexName = TimeSeriesUtils.timeSeries(model);
return esClient.isExistsTemplate(model.getName()) && esClient.isExistsIndex(timeSeriesIndexName);
} else {
return esClient.isExistsIndex(model.getName());
}
......@@ -69,29 +68,30 @@ public class StorageEsInstaller extends ModelInstaller {
Map<String, Object> settings = createSetting(model.isRecord());
Map<String, Object> mapping = createMapping(model);
logger.info("index {}'s columnTypeEsMapping builder str: {}", esClient.formatIndexName(model.getName()), mapping
log.info("index {}'s columnTypeEsMapping builder str: {}", esClient.formatIndexName(model.getName()), mapping
.toString());
try {
if (model.isCapableOfTimeSeries()) {
if (!esClient.isExistsTemplate(model.getName())) {
boolean isAcknowledged = esClient.createTemplate(model.getName(), settings, mapping);
logger.info("create {} index template finished, isAcknowledged: {}", model.getName(), isAcknowledged);
log.info(
"create {} index template finished, isAcknowledged: {}", model.getName(), isAcknowledged);
if (!isAcknowledged) {
throw new StorageException("create " + model.getName() + " index template failure, ");
}
}
if (!esClient.isExistsIndex(model.getName())) {
String timeSeriesIndexName = TimeSeriesUtils.timeSeries(model);
String timeSeriesIndexName = TimeSeriesUtils.timeSeries(model);
if (!esClient.isExistsIndex(timeSeriesIndexName)) {
boolean isAcknowledged = esClient.createIndex(timeSeriesIndexName);
logger.info("create {} index finished, isAcknowledged: {}", timeSeriesIndexName, isAcknowledged);
log.info("create {} index finished, isAcknowledged: {}", timeSeriesIndexName, isAcknowledged);
if (!isAcknowledged) {
throw new StorageException("create " + timeSeriesIndexName + " time series index failure, ");
}
}
} else {
boolean isAcknowledged = esClient.createIndex(model.getName(), settings, mapping);
logger.info("create {} index finished, isAcknowledged: {}", model.getName(), isAcknowledged);
log.info("create {} index finished, isAcknowledged: {}", model.getName(), isAcknowledged);
if (!isAcknowledged) {
throw new StorageException("create " + model.getName() + " index failure, ");
}
......@@ -106,7 +106,8 @@ public class StorageEsInstaller extends ModelInstaller {
setting.put("index.number_of_shards", config.getIndexShardsNumber());
setting.put("index.number_of_replicas", config.getIndexReplicasNumber());
setting.put("index.refresh_interval", record ? TimeValue.timeValueSeconds(10)
.toString() : TimeValue.timeValueSeconds(config.getFlushInterval())
.toString() : TimeValue.timeValueSeconds(
config.getFlushInterval())
.toString());
setting.put("analysis.analyzer.oap_analyzer.type", "stop");
if (!StringUtil.isEmpty(config.getAdvanced())) {
......@@ -150,7 +151,7 @@ public class StorageEsInstaller extends ModelInstaller {
}
}
logger.debug("elasticsearch index template setting: {}", mapping.toString());
log.debug("elasticsearch index template setting: {}", mapping.toString());
return mapping;
}
......
......@@ -17,12 +17,29 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.Downsampling;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.joda.time.DateTime;
import org.joda.time.Days;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
/**
* TimeSeriesUtils sets up and splits the time suffix of index name.
*/
public class TimeSeriesUtils {
private static DateTimeFormatter TIME_BUCKET_FORMATTER = DateTimeFormat.forPattern("yyyyMMdd");
/**
* We are far from the first day of 2000, so we set it as the day one to make sure the index based on {@link
* #DAY_STEP} is consistently no matter whenever the OAP starts up.
*/
private static final DateTime DAY_ONE = TIME_BUCKET_FORMATTER.parseDateTime("20000101");
@Setter
private static int DAY_STEP = 1;
public static String timeSeries(Model model) {
long timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), model.getDownsampling());
......@@ -34,13 +51,17 @@ public class TimeSeriesUtils {
case None:
return modelName;
case Hour:
return modelName + Const.LINE + timeBucket / 100;
return modelName + Const.LINE + compressTimeBucket(timeBucket / 100, DAY_STEP);
case Minute:
return modelName + Const.LINE + timeBucket / 10000;
return modelName + Const.LINE + compressTimeBucket(timeBucket / 10000, DAY_STEP);
case Day:
return modelName + Const.LINE + compressTimeBucket(timeBucket, DAY_STEP);
case Month:
return modelName + Const.LINE + timeBucket;
case Second:
return modelName + Const.LINE + timeBucket / 1000000;
return modelName + Const.LINE + compressTimeBucket(timeBucket / 1000000, DAY_STEP);
default:
return modelName + Const.LINE + timeBucket;
throw new UnexpectedException("Unexpected downsampling value, " + downsampling);
}
}
......@@ -55,4 +76,26 @@ public class TimeSeriesUtils {
static long indexTimeSeries(String indexName) {
return Long.valueOf(indexName.substring(indexName.lastIndexOf(Const.LINE) + 1));
}
/**
* Follow the dayStep to re-format the time bucket literal long value.
*
* Such as, in dayStep == 11,
*
* 20000105 re-formatted time bucket is 20000101, 20000115 re-formatted time bucket is 20000112, 20000123
* re-formatted time bucket is 20000123
*/
static long compressTimeBucket(long timeBucket, int dayStep) {
if (dayStep > 1) {
DateTime time = TIME_BUCKET_FORMATTER.parseDateTime("" + timeBucket);
int days = Days.daysBetween(DAY_ONE, time).getDays();
int groupBucketOffset = days % dayStep;
return Long.parseLong(time.minusDays(groupBucketOffset).toString(TIME_BUCKET_FORMATTER));
} else {
/**
* No calculation required. dayStep is for lower traffic. For normally configuration, there is pointless to calculate.
*/
return timeBucket;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import org.junit.Assert;
import org.junit.Test;
import static org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeSeriesUtils.compressTimeBucket;
public class TimeSeriesUtilsTest {
@Test
public void testCompressTimeBucket() {
Assert.assertEquals(20000101L, compressTimeBucket(20000105, 11));
Assert.assertEquals(20000101L, compressTimeBucket(20000111, 11));
Assert.assertEquals(20000112L, compressTimeBucket(20000112, 11));
Assert.assertEquals(20000112L, compressTimeBucket(20000122, 11));
Assert.assertEquals(20000123L, compressTimeBucket(20000123, 11));
Assert.assertEquals(20000123L, compressTimeBucket(20000125, 11));
}
}
......@@ -122,7 +122,8 @@ public class StorageModuleElasticsearch7Provider extends ModuleProvider {
this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockEs77DAOImpl(elasticSearch7Client));
this.registerServiceImplementation(
IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearch7Client,
new ElasticsearchStorageTTL()
new ElasticsearchStorageTTL(),
config.isEnablePackedDownsampling()
));
this.registerServiceImplementation(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册