未验证 提交 7c84e77b 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Support histogram function in Meter system. (#4719)

* Support histogram function in Meter system.
* Add histogram in the document.
* Support infinite-
上级 3bc6cb27
......@@ -17,4 +17,5 @@ The values of scope entity name, such as service name, are required when metrics
NOTICE, the metrics must be declared in the bootstrap stage, no runtime changed.
Meter System supports following binding functions
- **Avg**, calculate the avg value for every entity in the same metrics name.
- **avg**, calculate the avg value for every entity in the same metrics name.
- **histogram**. aggregate the counts in the configurable buckets.
......@@ -23,6 +23,7 @@ import java.util.List;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
@Getter(AccessLevel.PUBLIC)
@Setter(AccessLevel.PUBLIC)
......@@ -120,31 +121,28 @@ public class AnalysisResult {
serializeFields.addLongField(sourceColumn.getFieldName());
break;
default:
throw new IllegalStateException("Unexpected field type [" + type + "] of source sourceColumn [" + sourceColumn
.getFieldName() + "]");
throw new IllegalStateException(
"Unexpected field type [" + type + "] of source sourceColumn [" + sourceColumn
.getFieldName() + "]");
}
}
for (DataColumn column : persistentFields) {
String type = column.getType().getSimpleName();
switch (type) {
case "int":
serializeFields.addIntField(column.getFieldName());
break;
case "double":
serializeFields.addDoubleField(column.getFieldName());
break;
case "String":
serializeFields.addStringField(column.getFieldName());
break;
case "long":
serializeFields.addLongField(column.getFieldName());
break;
case "DataTable":
serializeFields.addDataTableField(column.getFieldName());
break;
default:
throw new IllegalStateException("Unexpected field type [" + type + "] of persistence column [" + column
final Class<?> columnType = column.getType();
if (columnType.equals(int.class)) {
serializeFields.addIntField(column.getFieldName());
} else if (columnType.equals(double.class)) {
serializeFields.addDoubleField(column.getFieldName());
} else if (columnType.equals(String.class)) {
serializeFields.addStringField(column.getFieldName());
} else if (columnType.equals(long.class)) {
serializeFields.addLongField(column.getFieldName());
} else if (StorageDataComplexObject.class.isAssignableFrom(columnType)) {
serializeFields.addObjectField(column.getFieldName());
} else {
throw new IllegalStateException(
"Unexpected field type [" + columnType.getSimpleName() + "] of persistence column [" + column
.getFieldName() + "]");
}
}
......
......@@ -26,7 +26,7 @@ public class PersistenceColumns {
private List<PersistenceField> longFields = new LinkedList<>();
private List<PersistenceField> doubleFields = new LinkedList<>();
private List<PersistenceField> intFields = new LinkedList<>();
private List<PersistenceField> dataTableFields = new LinkedList<>();
private List<PersistenceField> objectFields = new LinkedList<>();
public void addStringField(String fieldName) {
stringFields.add(new PersistenceField(fieldName));
......@@ -44,8 +44,8 @@ public class PersistenceColumns {
intFields.add(new PersistenceField(fieldName));
}
public void addDataTableField(String fieldName) {
dataTableFields.add(new PersistenceField(fieldName));
public void addObjectField(String fieldName) {
objectFields.add(new PersistenceField(fieldName));
}
public List<PersistenceField> getStringFields() {
......@@ -64,7 +64,7 @@ public class PersistenceColumns {
return intFields;
}
public List<PersistenceField> getDataTableFields() {
return dataTableFields;
public List<PersistenceField> getObjectFields() {
return objectFields;
}
}
......@@ -15,8 +15,8 @@ public void deserialize(org.apache.skywalking.oap.server.core.remote.grpc.proto.
${field.setter}(remoteData.getDataIntegers(${field?index}));
</#list>
<#list serializeFields.dataTableFields as field>
${field.setter}(new org.apache.skywalking.oap.server.core.analysis.metrics.DataTable(remoteData.getDataTableStrings(${field?index})));
<#list serializeFields.objectFields as field>
${field.setter}(new org.apache.skywalking.oap.server.core.analysis.metrics.DataTable(remoteData.getDataObjectStrings(${field?index})));
</#list>
}
\ No newline at end of file
......@@ -16,8 +16,8 @@ org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData.Builder remot
remoteBuilder.addDataIntegers(${field.getter}());
</#list>
<#list serializeFields.dataTableFields as field>
remoteBuilder.addDataTableStrings(${field.getter}().toStorageData());
<#list serializeFields.objectFields as field>
remoteBuilder.addDataObjectStrings(${field.getter}().toStorageData());
</#list>
return remoteBuilder;
......
......@@ -59,6 +59,10 @@ public class MeterEntity {
}
}
public String serviceId() {
return IDManager.ServiceID.buildId(serviceName, true);
}
/**
* Create a service level meter entity.
*/
......
......@@ -40,6 +40,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.StreamDefinition;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
import org.apache.skywalking.oap.server.core.analysis.meter.function.MeterFunction;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
......@@ -266,6 +267,11 @@ public class MeterSystem implements Service {
* @param acceptableValue should only be created through {@link #create(String, String, ScopeType, Class)}
*/
public void doStreamingCalculation(AcceptableValue acceptableValue) {
final long timeBucket = acceptableValue.getTimeBucket();
if (timeBucket == 0L) {
// Avoid no timestamp data, which could be harmful for the storage.
acceptableValue.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
}
MetricsStreamProcessor.getInstance().in((Metrics) acceptableValue);
}
......
......@@ -38,4 +38,6 @@ public interface AcceptableValue<T> {
Class<? extends StorageBuilder> builder();
void setTimeBucket(long timeBucket);
long getTimeBucket();
}
......@@ -25,6 +25,7 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.metrics.LongAvgMetrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
......@@ -37,17 +38,25 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column;
"entityId",
"timeBucket"
})
public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long> {
public abstract class AvgFunction extends LongAvgMetrics implements AcceptableValue<Long> {
@Setter
@Getter
@Column(columnName = ENTITY_ID)
private String entityId;
/**
* Service ID is required for sort query.
*/
@Setter
@Getter
@Column(columnName = InstanceTraffic.SERVICE_ID)
private String serviceId;
@Override
public Metrics toHour() {
Avg metrics = (Avg) createNew();
AvgFunction metrics = (AvgFunction) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInHour());
metrics.setServiceId(getServiceId());
metrics.setSummation(getSummation());
metrics.setCount(getCount());
return metrics;
......@@ -55,9 +64,10 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
@Override
public Metrics toDay() {
Avg metrics = (Avg) createNew();
AvgFunction metrics = (AvgFunction) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInDay());
metrics.setServiceId(getServiceId());
metrics.setSummation(getSummation());
metrics.setCount(getCount());
return metrics;
......@@ -75,6 +85,7 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
setTimeBucket(remoteData.getDataLongs(2));
this.entityId = remoteData.getDataStrings(0);
this.serviceId = remoteData.getDataStrings(1);
}
@Override
......@@ -85,6 +96,7 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
remoteBuilder.addDataLongs(getTimeBucket());
remoteBuilder.addDataStrings(entityId);
remoteBuilder.addDataStrings(serviceId);
return remoteBuilder;
}
......@@ -97,6 +109,7 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
@Override
public void accept(final MeterEntity entity, final Long value) {
this.entityId = entity.id();
this.serviceId = entity.serviceId();
this.summation += value;
this.count += 1;
}
......@@ -106,10 +119,10 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
return AvgStorageBuilder.class;
}
public static class AvgStorageBuilder implements StorageBuilder<Avg> {
public static class AvgStorageBuilder implements StorageBuilder<AvgFunction> {
@Override
public Avg map2Data(final Map<String, Object> dbMap) {
Avg metrics = new Avg() {
public AvgFunction map2Data(final Map<String, Object> dbMap) {
AvgFunction metrics = new AvgFunction() {
@Override
public AcceptableValue<Long> createNew() {
throw new UnexpectedException("createNew should not be called");
......@@ -119,17 +132,19 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
metrics.setValue(((Number) dbMap.get(VALUE)).longValue());
metrics.setCount(((Number) dbMap.get(COUNT)).longValue());
metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
metrics.setServiceId((String) dbMap.get(InstanceTraffic.SERVICE_ID));
metrics.setEntityId((String) dbMap.get(ENTITY_ID));
return metrics;
}
@Override
public Map<String, Object> data2Map(final Avg storageData) {
public Map<String, Object> data2Map(final AvgFunction storageData) {
Map<String, Object> map = new HashMap<>();
map.put(SUMMATION, storageData.getSummation());
map.put(VALUE, storageData.getValue());
map.put(COUNT, storageData.getCount());
map.put(TIME_BUCKET, storageData.getTimeBucket());
map.put(InstanceTraffic.SERVICE_ID, storageData.getServiceId());
map.put(ENTITY_ID, storageData.getEntityId());
return map;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.meter.function;
import java.util.Arrays;
import java.util.List;
import lombok.Getter;
import lombok.ToString;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.query.type.Bucket;
import org.apache.skywalking.oap.server.core.query.type.HeatMap;
/**
* BucketedValues represents a value set, which elements are grouped by time bucket.
*/
@ToString
@Getter
public class BucketedValues {
/**
* The element in the buckets represent the minimal value of this bucket, the max is defined by the next element.
* Such as 0, 10, 50, 100 means buckets are [0, 10), [10, 50), [50, 100), [100, infinite+).
*
* The {@link Integer#MIN_VALUE} could be the first bucket element to indicate there is no minimal value.
*/
private int[] buckets;
/**
* {@link #buckets} and {@link #values} arrays should have the same length. The element in the values, represents
* the amount in the same index bucket.
*/
private long[] values;
/**
* @param buckets Read {@link #buckets}
* @param values Read {@link #values}
*/
public BucketedValues(final int[] buckets, final long[] values) {
if (buckets == null || values == null || buckets.length == 0 || values.length == 0) {
throw new IllegalArgumentException("buckets and values can't be null.");
}
if (buckets.length != values.length) {
throw new IllegalArgumentException("The length of buckets and values should be same.");
}
this.buckets = buckets;
this.values = values;
}
/**
* @return true if the bucket is same.
*/
public boolean isCompatible(DataTable dataset) {
final List<String> sortedKeys = dataset.sortedKeys(new HeatMap.KeyComparator(true));
int[] existedBuckets = new int[sortedKeys.size()];
for (int i = 0; i < sortedKeys.size(); i++) {
final String key = sortedKeys.get(i);
if (key.equals(Bucket.INFINITE_NEGATIVE)) {
existedBuckets[i] = Integer.MIN_VALUE;
} else {
existedBuckets[i] = Integer.parseInt(key);
}
}
return Arrays.equals(buckets, existedBuckets);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.meter.function;
import java.util.HashMap;
import java.util.Map;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.query.type.Bucket;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
* Histogram includes data range buckets and the amount matched/grouped in the buckets. This is for original histogram
* graph visualization
*/
@MeterFunction(functionName = "histogram")
@Slf4j
@EqualsAndHashCode(of = {
"entityId",
"timeBucket"
})
@ToString
public abstract class HistogramFunction extends Metrics implements AcceptableValue<BucketedValues> {
public static final String DATASET = "dataset";
@Setter
@Getter
@Column(columnName = ENTITY_ID)
private String entityId;
@Getter
@Setter
@Column(columnName = DATASET, dataType = Column.ValueDataType.HISTOGRAM, storageOnly = true, defaultValue = 0)
private DataTable dataset = new DataTable(30);
/**
* Service ID is required for sort query.
*/
@Setter
@Getter
@Column(columnName = InstanceTraffic.SERVICE_ID)
private String serviceId;
@Override
public void accept(final MeterEntity entity, final BucketedValues value) {
if (dataset.size() > 0) {
if (!value.isCompatible(dataset)) {
throw new IllegalArgumentException(
"Incompatible BucketedValues [" + value + "] for current HistogramFunction[" + dataset + "]");
}
}
this.entityId = entity.id();
this.serviceId = entity.serviceId();
final long[] values = value.getValues();
for (int i = 0; i < values.length; i++) {
final long bucket = value.getBuckets()[i];
String bucketName = bucket == Integer.MIN_VALUE ? Bucket.INFINITE_NEGATIVE : String.valueOf(bucket);
final long bucketValue = values[i];
dataset.valueAccumulation(bucketName, bucketValue);
}
}
@Override
public void combine(final Metrics metrics) {
HistogramFunction histogram = (HistogramFunction) metrics;
if (!dataset.keysEqual(histogram.getDataset())) {
log.warn("Incompatible input [{}}] for current HistogramFunction[{}], entity {}",
histogram, this, entityId
);
return;
}
this.dataset.append(histogram.dataset);
}
@Override
public void calculate() {
}
@Override
public Metrics toHour() {
HistogramFunction metrics = (HistogramFunction) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInHour());
metrics.setServiceId(getServiceId());
metrics.setDataset(getDataset());
return metrics;
}
@Override
public Metrics toDay() {
HistogramFunction metrics = (HistogramFunction) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInDay());
metrics.setServiceId(getServiceId());
metrics.setDataset(getDataset());
return metrics;
}
@Override
public int remoteHashCode() {
return entityId.hashCode();
}
@Override
public void deserialize(final RemoteData remoteData) {
this.setTimeBucket(remoteData.getDataLongs(0));
this.setEntityId(remoteData.getDataStrings(0));
this.setServiceId(remoteData.getDataStrings(1));
this.setDataset(new DataTable(remoteData.getDataObjectStrings(0)));
}
@Override
public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.addDataLongs(getTimeBucket());
remoteBuilder.addDataStrings(entityId);
remoteBuilder.addDataStrings(serviceId);
remoteBuilder.addDataObjectStrings(dataset.toStorageData());
return remoteBuilder;
}
@Override
public String id() {
return getTimeBucket() + Const.ID_CONNECTOR + entityId;
}
@Override
public Class<? extends StorageBuilder> builder() {
return HistogramFunctionBuilder.class;
}
public static class HistogramFunctionBuilder implements StorageBuilder<HistogramFunction> {
@Override
public HistogramFunction map2Data(final Map<String, Object> dbMap) {
HistogramFunction metrics = new HistogramFunction() {
@Override
public AcceptableValue<BucketedValues> createNew() {
throw new UnexpectedException("createNew should not be called");
}
};
metrics.setDataset(new DataTable((String) dbMap.get(DATASET)));
metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
metrics.setServiceId((String) dbMap.get(InstanceTraffic.SERVICE_ID));
metrics.setEntityId((String) dbMap.get(ENTITY_ID));
return metrics;
}
@Override
public Map<String, Object> data2Map(final HistogramFunction storageData) {
Map<String, Object> map = new HashMap<>();
map.put(DATASET, storageData.getDataset());
map.put(TIME_BUCKET, storageData.getTimeBucket());
map.put(InstanceTraffic.SERVICE_ID, storageData.getServiceId());
map.put(ENTITY_ID, storageData.getEntityId());
return map;
}
}
}
......@@ -22,12 +22,16 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
/**
* DataTable includes a hashmap to store string key and long value. It enhanced the serialization capability.
*/
@ToString
@EqualsAndHashCode
public class DataTable implements StorageDataComplexObject<DataTable> {
private HashMap<String, Long> data;
......@@ -52,10 +56,33 @@ public class DataTable implements StorageDataComplexObject<DataTable> {
data.put(key, value);
}
/**
* Accumulate the value with existing value in the same given key.
*/
public void valueAccumulation(String key, Long value) {
Long element = data.get(key);
if (element == null) {
element = value;
} else {
element += value;
}
data.put(key, element);
}
/**
* @return the sum of all values.
*/
public long sumOfValues() {
return data.values().stream().mapToLong(element -> element).sum();
}
public boolean keysEqual(DataTable that) {
if (this.data.keySet().size() != that.data.keySet().size()) {
return false;
}
return this.data.keySet().equals(that.data.keySet());
}
public List<String> sortedKeys(Comparator<String> keyComparator) {
return data.keySet().stream().sorted(keyComparator).collect(Collectors.toList());
}
......
......@@ -71,13 +71,7 @@ public abstract class HistogramMetrics extends Metrics {
}
String idx = String.valueOf(index * step);
Long element = dataset.get(idx);
if (element == null) {
element = 1L;
} else {
element++;
}
dataset.put(idx, element);
dataset.valueAccumulation(idx, 1L);
}
@Override
......
......@@ -73,13 +73,7 @@ public abstract class PercentileMetrics extends Metrics implements MultiIntValue
this.precision = precision;
String index = String.valueOf(value / precision);
Long element = dataset.get(index);
if (element == null) {
element = 1L;
} else {
element++;
}
dataset.put(index, element);
dataset.valueAccumulation(index, 1L);
}
@Override
......
......@@ -48,7 +48,7 @@ public class Entity {
* Normal service is the service having installed agent or metrics reported directly. Unnormal service is
* conjectural service, usually detected by the agent.
*/
private boolean isNormal;
private boolean normal;
private String serviceInstanceName;
private String endpointName;
......@@ -57,7 +57,7 @@ public class Entity {
* Normal service is the service having installed agent or metrics reported directly. Unnormal service is
* conjectural service, usually detected by the agent.
*/
private boolean destIsNormal;
private boolean destNormal;
private String destServiceInstanceName;
private String destEndpointName;
......@@ -74,34 +74,34 @@ public class Entity {
// This is unnecessary. Just for making core clear.
return null;
case Service:
return IDManager.ServiceID.buildId(serviceName, isNormal);
return IDManager.ServiceID.buildId(serviceName, normal);
case ServiceInstance:
return IDManager.ServiceInstanceID.buildId(
IDManager.ServiceID.buildId(serviceName, isNormal), serviceInstanceName);
IDManager.ServiceID.buildId(serviceName, normal), serviceInstanceName);
case Endpoint:
return IDManager.EndpointID.buildId(IDManager.ServiceID.buildId(serviceName, isNormal), endpointName);
return IDManager.EndpointID.buildId(IDManager.ServiceID.buildId(serviceName, normal), endpointName);
case ServiceRelation:
return IDManager.ServiceID.buildRelationId(
new IDManager.ServiceID.ServiceRelationDefine(
IDManager.ServiceID.buildId(serviceName, isNormal),
IDManager.ServiceID.buildId(destServiceName, destIsNormal)
IDManager.ServiceID.buildId(serviceName, normal),
IDManager.ServiceID.buildId(destServiceName, destNormal)
)
);
case ServiceInstanceRelation:
return IDManager.ServiceInstanceID.buildRelationId(
new IDManager.ServiceInstanceID.ServiceInstanceRelationDefine(
IDManager.ServiceInstanceID.buildId(
IDManager.ServiceID.buildId(serviceName, isNormal), serviceInstanceName),
IDManager.ServiceID.buildId(serviceName, normal), serviceInstanceName),
IDManager.ServiceInstanceID.buildId(
IDManager.ServiceID.buildId(destServiceName, destIsNormal), destServiceInstanceName)
IDManager.ServiceID.buildId(destServiceName, destNormal), destServiceInstanceName)
)
);
case EndpointRelation:
return IDManager.EndpointID.buildRelationId(
new IDManager.EndpointID.EndpointRelationDefine(
IDManager.ServiceID.buildId(serviceName, isNormal),
IDManager.ServiceID.buildId(serviceName, normal),
endpointName,
IDManager.ServiceID.buildId(destServiceName, destIsNormal),
IDManager.ServiceID.buildId(destServiceName, destNormal),
destEndpointName
)
);
......
......@@ -22,8 +22,8 @@ package org.apache.skywalking.oap.server.core.query.type;
* @since 8.0.0
*/
public class Bucket {
private static final String INFINITE_NEGATIVE = "infinite-";
private static final String INFINITE_POSITIVE = "infinite+";
public static final String INFINITE_NEGATIVE = "infinite-";
public static final String INFINITE_POSITIVE = "infinite+";
/**
* The min value of this bucket representing.
......
......@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
......@@ -48,22 +49,34 @@ public class HeatMap {
public void buildColumn(String id, String rawdata, int defaultValue) {
DataTable dataset = new DataTable(rawdata);
final List<String> sortedKeys = dataset.sortedKeys(
Comparator.comparingInt(Integer::parseInt));
final List<String> sortedKeys = dataset.sortedKeys(new KeyComparator(true));
if (buckets == null) {
buckets = new ArrayList<>(dataset.size());
for (int i = 0; i < sortedKeys.size(); i++) {
final Bucket bucket = new Bucket();
final String minValue = sortedKeys.get(i);
if (Bucket.INFINITE_NEGATIVE.equals(minValue)) {
bucket.infiniteMin();
} else {
bucket.setMin(Integer.parseInt(minValue));
}
if (i == sortedKeys.size() - 1) {
// last element
this.addBucket(
new Bucket().setMin(Integer.parseInt(sortedKeys.get(i))).infiniteMax()
);
bucket.infiniteMax();
} else {
this.addBucket(new Bucket(
Integer.parseInt(sortedKeys.get(i)),
Integer.parseInt(sortedKeys.get(i + 1))
));
final String max = sortedKeys.get(i + 1);
if (Bucket.INFINITE_POSITIVE.equals(max)) {
// If reach the infinite positive before the last element, ignore all other.
// Only for fail safe.
bucket.infiniteMax();
break;
} else {
bucket.setMax(Integer.parseInt(max));
}
}
this.addBucket(bucket);
}
}
......@@ -114,4 +127,26 @@ public class HeatMap {
values.add(value);
}
}
@RequiredArgsConstructor
public static class KeyComparator implements Comparator<String> {
private final boolean asc;
@Override
public int compare(final String key1, final String key2) {
int result;
if (key1.equals(key2)) {
result = 0;
} else if (Bucket.INFINITE_NEGATIVE.equals(key1) || Bucket.INFINITE_POSITIVE.equals(key2)) {
result = -1;
} else if (Bucket.INFINITE_NEGATIVE.equals(key2) || Bucket.INFINITE_POSITIVE.equals(key1)) {
result = 1;
} else {
result = Integer.parseInt(key1) - Integer.parseInt(key2);
}
return asc ? result : 0 - result;
}
}
}
......@@ -36,7 +36,7 @@ message RemoteData {
repeated int64 dataLongs = 2;
repeated double dataDoubles = 3;
repeated int32 dataIntegers = 4;
repeated string dataTableStrings = 5;
repeated string dataObjectStrings = 5;
}
message Empty {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.meter.function;
import java.util.Map;
import java.util.stream.IntStream;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.query.type.Bucket;
import org.apache.skywalking.oap.server.core.query.type.HeatMap;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.junit.Assert;
import org.junit.Test;
import static org.apache.skywalking.oap.server.core.analysis.meter.function.HistogramFunction.DATASET;
public class HistogramFunctionTest {
private static final int[] BUCKETS = new int[] {
0,
50,
100,
250
};
private static final int[] BUCKETS_2ND = new int[] {
0,
51,
100,
250
};
private static final int[] INFINITE_BUCKETS = new int[] {
Integer.MIN_VALUE,
-5,
0,
10
};
@Test
public void testFunction() {
HistogramFunctionInst inst = new HistogramFunctionInst();
inst.accept(
MeterEntity.newService("service-test"),
new BucketedValues(
BUCKETS, new long[] {
0,
4,
10,
10
})
);
inst.accept(
MeterEntity.newService("service-test"),
new BucketedValues(
BUCKETS, new long[] {
1,
2,
3,
4
})
);
final int[] results = inst.getDataset().sortedValues(new HeatMap.KeyComparator(true)).stream()
.flatMapToInt(l -> IntStream.of(l.intValue()))
.toArray();
Assert.assertArrayEquals(new int[] {
1,
6,
13,
14
}, results);
}
@Test
public void testFunctionWithInfinite() {
HistogramFunctionInst inst = new HistogramFunctionInst();
inst.accept(
MeterEntity.newService("service-test"),
new BucketedValues(
INFINITE_BUCKETS, new long[] {
0,
4,
10,
10
})
);
inst.accept(
MeterEntity.newService("service-test"),
new BucketedValues(
INFINITE_BUCKETS, new long[] {
1,
2,
3,
4
})
);
Assert.assertEquals(1L, inst.getDataset().get(Bucket.INFINITE_NEGATIVE).longValue());
}
@Test(expected = IllegalArgumentException.class)
public void testIncompatible() {
HistogramFunctionInst inst = new HistogramFunctionInst();
inst.accept(
MeterEntity.newService("service-test"),
new BucketedValues(
BUCKETS, new long[] {
0,
4,
10,
10
})
);
inst.accept(
MeterEntity.newService("service-test"),
new BucketedValues(
BUCKETS_2ND, new long[] {
1,
2,
3,
4
})
);
}
@Test
public void testSerialization() {
HistogramFunctionInst inst = new HistogramFunctionInst();
inst.accept(
MeterEntity.newService("service-test"),
new BucketedValues(
BUCKETS, new long[] {
1,
4,
10,
10
})
);
final HistogramFunctionInst inst2 = new HistogramFunctionInst();
inst2.deserialize(inst.serialize().build());
Assert.assertEquals(inst, inst2);
// HistogramFunction equal doesn't include dataset.
Assert.assertEquals(inst.getDataset(), inst2.getDataset());
}
@Test
public void testSerializationInInfinite() {
HistogramFunctionInst inst = new HistogramFunctionInst();
inst.accept(
MeterEntity.newService("service-test"),
new BucketedValues(
INFINITE_BUCKETS, new long[] {
1,
4,
10,
10
})
);
final HistogramFunctionInst inst2 = new HistogramFunctionInst();
inst2.deserialize(inst.serialize().build());
Assert.assertEquals(inst, inst2);
// HistogramFunction equal doesn't include dataset.
Assert.assertEquals(inst.getDataset(), inst2.getDataset());
}
@Test
public void testBuilder() throws IllegalAccessException, InstantiationException {
HistogramFunctionInst inst = new HistogramFunctionInst();
inst.accept(
MeterEntity.newService("service-test"),
new BucketedValues(
BUCKETS, new long[] {
1,
4,
10,
10
})
);
final StorageBuilder storageBuilder = inst.builder().newInstance();
// Simulate the storage layer do, convert the datatable to string.
final Map map = storageBuilder.data2Map(inst);
map.put(DATASET, ((DataTable) map.get(DATASET)).toStorageData());
final HistogramFunction inst2 = (HistogramFunction) storageBuilder.map2Data(map);
Assert.assertEquals(inst, inst2);
// HistogramFunction equal doesn't include dataset.
Assert.assertEquals(inst.getDataset(), inst2.getDataset());
}
private static class HistogramFunctionInst extends HistogramFunction {
@Override
public AcceptableValue<BucketedValues> createNew() {
return new HistogramFunctionInst();
}
}
}
......@@ -28,11 +28,11 @@ public class DataTableTestCase {
@Before
public void init() {
dataTable = new DataTable();
dataTable.put("5", 500L);
dataTable.put("6", 600L);
dataTable.put("1", 100L);
dataTable.put("2", 200L);
dataTable.put("7", 700L);
dataTable.valueAccumulation("5", 500L);
dataTable.valueAccumulation("6", 600L);
dataTable.valueAccumulation("1", 100L);
dataTable.valueAccumulation("2", 200L);
dataTable.valueAccumulation("7", 700L);
}
@Test
......
......@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
import org.apache.skywalking.oap.server.core.analysis.meter.function.BucketedValues;
import org.apache.skywalking.oap.server.fetcher.prometheus.module.PrometheusFetcherModule;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
......@@ -62,6 +63,7 @@ public class PrometheusFetcherProvider extends ModuleProvider {
// We should create it based on metrics configuration.
final MeterSystem meterSystem = MeterSystem.meterSystem(getManager());
meterSystem.create("test_long_metrics", "avg", ScopeType.SERVICE, Long.class);
meterSystem.create("test_histogram_metrics", "histogram", ScopeType.SERVICE, BucketedValues.class);
}
}
......@@ -78,10 +80,35 @@ public class PrometheusFetcherProvider extends ModuleProvider {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
final MeterEntity servEntity = MeterEntity.newService("mock_service");
// Long Avg Example
final AcceptableValue<Long> value = service.buildMetrics("test_long_metrics", Long.class);
value.accept(MeterEntity.newService("abc"), 5L);
value.accept(servEntity, 5L);
value.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
service.doStreamingCalculation(value);
// Histogram Example
final AcceptableValue<BucketedValues> histogramMetrics = service.buildMetrics(
"test_histogram_metrics", BucketedValues.class);
value.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
histogramMetrics.accept(servEntity, new BucketedValues(
new int[] {
Integer.MIN_VALUE,
0,
50,
100,
250
},
new long[] {
3,
1,
4,
10,
10
}
));
service.doStreamingCalculation(histogramMetrics);
}
}, 2, 2, TimeUnit.SECONDS);
}
......
Subproject commit c27d39381860e56bb9aec0d4377bb256641fdf26
Subproject commit 9d5bb7c97a5b653babe157e0360fee6bdf30c045
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册