未验证 提交 52598d77 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

use `core/persistentPeriod` + 5s as `refresh_interval` for all indices (#10127)

上级 210c3749
......@@ -30,6 +30,8 @@
* Fix miss set `LastUpdateTimestamp` that caused the metrics session to expire.
* Rename MAL rule `spring-sleuth.yaml` to `spring-micrometer.yaml`.
* Fix memory leak in Zipkin API.
* Remove the dependency of `refresh_interval` of ElasticSearch indices from `elasticsearch/flushInterval` config. Now,
it uses `core/persistentPeriod` + 5s as `refresh_interval` for all indices instead.
#### UI
......
......@@ -105,7 +105,7 @@ The Configuration Vocabulary lists all available configurations provided by `app
| - | - | superDatasetIndexReplicasNumber | Represents the replicas number in the super size dataset record index. | SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER | 0 |
| - | - | indexTemplateOrder | The order of index template. | SW_STORAGE_ES_INDEX_TEMPLATE_ORDER | 0 |
| - | - | bulkActions | Async bulk size of the record data batch execution. | SW_STORAGE_ES_BULK_ACTIONS | 5000 |
| - | - | flushInterval | Period of flush (in seconds). Does not matter whether `bulkActions` is reached or not. INT(flushInterval * 2/3) is used for index refresh period. | SW_STORAGE_ES_FLUSH_INTERVAL | 15 (index refresh period = 10) |
| - | - | flushInterval | Period of flush (in seconds). Does not matter whether `bulkActions` is reached or not. | SW_STORAGE_ES_FLUSH_INTERVAL | 15 (index refresh period = 10) |
| - | - | concurrentRequests | The number of concurrent requests allowed to be executed. | SW_STORAGE_ES_CONCURRENT_REQUESTS | 2 |
| - | - | resultWindowMaxSize | The maximum size of dataset when the OAP loads cache, such as network aliases. | SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE | 10000 |
| - | - | metadataQueryMaxSize | The maximum size of metadata per query. | SW_STORAGE_ES_QUERY_MAX_SIZE | 10000 |
......
......@@ -58,7 +58,7 @@ public class CoreModuleConfig extends ModuleConfig {
* The period of doing data persistence. Unit is second.
*/
@Setter
private long persistentPeriod = 25;
private int persistentPeriod = 25;
private boolean enableDataKeeperExecutor = true;
......
......@@ -31,6 +31,7 @@ public class ConfigService implements Service {
private final String searchableAlarmTags;
private final int metricsDataTTL;
private final int recordDataTTL;
private final int persistentPeriod;
public ConfigService(CoreModuleConfig moduleConfig) {
this.gRPCHost = moduleConfig.getGRPCHost();
......@@ -40,5 +41,6 @@ public class ConfigService implements Service {
this.searchableAlarmTags = moduleConfig.getSearchableAlarmTags();
this.metricsDataTTL = moduleConfig.getMetricsDataTTL();
this.recordDataTTL = moduleConfig.getRecordDataTTL();
this.persistentPeriod = moduleConfig.getPersistentPeriod();
}
}
......@@ -156,8 +156,7 @@ storage:
superDatasetIndexReplicasNumber: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER:0} # Represent the replicas number in the super size dataset record index, the default value is 0.
indexTemplateOrder: ${SW_STORAGE_ES_INDEX_TEMPLATE_ORDER:0} # the order of index template
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:5000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
# flush the bulk every 10 seconds whatever the number of requests
# INT(flushInterval * 2/3) would be used for index refresh period.
# flush the bulk every 15 seconds whatever the number of requests
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:15}
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
......
......@@ -90,11 +90,7 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
private int bulkActions = 5000;
/**
* Period of flush, no matter `bulkActions` reached or not.
* INT(flushInterval * 2/3) would be used for index refresh period.
* Unit is second.
*
* @since 8.7.0 increase to 15s from 10s
* @since 8.7.0 use INT(flushInterval * 2/3) as ElasticSearch index refresh interval. Default is 10s.
*/
private int flushInterval = 15;
private int concurrentRequests = 2;
......
......@@ -23,6 +23,7 @@ import java.util.Properties;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
......@@ -31,7 +32,6 @@ import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingDataDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingScheduleDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
......@@ -46,9 +46,9 @@ import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ISpanAttachedEventQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IZipkinQueryDAO;
......@@ -79,10 +79,10 @@ import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.Metri
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskLogEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileThreadSnapshotQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.RecordsQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ServiceLabelEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.SpanAttachedEventEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.RecordsQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopologyQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TraceQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.UITemplateManagementEsDAO;
......@@ -100,7 +100,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
protected StorageModuleElasticsearchConfig config;
protected ElasticSearchClient elasticSearchClient;
protected ModelInstaller modelInstaller;
protected StorageEsInstaller modelInstaller;
@Override
public String name() {
......@@ -197,7 +197,8 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
this.registerServiceImplementation(
ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient, config.getSegmentQueryMaxSize()));
this.registerServiceImplementation(IBrowserLogQueryDAO.class, new BrowserLogQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient, config));
this.registerServiceImplementation(
IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient, config));
this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IRecordsQueryDAO.class, new RecordsQueryEsDAO(elasticSearchClient));
......@@ -216,14 +217,22 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
this.registerServiceImplementation(IEventQueryDAO.class, new ESEventQueryDAO(elasticSearchClient));
this.registerServiceImplementation(IEBPFProfilingTaskDAO.class,
new EBPFProfilingTaskEsDAO(elasticSearchClient, config));
this.registerServiceImplementation(IEBPFProfilingScheduleDAO.class,
new EBPFProfilingScheduleEsDAO(elasticSearchClient, config));
this.registerServiceImplementation(IEBPFProfilingDataDAO.class,
new EBPFProfilingDataEsDAO(elasticSearchClient, config));
this.registerServiceImplementation(IServiceLabelDAO.class,
new ServiceLabelEsDAO(elasticSearchClient, config));
this.registerServiceImplementation(
IEBPFProfilingTaskDAO.class,
new EBPFProfilingTaskEsDAO(elasticSearchClient, config)
);
this.registerServiceImplementation(
IEBPFProfilingScheduleDAO.class,
new EBPFProfilingScheduleEsDAO(elasticSearchClient, config)
);
this.registerServiceImplementation(
IEBPFProfilingDataDAO.class,
new EBPFProfilingDataEsDAO(elasticSearchClient, config)
);
this.registerServiceImplementation(
IServiceLabelDAO.class,
new ServiceLabelEsDAO(elasticSearchClient, config)
);
this.registerServiceImplementation(
ITagAutoCompleteQueryDAO.class, new TagAutoCompleteQueryDAO(elasticSearchClient));
this.registerServiceImplementation(
......@@ -243,9 +252,17 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
elasticSearchClient.registerChecker(healthChecker);
try {
elasticSearchClient.connect();
final ConfigService service = getManager().find(CoreModule.NAME).provider().getService(ConfigService.class);
// Add 5s to make sure OAP has at least done persistent once.
// By default, the persistent period is 25 seconds and ElasticSearch refreshes in every 30 seconds.
modelInstaller.setIndexRefreshInterval(service.getPersistentPeriod() + 5);
modelInstaller.start();
getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(modelInstaller);
getManager().find(CoreModule.NAME)
.provider()
.getService(ModelCreator.class)
.addModelListener(modelInstaller);
} catch (Exception e) {
throw new ModuleStartException(e.getMessage(), e);
}
......
......@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.library.elasticsearch.response.Index;
import org.apache.skywalking.library.elasticsearch.response.IndexTemplate;
......@@ -49,6 +50,8 @@ public class StorageEsInstaller extends ModelInstaller {
private final StorageModuleElasticsearchConfig config;
protected final ColumnTypeEsMapping columnTypeEsMapping;
private final Map<String, Map<String, Object>> specificIndexesSettings;
@Setter
private int indexRefreshInterval = 30;
/**
* The mappings of the template .
......@@ -236,18 +239,6 @@ public class StorageEsInstaller extends ModelInstaller {
indexSettings.put("number_of_shards", model.isSuperDataset()
? Integer.toString(config.getIndexShardsNumber() * config.getSuperDatasetIndexShardsFactor())
: Integer.toString(config.getIndexShardsNumber()));
// Set the index refresh period as INT(flushInterval * 2/3). At the edge case,
// in low traffic(traffic < bulkActions in the whole period), there is a possible case, 2 period bulks are included in
// one index refresh rebuild operation, which could cause version conflicts. And this case can't be fixed
// through `core/persistentPeriod` as the bulk fresh is not controlled by the persistent timer anymore.
int indexRefreshInterval = config.getFlushInterval() * 2 / 3;
if (indexRefreshInterval < 5) {
// The refresh interval should not be less than 5 seconds (the recommended default value = 10s),
// and the bulk flush interval should not be set less than 8s (the recommended default value = 15s).
// This is a precaution case which makes ElasticSearch server has reasonable refresh interval,
// even this value is set too small by end user manually.
indexRefreshInterval = 5;
}
indexSettings.put("refresh_interval", indexRefreshInterval + "s");
indexSettings.put("analysis", getAnalyzerSetting(model));
if (!StringUtil.isEmpty(config.getAdvanced())) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册