未验证 提交 48a09853 编写于 作者: W Wan Kai 提交者: GitHub

Elasticsearch storage: support specify the settings...

Elasticsearch storage: support specify the settings `(number_of_shards/number_of_replicas)` for each index individually. (#9914)

* Elasticsearch storage: Provide system environment variable(`SW_STORAGE_ES_SPECIFIC_INDEX_SETTINGS`) and support specify the settings `(number_of_shards/number_of_replicas)` for each index individually.
* Elasticsearch storage: Support update index settings `(number_of_shards/number_of_replicas)` for the index template after re-configured.
上级 2e313495
......@@ -96,6 +96,8 @@
* Optimize data binary parse methods in *LogQueryDAO
* Support different indexType
* Support configuration for TTL and (block|segment) intervals
* Elasticsearch storage: Provide system environment variable(`SW_STORAGE_ES_SPECIFIC_INDEX_SETTINGS`) and support specify the settings `(number_of_shards/number_of_replicas)` for each index individually.
* Elasticsearch storage: Support update index settings `(number_of_shards/number_of_replicas)` for the index template after rebooting.
* Optimize MQ Topology analysis. Use entry span's peer from the consumer side as source service when no producer instrumentation(no cross-process reference).
* Refactor JDBC storage implementations to reuse logics.
* Fix `ClassCastException` in `LoggingConfigWatcher`.
......
......@@ -80,9 +80,12 @@ storage:
dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index.
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:1} # Shard number of new indexes
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:1} # Replicas number of new indexes
# Specify the settings for each index individually.
# If configured, this setting has the highest priority and overrides the generic settings.
specificIndexSettings: ${SW_STORAGE_ES_SPECIFIC_INDEX_SETTINGS:""}
# Super data set has been defined in the codes, such as trace segments.The following 3 config would be improve es performance when storage super size data in es.
superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value is less than 0
superDatasetIndexShardsFactor: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger traces.
superDatasetIndexShardsFactor: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin traces.
superDatasetIndexReplicasNumber: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER:0} # Represent the replicas number in the super size dataset record index, the default value is 0.
indexTemplateOrder: ${SW_STORAGE_ES_INDEX_TEMPLATE_ORDER:0} # the order of index template
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
......@@ -151,7 +154,33 @@ Once it is changed manually or through a 3rd party tool, such as [Vault](https:/
the storage provider will use the new username, password, and JKS password to establish the connection and close the old one. If the information exists in the file,
the `user/password` will be overridden.
### Advanced Configurations For Elasticsearch Index
### Index Settings
The following settings control the number of shards and replicas for new and existing index templates. The update only got applied after OAP reboots.
```yaml
storage:
elasticsearch:
# ......
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:1}
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:1}
specificIndexSettings: ${SW_STORAGE_ES_SPECIFIC_INDEX_SETTINGS:""}
superDatasetIndexShardsFactor: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5}
superDatasetIndexReplicasNumber: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER:0}
```
The following table shows the relationship between those config items and Elasticsearch `index number_of_shards/number_of_replicas`.
And also you can [specify the settings for each index individually.](#specify-settings-for-each-elasticsearch-index-individually)
| index | number_of_shards | number_of_replicas |
|--------------------------------------|------------------|----------------------|
| sw_ui_template | indexShardsNumber | indexReplicasNumber |
| sw_metrics-all-`${day-format}` | indexShardsNumber | indexReplicasNumber |
| sw_log-`${day-format}` | indexShardsNumber * superDatasetIndexShardsFactor | superDatasetIndexReplicasNumber |
| sw_segment-`${day-format}` | indexShardsNumber * superDatasetIndexShardsFactor | superDatasetIndexReplicasNumber |
| sw_browser_error_log-`${day-format}` | indexShardsNumber * superDatasetIndexShardsFactor | superDatasetIndexReplicasNumber |
| sw_zipkin_span-`${day-format}` | indexShardsNumber * superDatasetIndexShardsFactor | superDatasetIndexReplicasNumber |
| sw_records-all-`${day-format}` | indexShardsNumber | indexReplicasNumber |
#### Advanced Configurations For Elasticsearch Index
You can add advanced configurations in `JSON` format to set `ElasticSearch index settings` by following [ElasticSearch doc](https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules.html)
For example, set [translog](https://www.elastic.co/guide/en/elasticsearch/reference/master/index-modules-translog.html) settings:
......@@ -163,6 +192,39 @@ storage:
advanced: ${SW_STORAGE_ES_ADVANCED:"{\"index.translog.durability\":\"request\",\"index.translog.sync_interval\":\"5s\"}"}
```
#### Specify Settings For Each Elasticsearch Index Individually
You can specify the settings for one or more indexes individually by using `SW_STORAGE_ES_SPECIFIC_INDEX_SETTINGS`.
**NOTE:**
Supported settings:
- number_of_shards
- number_of_replicas
**NOTE:** These settings have the highest priority and will override the existing
generic settings mentioned in [index settings doc](#index-settings).
The settings are in `JSON` format. The index name here is logic entity name, which should exclude the `${SW_NAMESPACE}` which is `sw` by default, e.g.
```json
{
"metrics-all":{
"number_of_shards":"3",
"number_of_replicas":"2"
},
"segment":{
"number_of_shards":"6",
"number_of_replicas":"1"
}
}
```
This configuration in the YAML file is like this,
```yaml
storage:
elasticsearch:
# ......
specificIndexSettings: ${SW_STORAGE_ES_SPECIFIC_INDEX_SETTINGS:"{\"metrics-all\":{\"number_of_shards\":\"3\",\"number_of_replicas\":\"2\"},\"segment\":{\"number_of_shards\":\"6\",\"number_of_replicas\":\"1\"}}"}
```
### Recommended ElasticSearch server-side configurations
You could add the following configuration to `elasticsearch.yml`, and set the value based on your environment.
......
......@@ -100,6 +100,7 @@ The Configuration Vocabulary lists all available configurations provided by `app
| - | - | dayStep | Represents the number of days in the one-minute/hour/day index. | SW_STORAGE_DAY_STEP | 1 |
| - | - | indexShardsNumber | Shard number of new indexes. | SW_STORAGE_ES_INDEX_SHARDS_NUMBER | 1 |
| - | - | indexReplicasNumber | Replicas number of new indexes. | SW_STORAGE_ES_INDEX_REPLICAS_NUMBER | 0 |
| - | - | specificIndexSettings | Specify the settings for each index individually. If configured, this setting has the highest priority and overrides the generic settings. | SW_STORAGE_ES_SPECIFIC_INDEX_SETTINGS | - |
| - | - | superDatasetDayStep | Represents the number of days in the super size dataset record index. Default value is the same as dayStep when the value is less than 0. | SW_SUPERDATASET_STORAGE_DAY_STEP | -1 |
| - | - | superDatasetIndexShardsFactor | Super dataset is defined in the code (e.g. trace segments). This factor provides more shards for the super dataset: shards number = indexShardsNumber * superDatasetIndexShardsFactor. This factor also affects Zipkin and Jaeger traces. | SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR | 5 |
| - | - | superDatasetIndexReplicasNumber | Represents the replicas number in the super size dataset record index. | SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER | 0 |
......
......@@ -150,9 +150,12 @@ storage:
dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index.
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:1} # Shard number of new indexes
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:1} # Replicas number of new indexes
# Specify the settings for each index individually.
# If configured, this setting has the highest priority and overrides the generic settings.
specificIndexSettings: ${SW_STORAGE_ES_SPECIFIC_INDEX_SETTINGS:""}
# Super data set has been defined in the codes, such as trace segments.The following 3 config would be improve es performance when storage super size data in es.
superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value is less than 0
superDatasetIndexShardsFactor: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger traces.
superDatasetIndexShardsFactor: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin traces.
superDatasetIndexReplicasNumber: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER:0} # Represent the replicas number in the super size dataset record index, the default value is 0.
indexTemplateOrder: ${SW_STORAGE_ES_INDEX_TEMPLATE_ORDER:0} # the order of index template
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:5000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
......
......@@ -58,6 +58,14 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
private int dayStep = 1;
private int indexReplicasNumber = 0;
private int indexShardsNumber = 1;
/**
* @since 9.3.0, Specify the settings for each index individually.
* Use JSON format and the index name in the config should exclude the `${SW_NAMESPACE}` e.g.
* {"metrics-all":{"number_of_shards":"3","number_of_replicas":"2"},"segment":{"number_of_shards":"6","number_of_replicas":"1"}}
* If configured, this setting has the highest priority and overrides the generic settings.
*/
private String specificIndexSettings;
/**
* @since 8.2.0, the record day step is for super size dataset record index rolling when the value of it is greater
* than 0
......
......@@ -22,23 +22,27 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import org.apache.skywalking.library.elasticsearch.response.Mappings;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
public class IndexStructures {
private final Map<String, Fields> structures;
private final Map<String, Fields> mappingStructures;
private final Map<String, UpdatableIndexSettings> indexSettingStructures;
public IndexStructures() {
this.structures = new HashMap<>();
this.mappingStructures = new HashMap<>();
this.indexSettingStructures = new HashMap<>();
}
public Mappings getMapping(String tableName) {
Map<String, Object> properties =
structures.containsKey(tableName) ?
structures.get(tableName).properties : new HashMap<>();
mappingStructures.containsKey(tableName) ?
mappingStructures.get(tableName).properties : new HashMap<>();
Mappings.Source source =
structures.containsKey(tableName) ?
structures.get(tableName).source : new Mappings.Source();
mappingStructures.containsKey(tableName) ?
mappingStructures.get(tableName).source : new Mappings.Source();
return Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties)
......@@ -46,21 +50,29 @@ public class IndexStructures {
.build();
}
public UpdatableIndexSettings getUpdatableIndexSettings(String tableName) {
return indexSettingStructures.get(tableName);
}
/**
* Add or append field when the current structures don't contain the input structure or having
* Add or append mapping/settings when the current structures don't contain the input structure or having
* new fields in it.
*/
public void putStructure(String tableName, Mappings mapping) {
public void putStructure(String tableName, Mappings mapping, Map<String, Object> settings) {
if (CollectionUtils.isNotEmpty(settings) && Objects.nonNull(settings.get("index"))) {
this.indexSettingStructures.putIfAbsent(tableName, new UpdatableIndexSettings(
(Map<String, Object>) settings.get("index")));
}
if (Objects.isNull(mapping)
|| Objects.isNull(mapping.getProperties())
|| mapping.getProperties().isEmpty()) {
return;
}
Fields fields = new Fields(mapping);
if (structures.containsKey(tableName)) {
structures.get(tableName).appendNewFields(fields);
if (mappingStructures.containsKey(tableName)) {
mappingStructures.get(tableName).appendNewFields(fields);
} else {
structures.put(tableName, fields);
mappingStructures.put(tableName, fields);
}
}
......@@ -69,13 +81,13 @@ public class IndexStructures {
* The input mappings should be history mapping from current index.
* Do not return _source config to avoid current index update conflict.
*/
public Mappings diffStructure(String tableName, Mappings mappings) {
if (!structures.containsKey(tableName)) {
public Mappings diffMappings(String tableName, Mappings mappings) {
if (!mappingStructures.containsKey(tableName)) {
return new Mappings();
}
Map<String, Object> properties = mappings.getProperties();
Map<String, Object> diffProperties =
structures.get(tableName).diffFields(new Fields(mappings));
mappingStructures.get(tableName).diffFields(new Fields(mappings));
return Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(diffProperties)
......@@ -86,19 +98,33 @@ public class IndexStructures {
* Returns true when the current structures already contains the properties of the input
* mappings.
*/
public boolean containsStructure(String tableName, Mappings mappings) {
public boolean containsMapping(String tableName, Mappings mappings) {
if (Objects.isNull(mappings) ||
Objects.isNull(mappings.getProperties()) ||
mappings.getProperties().isEmpty()) {
CollectionUtils.isEmpty(mappings.getProperties())) {
return true;
}
return structures.containsKey(tableName)
&& structures.get(tableName)
.containsAllFields(new Fields(mappings));
return mappingStructures.containsKey(tableName)
&& mappingStructures.get(tableName)
.containsAllFields(new Fields(mappings));
}
/**
* The properties of the template or index.
* Returns true when the current index setting equals the input.
*/
public boolean compareIndexSetting(String tableName, Map<String, Object> settings) {
if ((CollectionUtils.isEmpty(settings) || CollectionUtils.isEmpty((Map) settings.get("index")))
&& Objects.isNull(indexSettingStructures.get(tableName))) {
return true;
}
return indexSettingStructures.containsKey(tableName)
&& indexSettingStructures.get(tableName).
equals(new UpdatableIndexSettings((Map<String, Object>) settings.get("index")));
}
/**
* The mapping properties of the template or index.
*/
public static class Fields {
private final Map<String, Object> properties;
......@@ -146,4 +172,18 @@ public class IndexStructures {
));
}
}
/**
* The index settings structure which only include needs to compare and update fields
*/
@EqualsAndHashCode
public static class UpdatableIndexSettings {
private final int replicas;
private final int shards;
public UpdatableIndexSettings(Map<String, Object> indexSettings) {
this.replicas = Integer.parseInt((String) indexSettings.getOrDefault("number_of_replicas", "0"));
this.shards = Integer.parseInt((String) indexSettings.getOrDefault("number_of_shards", "0"));
}
}
}
......@@ -18,8 +18,10 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.gson.Gson;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -35,6 +37,7 @@ import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
......@@ -43,6 +46,7 @@ public class StorageEsInstaller extends ModelInstaller {
private final Gson gson = new Gson();
private final StorageModuleElasticsearchConfig config;
protected final ColumnTypeEsMapping columnTypeEsMapping;
private final Map<String, Map<String, Object>> specificIndexesSettings;
/**
* The mappings of the template .
......@@ -56,6 +60,13 @@ public class StorageEsInstaller extends ModelInstaller {
this.columnTypeEsMapping = new ColumnTypeEsMapping();
this.config = config;
this.structures = getStructures();
if (StringUtil.isNotEmpty(config.getSpecificIndexSettings())) {
this.specificIndexesSettings = gson.fromJson(
config.getSpecificIndexSettings(), new TypeReference<Map<String, Map<String, Object>>>() {
}.getType());
} else {
this.specificIndexesSettings = Collections.emptyMap();
}
}
protected IndexStructures getStructures() {
......@@ -63,18 +74,18 @@ public class StorageEsInstaller extends ModelInstaller {
}
@Override
public boolean isExists(Model model) {
public boolean isExists(Model model) throws StorageException {
ElasticSearchClient esClient = (ElasticSearchClient) client;
String tableName = IndexController.INSTANCE.getTableName(model);
IndexController.LogicIndicesRegister.registerRelation(model, tableName);
if (!model.isTimeSeries()) {
boolean exist = esClient.isExistsIndex(tableName);
if (exist) {
Mappings historyMapping = esClient.getIndex(tableName)
.map(Index::getMappings)
.orElseGet(Mappings::new);
structures.putStructure(tableName, historyMapping);
exist = structures.containsStructure(tableName, createMapping(model));
Optional<Index> index = esClient.getIndex(tableName);
Mappings historyMapping = index.map(Index::getMappings).orElseGet(Mappings::new);
structures.putStructure(tableName, historyMapping, index.map(Index::getSettings).orElseGet(HashMap::new));
exist = structures.containsMapping(tableName, createMapping(model))
&& structures.compareIndexSetting(tableName, createSetting(model));
}
return exist;
}
......@@ -91,9 +102,10 @@ public class StorageEsInstaller extends ModelInstaller {
if (exist) {
structures.putStructure(
tableName, template.get().getMappings()
tableName, template.get().getMappings(), template.get().getSettings()
);
exist = structures.containsStructure(tableName, createMapping(model));
exist = structures.containsMapping(tableName, createMapping(model))
&& structures.compareIndexSetting(tableName, createSetting(model));
}
return exist;
}
......@@ -111,28 +123,36 @@ public class StorageEsInstaller extends ModelInstaller {
ElasticSearchClient esClient = (ElasticSearchClient) client;
String tableName = IndexController.INSTANCE.getTableName(model);
Mappings mapping = createMapping(model);
Map<String, Object> settings = createSetting(model);
if (!esClient.isExistsIndex(tableName)) {
Map<String, Object> settings = createSetting(model);
boolean isAcknowledged = esClient.createIndex(tableName, mapping, settings);
log.info("create {} index finished, isAcknowledged: {}", tableName, isAcknowledged);
if (!isAcknowledged) {
throw new StorageException("create " + tableName + " index failure, ");
throw new StorageException("create " + tableName + " index failure");
}
} else {
Mappings historyMapping = esClient.getIndex(tableName)
.map(Index::getMappings)
.orElseGet(Mappings::new);
structures.putStructure(tableName, mapping);
Mappings appendMapping = structures.diffStructure(tableName, historyMapping);
structures.putStructure(tableName, mapping, settings);
Mappings appendMapping = structures.diffMappings(tableName, historyMapping);
//update mapping
if (appendMapping.getProperties() != null && !appendMapping.getProperties().isEmpty()) {
boolean isAcknowledged = esClient.updateIndexMapping(tableName, appendMapping);
log.info("update {} index finished, isAcknowledged: {}, append mappings: {}", tableName,
log.info("update {} index mapping finished, isAcknowledged: {}, append mapping: {}", tableName,
isAcknowledged, appendMapping
);
if (!isAcknowledged) {
throw new StorageException("update " + tableName + " index failure");
throw new StorageException("update " + tableName + " index mapping failure");
}
}
//needs to update settings
if (!structures.compareIndexSetting(tableName, settings)) {
log.warn(
"index {} settings configuration has been updated to {}, please remove it before OAP starts",
tableName, settings
);
}
}
}
......@@ -144,9 +164,11 @@ public class StorageEsInstaller extends ModelInstaller {
String indexName = TimeSeriesUtils.latestWriteIndexName(model);
try {
boolean shouldUpdateTemplate = !esClient.isExistsTemplate(tableName);
shouldUpdateTemplate = shouldUpdateTemplate || !structures.containsStructure(tableName, mapping);
shouldUpdateTemplate = shouldUpdateTemplate
|| !structures.containsMapping(tableName, mapping)
|| !structures.compareIndexSetting(tableName, settings);
if (shouldUpdateTemplate) {
structures.putStructure(tableName, mapping);
structures.putStructure(tableName, mapping, settings);
boolean isAcknowledged = esClient.createOrUpdateTemplate(
tableName, settings, structures.getMapping(tableName), config.getIndexTemplateOrder());
log.info("create {} index template finished, isAcknowledged: {}", tableName, isAcknowledged);
......@@ -159,7 +181,8 @@ public class StorageEsInstaller extends ModelInstaller {
Mappings historyMapping = esClient.getIndex(indexName)
.map(Index::getMappings)
.orElseGet(Mappings::new);
Mappings appendMapping = structures.diffStructure(tableName, historyMapping);
Mappings appendMapping = structures.diffMappings(tableName, historyMapping);
//update mapping
if (appendMapping.getProperties() != null && !appendMapping.getProperties().isEmpty()) {
boolean isAcknowledged = esClient.updateIndexMapping(indexName, appendMapping);
log.info("update {} index finished, isAcknowledged: {}, append mappings: {}", indexName,
......@@ -169,6 +192,14 @@ public class StorageEsInstaller extends ModelInstaller {
throw new StorageException("update " + indexName + " time series index failure");
}
}
//needs to update settings
if (!structures.compareIndexSetting(tableName, settings)) {
log.info(
"index template {} settings configuration has been updated to {}, it will applied on new index",
tableName, settings
);
}
} else {
boolean isAcknowledged = esClient.createIndex(indexName);
log.info("create {} index finished, isAcknowledged: {}", indexName, isAcknowledged);
......@@ -183,13 +214,14 @@ public class StorageEsInstaller extends ModelInstaller {
protected Map<String, Object> createSetting(Model model) throws StorageException {
Map<String, Object> setting = new HashMap<>();
setting.put("index.number_of_replicas", model.isSuperDataset()
? config.getSuperDatasetIndexReplicasNumber()
: config.getIndexReplicasNumber());
setting.put("index.number_of_shards", model.isSuperDataset()
? config.getIndexShardsNumber() * config.getSuperDatasetIndexShardsFactor()
: config.getIndexShardsNumber());
Map<String, Object> indexSettings = new HashMap<>();
setting.put("index", indexSettings);
indexSettings.put("number_of_replicas", model.isSuperDataset()
? Integer.toString(config.getSuperDatasetIndexReplicasNumber())
: Integer.toString(config.getIndexReplicasNumber()));
indexSettings.put("number_of_shards", model.isSuperDataset()
? Integer.toString(config.getIndexShardsNumber() * config.getSuperDatasetIndexShardsFactor())
: Integer.toString(config.getIndexShardsNumber()));
// Set the index refresh period as INT(flushInterval * 2/3). At the edge case,
// in low traffic(traffic < bulkActions in the whole period), there is a possible case, 2 period bulks are included in
// one index refresh rebuild operation, which could cause version conflicts. And this case can't be fixed
......@@ -202,13 +234,20 @@ public class StorageEsInstaller extends ModelInstaller {
// even this value is set too small by end user manually.
indexRefreshInterval = 5;
}
setting.put("index.refresh_interval", indexRefreshInterval + "s");
indexSettings.put("refresh_interval", indexRefreshInterval + "s");
List<ModelColumn> columns = IndexController.LogicIndicesRegister.getPhysicalTableColumns(model);
setting.put("analysis", getAnalyzerSetting(columns));
indexSettings.put("analysis", getAnalyzerSetting(columns));
if (!StringUtil.isEmpty(config.getAdvanced())) {
Map<String, Object> advancedSettings = gson.fromJson(config.getAdvanced(), Map.class);
setting.putAll(advancedSettings);
}
//Set the config for the specific index, if has been configured.
Map<String, Object> specificSettings = this.specificIndexesSettings.get(IndexController.INSTANCE.getTableName(model));
if (!CollectionUtils.isEmpty(specificSettings)) {
indexSettings.putAll(specificSettings);
}
return setting;
}
......
......@@ -38,7 +38,7 @@ public class IndexStructuresTest {
"test", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties)
.build());
.build(), new HashMap<>());
Mappings mapping = structures.getMapping("test");
Assert.assertEquals(mapping.getProperties(), properties);
......@@ -46,7 +46,7 @@ public class IndexStructuresTest {
"test2", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(new HashMap<>())
.build());
.build(), new HashMap<>());
mapping = structures.getMapping("test2");
Assert.assertTrue(mapping.getProperties().isEmpty());
......@@ -59,7 +59,7 @@ public class IndexStructuresTest {
.type(ElasticSearchClient.TYPE)
.properties(properties)
.source(source)
.build());
.build(), new HashMap<>());
Assert.assertEquals(properties, structuresSource.getMapping("test").getProperties());
Assert.assertEquals(source.getExcludes(), structuresSource.getMapping("test").getSource().getExcludes());
}
......@@ -74,7 +74,7 @@ public class IndexStructuresTest {
"test", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties)
.build());
.build(), new HashMap<>());
Mappings mapping = structures.getMapping("test");
Assert.assertEquals(properties, mapping.getProperties());
HashMap<String, Object> properties2 = new HashMap<>();
......@@ -84,7 +84,7 @@ public class IndexStructuresTest {
"test", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties2)
.build());
.build(), new HashMap<>());
mapping = structures.getMapping("test");
HashMap<String, Object> res = new HashMap<>();
res.put("a", "b");
......@@ -101,7 +101,7 @@ public class IndexStructuresTest {
.type(ElasticSearchClient.TYPE)
.properties(properties)
.source(source)
.build());
.build(), new HashMap<>());
Assert.assertEquals(properties, structuresSource.getMapping("test").getProperties());
Assert.assertEquals(source.getExcludes(), structuresSource.getMapping("test").getSource().getExcludes());
......@@ -111,14 +111,14 @@ public class IndexStructuresTest {
"test", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties2)
.source(source2)
.build());
.source(source)
.build(), new HashMap<>());
Assert.assertEquals(res, structuresSource.getMapping("test").getProperties());
Assert.assertEquals(new HashSet<>(), structuresSource.getMapping("test").getSource().getExcludes());
Assert.assertEquals(new HashSet<>(Arrays.asList("a", "b", "c", "d", "e")), structuresSource.getMapping("test").getSource().getExcludes());
}
@Test
public void diffStructure() {
public void diffMapping() {
IndexStructures structures = new IndexStructures();
HashMap<String, Object> properties = new HashMap<>();
properties.put("a", "b");
......@@ -128,10 +128,10 @@ public class IndexStructuresTest {
"test", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties)
.build());
.build(), new HashMap<>());
HashMap<String, Object> properties2 = new HashMap<>();
properties2.put("a", "b");
Mappings diffMappings = structures.diffStructure(
Mappings diffMappings = structures.diffMappings(
"test", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties2)
......@@ -140,7 +140,7 @@ public class IndexStructuresTest {
res.put("c", "d");
res.put("f", "g");
Assert.assertEquals(res, diffMappings.getProperties());
diffMappings = structures.diffStructure(
diffMappings = structures.diffMappings(
"test",
Mappings.builder()
.type(ElasticSearchClient.TYPE)
......@@ -158,15 +158,15 @@ public class IndexStructuresTest {
.type(ElasticSearchClient.TYPE)
.properties(properties)
.source(source)
.build());
diffMappings = structuresSource.diffStructure(
.build(), new HashMap<>());
diffMappings = structuresSource.diffMappings(
"test", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties2)
.source(source)
.build());
Assert.assertEquals(res, diffMappings.getProperties());
diffMappings = structuresSource.diffStructure(
diffMappings = structuresSource.diffMappings(
"test", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties2)
......@@ -174,7 +174,7 @@ public class IndexStructuresTest {
.build());
Assert.assertEquals(res, diffMappings.getProperties());
Assert.assertNull("Mapping source should not be return by diffStructure()", diffMappings.getSource());
diffMappings = structuresSource.diffStructure(
diffMappings = structuresSource.diffMappings(
"test",
Mappings.builder()
.type(ElasticSearchClient.TYPE)
......@@ -186,7 +186,7 @@ public class IndexStructuresTest {
}
@Test
public void containsStructure() {
public void containsMapping() {
IndexStructures structures = new IndexStructures();
HashMap<String, Object> properties = new HashMap<>();
properties.put("a", "b");
......@@ -196,12 +196,12 @@ public class IndexStructuresTest {
.type(ElasticSearchClient.TYPE)
.properties(properties)
.source(new Mappings.Source())
.build());
.build(), new HashMap<>());
HashMap<String, Object> properties2 = new HashMap<>();
properties2.put("a", "b");
properties2.put("c", "d");
Assert.assertTrue(structures.containsStructure(
Assert.assertTrue(structures.containsMapping(
"test",
Mappings.builder()
.type(ElasticSearchClient.TYPE)
......@@ -213,7 +213,7 @@ public class IndexStructuresTest {
HashMap<String, Object> properties3 = new HashMap<>();
properties3.put("a", "b");
properties3.put("q", "d");
Assert.assertFalse(structures.containsStructure(
Assert.assertFalse(structures.containsMapping(
"test",
Mappings.builder()
.type(ElasticSearchClient.TYPE)
......@@ -221,4 +221,35 @@ public class IndexStructuresTest {
.build()
));
}
@Test
public void compareIndexSetting() {
IndexStructures structures = new IndexStructures();
HashMap<String, Object> settings = new HashMap<>();
HashMap<String, Object> indexSettings = new HashMap<>();
settings.put("index", indexSettings);
indexSettings.put("number_of_replicas", "1");
indexSettings.put("number_of_shards", "1");
structures.putStructure("test", new Mappings(), settings);
HashMap<String, Object> settings2 = new HashMap<>();
HashMap<String, Object> indexSettings2 = new HashMap<>();
settings2.put("index", indexSettings2);
indexSettings2.put("number_of_replicas", "1");
indexSettings2.put("number_of_shards", "1");
Assert.assertTrue(structures.compareIndexSetting(
"test",
settings2
));
HashMap<String, Object> settings3 = new HashMap<>();
HashMap<String, Object> indexSettings3 = new HashMap<>();
settings3.put("index", indexSettings3);
indexSettings3.put("number_of_replicas", "1");
indexSettings3.put("number_of_shards", "2");
Assert.assertFalse(structures.compareIndexSetting(
"test",
settings3
));
}
}
......@@ -248,18 +248,18 @@ public class MockEsInstallTest {
//clone it since the items will be changed after combine
Mappings hisMappingsClone = cloneMappings(this.hisMappings);
//put the current mappings
structures.putStructure(this.name, this.hisMappings);
structures.putStructure(this.name, this.hisMappings, new HashMap<>());
//if current mappings already contains new mappings items
Assert.assertEquals(this.contains, structures.containsStructure(this.name, this.newMappings));
Assert.assertEquals(this.contains, structures.containsMapping(this.name, this.newMappings));
//put the new mappings and combine
structures.putStructure(this.name, this.newMappings);
structures.putStructure(this.name, this.newMappings, new HashMap<>());
Mappings mappings = structures.getMapping(this.name);
Assert.assertEquals(this.combineResult, mapper.writeValueAsString(mappings));
//diff the hisMapping and new, if has new item will update current index
structures.putStructure(this.name, this.newMappings);
Mappings diff = structures.diffStructure(this.name, hisMappingsClone);
structures.putStructure(this.name, this.newMappings, new HashMap<>());
Mappings diff = structures.diffMappings(this.name, hisMappingsClone);
Assert.assertEquals(this.diffResult, mapper.writeValueAsString(diff));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册