未验证 提交 bee8a86b 编写于 作者: J Jiajing LU 提交者: GitHub

Optimize BanyanDB interval rules (#10128)

* resolve #10104 #10102
上级 3fd5b058
......@@ -36,6 +36,10 @@
* Optimize `flushInterval` of ElasticSearch BulkProcessor to avoid extra periodical flush in the continuous bulk streams.
* An unexpected dot is added when exp is a pure metric name and expPrefix != null.
* Support monitoring MariaDB.
* Remove measure/stream specific interval settings in BanyanDB.
* Add global-specific settings used to override global configurations (e.g `segmentIntervalDays`, `blockIntervalHours`) in BanyanDB.
* Use TTL-driven interval settings for the `measure-default` group in BanyanDB.
* Fix wrong group of non time-relative metadata in BanyanDB.
#### UI
......
......@@ -235,10 +235,11 @@ storage:
superDatasetShardsFactor: ${SW_STORAGE_BANYANDB_SUPERDATASET_SHARDS_FACTOR:2}
concurrentWriteThreads: ${SW_STORAGE_BANYANDB_CONCURRENT_WRITE_THREADS:15}
profileTaskQueryMaxSize: ${SW_STORAGE_BANYANDB_PROFILE_TASK_QUERY_MAX_SIZE:200} # the max number of fetch task in a request
streamBlockInterval: ${SW_STORAGE_BANYANDB_STREAM_BLOCK_INTERVAL:4} # Unit is hour
streamSegmentInterval: ${SW_STORAGE_BANYANDB_STREAM_SEGMENT_INTERVAL:24} # Unit is hour
measureBlockInterval: ${SW_STORAGE_BANYANDB_MEASURE_BLOCK_INTERVAL:4} # Unit is hour
measureSegmentInterval: ${SW_STORAGE_BANYANDB_MEASURE_SEGMENT_INTERVAL:24} # Unit is hour
blockIntervalHours: ${SW_STORAGE_BANYANDB_BLOCK_INTERVAL_HOURS:24} # Unit is hour
segmentIntervalDays: ${SW_STORAGE_BANYANDB_SEGMENT_INTERVAL_DAYS:1} # Unit is day
superDatasetBlockIntervalHours: ${SW_STORAGE_BANYANDB_SUPER_DATASET_BLOCK_INTERVAL_HOURS:24} # Unit is hour
superDatasetSegmentIntervalDays: ${SW_STORAGE_BANYANDB_SUPER_DATASET_SEGMENT_INTERVAL_DAYS:1} # Unit is day
specificGroupSettings: ${SW_STORAGE_BANYANDB_SPECIFIC_GROUP_SETTINGS:""} # For example, {"group1": {"blockIntervalHours": 4, "segmentIntervalDays": 1}}
agent-analyzer:
selector: ${SW_AGENT_ANALYZER:default}
......
......@@ -40,6 +40,7 @@ public class BanyanDBIndexInstaller extends ModelInstaller {
public BanyanDBIndexInstaller(Client client, ModuleManager moduleManager, BanyanDBStorageConfig config) {
super(client, moduleManager);
this.config = config;
MetadataRegistry.INSTANCE.initializeIntervals(config.getSpecificGroupSettings());
}
@Override
......@@ -60,9 +61,9 @@ public class BanyanDBIndexInstaller extends ModelInstaller {
// then check entity schema
if (metadata.findRemoteSchema(c).isPresent()) {
// register models only locally but not remotely
if (model.isTimeSeries() && model.isRecord()) { // stream
if (model.isRecord()) { // stream
MetadataRegistry.INSTANCE.registerStreamModel(model, config, configService);
} else if (model.isTimeSeries() && !model.isRecord()) { // measure
} else { // measure
MetadataRegistry.INSTANCE.registerMeasureModel(model, config, configService);
}
return true;
......@@ -78,20 +79,18 @@ public class BanyanDBIndexInstaller extends ModelInstaller {
public void createTable(Model model) throws StorageException {
try {
ConfigService configService = moduleManager.find(CoreModule.NAME).provider().getService(ConfigService.class);
if (model.isTimeSeries() && model.isRecord()) { // stream
if (model.isRecord()) { // stream
Stream stream = MetadataRegistry.INSTANCE.registerStreamModel(model, config, configService);
if (stream != null) {
log.info("install stream schema {}", model.getName());
((BanyanDBStorageClient) client).define(stream);
}
} else if (model.isTimeSeries() && !model.isRecord()) { // measure
} else { // measure
Measure measure = MetadataRegistry.INSTANCE.registerMeasureModel(model, config, configService);
if (measure != null) {
log.info("install measure schema {}", model.getName());
((BanyanDBStorageClient) client).define(measure);
}
} else if (!model.isTimeSeries()) {
log.info("skip property index {}", model.getName());
}
} catch (IOException ex) {
throw new StorageException("fail to install schema", ex);
......
......@@ -57,31 +57,43 @@ public class BanyanDBStorageConfig extends ModuleConfig {
*/
private int superDatasetShardsFactor;
/**
* block interval for Stream group.
* Unit is hours.
* Default global block interval for non-super-dataset models.
* Unit is hour.
*
* @since 9.3.0
* @since 9.4.0
*/
private int streamBlockInterval;
private int blockIntervalHours;
/**
* segment interval for Stream group.
* Unit is hours.
* Default global segment interval for non-super-dataset models.
* Unit is day.
*
* @since 9.3.0
* @since 9.4.0
*/
private int streamSegmentInterval;
private int segmentIntervalDays;
/**
* block interval for Measure group.
* Unit is hours.
* Default global block interval for super-dataset models.
* Unit is hour.
*
* @since 9.3.0
* @since 9.4.0
*/
private int measureBlockInterval;
private int superDatasetBlockIntervalHours;
/**
* segment interval for Measure group.
* Unit is hours.
* Default global segment interval for super-dataset models.
* Unit is day.
*
* @since 9.3.0
* @since 9.4.0
*/
private int measureSegmentInterval;
private int superDatasetSegmentIntervalDays;
/**
* Specify the settings for each group individually. All groups created in BanyanDB can
* be found with <a href="https://skywalking.apache.org/docs/skywalking-banyandb/next/crud/group/#list-operation">bydbctl</a>.
* <p>
* NOTE: setting intervals works for all groups except `measure-default`.
* <p>
* NOTE: available groups: `measure-default`, `measure-sampled`, `stream-default`
* and `stream-*` with names of the super dataset as the suffix.
*
* @since 9.4.0
*/
private String specificGroupSettings;
}
......@@ -18,8 +18,12 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.JsonObject;
import io.grpc.Status;
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.util.ArrayList;
import java.util.Comparator;
......@@ -32,11 +36,14 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.Singular;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
......@@ -66,8 +73,11 @@ import org.apache.skywalking.oap.server.library.util.StringUtil;
public enum MetadataRegistry {
INSTANCE;
private static final ObjectMapper MAPPER = new ObjectMapper();
private final Map<String, Schema> registry = new HashMap<>();
private Map<String, GroupSetting> specificGroupSettings = new HashMap<>();
public Stream registerStreamModel(Model model, BanyanDBStorageConfig config, ConfigService configService) {
final SchemaMetadata schemaMetadata = parseMetadata(model, config, configService);
Schema.SchemaBuilder schemaBuilder = Schema.builder().metadata(schemaMetadata);
......@@ -90,7 +100,7 @@ public enum MetadataRegistry {
String timestampColumn4Stream = model.getBanyanDBModelExtension().getTimestampColumn();
if (StringUtil.isBlank(timestampColumn4Stream)) {
throw new IllegalStateException(
"Model[stream." + model.getName() + "] miss defined @BanyanDB.TimestampColumn");
"Model[stream." + model.getName() + "] miss defined @BanyanDB.TimestampColumn");
}
schemaBuilder.timestampColumn4Stream(timestampColumn4Stream);
List<IndexRule> indexRules = tags.stream()
......@@ -336,29 +346,70 @@ public enum MetadataRegistry {
return tagSpec;
}
public void initializeIntervals(String specificGroupSettingsStr) {
if (StringUtil.isBlank(specificGroupSettingsStr)) {
return;
}
try {
specificGroupSettings = MAPPER.readerFor(new TypeReference<Map<String, GroupSetting>>() {
}).readValue(specificGroupSettingsStr);
} catch (IOException ioEx) {
log.warn("fail to parse specificGroupSettings", ioEx);
}
}
public SchemaMetadata parseMetadata(Model model, BanyanDBStorageConfig config, ConfigService configService) {
if (model.isRecord()) {
String group = "stream-default";
String group;
if (model.isRecord()) { // stream
group = "stream-default";
if (model.isSuperDataset()) {
// for superDataset, we should use separate group
group = "stream-" + model.getName();
}
} else if (model.getDownsampling() == DownSampling.Minute && model.isTimeRelativeID()) { // measure
group = "measure-sampled";
} else {
// Solution: 2 * TTL < T * (1 + 0.8)
// e.g. if TTL=7, T=8: a new block/segment will be created at 14.4 days,
// while the first block has been deleted at 2*TTL
final int intervalDays = Double.valueOf(Math.ceil(configService.getMetricsDataTTL() * 2.0 / 1.8)).intValue();
return new SchemaMetadata("measure-default", model.getName(), Kind.MEASURE,
model.getDownsampling(),
config.getMetricsShardsNumber(),
intervalDays * 24,
intervalDays * 24, // use 10-day/240-hour strategy
configService.getMetricsDataTTL());
}
int blockIntervalHrs = config.getBlockIntervalHours();
int segmentIntervalDays = config.getSegmentIntervalDays();
if (model.isSuperDataset()) {
blockIntervalHrs = config.getSuperDatasetBlockIntervalHours();
segmentIntervalDays = config.getSuperDatasetSegmentIntervalDays();
}
GroupSetting groupSetting = this.specificGroupSettings.get(group);
if (groupSetting != null) {
blockIntervalHrs = groupSetting.getBlockIntervalHours();
segmentIntervalDays = groupSetting.getSegmentIntervalDays();
}
if (model.isRecord()) {
return new SchemaMetadata(group,
model.getName(),
Kind.STREAM,
model.getDownsampling(),
config.getRecordShardsNumber() *
(model.isSuperDataset() ? config.getSuperDatasetShardsFactor() : 1),
config.getStreamBlockInterval(),
config.getStreamSegmentInterval(),
blockIntervalHrs,
segmentIntervalDays * 24,
configService.getRecordDataTTL()
);
}
return new SchemaMetadata("measure-default", model.getName(), Kind.MEASURE,
// FIX: address issue #10104
return new SchemaMetadata(group, model.getName(), Kind.MEASURE,
model.getDownsampling(),
config.getMetricsShardsNumber(),
config.getMeasureBlockInterval(),
config.getMeasureSegmentInterval(),
blockIntervalHrs,
segmentIntervalDays * 24,
configService.getMetricsDataTTL());
}
......@@ -542,4 +593,12 @@ public enum MetadataRegistry {
public enum ColumnType {
TAG, FIELD;
}
@Getter
@Setter
@NoArgsConstructor
public static class GroupSetting {
private int blockIntervalHours;
private int segmentIntervalDays;
}
}
......@@ -23,6 +23,6 @@ SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
SW_KUBERNETES_COMMIT_SHA=b670c41d94a82ddefcf466d54bab5c492d88d772
SW_ROVER_COMMIT=d956eaede57b62108b78bca48045bd09ba88e653
SW_BANYANDB_COMMIT=566314a3d72299879233e8b66be218df3b73b6e0
SW_BANYANDB_COMMIT=005b02210caacee0141de8085edebed367ef5a6f
SW_CTL_COMMIT=e684fae0107045fc23799146d62f04cb68bd5a3b
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册