未验证 提交 9b3ef909 编写于 作者: J Jiajing LU 提交者: GitHub

Remove time_bucket tag for BanyanDB impl (#10202)

上级 ccc13cc1
......@@ -62,6 +62,7 @@
* Support server status watcher for `MetricsPersistentWorker` to check the metrics whether required initialization.
* Fix the meter value are not correct when using `sumPerMinLabeld` or `sumHistogramPercentile` MAL function.
* Fix cannot display attached events when using Zipkin Lens UI query traces.
* Remove `time_bucket` for both Stream and Measure kinds in BanyanDB plugin.
#### UI
......
......@@ -41,7 +41,7 @@ import java.util.List;
import java.util.Set;
public class BanyanDBAggregationQueryDAO extends AbstractBanyanDBDAO implements IAggregationQueryDAO {
private static final Set<String> TAGS = ImmutableSet.of(Metrics.ENTITY_ID, Metrics.TIME_BUCKET);
private static final Set<String> TAGS = ImmutableSet.of(Metrics.ENTITY_ID);
public BanyanDBAggregationQueryDAO(BanyanDBStorageClient client) {
super(client);
......@@ -56,8 +56,6 @@ public class BanyanDBAggregationQueryDAO extends AbstractBanyanDBDAO implements
@Override
protected void apply(MeasureQuery query) {
query.meanBy(valueColumnName, ImmutableSet.of(Metrics.ENTITY_ID));
query.and(lte(Metrics.TIME_BUCKET, duration.getEndTimeBucket()));
query.and(gte(Metrics.TIME_BUCKET, duration.getStartTimeBucket()));
if (condition.getOrder() == Order.DES) {
query.topN(condition.getTopN(), valueColumnName);
} else {
......
......@@ -29,6 +29,9 @@ import org.apache.skywalking.banyandb.v1.client.StreamWrite;
import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
import org.apache.skywalking.banyandb.v1.client.metadata.Serializable;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
......@@ -48,6 +51,11 @@ public class BanyanDBConverter {
@Override
public Object get(String fieldName) {
if (fieldName.equals(Record.TIME_BUCKET)) {
final String timestampColumnName = schema.getTimestampColumn4Stream();
long timestampMillis = ((Number) this.get(timestampColumnName)).longValue();
return TimeBucket.getTimeBucket(timestampMillis, schema.getMetadata().getDownSampling());
}
MetadataRegistry.ColumnSpec spec = schema.getSpec(fieldName);
if (double.class.equals(spec.getColumnClass())) {
return ByteUtil.bytes2Double(rowEntity.getTagValue(fieldName));
......@@ -70,6 +78,9 @@ public class BanyanDBConverter {
@Override
public void accept(String fieldName, Object fieldValue) {
if (fieldName.equals(Record.TIME_BUCKET)) {
return;
}
if (fieldName.equals(this.schema.getTimestampColumn4Stream())) {
streamWrite.setTimestamp((long) fieldValue);
}
......@@ -125,6 +136,9 @@ public class BanyanDBConverter {
@Override
public void accept(String fieldName, Object fieldValue) {
if (fieldName.equals(Metrics.TIME_BUCKET)) {
return;
}
MetadataRegistry.ColumnSpec columnSpec = this.schema.getSpec(fieldName);
if (columnSpec == null) {
throw new IllegalArgumentException("fail to find tag/field[" + fieldName + "]");
......@@ -229,6 +243,9 @@ public class BanyanDBConverter {
@Override
public Object get(String fieldName) {
if (fieldName.equals(Metrics.TIME_BUCKET)) {
return TimeBucket.getTimeBucket(dataPoint.getTimestamp(), schema.getMetadata().getDownSampling());
}
MetadataRegistry.ColumnSpec spec = schema.getSpec(fieldName);
switch (spec.getColumnType()) {
case TAG:
......
......@@ -40,7 +40,7 @@ import java.util.List;
import java.util.Set;
public class BanyanDBRecordsQueryDAO extends AbstractBanyanDBDAO implements IRecordsQueryDAO {
private static final Set<String> TAGS = ImmutableSet.of(TopN.TIME_BUCKET, TopN.ENTITY_ID, TopN.STATEMENT, TopN.TRACE_ID);
private static final Set<String> TAGS = ImmutableSet.of(TopN.ENTITY_ID, TopN.STATEMENT, TopN.TRACE_ID);
public BanyanDBRecordsQueryDAO(BanyanDBStorageClient client) {
super(client);
......@@ -62,8 +62,6 @@ public class BanyanDBRecordsQueryDAO extends AbstractBanyanDBDAO implements IRec
} else {
query.bottomN(condition.getTopN(), valueColumnName);
}
query.and(gte(TopN.TIME_BUCKET, duration.getStartTimeBucketInSec()));
query.and(lte(TopN.TIME_BUCKET, duration.getEndTimeBucketInSec()));
}
});
......
......@@ -60,6 +60,8 @@ import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.query.enumeration.Step;
import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
......@@ -288,12 +290,18 @@ public enum MetadataRegistry {
* Parse tags' metadata for {@link Stream}
* Every field of a class is registered as a {@link org.apache.skywalking.banyandb.model.v1.BanyandbModel.Tag}
* regardless of its dataType.
*
* @since 9.4.0 Skip {@link Record#TIME_BUCKET}
*/
List<TagMetadata> parseTagMetadata(Model model, Schema.SchemaBuilder builder) {
List<TagMetadata> tagMetadataList = new ArrayList<>();
for (final ModelColumn col : model.getColumns()) {
final String columnStorageName = col.getColumnName().getStorageName();
if (columnStorageName.equals(Record.TIME_BUCKET)) {
continue;
}
final TagFamilySpec.TagSpec tagSpec = parseTagSpec(col);
builder.spec(col.getColumnName().getStorageName(), new ColumnSpec(ColumnType.TAG, col.getType()));
builder.spec(columnStorageName, new ColumnSpec(ColumnType.TAG, col.getType()));
if (col.shouldIndex()) {
// build indexRule
IndexRule indexRule = parseIndexRule(tagSpec.getTagName(), col);
......@@ -310,6 +318,8 @@ public enum MetadataRegistry {
* Parse tags and fields' metadata for {@link Measure}.
* For field whose dataType is not {@link Column.ValueDataType#NOT_VALUE},
* it is registered as {@link org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure.DataPoint.Field}
*
* @since 9.4.0 Skip {@link Metrics#TIME_BUCKET}
*/
List<TagMetadata> parseTagAndFieldMetadata(Model model, Schema.SchemaBuilder builder) {
List<TagMetadata> tagMetadataList = new ArrayList<>();
......@@ -317,12 +327,16 @@ public enum MetadataRegistry {
Optional<ValueColumnMetadata.ValueColumn> valueColumnOpt = ValueColumnMetadata.INSTANCE
.readValueColumnDefinition(model.getName());
for (final ModelColumn col : model.getColumns()) {
if (valueColumnOpt.isPresent() && valueColumnOpt.get().getValueCName().equals(col.getColumnName().getStorageName())) {
builder.spec(col.getColumnName().getStorageName(), new ColumnSpec(ColumnType.FIELD, col.getType()));
final String columnStorageName = col.getColumnName().getStorageName();
if (columnStorageName.equals(Metrics.TIME_BUCKET)) {
continue;
}
if (valueColumnOpt.isPresent() && valueColumnOpt.get().getValueCName().equals(columnStorageName)) {
builder.spec(columnStorageName, new ColumnSpec(ColumnType.FIELD, col.getType()));
continue;
}
final TagFamilySpec.TagSpec tagSpec = parseTagSpec(col);
builder.spec(col.getColumnName().getStorageName(), new ColumnSpec(ColumnType.TAG, col.getType()));
builder.spec(columnStorageName, new ColumnSpec(ColumnType.TAG, col.getType()));
if (col.shouldIndex()) {
// build indexRule
IndexRule indexRule = parseIndexRule(tagSpec.getTagName(), col);
......
......@@ -44,7 +44,7 @@ public class BanyanDBNetworkAddressAliasDAO extends AbstractBanyanDBDAO implemen
private MetadataRegistry.Schema schema;
private static final Set<String> TAGS = ImmutableSet.of(NetworkAddressAlias.ADDRESS,
NetworkAddressAlias.TIME_BUCKET, NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET,
NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET,
NetworkAddressAlias.REPRESENT_SERVICE_ID, NetworkAddressAlias.REPRESENT_SERVICE_INSTANCE_ID);
public BanyanDBNetworkAddressAliasDAO(final BanyanDBStorageClient client) {
......
......@@ -48,7 +48,7 @@ import java.util.Set;
public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarmQueryDAO {
private static final Set<String> TAGS = ImmutableSet.of(AlarmRecord.SCOPE,
AlarmRecord.NAME, AlarmRecord.ID0, AlarmRecord.ID1, AlarmRecord.ALARM_MESSAGE, AlarmRecord.START_TIME,
AlarmRecord.TIME_BUCKET, AlarmRecord.RULE_NAME, AlarmRecord.TAGS, AlarmRecord.TAGS_RAW_DATA);
AlarmRecord.RULE_NAME, AlarmRecord.TAGS, AlarmRecord.TAGS_RAW_DATA);
public BanyanDBAlarmQueryDAO(BanyanDBStorageClient client) {
super(client);
......
......@@ -39,8 +39,7 @@ public class BanyanDBEBPFProfilingDataDAO extends AbstractBanyanDBDAO implements
EBPFProfilingDataRecord.STACK_ID_LIST,
EBPFProfilingDataRecord.TARGET_TYPE,
EBPFProfilingDataRecord.DATA_BINARY,
EBPFProfilingDataRecord.TASK_ID,
EBPFProfilingDataRecord.TIME_BUCKET);
EBPFProfilingDataRecord.TASK_ID);
public BanyanDBEBPFProfilingDataDAO(BanyanDBStorageClient client) {
super(client);
......
......@@ -54,8 +54,7 @@ public class BanyanDBEBPFProfilingTaskDAO extends AbstractBanyanDBDAO implements
EBPFProfilingTaskRecord.FIXED_TRIGGER_DURATION,
EBPFProfilingTaskRecord.TARGET_TYPE,
EBPFProfilingTaskRecord.CREATE_TIME,
EBPFProfilingTaskRecord.LAST_UPDATE_TIME,
EBPFProfilingTaskRecord.TIME_BUCKET);
EBPFProfilingTaskRecord.LAST_UPDATE_TIME);
private static final Gson GSON = new Gson();
......
......@@ -23,7 +23,7 @@ import org.apache.skywalking.banyandb.v1.client.AbstractQuery;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.profiling.trace.ProfileTaskRecord;
import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO;
......@@ -46,8 +46,7 @@ public class BanyanDBProfileTaskQueryDAO extends AbstractBanyanDBDAO implements
ProfileTaskRecord.DURATION,
ProfileTaskRecord.MIN_DURATION_THRESHOLD,
ProfileTaskRecord.DUMP_PERIOD,
ProfileTaskRecord.MAX_SAMPLING_COUNT,
Metrics.TIME_BUCKET
ProfileTaskRecord.MAX_SAMPLING_COUNT
);
private final int queryMaxSize;
......@@ -70,11 +69,12 @@ public class BanyanDBProfileTaskQueryDAO extends AbstractBanyanDBDAO implements
query.and(eq(ProfileTaskRecord.ENDPOINT_NAME, endpointName));
}
if (startTimeBucket != null) {
query.and(gte(Metrics.TIME_BUCKET, startTimeBucket));
query.and(gte(ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket)));
}
if (endTimeBucket != null) {
query.and(lte(Metrics.TIME_BUCKET, endTimeBucket));
query.and(lte(ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket)));
}
if (limit != null) {
query.setLimit(limit);
} else {
......
......@@ -49,7 +49,6 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
ProfileThreadSnapshotRecord.SEGMENT_ID,
ProfileThreadSnapshotRecord.DUMP_TIME,
ProfileThreadSnapshotRecord.SEQUENCE,
ProfileThreadSnapshotRecord.TIME_BUCKET,
ProfileThreadSnapshotRecord.STACK_BINARY);
private static final Set<String> TAGS_TRACE = ImmutableSet.of(SegmentRecord.TRACE_ID,
......@@ -67,7 +66,6 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
SegmentRecord.ENDPOINT_ID,
SegmentRecord.LATENCY,
SegmentRecord.START_TIME,
SegmentRecord.TIME_BUCKET,
SegmentRecord.DATA_BINARY);
private final int querySegmentMaxSize;
......
......@@ -67,7 +67,6 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
SegmentRecord.ENDPOINT_ID,
SegmentRecord.LATENCY,
SegmentRecord.START_TIME,
SegmentRecord.TIME_BUCKET,
SegmentRecord.DATA_BINARY);
public BanyanDBTraceQueryDAO(BanyanDBStorageClient client) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册