未验证 提交 217bb624 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Support percentile function in the meter system. (#4725)

* Support percentile function in the meter system.

* Update document.

* Fix formats.
上级 4a8f86f8
......@@ -17,5 +17,8 @@ The values of scope entity name, such as service name, are required when metrics
NOTICE, the metrics must be declared in the bootstrap stage, no runtime changed.
Meter System supports following binding functions
- **avg**, calculate the avg value for every entity in the same metrics name.
- **histogram**. aggregate the counts in the configurable buckets.
- **avg**. Calculate the avg value for every entity in the same metrics name.
- **histogram**. Aggregate the counts in the configurable buckets, buckets is configurable but must be assigned in the declaration stage.
- **percentile**. Read [percentile in WIKI](https://en.wikipedia.org/wiki/Percentile). Unlike in the OAL, we provide
50/75/90/95/99 in default, in the meter system function, percentile function accepts several ranks, which should be in
the (0, 100) range.
......@@ -139,7 +139,7 @@ public class AnalysisResult {
} else if (columnType.equals(long.class)) {
serializeFields.addLongField(column.getFieldName());
} else if (StorageDataComplexObject.class.isAssignableFrom(columnType)) {
serializeFields.addObjectField(column.getFieldName());
serializeFields.addObjectField(column.getFieldName(), columnType.getName());
} else {
throw new IllegalStateException(
"Unexpected field type [" + columnType.getSimpleName() + "] of persistence column [" + column
......
......@@ -29,23 +29,23 @@ public class PersistenceColumns {
private List<PersistenceField> objectFields = new LinkedList<>();
public void addStringField(String fieldName) {
stringFields.add(new PersistenceField(fieldName));
stringFields.add(new PersistenceField(fieldName, "String"));
}
public void addLongField(String fieldName) {
longFields.add(new PersistenceField(fieldName));
longFields.add(new PersistenceField(fieldName, "long"));
}
public void addDoubleField(String fieldName) {
doubleFields.add(new PersistenceField(fieldName));
doubleFields.add(new PersistenceField(fieldName, "double"));
}
public void addIntField(String fieldName) {
intFields.add(new PersistenceField(fieldName));
intFields.add(new PersistenceField(fieldName, "int"));
}
public void addObjectField(String fieldName) {
objectFields.add(new PersistenceField(fieldName));
public void addObjectField(String fieldName, String fieldType) {
objectFields.add(new PersistenceField(fieldName, fieldType));
}
public List<PersistenceField> getStringFields() {
......
......@@ -29,10 +29,12 @@ public class PersistenceField {
private String fieldName;
private String setter;
private String getter;
private String fieldType;
public PersistenceField(String fieldName) {
public PersistenceField(String fieldName, String fieldType) {
this.fieldName = fieldName;
this.setter = ClassMethodUtil.toSetMethod(fieldName);
this.getter = ClassMethodUtil.toGetMethod(fieldName);
this.fieldType = fieldType;
}
}
......@@ -16,7 +16,7 @@ public void deserialize(org.apache.skywalking.oap.server.core.remote.grpc.proto.
</#list>
<#list serializeFields.objectFields as field>
${field.setter}(new org.apache.skywalking.oap.server.core.analysis.metrics.DataTable(remoteData.getDataObjectStrings(${field?index})));
${field.setter}(new ${field.fieldType}(remoteData.getDataObjectStrings(${field?index})));
</#list>
}
\ No newline at end of file
......@@ -159,13 +159,18 @@ public class MeterSystem implements Service {
boolean foundDataType = false;
String acceptance = null;
for (final Type genericInterface : meterFunction.getGenericInterfaces()) {
ParameterizedType parameterizedType = (ParameterizedType) genericInterface;
if (parameterizedType.getRawType().getTypeName().equals(AcceptableValue.class.getName())) {
Type[] arguments = parameterizedType.getActualTypeArguments();
if (arguments[0].equals(dataType)) {
foundDataType = true;
} else {
acceptance = arguments[0].getTypeName();
if (genericInterface instanceof ParameterizedType) {
ParameterizedType parameterizedType = (ParameterizedType) genericInterface;
if (parameterizedType.getRawType().getTypeName().equals(AcceptableValue.class.getName())) {
Type[] arguments = parameterizedType.getActualTypeArguments();
if (arguments[0].equals(dataType)) {
foundDataType = true;
} else {
acceptance = arguments[0].getTypeName();
}
}
if (foundDataType) {
break;
}
}
}
......
......@@ -27,7 +27,6 @@ import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
......@@ -58,13 +57,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal
@Setter
@Column(columnName = DATASET, dataType = Column.ValueDataType.HISTOGRAM, storageOnly = true, defaultValue = 0)
private DataTable dataset = new DataTable(30);
/**
* Service ID is required for sort query.
*/
@Setter
@Getter
@Column(columnName = InstanceTraffic.SERVICE_ID)
private String serviceId;
@Override
public void accept(final MeterEntity entity, final BucketedValues value) {
......@@ -76,7 +68,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal
}
this.entityId = entity.id();
this.serviceId = entity.serviceId();
final long[] values = value.getValues();
for (int i = 0; i < values.length; i++) {
......@@ -110,7 +101,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal
HistogramFunction metrics = (HistogramFunction) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInHour());
metrics.setServiceId(getServiceId());
metrics.setDataset(getDataset());
return metrics;
}
......@@ -120,7 +110,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal
HistogramFunction metrics = (HistogramFunction) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInDay());
metrics.setServiceId(getServiceId());
metrics.setDataset(getDataset());
return metrics;
}
......@@ -135,7 +124,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal
this.setTimeBucket(remoteData.getDataLongs(0));
this.setEntityId(remoteData.getDataStrings(0));
this.setServiceId(remoteData.getDataStrings(1));
this.setDataset(new DataTable(remoteData.getDataObjectStrings(0)));
}
......@@ -146,7 +134,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal
remoteBuilder.addDataLongs(getTimeBucket());
remoteBuilder.addDataStrings(entityId);
remoteBuilder.addDataStrings(serviceId);
remoteBuilder.addDataObjectStrings(dataset.toStorageData());
......@@ -175,7 +162,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal
};
metrics.setDataset(new DataTable((String) dbMap.get(DATASET)));
metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
metrics.setServiceId((String) dbMap.get(InstanceTraffic.SERVICE_ID));
metrics.setEntityId((String) dbMap.get(ENTITY_ID));
return metrics;
}
......@@ -185,7 +171,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal
Map<String, Object> map = new HashMap<>();
map.put(DATASET, storageData.getDataset());
map.put(TIME_BUCKET, storageData.getTimeBucket());
map.put(InstanceTraffic.SERVICE_ID, storageData.getServiceId());
map.put(ENTITY_ID, storageData.getEntityId());
return map;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.meter.function;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.MultiIntValuesHolder;
import org.apache.skywalking.oap.server.core.analysis.metrics.PercentileMetrics;
import org.apache.skywalking.oap.server.core.query.type.Bucket;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
* PercentileFunction is the implementation of {@link PercentileMetrics} in the meter system. The major difference is
* the PercentileFunction accepts the {@link PercentileArgument} as input rather than every single request.
*/
@MeterFunction(functionName = "percentile")
@Slf4j
@EqualsAndHashCode(of = {
"entityId",
"timeBucket"
})
public abstract class PercentileFunction extends Metrics implements AcceptableValue<PercentileFunction.PercentileArgument>, MultiIntValuesHolder {
public static final String DATASET = "dataset";
public static final String RANKS = "ranks";
public static final String VALUE = "value";
@Setter
@Getter
@Column(columnName = ENTITY_ID)
private String entityId;
@Getter
@Setter
@Column(columnName = VALUE, dataType = Column.ValueDataType.LABELED_VALUE, storageOnly = true)
private DataTable percentileValues = new DataTable(10);
@Getter
@Setter
@Column(columnName = DATASET, storageOnly = true)
private DataTable dataset = new DataTable(30);
/**
* Rank
*/
@Getter
@Setter
@Column(columnName = RANKS, storageOnly = true)
private IntList ranks = new IntList(10);
private boolean isCalculated = false;
@Override
public void accept(final MeterEntity entity, final PercentileArgument value) {
if (dataset.size() > 0) {
if (!value.getBucketedValues().isCompatible(dataset)) {
throw new IllegalArgumentException(
"Incompatible BucketedValues [" + value + "] for current PercentileFunction[" + dataset + "]");
}
}
for (final int rank : value.getRanks()) {
if (rank <= 0) {
throw new IllegalArgumentException("Illegal rank value " + rank + ", must be positive");
}
}
if (ranks.size() > 0) {
if (ranks.size() != value.getRanks().length) {
throw new IllegalArgumentException(
"Incompatible ranks size = [" + value.getRanks().length + "] for current PercentileFunction[" + ranks
.size() + "]");
} else {
for (final int rank : value.getRanks()) {
if (!ranks.include(rank)) {
throw new IllegalArgumentException(
"Rank " + rank + " doesn't exist in the previous ranks " + ranks);
}
}
}
} else {
for (final int rank : value.getRanks()) {
ranks.add(rank);
}
}
this.entityId = entity.id();
final long[] values = value.getBucketedValues().getValues();
for (int i = 0; i < values.length; i++) {
final long bucket = value.getBucketedValues().getBuckets()[i];
String bucketName = bucket == Integer.MIN_VALUE ? Bucket.INFINITE_NEGATIVE : String.valueOf(bucket);
final long bucketValue = values[i];
dataset.valueAccumulation(bucketName, bucketValue);
}
this.isCalculated = false;
}
@Override
public void combine(final Metrics metrics) {
PercentileFunction percentile = (PercentileFunction) metrics;
if (!dataset.keysEqual(percentile.getDataset())) {
log.warn("Incompatible input [{}}] for current HistogramFunction[{}], entity {}",
percentile, this, entityId
);
return;
}
if (ranks.size() > 0) {
IntList ranksOfThat = percentile.getRanks();
if (this.ranks.size() != ranks.size()) {
log.warn("Incompatible ranks size = [{}}] for current PercentileFunction[{}]",
ranks.size(), this.ranks.size()
);
return;
} else {
if (!this.ranks.equals(percentile.getRanks())) {
log.warn("Rank {} doesn't exist in the previous ranks {}", percentile.getRanks(), ranks);
return;
}
}
}
this.dataset.append(percentile.dataset);
this.isCalculated = false;
}
@Override
public void calculate() {
if (!isCalculated) {
long total = dataset.sumOfValues();
int[] roofs = new int[ranks.size()];
for (int i = 0; i < ranks.size(); i++) {
roofs[i] = Math.round(total * ranks.get(i) * 1.0f / 100);
}
int count = 0;
final List<String> sortedKeys = dataset.sortedKeys(Comparator.comparingInt(Integer::parseInt));
int loopIndex = 0;
for (String key : sortedKeys) {
final Long value = dataset.get(key);
count += value;
for (int rankIdx = loopIndex; rankIdx < roofs.length; rankIdx++) {
int roof = roofs[rankIdx];
if (count >= roof) {
percentileValues.put(String.valueOf(ranks.get(rankIdx)), Long.parseLong(key));
loopIndex++;
} else {
break;
}
}
}
}
}
@Override
public Metrics toHour() {
PercentileFunction metrics = (PercentileFunction) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInHour());
metrics.setDataset(getDataset());
metrics.setRanks(getRanks());
metrics.setPercentileValues(getPercentileValues());
return metrics;
}
@Override
public Metrics toDay() {
PercentileFunction metrics = (PercentileFunction) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInDay());
metrics.setDataset(getDataset());
metrics.setRanks(getRanks());
metrics.setPercentileValues(getPercentileValues());
return metrics;
}
@Override
public int[] getValues() {
return percentileValues.sortedValues(Comparator.comparingInt(Integer::parseInt))
.stream()
.flatMapToInt(l -> IntStream.of(l.intValue()))
.toArray();
}
@Override
public int remoteHashCode() {
return entityId.hashCode();
}
@Override
public void deserialize(final RemoteData remoteData) {
this.setTimeBucket(remoteData.getDataLongs(0));
this.setEntityId(remoteData.getDataStrings(0));
this.setDataset(new DataTable(remoteData.getDataObjectStrings(0)));
this.setRanks(new IntList(remoteData.getDataObjectStrings(1)));
this.setPercentileValues(new DataTable(remoteData.getDataObjectStrings(2)));
}
@Override
public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.addDataLongs(getTimeBucket());
remoteBuilder.addDataStrings(entityId);
remoteBuilder.addDataObjectStrings(dataset.toStorageData());
remoteBuilder.addDataObjectStrings(ranks.toStorageData());
remoteBuilder.addDataObjectStrings(percentileValues.toStorageData());
return remoteBuilder;
}
@Override
public String id() {
return getTimeBucket() + Const.ID_CONNECTOR + entityId;
}
@Override
public Class<? extends StorageBuilder> builder() {
return PercentileFunctionBuilder.class;
}
@RequiredArgsConstructor
@Getter
public static class PercentileArgument {
private final BucketedValues bucketedValues;
private final int[] ranks;
}
public static class PercentileFunctionBuilder implements StorageBuilder<PercentileFunction> {
@Override
public PercentileFunction map2Data(final Map<String, Object> dbMap) {
PercentileFunction metrics = new PercentileFunction() {
@Override
public AcceptableValue<PercentileArgument> createNew() {
throw new UnexpectedException("createNew should not be called");
}
};
metrics.setDataset(new DataTable((String) dbMap.get(DATASET)));
metrics.setRanks(new IntList((String) dbMap.get(RANKS)));
metrics.setPercentileValues(new DataTable((String) dbMap.get(VALUE)));
metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
metrics.setEntityId((String) dbMap.get(ENTITY_ID));
return metrics;
}
@Override
public Map<String, Object> data2Map(final PercentileFunction storageData) {
Map<String, Object> map = new HashMap<>();
map.put(DATASET, storageData.getDataset());
map.put(RANKS, storageData.getRanks());
map.put(VALUE, storageData.getPercentileValues());
map.put(TIME_BUCKET, storageData.getTimeBucket());
map.put(ENTITY_ID, storageData.getEntityId());
return map;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.metrics;
import java.util.ArrayList;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
/**
* IntList is a serializable array list carrying int values.
*/
@ToString
@EqualsAndHashCode
public class IntList implements StorageDataComplexObject<IntList> {
private List<Integer> data;
public IntList(int initialSize) {
this.data = new ArrayList(initialSize);
}
public IntList(String valueString) {
toObject(valueString);
}
public int size() {
return data.size();
}
public boolean include(int value) {
return data.contains(value);
}
@Override
public String toStorageData() {
StringBuilder builder = new StringBuilder();
this.data.forEach(element -> {
if (builder.length() != 0) {
// For the first element.
builder.append(Const.ARRAY_SPLIT);
}
builder.append(element);
});
return builder.toString();
}
@Override
public void toObject(final String data) {
String[] elements = data.split(Const.ARRAY_PARSER_SPLIT);
this.data = new ArrayList<>(elements.length);
for (String element : elements) {
this.data.add(Integer.parseInt(element));
}
}
@Override
public void copyFrom(final IntList source) {
this.data.addAll(source.data);
}
public void add(final int rank) {
this.data.add(rank);
}
public int get(final int idx) {
return this.data.get(idx);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.meter.function;
import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.junit.Assert;
import org.junit.Test;
public class PercentileFunctionTest {
private static final int[] BUCKETS = new int[] {
0,
50,
100,
250
};
private static final int[] BUCKETS_2ND = new int[] {
0,
51,
100,
250
};
private static final int[] RANKS = new int[] {
50,
90
};
@Test
public void testFunction() {
PercentileFunctionInst inst = new PercentileFunctionInst();
inst.accept(
MeterEntity.newService("service-test"),
new PercentileFunction.PercentileArgument(
new BucketedValues(
BUCKETS,
new long[] {
10,
20,
30,
40
}
),
RANKS
)
);
inst.accept(
MeterEntity.newService("service-test"),
new PercentileFunction.PercentileArgument(
new BucketedValues(
BUCKETS,
new long[] {
10,
20,
30,
40
}
),
RANKS
)
);
inst.calculate();
final int[] values = inst.getValues();
/**
* Expected percentile dataset
* <pre>
* 0 , 20
* 50 , 40
* 100, 60 <- P50
* 250, 80 <- P90
* </pre>
*/
Assert.assertArrayEquals(new int[] {
100,
250
}, values);
}
@Test(expected = IllegalArgumentException.class)
public void testIncompatible() {
PercentileFunctionInst inst = new PercentileFunctionInst();
inst.accept(
MeterEntity.newService("service-test"),
new PercentileFunction.PercentileArgument(
new BucketedValues(
BUCKETS,
new long[] {
10,
20,
30,
40
}
),
RANKS
)
);
inst.accept(
MeterEntity.newService("service-test"),
new PercentileFunction.PercentileArgument(
new BucketedValues(
BUCKETS_2ND,
new long[] {
10,
20,
30,
40
}
),
RANKS
)
);
}
@Test
public void testSerialization() {
PercentileFunctionInst inst = new PercentileFunctionInst();
inst.accept(
MeterEntity.newService("service-test"),
new PercentileFunction.PercentileArgument(
new BucketedValues(
BUCKETS,
new long[] {
10,
20,
30,
40
}
),
RANKS
)
);
PercentileFunctionInst inst2 = new PercentileFunctionInst();
inst2.deserialize(inst.serialize().build());
Assert.assertEquals(inst, inst2);
// HistogramFunction equal doesn't include dataset.
Assert.assertEquals(inst.getDataset(), inst2.getDataset());
Assert.assertEquals(inst.getRanks(), inst2.getRanks());
Assert.assertEquals(0, inst2.getPercentileValues().size());
}
@Test
public void testBuilder() throws IllegalAccessException, InstantiationException {
PercentileFunctionInst inst = new PercentileFunctionInst();
inst.accept(
MeterEntity.newService("service-test"),
new PercentileFunction.PercentileArgument(
new BucketedValues(
BUCKETS,
new long[] {
10,
20,
30,
40
}
),
RANKS
)
);
inst.calculate();
final StorageBuilder storageBuilder = inst.builder().newInstance();
// Simulate the storage layer do, convert the datatable to string.
final Map map = storageBuilder.data2Map(inst);
map.put(PercentileFunction.DATASET, ((DataTable) map.get(PercentileFunction.DATASET)).toStorageData());
map.put(PercentileFunction.VALUE, ((DataTable) map.get(PercentileFunction.VALUE)).toStorageData());
map.put(PercentileFunction.RANKS, ((IntList) map.get(PercentileFunction.RANKS)).toStorageData());
final PercentileFunction inst2 = (PercentileFunction) storageBuilder.map2Data(map);
Assert.assertEquals(inst, inst2);
// HistogramFunction equal doesn't include dataset.
Assert.assertEquals(inst.getDataset(), inst2.getDataset());
Assert.assertEquals(inst.getPercentileValues(), inst2.getPercentileValues());
Assert.assertEquals(inst.getRanks(), inst2.getRanks());
}
private static class PercentileFunctionInst extends PercentileFunction {
@Override
public AcceptableValue<PercentileArgument> createNew() {
return new PercentileFunctionInst();
}
}
}
......@@ -27,6 +27,7 @@ import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
import org.apache.skywalking.oap.server.core.analysis.meter.function.BucketedValues;
import org.apache.skywalking.oap.server.core.analysis.meter.function.PercentileFunction;
import org.apache.skywalking.oap.server.fetcher.prometheus.module.PrometheusFetcherModule;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
......@@ -64,6 +65,10 @@ public class PrometheusFetcherProvider extends ModuleProvider {
final MeterSystem meterSystem = MeterSystem.meterSystem(getManager());
meterSystem.create("test_long_metrics", "avg", ScopeType.SERVICE, Long.class);
meterSystem.create("test_histogram_metrics", "histogram", ScopeType.SERVICE, BucketedValues.class);
meterSystem.create(
"test_percentile_metrics", "percentile", ScopeType.SERVICE,
PercentileFunction.PercentileArgument.class
);
}
}
......@@ -109,6 +114,38 @@ public class PrometheusFetcherProvider extends ModuleProvider {
}
));
service.doStreamingCalculation(histogramMetrics);
// Percentile Example
final AcceptableValue<PercentileFunction.PercentileArgument> testPercentileMetrics = service.buildMetrics(
"test_percentile_metrics", PercentileFunction.PercentileArgument.class);
testPercentileMetrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
testPercentileMetrics.accept(
MeterEntity.newService("service-test"),
new PercentileFunction.PercentileArgument(
new BucketedValues(
// Buckets
new int[] {
0,
51,
100,
250
},
// Values
new long[] {
10,
20,
30,
40
}
),
// Ranks
new int[] {
50,
90
}
)
);
service.doStreamingCalculation(testPercentileMetrics);
}
}, 2, 2, TimeUnit.SECONDS);
}
......
......@@ -20,8 +20,8 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import com.google.gson.JsonObject;
import org.apache.skywalking.oap.server.core.analysis.NodeType;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.storage.model.DataTypeMapping;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
public class ColumnTypeEsMapping implements DataTypeMapping {
......@@ -35,7 +35,7 @@ public class ColumnTypeEsMapping implements DataTypeMapping {
return "double";
} else if (String.class.equals(type)) {
return "keyword";
} else if (DataTable.class.equals(type)) {
} else if (StorageDataComplexObject.class.isAssignableFrom(type)) {
return "text";
} else if (byte[].class.equals(type)) {
return "binary";
......
......@@ -23,12 +23,12 @@ import java.sql.Connection;
import java.sql.SQLException;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.NodeType;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.ColumnName;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
......@@ -98,7 +98,7 @@ public class H2TableInstaller extends ModelInstaller {
return "DOUBLE";
} else if (String.class.equals(type)) {
return "VARCHAR(" + column.getLength() + ")";
} else if (DataTable.class.equals(type)) {
} else if (StorageDataComplexObject.class.isAssignableFrom(type)) {
return "VARCHAR(20000)";
} else if (byte[].class.equals(type)) {
return "MEDIUMTEXT";
......
......@@ -22,11 +22,11 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.ExtraQueryIndex;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
......@@ -105,7 +105,7 @@ public class MySQLTableInstaller extends H2TableInstaller {
@Override
protected String getColumnType(final ModelColumn column) {
if (DataTable.class.equals(column.getType())) {
if (StorageDataComplexObject.class.isAssignableFrom(column.getType())) {
return "MEDIUMTEXT";
}
return super.getColumnType(column);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册