提交 4c16930a 编写于 作者: G Gao Hongtao

Add the ID tag to process_traffic for searching

Signed-off-by: NGao Hongtao <hanahmily@gmail.com>
上级 8ea8dbc0
......@@ -52,6 +52,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PR
"name",
})
@SQLDatabase.Sharding(shardingAlgorithm = ShardingAlgorithm.NO_SHARDING)
@BanyanDB.StoreIDTag
public class ProcessTraffic extends Metrics {
public static final String INDEX_NAME = "process_traffic";
public static final String SERVICE_ID = "service_id";
......
......@@ -140,4 +140,13 @@ public @interface BanyanDB {
@Retention(RetentionPolicy.RUNTIME)
@interface MeasureField {
}
/**
* StoreIDTag indicates a metric store its ID as a tag for searching.
* @Since 9.4.0
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@interface StoreIDTag {
}
}
......@@ -37,4 +37,12 @@ public class BanyanDBModelExtension {
@Setter
private String timestampColumn;
/**
* shouldStoreIDTag indicates whether a metric store its ID as a tag.
* The installer will create a virtual string ID tag with a tree index rule.
*/
@Getter
@Setter
private boolean shouldStoreIDTag;
}
......@@ -93,6 +93,10 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
banyanDBModelExtension.setTimestampColumn(timestampColumn);
}
if (aClass.isAnnotationPresent(BanyanDB.StoreIDTag.class)) {
banyanDBModelExtension.setShouldStoreIDTag(true);
}
checker.check(storage.getModelName());
Model model = new Model(
......
......@@ -40,6 +40,9 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.util.ByteUtil;
import java.util.List;
public class BanyanDBConverter {
public static final String ID = "id";
public static class StorageToStream implements Convert2Entity {
private final MetadataRegistry.Schema schema;
private final RowEntity rowEntity;
......@@ -154,6 +157,14 @@ public class BanyanDBConverter {
}
}
public void acceptID(String id) {
try {
this.measureWrite.tag(ID, TagAndValue.stringTagValue(id));
} catch (BanyanDBException ex) {
log.error("fail to add ID tag", ex);
}
}
@Override
public void accept(String fieldName, byte[] fieldValue) {
MetadataRegistry.ColumnSpec columnSpec = this.schema.getSpec(fieldName);
......
......@@ -94,7 +94,7 @@ public enum MetadataRegistry {
// 1) a list of TagFamilySpec,
// 2) a list of IndexRule,
List<TagMetadata> tags = parseTagMetadata(model, schemaBuilder);
List<TagFamilySpec> tagFamilySpecs = schemaMetadata.extractTagFamilySpec(tags);
List<TagFamilySpec> tagFamilySpecs = schemaMetadata.extractTagFamilySpec(tags, false);
// iterate over tagFamilySpecs to save tag names
for (final TagFamilySpec tagFamilySpec : tagFamilySpecs) {
for (final TagFamilySpec.TagSpec tagSpec : tagFamilySpec.tagSpecs()) {
......@@ -138,7 +138,7 @@ public enum MetadataRegistry {
// 1) a list of TagFamilySpec,
// 2) a list of IndexRule,
MeasureMetadata tagsAndFields = parseTagAndFieldMetadata(model, schemaBuilder);
List<TagFamilySpec> tagFamilySpecs = schemaMetadata.extractTagFamilySpec(tagsAndFields.tags);
List<TagFamilySpec> tagFamilySpecs = schemaMetadata.extractTagFamilySpec(tagsAndFields.tags, model.getBanyanDBModelExtension().isShouldStoreIDTag());
// iterate over tagFamilySpecs to save tag names
for (final TagFamilySpec tagFamilySpec : tagFamilySpecs) {
for (final TagFamilySpec.TagSpec tagSpec : tagFamilySpec.tagSpecs()) {
......@@ -150,6 +150,10 @@ public enum MetadataRegistry {
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (model.getBanyanDBModelExtension().isShouldStoreIDTag()) {
indexRules.add(IndexRule.create(BanyanDBConverter.ID, IndexRule.IndexType.TREE, IndexRule.IndexLocation.SERIES));
}
final Measure.Builder builder = Measure.create(schemaMetadata.getGroup(), schemaMetadata.name(),
downSamplingDuration(model.getDownsampling()));
builder.setEntityRelativeTags(shardingColumns);
......@@ -514,14 +518,19 @@ public enum MetadataRegistry {
}
}
private List<TagFamilySpec> extractTagFamilySpec(List<TagMetadata> tagMetadataList) {
private List<TagFamilySpec> extractTagFamilySpec(List<TagMetadata> tagMetadataList, boolean shouldAddID) {
final String indexFamily = SchemaMetadata.this.indexFamily();
final String nonIndexFamily = SchemaMetadata.this.nonIndexFamily();
Map<String, List<TagMetadata>> tagMetadataMap = tagMetadataList.stream()
.collect(Collectors.groupingBy(tagMetadata -> tagMetadata.isIndex() ? SchemaMetadata.this.indexFamily() : SchemaMetadata.this.nonIndexFamily()));
.collect(Collectors.groupingBy(tagMetadata -> tagMetadata.isIndex() ? indexFamily : nonIndexFamily));
final List<TagFamilySpec> tagFamilySpecs = new ArrayList<>(tagMetadataMap.size());
for (final Map.Entry<String, List<TagMetadata>> entry : tagMetadataMap.entrySet()) {
final TagFamilySpec.Builder b = TagFamilySpec.create(entry.getKey())
.addTagSpecs(entry.getValue().stream().map(TagMetadata::getTagSpec).collect(Collectors.toList()));
if (shouldAddID && indexFamily.equals(entry.getKey())) {
b.addTagSpec(TagFamilySpec.TagSpec.newStringTag(BanyanDBConverter.ID));
}
tagFamilySpecs.add(b.build());
}
......
......@@ -165,15 +165,15 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
@Override
public ServiceInstance getInstance(String instanceId) throws IOException {
IDManager.ServiceInstanceID.InstanceIDDefinition id = IDManager.ServiceInstanceID.analysisId(instanceId);
MeasureQueryResponse resp = query(InstanceTraffic.INDEX_NAME,
INSTANCE_TRAFFIC_COMPACT_TAGS,
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
if (StringUtil.isNotEmpty(instanceId)) {
query.and(id(instanceId));
}
query.and(eq(InstanceTraffic.SERVICE_ID, id.getServiceId()))
.and(eq(InstanceTraffic.NAME, id.getName()));
}
});
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(InstanceTraffic.INDEX_NAME, DownSampling.Minute);
......@@ -327,7 +327,7 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
@Override
protected void apply(MeasureQuery query) {
if (StringUtil.isNotEmpty(processId)) {
query.and(id(processId));
query.and(eq(BanyanDBConverter.ID, processId));
}
}
});
......
......@@ -138,6 +138,9 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD
TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling())); // timestamp
final BanyanDBConverter.MeasureToStorage toStorage = new BanyanDBConverter.MeasureToStorage(schema, measureWrite);
storageBuilder.entity2Storage(metrics, toStorage);
if (model.getBanyanDBModelExtension().isShouldStoreIDTag()) {
toStorage.acceptID(metrics.id().build());
}
return new BanyanDBMeasureInsertRequest(toStorage.obtain(), callback);
}
......@@ -153,6 +156,9 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD
TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling())); // timestamp
final BanyanDBConverter.MeasureToStorage toStorage = new BanyanDBConverter.MeasureToStorage(schema, measureWrite);
storageBuilder.entity2Storage(metrics, toStorage);
if (model.getBanyanDBModelExtension().isShouldStoreIDTag()) {
toStorage.acceptID(metrics.id().build());
}
return new BanyanDBMeasureUpdateRequest(toStorage.obtain());
}
......
......@@ -29,7 +29,6 @@ import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.banyandb.v1.client.TimestampRange;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
......@@ -146,10 +145,6 @@ public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageCli
return PairQueryCondition.LongQueryCondition.ne(name, value);
}
protected PairQueryCondition<String> id(String value) {
return PairQueryCondition.IDQueryCondition.eq(Metrics.ID, value);
}
protected AbstractQuery.OrderBy desc(String name) {
return new AbstractQuery.OrderBy(name, AbstractQuery.Sort.DESC);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册