未验证 提交 8a807002 编写于 作者: W Wan Kai 提交者: GitHub

Support Elasticsearch column alias for the compatibility between storage...

Support Elasticsearch column alias for the compatibility between storage logicSharding model and no-logicSharding model. (#9442)

## New ElasticSearch storage option explanation in 9.2.0
Since v9.2.0, SkyWalking OAP provides 2 storage options for metrics/meter and records, 
system environment variable is (`SW_STORAGE_ES_LOGIC_SHARDING`):

### No-Sharding Model (OAP default setting, `SW_STORAGE_ES_LOGIC_SHARDING = false`)
1. OAP merges all metrics/meter and records(without super datasets, such as segments) indices into one physical 
index template `metrics-all` and `records-all`.
2. The logic index name would be present in columns `metric_table` and `record_table`.
3. If the logic column name has an alias (configured by `@ElasticSearch.Column()`), the alias would be the real physical column name.

### No-Sharding Model (`SW_STORAGE_ES_LOGIC_SHARDING = true `)
1. OAP shard metrics/meter indices into multi-physical indices as in the previous versions(one index template per metric/meter aggregation function).
2. Records and metrics without configuring aggregation function in `@MetricsFunction` and `@MeterFunction` would not be sharded.
3. The shard template name would be `metrics-aggregation function name` or `meter-aggregation function name` such as `metrics-count`,
and the logic index name would be present in column `metric_table`.
4. The OAP **would not** use the column alias, the logic column name would be the real physical column name.

**Notice**: 
Users still could choose to adjust ElasticSearch's shard number(`SW_STORAGE_ES_INDEX_SHARDS_NUMBER`) to scale out no matter the option.
上级 053ae3f8
## New ElasticSearch storage option explanation in 9.2.0
Since v9.2.0, SkyWalking OAP provide 2 storage options for metrics/meter and records,
system environment variable is (`SW_STORAGE_ES_LOGIC_SHARDING`):
### No-Sharding Model (OAP default setting, `SW_STORAGE_ES_LOGIC_SHARDING = false`)
1. OAP merge all metrics/meter and records(without super datasets, such as segments) indices into one physical
index template `metrics-all` and `records-all`.
2. The logic index name would present in column `metric_table` and `record_table`.
3. If logic column name has alias (configured by `@ElasticSearch.Column()`), the alias would be the real physical column name.
### No-Sharding Model (`SW_STORAGE_ES_LOGIC_SHARDING = true `)
1. OAP shard metrics/meter indices into multi-physical indices as the previous versions(one index template per metric/meter aggregation function).
2. Records and metrics without configure aggregation function in `@MetricsFunction` and `@MeterFunction` would not be sharded.
3. The shard template name would be `metrics-aggregation function name` or `meter-aggregation function name` such as `metrics-count`,
and the logic index name would present in column `metric_table`.
4. The OAP **would not** use the column alias, the logic column name would be the real physical column name.
**Notice**:
Users still could choose to adjust ElasticSearch's shard number(`SW_STORAGE_ES_INDEX_SHARDS_NUMBER`) to scale out no matter the option is.
......@@ -11,6 +11,7 @@ These are known and frequently asked questions about SkyWalking. We welcome you
* [Compiling issues on Mac's M1 chip](How-to-build-with-mac-m1.md)
## Runtime
* [New ElasticSearch storage option explanation in 9.2.0](New-ElasticSearch-storage-option-explanation-in-9.2.0.md)
* [Version 9.x+ upgrade](v9-version-upgrade.md)
* [Elasticsearch exception `type=version_conflict_engine_exception` since 8.7.0](es-version-conflict.md)
* [Version 8.x+ upgrade](v8-version-upgrade.md)
......
......@@ -36,15 +36,17 @@
* Add `VIRTUAL_CACHE` to Layer, to fix conjectured Redis server, which icon can't show on the topology.
* [Breaking Change] Elasticsearch storage merge all metrics/meter and records(without super datasets) indices into one
physical index template `metrics-all` and `records-all` on the default setting.
Provide system environment variable(`SW_STORAGE_ES_LOGIC_SHARDING`) to shard metrics/records indices into
Provide system environment variable(`SW_STORAGE_ES_LOGIC_SHARDING`) to shard metrics/meter indices into
multi-physical indices as the previous versions(one index template per metric/meter aggregation function).
In the current one index mode, users still could choose to adjust ElasticSearch's shard
number(`SW_STORAGE_ES_INDEX_SHARDS_NUMBER`) to scale out.
* [Breaking Change] Many columns of metrics and records model names are changed, The H2/Mysql/Tidb/Postgres storage
users are required to remove all metrics-related and records-related tables for OAP to re-create or use a new database
instance.
More details please refer to [New ElasticSearch storage option explanation in 9.2.0](../FAQ/New-ElasticSearch-storage-option-explanation-in-9.2.0.md)
and [backend-storage.md](../setup/backend/backend-storage.md)
* [Breaking Change] Index/table `ebpf_profiling_schedule` added a new column `ebpf_profiling_schedule_id`,
the H2/Mysql/Tidb/Postgres storage users are required to re-created it when bump up from previous releases.
* Fix Zipkin trace query the max size of spans.
* Add `tls` and `https` component IDs for Network Profiling.
* Support Elasticsearch column alias for the compatibility between storage logicSharding model and no-logicSharding model.
#### UI
......
......@@ -45,7 +45,7 @@ public class EndpointTraffic extends Metrics {
public static final String INDEX_NAME = "endpoint_traffic";
public static final String SERVICE_ID = "service_id";
public static final String NAME = "endpoint_traffic_name";
public static final String NAME = "name";
@Setter
@Getter
......@@ -55,6 +55,7 @@ public class EndpointTraffic extends Metrics {
@Getter
@Column(columnName = NAME)
@ElasticSearch.MatchQuery
@ElasticSearch.Column(columnAlias = "endpoint_traffic_name")
private String name = Const.EMPTY_STRING;
@Override
......
......@@ -31,6 +31,7 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
......@@ -48,7 +49,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SE
public class InstanceTraffic extends Metrics {
public static final String INDEX_NAME = "instance_traffic";
public static final String SERVICE_ID = "service_id";
public static final String NAME = "instance_traffic_name";
public static final String NAME = "name";
public static final String LAST_PING_TIME_BUCKET = "last_ping";
public static final String PROPERTIES = "properties";
......@@ -62,6 +63,7 @@ public class InstanceTraffic extends Metrics {
@Setter
@Getter
@Column(columnName = NAME, storageOnly = true)
@ElasticSearch.Column(columnAlias = "instance_traffic_name")
private String name;
@Setter
......
......@@ -49,7 +49,7 @@ import static org.apache.skywalking.oap.server.core.Const.DOUBLE_COLONS_SPLIT;
public class ServiceTraffic extends Metrics {
public static final String INDEX_NAME = "service_traffic";
public static final String NAME = "service_traffic_name";
public static final String NAME = "name";
public static final String SHORT_NAME = "short_name";
......@@ -63,6 +63,7 @@ public class ServiceTraffic extends Metrics {
@Getter
@Column(columnName = NAME)
@ElasticSearch.MatchQuery
@ElasticSearch.Column(columnAlias = "service_traffic_name")
private String name = Const.EMPTY_STRING;
@Setter
......
......@@ -39,6 +39,7 @@ import org.apache.skywalking.oap.server.core.query.type.Bucket;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
......@@ -52,7 +53,7 @@ import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
public abstract class PercentileFunction extends Meter implements AcceptableValue<PercentileFunction.PercentileArgument>, MultiIntValuesHolder {
public static final String DATASET = "dataset";
public static final String RANKS = "ranks";
public static final String VALUE = "datatable_value";
public static final String VALUE = "value";
@Setter
@Getter
......@@ -62,6 +63,7 @@ public abstract class PercentileFunction extends Meter implements AcceptableValu
@Getter
@Setter
@Column(columnName = VALUE, dataType = Column.ValueDataType.LABELED_VALUE, storageOnly = true)
@ElasticSearch.Column(columnAlias = "datatable_value")
private DataTable percentileValues = new DataTable(10);
@Getter
@Setter
......
......@@ -36,6 +36,7 @@ import org.apache.skywalking.oap.server.core.query.type.Bucket;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
......@@ -55,8 +56,8 @@ import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
@ToString
public abstract class AvgHistogramFunction extends Meter implements AcceptableValue<BucketedValues> {
public static final String DATASET = "dataset";
protected static final String SUMMATION = "datatable_summation";
protected static final String COUNT = "datatable_count";
protected static final String SUMMATION = "summation";
protected static final String COUNT = "count";
@Setter
@Getter
......@@ -66,10 +67,12 @@ public abstract class AvgHistogramFunction extends Meter implements AcceptableVa
@Getter
@Setter
@Column(columnName = SUMMATION, storageOnly = true)
@ElasticSearch.Column(columnAlias = "datatable_summation")
protected DataTable summation = new DataTable(30);
@Getter
@Setter
@Column(columnName = COUNT, storageOnly = true)
@ElasticSearch.Column(columnAlias = "datatable_count")
protected DataTable count = new DataTable(30);
@Getter
@Setter
......
......@@ -45,6 +45,7 @@ import org.apache.skywalking.oap.server.core.query.type.Bucket;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
......@@ -69,9 +70,9 @@ public abstract class AvgHistogramPercentileFunction extends Meter implements Ac
private static final String DEFAULT_GROUP = "pD";
public static final String DATASET = "dataset";
public static final String RANKS = "ranks";
public static final String VALUE = "datatable_value";
protected static final String SUMMATION = "datatable_summation";
protected static final String COUNT = "datatable_count";
public static final String VALUE = "value";
protected static final String SUMMATION = "summation";
protected static final String COUNT = "count";
@Setter
@Getter
......@@ -81,14 +82,17 @@ public abstract class AvgHistogramPercentileFunction extends Meter implements Ac
@Getter
@Setter
@Column(columnName = VALUE, dataType = Column.ValueDataType.LABELED_VALUE, storageOnly = true)
@ElasticSearch.Column(columnAlias = "datatable_value")
private DataTable percentileValues = new DataTable(10);
@Getter
@Setter
@Column(columnName = SUMMATION, storageOnly = true)
@ElasticSearch.Column(columnAlias = "datatable_summation")
protected DataTable summation = new DataTable(30);
@Getter
@Setter
@Column(columnName = COUNT, storageOnly = true)
@ElasticSearch.Column(columnAlias = "datatable_count")
protected DataTable count = new DataTable(30);
@Getter
@Setter
......
......@@ -36,6 +36,7 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
......@@ -43,9 +44,9 @@ import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
@MeterFunction(functionName = "avgLabeled")
@ToString
public abstract class AvgLabeledFunction extends Meter implements AcceptableValue<DataTable>, LabeledValueHolder {
protected static final String SUMMATION = "datatable_summation";
protected static final String COUNT = "datatable_count";
protected static final String VALUE = "datatable_value";
protected static final String SUMMATION = "summation";
protected static final String COUNT = "count";
protected static final String VALUE = "value";
@Setter
@Getter
......@@ -64,14 +65,17 @@ public abstract class AvgLabeledFunction extends Meter implements AcceptableValu
@Getter
@Setter
@Column(columnName = SUMMATION, storageOnly = true)
@ElasticSearch.Column(columnAlias = "datatable_summation")
protected DataTable summation = new DataTable(30);
@Getter
@Setter
@Column(columnName = COUNT, storageOnly = true)
@ElasticSearch.Column(columnAlias = "datatable_count")
protected DataTable count = new DataTable(30);
@Getter
@Setter
@Column(columnName = VALUE, dataType = Column.ValueDataType.LABELED_VALUE, storageOnly = true)
@ElasticSearch.Column(columnAlias = "datatable_value")
private DataTable value = new DataTable(30);
@Override
......
......@@ -39,6 +39,7 @@ import org.apache.skywalking.oap.server.core.query.type.Bucket;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
......@@ -61,8 +62,8 @@ public abstract class SumHistogramPercentileFunction extends Meter implements Ac
private static final String DEFAULT_GROUP = "pD";
public static final String DATASET = "dataset";
public static final String RANKS = "ranks";
public static final String VALUE = "datatable_value";
protected static final String SUMMATION = "datatable_summation";
public static final String VALUE = "value";
protected static final String SUMMATION = "summation";
@Setter
@Getter
......@@ -72,10 +73,12 @@ public abstract class SumHistogramPercentileFunction extends Meter implements Ac
@Getter
@Setter
@Column(columnName = VALUE, dataType = Column.ValueDataType.LABELED_VALUE, storageOnly = true)
@ElasticSearch.Column(columnAlias = "datatable_value")
private DataTable percentileValues = new DataTable(10);
@Getter
@Setter
@Column(columnName = SUMMATION, storageOnly = true)
@ElasticSearch.Column(columnAlias = "datatable_summation")
protected DataTable summation = new DataTable(30);
/**
* Rank
......
......@@ -27,6 +27,7 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.Metrics
import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.SourceFrom;
import org.apache.skywalking.oap.server.core.query.sql.Function;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
/**
* Apdex dissatisfaction levels of Tolerating (apdex_t) and Frustrated (apdex_f) indicate how slow site performance
......@@ -44,7 +45,7 @@ public abstract class ApdexMetrics extends Metrics implements IntValueHolder {
protected static final String S_NUM = "s_num";
// Level: tolerated
protected static final String T_NUM = "t_num";
protected static final String VALUE = "int_value";
protected static final String VALUE = "value";
@Getter
@Setter
......@@ -61,6 +62,7 @@ public abstract class ApdexMetrics extends Metrics implements IntValueHolder {
@Getter
@Setter
@Column(columnName = VALUE, dataType = Column.ValueDataType.COMMON_VALUE, function = Function.Avg)
@ElasticSearch.Column(columnAlias = "int_value")
private int value;
@Entrance
......
......@@ -26,17 +26,19 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.Metrics
import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.SourceFrom;
import org.apache.skywalking.oap.server.core.query.sql.Function;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
@MetricsFunction(functionName = "doubleAvg")
public abstract class DoubleAvgMetrics extends Metrics implements DoubleValueHolder {
protected static final String SUMMATION = "double_summation";
protected static final String SUMMATION = "summation";
protected static final String COUNT = "count";
protected static final String VALUE = "double_value";
protected static final String VALUE = "value";
@Getter
@Setter
@Column(columnName = SUMMATION, storageOnly = true)
@ElasticSearch.Column(columnAlias = "double_summation")
private double summation;
@Getter
@Setter
......@@ -45,6 +47,7 @@ public abstract class DoubleAvgMetrics extends Metrics implements DoubleValueHol
@Getter
@Setter
@Column(columnName = VALUE, dataType = Column.ValueDataType.COMMON_VALUE, function = Function.Avg)
@ElasticSearch.Column(columnAlias = "double_value")
private double value;
@Entrance
......
......@@ -28,6 +28,7 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.Entranc
import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.MetricsFunction;
import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.SourceFrom;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
/**
* Percentile is a better implementation than deprecated PxxMetrics in older releases.
......@@ -38,7 +39,7 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column;
@MetricsFunction(functionName = "percentile")
public abstract class PercentileMetrics extends Metrics implements MultiIntValuesHolder {
protected static final String DATASET = "dataset";
protected static final String VALUE = "datatable_value";
protected static final String VALUE = "value";
protected static final String PRECISION = "precision";
private static final int[] RANKS = {
......@@ -52,6 +53,7 @@ public abstract class PercentileMetrics extends Metrics implements MultiIntValue
@Getter
@Setter
@Column(columnName = VALUE, dataType = Column.ValueDataType.LABELED_VALUE, storageOnly = true)
@ElasticSearch.Column(columnAlias = "datatable_value")
private DataTable percentileValues;
@Getter
@Setter
......
......@@ -68,4 +68,18 @@ public @interface ElasticSearch {
}
}
}
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@interface Column {
/**
* Warning: this is only used to solve the conflict among the existing columns since we need support to merge all metrics
* in one physical index template. When creating a new column, we should avoid the compatibility issue
* between these 2 storage modes rather than use this alias.
*/
@Deprecated
String columnAlias();
}
}
......@@ -36,6 +36,8 @@ public class ElasticSearchExtension {
*/
private final ElasticSearch.MatchQuery.AnalyzerType analyzer;
private final String columnAlias;
public boolean needMatchQuery() {
return analyzer != null;
}
......
......@@ -162,8 +162,10 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
// ElasticSearch extension
final ElasticSearch.MatchQuery elasticSearchAnalyzer = field.getAnnotation(
ElasticSearch.MatchQuery.class);
final ElasticSearch.Column elasticSearchColumn = field.getAnnotation(ElasticSearch.Column.class);
ElasticSearchExtension elasticSearchExtension = new ElasticSearchExtension(
elasticSearchAnalyzer == null ? null : elasticSearchAnalyzer.analyzer()
elasticSearchAnalyzer == null ? null : elasticSearchAnalyzer.analyzer(),
elasticSearchColumn == null ? null : elasticSearchColumn.columnAlias()
);
// BanyanDB extension
......
......@@ -30,7 +30,7 @@ public class ModelColumnTest {
false, false, true, 0,
new SQLDatabaseExtension(),
new ElasticSearchExtension(
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER),
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc"),
new BanyanDBExtension(-1, false, true)
);
Assert.assertEquals(true, column.isStorageOnly());
......@@ -39,7 +39,7 @@ public class ModelColumnTest {
column = new ModelColumn(new ColumnName("", "abc"), DataTable.class, DataTable.class,
false, false, true, 200,
new SQLDatabaseExtension(),
new ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER),
new ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc"),
new BanyanDBExtension(-1, false, true)
);
Assert.assertEquals(true, column.isStorageOnly());
......@@ -49,7 +49,7 @@ public class ModelColumnTest {
column = new ModelColumn(new ColumnName("", "abc"), String.class, String.class,
false, false, true, 200,
new SQLDatabaseExtension(),
new ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER),
new ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc"),
new BanyanDBExtension(-1, false, true)
);
Assert.assertEquals(false, column.isStorageOnly());
......@@ -62,7 +62,7 @@ public class ModelColumnTest {
true, false, true, 200,
new SQLDatabaseExtension(),
new ElasticSearchExtension(
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER),
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc"),
new BanyanDBExtension(-1, false, true)
);
}
......@@ -73,7 +73,7 @@ public class ModelColumnTest {
true, true, false, 200,
new SQLDatabaseExtension(),
new ElasticSearchExtension(
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER),
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc"),
new BanyanDBExtension(-1, false, true)
);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
/**
* ElasticSearchConverter same as the HashMapConverter, but translate the column alias automatically.
*/
public class ElasticSearchConverter {
@RequiredArgsConstructor
public static class ToEntity implements Convert2Entity {
private final String modelName;
private final Map<String, Object> source;
@Override
public Object get(final String fieldName) {
return source.get(getPhysicalColumnName(modelName, fieldName));
}
@Override
public byte[] getBytes(final String fieldName) {
final String value = (String) source.get(getPhysicalColumnName(modelName, fieldName));
if (StringUtil.isEmpty(value)) {
return new byte[] {};
}
return Base64.getDecoder().decode(value);
}
}
public static class ToStorage implements Convert2Storage<Map<String, Object>> {
private Map<String, Object> source;
private String modelName;
public ToStorage(String modelName) {
source = new HashMap();
this.modelName = modelName;
}
@Override
public void accept(final String fieldName, final Object fieldValue) {
source.put(getPhysicalColumnName(modelName, fieldName)
, fieldValue);
}
@Override
public void accept(final String fieldName, final byte[] fieldValue) {
if (CollectionUtils.isEmpty(fieldValue)) {
source.put(getPhysicalColumnName(modelName, fieldName), Const.EMPTY_STRING);
} else {
source.put(getPhysicalColumnName(modelName, fieldName), new String(Base64.getEncoder().encode(fieldValue)));
}
}
@Override
public void accept(final String fieldName, final List<String> fieldValue) {
this.accept(getPhysicalColumnName(modelName, fieldName), (Object) fieldValue);
}
@Override
public Object get(final String fieldName) {
return source.get(getPhysicalColumnName(modelName, fieldName));
}
@Override
public Map<String, Object> obtain() {
return source;
}
}
private static String getPhysicalColumnName(String modelName, String fieldName) {
return IndexController.LogicIndicesRegister.getPhysicalColumnName(modelName, fieldName);
}
}
......@@ -23,15 +23,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.FunctionCategory;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
/**
......@@ -45,6 +46,7 @@ public enum IndexController {
* Init in StorageModuleElasticsearchProvider.prepare() and the value from the config.
*/
@Setter
@Getter
private boolean logicSharding = false;
public String getTableName(Model model) {
......@@ -124,6 +126,8 @@ public enum IndexController {
private static final Map<String/*physical index name*/, Map<String/*column name*/, ModelColumn>> PHYSICAL_INDICES_COLUMNS = new HashMap<>();
private static final Map<String/*logic index name*/, Map<String/*column name*/, String/*alias*/>> LOGIC_INDICES_COLUMNS_ALIAS = new HashMap<>();
/**
* The metric table name in aggregation physical storage.
*/
......@@ -141,15 +145,34 @@ public enum IndexController {
public static void registerRelation(Model model, String physicalName) {
LOGIC_INDICES_CATALOG.put(model.getName(), physicalName);
Map<String, ModelColumn> columns = PHYSICAL_INDICES_COLUMNS.computeIfAbsent(
physicalName, v -> new ConcurrentHashMap<>());
model.getColumns().forEach(modelColumn -> {
String columnName = modelColumn.getColumnName().getName();
if (columns.containsKey(columnName)) {
checkModelColumnConflicts(columns.get(columnName), modelColumn, physicalName);
} else {
columns.put(columnName, modelColumn);
}
});
physicalName, v -> new HashMap<>());
if (!IndexController.INSTANCE.logicSharding) {
model.getColumns().forEach(modelColumn -> {
String columnName = modelColumn.getColumnName().getName();
String alias = modelColumn.getElasticSearchExtension().getColumnAlias();
if (alias != null) {
Map<String, String> aliasMap = LOGIC_INDICES_COLUMNS_ALIAS.computeIfAbsent(
model.getName(), v -> new HashMap<>());
aliasMap.put(modelColumn.getColumnName().getName(), alias);
columnName = alias;
}
if (columns.containsKey(columnName)) {
checkModelColumnConflicts(columns.get(columnName), modelColumn, physicalName);
} else {
columns.put(columnName, modelColumn);
}
});
} else {
model.getColumns().forEach(modelColumn -> {
String columnName = modelColumn.getColumnName().getName();
if (columns.containsKey(columnName)) {
checkModelColumnConflicts(columns.get(columnName), modelColumn, physicalName);
} else {
columns.put(columnName, modelColumn);
}
});
}
}
public static boolean isPhysicalTable(String logicName) {
......@@ -161,6 +184,27 @@ public enum IndexController {
return new ArrayList<>(PHYSICAL_INDICES_COLUMNS.get(tableName).values());
}
/**
* Get real physical column name by logic name.
* Warning: This is only used to solve the column has alias.
*/
@Deprecated
public static String getPhysicalColumnName(String modelName, String columnName) {
if (IndexController.INSTANCE.logicSharding) {
return columnName;
}
Map<String, String> aliasMap = LOGIC_INDICES_COLUMNS_ALIAS.get(modelName);
if (CollectionUtils.isEmpty(aliasMap)) {
return columnName;
}
return aliasMap.getOrDefault(columnName, columnName);
}
/**
* Check the columns conflicts when they in one physical index
*/
private static void checkModelColumnConflicts(ModelColumn mc1, ModelColumn mc2, String physicalName) {
if (!(mc1.isIndexOnly() == mc2.isIndexOnly())) {
throw new IllegalArgumentException(mc1.getColumnName() + " and " + mc2.getColumnName() + " isIndexOnly conflict in index: " + physicalName);
......
......@@ -23,7 +23,6 @@ import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
......@@ -44,7 +43,7 @@ public class ManagementEsDAO extends EsDAO implements IManagementDAO {
if (exist) {
return;
}
final HashMapConverter.ToStorage toStorage = new HashMapConverter.ToStorage();
final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(model.getName());
storageBuilder.entity2Storage(managementData, toStorage);
Map<String, Object> source =
IndexController.INSTANCE.appendTableColumn(model, toStorage.obtain());
......
......@@ -33,7 +33,6 @@ import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
......@@ -72,7 +71,7 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
if (!indexIdsGroup.isEmpty()) {
final Optional<Documents> response = getClient().ids(indexIdsGroup);
response.ifPresent(documents -> documents.forEach(document -> {
Metrics source = storageBuilder.storage2Entity(new HashMapConverter.ToEntity(document.getSource()));
Metrics source = storageBuilder.storage2Entity(new ElasticSearchConverter.ToEntity(model.getName(), document.getSource()));
result.add(source);
}));
}
......@@ -89,7 +88,7 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
.collect(Collectors.toList());
final SearchResponse response = getClient().searchIDs(tableName, ids);
response.getHits().getHits().forEach(hit -> {
Metrics source = storageBuilder.storage2Entity(new HashMapConverter.ToEntity(hit.getSource()));
Metrics source = storageBuilder.storage2Entity(new ElasticSearchConverter.ToEntity(model.getName(), hit.getSource()));
result.add(source);
});
});
......@@ -99,7 +98,7 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
@Override
public InsertRequest prepareBatchInsert(Model model, Metrics metrics) {
final HashMapConverter.ToStorage toStorage = new HashMapConverter.ToStorage();
final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(model.getName());
storageBuilder.entity2Storage(metrics, toStorage);
Map<String, Object> builder = IndexController.INSTANCE.appendTableColumn(model, toStorage.obtain());
String modelName = TimeSeriesUtils.writeIndexName(model, metrics.getTimeBucket());
......@@ -109,7 +108,7 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
@Override
public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) {
final HashMapConverter.ToStorage toStorage = new HashMapConverter.ToStorage();
final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(model.getName());
storageBuilder.entity2Storage(metrics, toStorage);
Map<String, Object> builder =
IndexController.INSTANCE.appendTableColumn(model, toStorage.obtain());
......
......@@ -23,7 +23,6 @@ import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
......@@ -41,7 +40,7 @@ public class NoneStreamEsDAO extends EsDAO implements INoneStreamDAO {
@Override
public void insert(Model model, NoneStream noneStream) throws IOException {
final HashMapConverter.ToStorage toStorage = new HashMapConverter.ToStorage();
final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(model.getName());
storageBuilder.entity2Storage(noneStream, toStorage);
Map<String, Object> builder =
IndexController.INSTANCE.appendTableColumn(model, toStorage.obtain());
......
......@@ -23,7 +23,6 @@ import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
......@@ -39,7 +38,7 @@ public class RecordEsDAO extends EsDAO implements IRecordDAO {
@Override
public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
final HashMapConverter.ToStorage toStorage = new HashMapConverter.ToStorage();
final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(model.getName());
storageBuilder.entity2Storage(record, toStorage);
Map<String, Object> builder = IndexController.INSTANCE.appendTableColumn(model, toStorage.obtain());
String modelName = TimeSeriesUtils.writeIndexName(model, record.getTimeBucket());
......
......@@ -233,13 +233,18 @@ public class StorageEsInstaller extends ModelInstaller {
Mappings.Source source = new Mappings.Source();
for (ModelColumn columnDefine : model.getColumns()) {
final String type = columnTypeEsMapping.transform(columnDefine.getType(), columnDefine.getGenericType());
String columnName = columnDefine.getColumnName().getName();
String alias = columnDefine.getElasticSearchExtension().getColumnAlias();
if (!config.isLogicSharding() && alias != null) {
columnName = alias;
}
if (columnDefine.getElasticSearchExtension().needMatchQuery()) {
String matchCName = MatchCNameBuilder.INSTANCE.build(columnDefine.getColumnName().getName());
String matchCName = MatchCNameBuilder.INSTANCE.build(columnName);
Map<String, Object> originalColumn = new HashMap<>();
originalColumn.put("type", type);
originalColumn.put("copy_to", matchCName);
properties.put(columnDefine.getColumnName().getName(), originalColumn);
properties.put(columnName, originalColumn);
Map<String, Object> matchColumn = new HashMap<>();
matchColumn.put("type", "text");
......@@ -252,11 +257,11 @@ public class StorageEsInstaller extends ModelInstaller {
if (columnDefine.isStorageOnly() && !"binary".equals(type)) {
column.put("index", false);
}
properties.put(columnDefine.getColumnName().getName(), column);
properties.put(columnName, column);
}
if (columnDefine.isIndexOnly()) {
source.getExcludes().add(columnDefine.getColumnName().getName());
source.getExcludes().add(columnName);
}
}
......
......@@ -31,9 +31,9 @@ import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
......@@ -81,7 +81,7 @@ public class NetworkAddressAliasEsDAO extends EsDAO implements INetworkAddressAl
for (SearchHit searchHit : results.getHits()) {
networkAddressAliases.add(
builder.storage2Entity(
new HashMapConverter.ToEntity(searchHit.getSource())));
new ElasticSearchConverter.ToEntity(NetworkAddressAlias.INDEX_NAME, searchHit.getSource())));
}
if (results.getHits().getTotal() < batchSize) {
break;
......
......@@ -35,9 +35,9 @@ import org.apache.skywalking.oap.server.core.query.enumeration.Scope;
import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
import org.apache.skywalking.oap.server.core.query.type.Alarms;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
......@@ -88,7 +88,7 @@ public class AlarmQueryEsDAO extends EsDAO implements IAlarmQueryDAO {
for (SearchHit searchHit : response.getHits().getHits()) {
AlarmRecord.Builder builder = new AlarmRecord.Builder();
AlarmRecord alarmRecord = builder.storage2Entity(new HashMapConverter.ToEntity(searchHit.getSource()));
AlarmRecord alarmRecord = builder.storage2Entity(new ElasticSearchConverter.ToEntity(AlarmRecord.INDEX_NAME, searchHit.getSource()));
AlarmMessage message = new AlarmMessage();
message.setId(String.valueOf(alarmRecord.getId0()));
......
......@@ -27,9 +27,9 @@ import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingDataRecord;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingDataDAO;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
......@@ -91,7 +91,7 @@ public class EBPFProfilingDataEsDAO extends EsDAO implements IEBPFProfilingDataD
for (SearchHit hit : response.getHits()) {
final Map<String, Object> sourceAsMap = hit.getSource();
final EBPFProfilingDataRecord.Builder builder = new EBPFProfilingDataRecord.Builder();
records.add(builder.storage2Entity(new HashMapConverter.ToEntity(sourceAsMap)));
records.add(builder.storage2Entity(new ElasticSearchConverter.ToEntity(EBPFProfilingDataRecord.INDEX_NAME, sourceAsMap)));
}
return records;
}
......
......@@ -32,11 +32,11 @@ import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilin
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTriggerType;
import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTask;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
......@@ -138,7 +138,7 @@ public class EBPFProfilingTaskEsDAO extends EsDAO implements IEBPFProfilingTaskD
private EBPFProfilingTask parseTask(final SearchHit hit) {
final Map<String, Object> sourceAsMap = hit.getSource();
final EBPFProfilingTaskRecord.Builder builder = new EBPFProfilingTaskRecord.Builder();
final EBPFProfilingTaskRecord record = builder.storage2Entity(new HashMapConverter.ToEntity(sourceAsMap));
final EBPFProfilingTaskRecord record = builder.storage2Entity(new ElasticSearchConverter.ToEntity(EBPFProfilingTaskRecord.INDEX_NAME, sourceAsMap));
final EBPFProfilingTask task = new EBPFProfilingTask();
task.setTaskId(record.getLogicalId());
......@@ -161,4 +161,4 @@ public class EBPFProfilingTaskEsDAO extends EsDAO implements IEBPFProfilingTaskD
task.setLastUpdateTime(record.getLastUpdateTime());
return task;
}
}
\ No newline at end of file
}
......@@ -53,10 +53,10 @@ import org.apache.skywalking.oap.server.core.query.type.Process;
import org.apache.skywalking.oap.server.core.query.type.Service;
import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
......@@ -67,6 +67,8 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
private final int queryMaxSize;
private final int scrollingBatchSize;
private String endpointTrafficNameAlias;
private boolean aliasNameInit = false;
public MetadataQueryEsDAO(
ElasticSearchClient client,
......@@ -194,6 +196,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
@Override
public List<Endpoint> findEndpoint(String keyword, String serviceId, int limit)
throws IOException {
initColumnName();
final String index = IndexController.LogicIndicesRegister.getPhysicalTableName(
EndpointTraffic.INDEX_NAME);
......@@ -202,7 +205,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
.must(Query.term(EndpointTraffic.SERVICE_ID, serviceId));
if (!Strings.isNullOrEmpty(keyword)) {
String matchCName = MatchCNameBuilder.INSTANCE.build(EndpointTraffic.NAME);
String matchCName = MatchCNameBuilder.INSTANCE.build(endpointTrafficNameAlias);
query.must(Query.match(matchCName, keyword));
}
......@@ -219,11 +222,11 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
Map<String, Object> sourceAsMap = searchHit.getSource();
final EndpointTraffic endpointTraffic =
new EndpointTraffic.Builder().storage2Entity(new HashMapConverter.ToEntity(sourceAsMap));
new EndpointTraffic.Builder().storage2Entity(new ElasticSearchConverter.ToEntity(EndpointTraffic.INDEX_NAME, sourceAsMap));
Endpoint endpoint = new Endpoint();
endpoint.setId(endpointTraffic.id());
endpoint.setName((String) sourceAsMap.get(EndpointTraffic.NAME));
endpoint.setName(endpointTraffic.getName());
endpoints.add(endpoint);
}
......@@ -360,7 +363,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
for (SearchHit hit : response.getHits()) {
final Map<String, Object> sourceAsMap = hit.getSource();
final ServiceTraffic.Builder builder = new ServiceTraffic.Builder();
final ServiceTraffic serviceTraffic = builder.storage2Entity(new HashMapConverter.ToEntity(sourceAsMap));
final ServiceTraffic serviceTraffic = builder.storage2Entity(new ElasticSearchConverter.ToEntity(ServiceTraffic.INDEX_NAME, sourceAsMap));
String serviceName = serviceTraffic.getName();
Service service = new Service();
service.setId(serviceTraffic.getServiceId());
......@@ -379,7 +382,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
Map<String, Object> sourceAsMap = searchHit.getSource();
final InstanceTraffic instanceTraffic =
new InstanceTraffic.Builder().storage2Entity(new HashMapConverter.ToEntity(sourceAsMap));
new InstanceTraffic.Builder().storage2Entity(new ElasticSearchConverter.ToEntity(InstanceTraffic.INDEX_NAME, sourceAsMap));
ServiceInstance serviceInstance = new ServiceInstance();
serviceInstance.setId(instanceTraffic.id());
......@@ -411,7 +414,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
Map<String, Object> sourceAsMap = searchHit.getSource();
final ProcessTraffic processTraffic =
new ProcessTraffic.Builder().storage2Entity(new HashMapConverter.ToEntity(sourceAsMap));
new ProcessTraffic.Builder().storage2Entity(new ElasticSearchConverter.ToEntity(ProcessTraffic.INDEX_NAME, sourceAsMap));
Process process = new Process();
process.setId(processTraffic.id());
......@@ -443,4 +446,14 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
}
return processes;
}
/**
* When the index column use an alias, we should get it's real physical column name for query.
*/
private void initColumnName() {
if (!aliasNameInit) {
this.endpointTrafficNameAlias = IndexController.LogicIndicesRegister.getPhysicalColumnName(EndpointTraffic.INDEX_NAME, EndpointTraffic.NAME);
aliasNameInit = true;
}
}
}
......@@ -36,7 +36,6 @@ import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
import org.apache.skywalking.library.elasticsearch.response.search.SearchHits;
import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.analysis.metrics.HistogramMetrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.query.PointOfTime;
import org.apache.skywalking.oap.server.core.query.input.Duration;
......@@ -64,11 +63,12 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
public long readMetricsValue(final MetricsCondition condition,
final String valueColumnName,
final Duration duration) {
final String realValueColumn = IndexController.LogicIndicesRegister.getPhysicalColumnName(condition.getName(), valueColumnName);
final SearchBuilder sourceBuilder = buildQuery(condition, duration);
int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
if (function == Function.Latest) {
return readMetricsValues(condition, valueColumnName, duration)
return readMetricsValues(condition, realValueColumn, duration)
.getValues().latestValue(defaultValue);
}
......@@ -78,7 +78,7 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
.executionHint(TermsAggregationBuilder.ExecutionHint.MAP)
.collectMode(TermsAggregationBuilder.CollectMode.BREADTH_FIRST)
.size(1);
functionAggregation(function, entityIdAggregation, valueColumnName);
functionAggregation(function, entityIdAggregation, realValueColumn);
sourceBuilder.aggregation(entityIdAggregation);
......@@ -93,7 +93,7 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
(List<Map<String, Object>>) idTerms.get("buckets");
for (Map<String, Object> idBucket : buckets) {
final Map<String, Object> agg = (Map<String, Object>) idBucket.get(valueColumnName);
final Map<String, Object> agg = (Map<String, Object>) idBucket.get(realValueColumn);
return ((Number) agg.get("value")).longValue();
}
return defaultValue;
......@@ -103,6 +103,7 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
public MetricsValues readMetricsValues(final MetricsCondition condition,
final String valueColumnName,
final Duration duration) {
final String realValueColumn = IndexController.LogicIndicesRegister.getPhysicalColumnName(condition.getName(), valueColumnName);
String tableName =
IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName());
final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
......@@ -132,7 +133,7 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
kvInt.setValue(0);
if (idMap.containsKey(id)) {
Map<String, Object> source = idMap.get(id);
kvInt.setValue(((Number) source.getOrDefault(valueColumnName, 0)).longValue());
kvInt.setValue(((Number) source.getOrDefault(realValueColumn, 0)).longValue());
} else {
kvInt.setValue(ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName()));
}
......@@ -152,6 +153,7 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
final String valueColumnName,
final List<String> labels,
final Duration duration) {
final String realValueColumn = IndexController.LogicIndicesRegister.getPhysicalColumnName(condition.getName(), valueColumnName);
final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
String tableName =
IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName());
......@@ -177,7 +179,7 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
for (final Document document : response.get()) {
idMap.put(
document.getId(),
new DataTable((String) document.getSource().getOrDefault(valueColumnName, ""))
new DataTable((String) document.getSource().getOrDefault(realValueColumn, ""))
);
}
}
......@@ -188,6 +190,7 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
public HeatMap readHeatMap(final MetricsCondition condition,
final String valueColumnName,
final Duration duration) {
final String realValueColumn = IndexController.LogicIndicesRegister.getPhysicalColumnName(condition.getName(), valueColumnName);
final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
String tableName =
IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName());
......@@ -218,7 +221,7 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
for (String id : ids) {
Map<String, Object> source = idMap.get(id);
if (source != null) {
String value = (String) source.get(HistogramMetrics.DATASET);
String value = (String) source.get(realValueColumn);
heatMap.buildColumn(id, value, defaultValue);
}
}
......
......@@ -39,10 +39,10 @@ import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentReco
import org.apache.skywalking.oap.server.core.profiling.trace.ProfileThreadSnapshotRecord;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileThreadSnapshotQueryDAO;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
......@@ -170,7 +170,7 @@ public class ProfileThreadSnapshotQueryEsDAO extends EsDAO
List<ProfileThreadSnapshotRecord> result = new ArrayList<>(maxSequence - minSequence);
for (SearchHit searchHit : response.getHits().getHits()) {
ProfileThreadSnapshotRecord record = builder.storage2Entity(
new HashMapConverter.ToEntity(searchHit.getSource()));
new ElasticSearchConverter.ToEntity(ProfileThreadSnapshotRecord.INDEX_NAME, searchHit.getSource()));
result.add(record);
}
......
......@@ -33,9 +33,9 @@ import org.apache.skywalking.library.elasticsearch.response.search.SearchRespons
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteData;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType;
import org.apache.skywalking.oap.server.core.storage.query.ITagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameGenerator;
......@@ -103,7 +103,7 @@ public class TagAutoCompleteQueryDAO extends EsDAO implements ITagAutoCompleteQu
Set<String> tagValues = new HashSet<>();
for (SearchHit searchHit : response.getHits().getHits()) {
TagAutocompleteData tag = new TagAutocompleteData.Builder().storage2Entity(
new HashMapConverter.ToEntity(searchHit.getSource()));
new ElasticSearchConverter.ToEntity(TagAutocompleteData.INDEX_NAME, searchHit.getSource()));
tagValues.add(tag.getTagValue());
}
return tagValues;
......
......@@ -40,11 +40,11 @@ import org.apache.skywalking.oap.server.core.query.type.Span;
import org.apache.skywalking.oap.server.core.query.type.TraceBrief;
import org.apache.skywalking.oap.server.core.query.type.TraceState;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameGenerator;
......@@ -174,7 +174,7 @@ public class TraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
List<SegmentRecord> segmentRecords = new ArrayList<>();
for (SearchHit searchHit : response.getHits().getHits()) {
SegmentRecord segmentRecord = new SegmentRecord.Builder().storage2Entity(
new HashMapConverter.ToEntity(searchHit.getSource()));
new ElasticSearchConverter.ToEntity(SegmentRecord.INDEX_NAME, searchHit.getSource()));
segmentRecords.add(segmentRecord);
}
return segmentRecords;
......
......@@ -35,10 +35,10 @@ import org.apache.skywalking.oap.server.core.query.input.DashboardSetting;
import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
import org.apache.skywalking.oap.server.core.query.type.TemplateChangeStatus;
import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
......@@ -64,7 +64,7 @@ public class UITemplateManagementEsDAO extends EsDAO implements UITemplateManage
UITemplate.Builder builder = new UITemplate.Builder();
SearchHit data = response.getHits().getHits().get(0);
return new DashboardConfiguration().fromEntity(
builder.storage2Entity(new HashMapConverter.ToEntity(data.getSource())));
builder.storage2Entity(new ElasticSearchConverter.ToEntity(UITemplate.INDEX_NAME, data.getSource())));
}
return null;
}
......@@ -93,7 +93,7 @@ public class UITemplateManagementEsDAO extends EsDAO implements UITemplateManage
for (SearchHit searchHit : response.getHits()) {
Map<String, Object> sourceAsMap = searchHit.getSource();
final UITemplate uiTemplate = builder.storage2Entity(new HashMapConverter.ToEntity(sourceAsMap));
final UITemplate uiTemplate = builder.storage2Entity(new ElasticSearchConverter.ToEntity(UITemplate.INDEX_NAME, sourceAsMap));
configs.add(new DashboardConfiguration().fromEntity(uiTemplate));
}
return configs;
......@@ -111,7 +111,7 @@ public class UITemplateManagementEsDAO extends EsDAO implements UITemplateManage
.build();
}
final HashMapConverter.ToStorage toStorage = new HashMapConverter.ToStorage();
final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(UITemplate.INDEX_NAME);
builder.entity2Storage(uiTemplate, toStorage);
getClient().forceInsert(UITemplate.INDEX_NAME, uiTemplate.id(), toStorage.obtain());
return TemplateChangeStatus.builder().status(true).id(uiTemplate.getTemplateId()).build();
......@@ -134,7 +134,7 @@ public class UITemplateManagementEsDAO extends EsDAO implements UITemplateManage
.message("Can't find the template").build();
}
final HashMapConverter.ToStorage toStorage = new HashMapConverter.ToStorage();
final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(UITemplate.INDEX_NAME);
builder.entity2Storage(uiTemplate, toStorage);
getClient().forceUpdate(UITemplate.INDEX_NAME, uiTemplate.id(), toStorage.obtain());
return TemplateChangeStatus.builder().status(true).id(setting.getId()).build();
......@@ -151,10 +151,10 @@ public class UITemplateManagementEsDAO extends EsDAO implements UITemplateManage
if (response.isPresent()) {
final UITemplate.Builder builder = new UITemplate.Builder();
final UITemplate uiTemplate = builder.storage2Entity(
new HashMapConverter.ToEntity(response.get().getSource()));
new ElasticSearchConverter.ToEntity(UITemplate.INDEX_NAME, response.get().getSource()));
uiTemplate.setDisabled(BooleanUtils.TRUE);
final HashMapConverter.ToStorage toStorage = new HashMapConverter.ToStorage();
final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(UITemplate.INDEX_NAME);
builder.entity2Storage(uiTemplate, toStorage);
getClient().forceUpdate(UITemplate.INDEX_NAME, uiTemplate.id(), toStorage.obtain());
return TemplateChangeStatus.builder().status(true).id(id).build();
......
......@@ -38,7 +38,6 @@ import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.storage.query.IZipkinQueryDAO;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.core.zipkin.ZipkinServiceRelationTraffic;
import org.apache.skywalking.oap.server.core.zipkin.ZipkinServiceSpanTraffic;
import org.apache.skywalking.oap.server.core.zipkin.ZipkinServiceTraffic;
......@@ -46,6 +45,7 @@ import org.apache.skywalking.oap.server.core.zipkin.ZipkinSpanRecord;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameGenerator;
......@@ -82,7 +82,7 @@ public class ZipkinQueryEsDAO extends EsDAO implements IZipkinQueryDAO {
for (SearchHit searchHit : response.getHits()) {
Map<String, Object> sourceAsMap = searchHit.getSource();
ZipkinServiceTraffic record = new ZipkinServiceTraffic.Builder().storage2Entity(
new HashMapConverter.ToEntity(sourceAsMap));
new ElasticSearchConverter.ToEntity(ZipkinServiceTraffic.INDEX_NAME, sourceAsMap));
services.add(record.getServiceName());
}
if (services.size() < SCROLLING_BATCH_SIZE) {
......@@ -110,7 +110,7 @@ public class ZipkinQueryEsDAO extends EsDAO implements IZipkinQueryDAO {
for (SearchHit searchHit : response.getHits()) {
Map<String, Object> sourceAsMap = searchHit.getSource();
ZipkinServiceRelationTraffic record = new ZipkinServiceRelationTraffic.Builder().storage2Entity(
new HashMapConverter.ToEntity(sourceAsMap));
new ElasticSearchConverter.ToEntity(ZipkinServiceRelationTraffic.INDEX_NAME, sourceAsMap));
remoteServices.add(record.getRemoteServiceName());
}
return remoteServices;
......@@ -129,7 +129,7 @@ public class ZipkinQueryEsDAO extends EsDAO implements IZipkinQueryDAO {
for (SearchHit searchHit : response.getHits()) {
Map<String, Object> sourceAsMap = searchHit.getSource();
ZipkinServiceSpanTraffic record = new ZipkinServiceSpanTraffic.Builder().storage2Entity(
new HashMapConverter.ToEntity(sourceAsMap));
new ElasticSearchConverter.ToEntity(ZipkinServiceSpanTraffic.INDEX_NAME, sourceAsMap));
spanNames.add(record.getSpanName());
}
return spanNames;
......@@ -151,7 +151,7 @@ public class ZipkinQueryEsDAO extends EsDAO implements IZipkinQueryDAO {
for (SearchHit searchHit : response.getHits()) {
Map<String, Object> sourceAsMap = searchHit.getSource();
ZipkinSpanRecord record = new ZipkinSpanRecord.Builder().storage2Entity(
new HashMapConverter.ToEntity(sourceAsMap));
new ElasticSearchConverter.ToEntity(ZipkinSpanRecord.INDEX_NAME, sourceAsMap));
trace.add(buildSpanFromRecord(record));
}
if (response.getHits().getHits().size() < SCROLLING_BATCH_SIZE) {
......@@ -261,7 +261,7 @@ public class ZipkinQueryEsDAO extends EsDAO implements IZipkinQueryDAO {
for (SearchHit searchHit : response.getHits()) {
Map<String, Object> sourceAsMap = searchHit.getSource();
ZipkinSpanRecord record = new ZipkinSpanRecord.Builder().storage2Entity(
new HashMapConverter.ToEntity(sourceAsMap));
new ElasticSearchConverter.ToEntity(ZipkinSpanRecord.INDEX_NAME, sourceAsMap));
Span span = buildSpanFromRecord(record);
String traceId = span.traceId();
groupedByTraceId.putIfAbsent(traceId, new ArrayList<>());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册