未验证 提交 91a82e05 编写于 作者: K Kai 提交者: GitHub

Add storage column attribute `indexOnly` (#8679)

* Add storage column attribute `indexOnly`, support ElasticSearch only index and not store some fields.
* Add `indexOnly=true` to `SegmentRecord.tags`, to reduce unnecessary storage.
上级 4def1591
......@@ -109,6 +109,8 @@ Release Notes.
* [Breaking Change] Remove configuration `restAcceptorPriorityDelta` (env var: `SW_RECEIVER_SHARING_JETTY_DELTA`
, `SW_CORE_REST_JETTY_DELTA`).
* [Breaking Change] Remove configuration `graphql/path` (env var: `SW_QUERY_GRAPHQL_PATH`).
* Add storage column attribute `indexOnly`, support ElasticSearch only index and not store some fields.
* Add `indexOnly=true` to `SegmentRecord.tags`, `AlarmRecord.tags`, `AbstractLogRecord.tags`, to reduce unnecessary storage.
* [Breaking Change] Remove configuration `restMinThreads` (env var: `SW_CORE_REST_JETTY_MIN_THREADS`
, `SW_RECEIVER_SHARING_JETTY_MIN_THREADS`).
* Refactor the core Builder mechanism, new storage plugin could implement their own converter and get rid of hard
......
......@@ -74,7 +74,7 @@ public class AlarmRecord extends Record {
private String alarmMessage;
@Column(columnName = RULE_NAME)
private String ruleName;
@Column(columnName = TAGS)
@Column(columnName = TAGS, indexOnly = true)
private List<String> tagsInString;
@Column(columnName = TAGS_RAW_DATA, storageOnly = true)
private byte[] tagsRawData;
......
......@@ -94,7 +94,7 @@ public abstract class AbstractLogRecord extends Record {
private byte[] tagsRawData;
@Setter
@Getter
@Column(columnName = TAGS)
@Column(columnName = TAGS, indexOnly = true)
private List<String> tagsInString;
/**
......
......@@ -90,7 +90,7 @@ public class SegmentRecord extends Record {
private byte[] dataBinary;
@Setter
@Getter
@Column(columnName = TAGS)
@Column(columnName = TAGS, indexOnly = true)
private List<String> tags;
/**
* Tags raw data is a duplicate field of {@link #tags}. Some storage don't support array values in a single column.
......
......@@ -58,6 +58,14 @@ public @interface Column {
*/
boolean storageOnly() default false;
/**
* The column(field) is just indexed, never stored. Note: this feature only supported by elasticsearch
* and don't support mappings update due to ElasticSearch server's limitation.
*
* NOTICE, metrics should not use this, as the OAP core merges indices of metrics automatically.
*/
boolean indexOnly() default false;
/**
* @return the length of this column, this is only for {@link String} column. The usage of this depends on the
* storage implementation.
......
......@@ -30,6 +30,7 @@ public class ModelColumn {
private final Type genericType;
private final boolean matchQuery;
private final boolean storageOnly;
private final boolean indexOnly;
private final int length;
private final Column.AnalyzerType analyzer;
......@@ -38,6 +39,7 @@ public class ModelColumn {
Type genericType,
boolean matchQuery,
boolean storageOnly,
boolean indexOnly,
boolean isValue,
int length,
Column.AnalyzerType analyzer) {
......@@ -59,5 +61,11 @@ public class ModelColumn {
}
this.storageOnly = storageOnly;
}
if (storageOnly && indexOnly) {
throw new IllegalArgumentException(
"The column " + columnName + " can't be defined as both indexOnly and storageOnly.");
}
this.indexOnly = indexOnly;
}
}
......@@ -128,7 +128,7 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
modelColumns.add(
new ModelColumn(
new ColumnName(modelName, column.columnName()), field.getType(), field.getGenericType(),
column.matchQuery(), column.storageOnly(), column.dataType().isValue(), columnLength,
column.matchQuery(), column.storageOnly(), column.indexOnly(), column.dataType().isValue(), columnLength,
column.analyzer()
));
if (log.isDebugEnabled()) {
......
......@@ -27,14 +27,14 @@ public class ModelColumnTest {
@Test
public void testColumnDefine() {
ModelColumn column = new ModelColumn(new ColumnName("", "abc"), byte[].class, byte[].class, true,
false, true, 0,
false, false, true, 0,
Column.AnalyzerType.OAP_ANALYZER
);
Assert.assertEquals(true, column.isStorageOnly());
Assert.assertEquals("abc", column.getColumnName().getName());
column = new ModelColumn(new ColumnName("", "abc"), DataTable.class, DataTable.class, true,
false, true, 200,
false, false, true, 200,
Column.AnalyzerType.OAP_ANALYZER
);
Assert.assertEquals(true, column.isStorageOnly());
......@@ -42,7 +42,7 @@ public class ModelColumnTest {
Assert.assertEquals(200, column.getLength());
column = new ModelColumn(new ColumnName("", "abc"), String.class, String.class, true,
false, true, 200,
false, false, true, 200,
Column.AnalyzerType.OAP_ANALYZER
);
Assert.assertEquals(false, column.isStorageOnly());
......@@ -52,7 +52,15 @@ public class ModelColumnTest {
@Test(expected = IllegalArgumentException.class)
public void testConflictDefinition() {
ModelColumn column = new ModelColumn(new ColumnName("", "abc"), String.class, String.class,
true, true, true, 200,
true, true, false, true, 200,
Column.AnalyzerType.OAP_ANALYZER
);
}
@Test(expected = IllegalArgumentException.class)
public void testConflictDefinitionIndexOnly() {
ModelColumn column = new ModelColumn(new ColumnName("", "abc"), String.class, String.class,
true, true, true, false, 200,
Column.AnalyzerType.OAP_ANALYZER
);
}
......
......@@ -22,6 +22,8 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import org.apache.skywalking.library.elasticsearch.response.Mappings;
......@@ -39,14 +41,24 @@ final class V6MappingsDeserializer extends JsonDeserializer<Mappings> {
m.entrySet()
.stream()
.filter(it -> it.getValue() instanceof Map)
.filter(it -> ((Map<String, Object>) it.getValue()).containsKey("properties"))
.peek(it -> it.setValue(((Map<?, ?>) it.getValue()).get("properties")))
.filter(it -> ((Map<String, Object>) it.getValue()).containsKey("properties")
|| ((Map<String, Object>) it.getValue()).containsKey("_source"))
.findFirst();
final Optional<Mappings> result = typeMapping.map(it -> {
final Mappings mappings = new Mappings();
mappings.setType(it.getKey());
mappings.setProperties((Map<String, Object>) it.getValue());
Map<String, Object> properties = (Map<String, Object>) ((Map<?, ?>) it.getValue()).get("properties");
Map<String, Object> source = (Map<String, Object>) ((Map<?, ?>) it.getValue()).get("_source");
if (properties != null) {
mappings.setProperties(properties);
}
if (source != null) {
Object excludes = source.get("excludes");
if (excludes != null) {
mappings.getSource().setExcludes(new HashSet<>((ArrayList<String>) excludes));
}
}
return mappings;
});
return result.orElse(null);
......
......@@ -34,6 +34,9 @@ final class V6MappingsSerializer extends JsonSerializer<Mappings> {
gen.writeFieldName(value.getType());
gen.writeStartObject();
{
if (value.getSource() != null && !value.getSource().getExcludes().isEmpty()) {
gen.writeObjectField("_source", value.getSource());
}
gen.writeObjectField("properties", value.getProperties());
}
gen.writeEndObject();
......
......@@ -22,7 +22,8 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import java.io.IOException;
import java.util.Iterator;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import org.apache.skywalking.library.elasticsearch.response.Mappings;
......@@ -36,12 +37,20 @@ final class V7MappingsDeserializer extends JsonDeserializer<Mappings> {
p.getCodec().readValue(p, new TypeReference<Map<String, Object>>() {
});
final Iterator<Map.Entry<String, Object>> it = m.entrySet().iterator();
if (it.hasNext()) {
final Map.Entry<String, Object> first = it.next();
if (m.size() > 0) {
Map<String, Object> properties = (Map<String, Object>) m.get("properties");
Map<String, Object> source = (Map<String, Object>) m.get("_source");
final Mappings mappings = new Mappings();
mappings.setType("_doc");
mappings.setProperties((Map<String, Object>) first.getValue());
if (properties != null) {
mappings.setProperties(properties);
}
if (source != null) {
Object excludes = source.get("excludes");
if (excludes != null) {
mappings.getSource().setExcludes(new HashSet<>((ArrayList<String>) excludes));
}
}
return mappings;
}
return null;
......
......@@ -19,7 +19,10 @@ package org.apache.skywalking.library.elasticsearch.response;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Set;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
......@@ -42,4 +45,16 @@ public final class Mappings {
@Getter
@Setter
private Map<String, Object> properties = new HashMap<>();
@JsonProperty("_source")
@Getter
@Setter
private Source source = new Source();
public static class Source {
@JsonProperty("excludes")
@Getter
@Setter
private Set<String> excludes = new HashSet<>();
}
}
......@@ -45,6 +45,7 @@ import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@RequiredArgsConstructor
......@@ -122,9 +123,14 @@ public class ITElasticSearchTest {
"metric_table", ImmutableMap.of("type", "keyword"),
"service_id", ImmutableMap.of("type", "keyword")
);
final Mappings.Source sourceConf = new Mappings.Source();
sourceConf.getExcludes().add("test");
final Mappings mappings = Mappings.builder()
.type("_doc")
.properties(properties)
.source(sourceConf)
.build();
assertThat(templateClient.createOrUpdate(name, ImmutableMap.of(), mappings, 0))
......@@ -137,6 +143,12 @@ public class ITElasticSearchTest {
.map(IndexTemplate::getMappings)
.map(Mappings::getProperties)
.hasValue(mappings.getProperties());
assertThat(templateClient.get(name))
.isPresent()
.map(IndexTemplate::getMappings)
.map(Mappings::getSource)
.map(Mappings.Source::getExcludes)
.hasValue(mappings.getSource().getExcludes());
}
@Test
......@@ -177,13 +189,18 @@ public class ITElasticSearchTest {
@Test
public void testSearch() {
final String index = "test-index";
final Mappings.Source sourceConf = new Mappings.Source();
sourceConf.getExcludes().add("key3");
assertTrue(
client.index().create(
index,
Mappings.builder()
.type("type")
.properties(ImmutableMap.of("key1", ImmutableMap.of("type", "keyword")))
.properties(ImmutableMap.of("key2", ImmutableMap.of("type", "keyword")))
.properties(ImmutableMap.of("key2", ImmutableMap.of("type", "keyword"),
"key3", ImmutableMap.of("type", "keyword")
))
.source(sourceConf)
.build(),
null
)
......@@ -197,7 +214,7 @@ public class ITElasticSearchTest {
.index(index)
.type(type)
.id("id" + i)
.doc(ImmutableMap.of("key1", "val" + i, "key2", "val" + (i + 1)
.doc(ImmutableMap.of("key1", "val" + i, "key2", "val" + (i + 1), "key3", "val" + (i + 2)
))
.build(), null);
}
......@@ -209,6 +226,15 @@ public class ITElasticSearchTest {
assertEquals(1, response.getHits().getTotal());
assertEquals("val1", response.getHits().iterator().next().getSource().get("key1"));
});
//test indexOnly
await().atMost(Duration.ONE_MINUTE).untilAsserted(() -> {
SearchResponse response = client.search(
Search.builder().query(Query.bool().must(Query.term("key3", "val3"))).build()
);
assertEquals(1, response.getHits().getTotal());
assertEquals("val1", response.getHits().iterator().next().getSource().get("key1"));
assertNull("indexOnly fields should not be stored", response.getHits().iterator().next().getSource().get("key3"));
});
await().atMost(Duration.ONE_MINUTE)
.pollInterval(Duration.FIVE_SECONDS)
......
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.skywalking.library.elasticsearch.response.Mappings;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
......@@ -36,9 +37,13 @@ public class IndexStructures {
Map<String, Object> properties =
structures.containsKey(tableName) ?
structures.get(tableName).properties : new HashMap<>();
Mappings.Source source =
structures.containsKey(tableName) ?
structures.get(tableName).source : new Mappings.Source();
return Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties)
.source(source)
.build();
}
......@@ -52,8 +57,7 @@ public class IndexStructures {
|| mapping.getProperties().isEmpty()) {
return;
}
Map<String, Object> properties = mapping.getProperties();
Fields fields = new Fields(properties);
Fields fields = new Fields(mapping);
if (structures.containsKey(tableName)) {
structures.get(tableName).appendNewFields(fields);
} else {
......@@ -63,6 +67,7 @@ public class IndexStructures {
/**
* Returns mappings with fields that not exist in the input mappings.
* do not return _source config to avoid index update conflict.
*/
public Mappings diffStructure(String tableName, Mappings mappings) {
if (!structures.containsKey(tableName)) {
......@@ -70,7 +75,7 @@ public class IndexStructures {
}
Map<String, Object> properties = mappings.getProperties();
Map<String, Object> diffProperties =
structures.get(tableName).diffFields(new Fields(properties));
structures.get(tableName).diffFields(new Fields(mappings));
return Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(diffProperties)
......@@ -89,7 +94,7 @@ public class IndexStructures {
}
return structures.containsKey(tableName)
&& structures.get(tableName)
.containsAllFields(new Fields(mappings.getProperties()));
.containsAllFields(new Fields(mappings));
}
/**
......@@ -97,9 +102,11 @@ public class IndexStructures {
*/
public static class Fields {
private final Map<String, Object> properties;
Mappings.Source source;
private Fields(Map<String, Object> properties) {
this.properties = properties;
private Fields(Mappings mapping) {
this.properties = mapping.getProperties();
this.source = mapping.getSource();
}
/**
......@@ -123,6 +130,11 @@ public class IndexStructures {
Map.Entry::getValue
));
properties.putAll(newFields);
if (source != null) {
Set<String> exclude = source.getExcludes();
Set<String> newExclude = fields.source.getExcludes();
exclude.addAll(newExclude);
}
}
/**
......
......@@ -224,6 +224,7 @@ public class StorageEsInstaller extends ModelInstaller {
protected Mappings createMapping(Model model) {
Map<String, Object> properties = new HashMap<>();
Mappings.Source source = new Mappings.Source();
for (ModelColumn columnDefine : model.getColumns()) {
final String type = columnTypeEsMapping.transform(columnDefine.getType(), columnDefine.getGenericType());
if (columnDefine.isMatchQuery()) {
......@@ -247,6 +248,10 @@ public class StorageEsInstaller extends ModelInstaller {
}
properties.put(columnDefine.getColumnName().getName(), column);
}
if (columnDefine.isIndexOnly()) {
source.getExcludes().add(columnDefine.getColumnName().getName());
}
}
if (IndexController.INSTANCE.isMetricModel(model)) {
......@@ -257,6 +262,7 @@ public class StorageEsInstaller extends ModelInstaller {
Mappings mappings = Mappings.builder()
.type("type")
.properties(properties)
.source(source)
.build();
log.debug("elasticsearch index template setting: {}", mappings.toString());
......
......@@ -18,7 +18,10 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import org.apache.skywalking.library.elasticsearch.response.Mappings;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.junit.Assert;
......@@ -48,6 +51,18 @@ public class IndexStructuresTest {
mapping = structures.getMapping("test2");
Assert.assertTrue(mapping.getProperties().isEmpty());
//test with source
IndexStructures structuresSource = new IndexStructures();
Mappings.Source source = new Mappings.Source();
source.getExcludes().add("a");
structuresSource.putStructure(
"test", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties)
.source(source)
.build());
Assert.assertEquals(structuresSource.getMapping("test").getProperties(), properties);
Assert.assertEquals(structuresSource.getMapping("test").getSource().getExcludes(), source.getExcludes());
}
@Test
......@@ -77,6 +92,31 @@ public class IndexStructuresTest {
res.put("c", "d");
res.put("f", "g");
Assert.assertEquals(res, mapping.getProperties());
//test with source
IndexStructures structuresSource = new IndexStructures();
Mappings.Source source = new Mappings.Source();
source.getExcludes().addAll(Arrays.asList("a", "b", "c"));
structuresSource.putStructure(
"test", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties)
.source(source)
.build());
Assert.assertEquals(structuresSource.getMapping("test").getProperties(), properties);
Assert.assertEquals(structuresSource.getMapping("test").getSource().getExcludes(), source.getExcludes());
Mappings.Source source2 = new Mappings.Source();
source.getExcludes().addAll(Arrays.asList("b", "c", "d", "e"));
structuresSource.putStructure(
"test", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties2)
.source(source2)
.build());
Set<String> excludes = new HashSet<>(Arrays.asList("a", "b", "c", "d", "e"));
Assert.assertEquals(structuresSource.getMapping("test").getProperties(), res);
Assert.assertEquals(structuresSource.getMapping("test").getSource().getExcludes(), excludes);
}
@Test
......@@ -110,6 +150,41 @@ public class IndexStructuresTest {
.build()
);
Assert.assertEquals(new HashMap<>(), diffMappings.getProperties());
//test with source
IndexStructures structuresSource = new IndexStructures();
Mappings.Source source = new Mappings.Source();
source.getExcludes().addAll(Arrays.asList("a", "b", "c"));
structuresSource.putStructure(
"test", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties)
.source(source)
.build());
diffMappings = structuresSource.diffStructure(
"test", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties2)
.source(source)
.build());
Assert.assertEquals(res, diffMappings.getProperties());
diffMappings = structuresSource.diffStructure(
"test", Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties2)
.source(source)
.build());
Assert.assertEquals(res, diffMappings.getProperties());
Assert.assertNull("Mapping source should not be return by diffStructure()", diffMappings.getSource());
diffMappings = structuresSource.diffStructure(
"test",
Mappings.builder()
.type(ElasticSearchClient.TYPE)
.properties(properties)
.source(source)
.build()
);
Assert.assertEquals(new HashMap<>(), diffMappings.getProperties());
}
@Test
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册