未验证 提交 77082de7 编写于 作者: Z Zhenxu Ke 提交者: GitHub

Add `sum` function in meter system (#6427)

* Add `sum` function in meter system

* Fix minor potential bug and reformat some codes

* Fix a bug

* Use Set to hold the {aggregate,scope} labels as same labels maybe added by multiple aggregation functions

* Separate downsampling function and revert unnecessary codes
上级 47ec0bb0
......@@ -30,6 +30,7 @@ Release Notes.
* Remove filename suffix in the meter active file config.
* Introduce log analysis language (LAL).
* Fix alarm httpclient connection leak.
* Add `sum` function in meter system.
#### UI
* Update selector scroller to show in all pages.
......
......@@ -174,26 +174,22 @@ Examples:
MAL should instruct meter-system how to do downsampling for metrics. It doesn't only refer to aggregate raw samples to
`minute` level, but also hints data from `minute` to higher levels, for instance, `hour` and `day`.
Down sampling operations are as global function in MAL:
Down sampling function is called `downsampling` in MAL, it accepts the following types:
- avg
- latest (TODO)
- min (TODO)
- max (TODO)
- mean (TODO)
- sum (TODO)
- count (TODO)
- AVG
- SUM
- LATEST
- MIN (TODO)
- MAX (TODO)
- MEAN (TODO)
- COUNT (TODO)
The default one is `avg` if not specific an operation.
The default type is `AVG`.
If user want get latest time from `last_server_state_sync_time_in_seconds`:
If users want to get the latest time from `last_server_state_sync_time_in_seconds`:
```
latest(last_server_state_sync_time_in_seconds.tagEqual('production', 'catalog'))
or
latest last_server_state_sync_time_in_seconds.tagEqual('production', 'catalog')
last_server_state_sync_time_in_seconds.tagEqual('production', 'catalog').downsampling(LATEST)
```
## Metric level function
......
......@@ -22,5 +22,5 @@ package org.apache.skywalking.oap.meter.analyzer.dsl;
* DownsamplingType indicates the downsampling type of meter function
*/
public enum DownsamplingType {
AVG, LATEST
AVG, SUM, LATEST
}
......@@ -23,6 +23,7 @@ import groovy.lang.ExpandoMetaClass;
import groovy.lang.GroovyObjectSupport;
import groovy.util.DelegatingScript;
import java.time.Instant;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
......@@ -92,42 +93,7 @@ public class Expression {
}
private void empower() {
expression.setDelegate(new GroovyObjectSupport() {
public SampleFamily propertyMissing(String metricName) {
ExpressionParsingContext.get().ifPresent(ctx -> {
if (!ctx.samples.contains(metricName)) {
ctx.samples.add(metricName);
}
});
ImmutableMap<String, SampleFamily> sampleFamilies = propertyRepository.get();
if (sampleFamilies == null) {
return SampleFamily.EMPTY;
}
if (sampleFamilies.containsKey(metricName)) {
return sampleFamilies.get(metricName);
}
if (!ExpressionParsingContext.get().isPresent()) {
log.warn("{} referred by \"{}\" doesn't exist in {}", metricName, literal, sampleFamilies.keySet());
}
return SampleFamily.EMPTY;
}
public SampleFamily avg(SampleFamily sf) {
ExpressionParsingContext.get().ifPresent(ctx -> ctx.downsampling = DownsamplingType.AVG);
return sf;
}
public SampleFamily latest(SampleFamily sf) {
ExpressionParsingContext.get().ifPresent(ctx -> ctx.downsampling = DownsamplingType.LATEST);
return sf;
}
public Number time() {
return Instant.now().getEpochSecond();
}
});
expression.setDelegate(new ExpressionDelegate(literal, propertyRepository));
extendNumber(Number.class);
}
......@@ -139,4 +105,39 @@ public class Expression {
expando.registerInstanceMethod("div", new NumberClosure(this, (n, s) -> s.newValue(v -> n.doubleValue() / v)));
expando.initialize();
}
@RequiredArgsConstructor
@SuppressWarnings("unused") // used in MAL expressions
public static class ExpressionDelegate extends GroovyObjectSupport {
public static final DownsamplingType AVG = DownsamplingType.AVG;
public static final DownsamplingType SUM = DownsamplingType.SUM;
public static final DownsamplingType LATEST = DownsamplingType.LATEST;
private final String literal;
private final ThreadLocal<ImmutableMap<String, SampleFamily>> propertyRepository;
public SampleFamily propertyMissing(String metricName) {
ExpressionParsingContext.get().ifPresent(ctx -> {
if (!ctx.samples.contains(metricName)) {
ctx.samples.add(metricName);
}
});
ImmutableMap<String, SampleFamily> sampleFamilies = propertyRepository.get();
if (sampleFamilies == null) {
return SampleFamily.EMPTY;
}
if (sampleFamilies.containsKey(metricName)) {
return sampleFamilies.get(metricName);
}
if (!ExpressionParsingContext.get().isPresent()) {
log.warn("{} referred by \"{}\" doesn't exist in {}", metricName, literal, sampleFamilies.keySet());
}
return SampleFamily.EMPTY;
}
public Number time() {
return Instant.now().getEpochSecond();
}
}
}
......@@ -20,10 +20,12 @@ package org.apache.skywalking.oap.meter.analyzer.dsl;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
......@@ -44,8 +46,8 @@ public class ExpressionParsingContext implements Closeable {
CACHE.set(ExpressionParsingContext.builder()
.samples(Lists.newArrayList())
.downsampling(DownsamplingType.AVG)
.scopeLabels(Lists.newArrayList())
.aggregationLabels(Lists.newArrayList()).build());
.scopeLabels(Sets.newHashSet())
.aggregationLabels(Sets.newHashSet()).build());
}
return CACHE.get();
}
......@@ -62,9 +64,9 @@ public class ExpressionParsingContext implements Closeable {
int[] percentiles;
List<String> aggregationLabels;
Set<String> aggregationLabels;
List<String> scopeLabels;
Set<String> scopeLabels;
DownsamplingType downsampling;
......
......@@ -32,7 +32,7 @@ import org.apache.skywalking.oap.meter.analyzer.dsl.counter.CounterWindow;
/**
* Sample represents the metric data point in a range of time.
*/
@Builder
@Builder(toBuilder = true)
@EqualsAndHashCode
@ToString
@Getter
......@@ -43,11 +43,7 @@ public class Sample {
final long timestamp;
Sample newValue(Function<Double, Double> transform) {
return Sample.builder().name(name)
.timestamp(timestamp)
.labels(labels)
.value(transform.apply(value))
.build();
return toBuilder().value(transform.apply(value)).build();
}
Sample increase(String range, Function2<Double, Long, Double> transform) {
......
......@@ -26,8 +26,6 @@ import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AtomicDouble;
import groovy.lang.Closure;
import io.vavr.Function2;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.EqualsAndHashCode;
......@@ -53,6 +51,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.util.function.UnaryOperator.identity;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
......@@ -180,18 +179,20 @@ public class SampleFamily {
return EMPTY;
}
if (by == null) {
double result = Arrays.stream(samples).mapToDouble(s -> s.value).average().orElse(0.0D);
double result = Arrays.stream(samples).mapToDouble(Sample::getValue).average().orElse(0.0D);
return SampleFamily.build(this.context, newSample(ImmutableMap.of(), samples[0].timestamp, result));
}
return SampleFamily.build(
this.context,
Arrays.stream(samples)
.map(sample -> Tuple.of(by.stream()
.collect(toImmutableMap(labelKey -> labelKey, labelKey -> sample.labels.getOrDefault(labelKey, ""))), sample))
.collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList())))
.collect(groupingBy(it -> getLabels(by, it), mapping(identity(), toList())))
.entrySet().stream()
.map(entry -> newSample(entry.getKey(), entry.getValue().get(0).timestamp, entry.getValue().stream()
.mapToDouble(s -> s.value).average().orElse(0.0D)))
.map(entry -> newSample(
entry.getKey(),
entry.getValue().get(0).getTimestamp(),
entry.getValue().stream().mapToDouble(Sample::getValue).average().orElse(0.0D)
))
.toArray(Sample[]::new)
);
}
......@@ -205,14 +206,27 @@ public class SampleFamily {
double result = Arrays.stream(samples).mapToDouble(s -> s.value).reduce(aggregator).orElse(0.0D);
return SampleFamily.build(this.context, newSample(ImmutableMap.of(), samples[0].timestamp, result));
}
return SampleFamily.build(this.context, Arrays.stream(samples)
.map(sample -> Tuple.of(by.stream()
.collect(toImmutableMap(labelKey -> labelKey, labelKey -> sample.labels.getOrDefault(labelKey, ""))), sample))
.collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList())))
.entrySet().stream()
.map(entry -> newSample(entry.getKey(), entry.getValue().get(0).timestamp, entry.getValue().stream()
.mapToDouble(s -> s.value).reduce(aggregator).orElse(0.0D)))
.toArray(Sample[]::new));
return SampleFamily.build(
this.context,
Arrays.stream(samples)
.collect(groupingBy(it -> getLabels(by, it), mapping(identity(), toList())))
.entrySet().stream()
.map(entry -> newSample(
entry.getKey(),
entry.getValue().get(0).getTimestamp(),
entry.getValue().stream().mapToDouble(Sample::getValue).reduce(aggregator).orElse(0.0D)
))
.toArray(Sample[]::new)
);
}
private ImmutableMap<String, String> getLabels(final List<String> labelKeys, final Sample sample) {
return labelKeys.stream()
.collect(toImmutableMap(
Function.identity(),
labelKey -> sample.labels.getOrDefault(labelKey, "")
));
}
/* Function */
......@@ -221,10 +235,12 @@ public class SampleFamily {
if (this == EMPTY) {
return EMPTY;
}
return SampleFamily.build(this.context, Arrays.stream(samples).map(sample -> sample
.increase(range, (lowerBoundValue, lowerBoundTime) ->
sample.value - lowerBoundValue))
.toArray(Sample[]::new));
return SampleFamily.build(
this.context,
Arrays.stream(samples)
.map(sample -> sample.increase(range, (lowerBoundValue, unused) -> sample.value - lowerBoundValue))
.toArray(Sample[]::new)
);
}
public SampleFamily rate(String range) {
......@@ -232,22 +248,22 @@ public class SampleFamily {
if (this == EMPTY) {
return EMPTY;
}
return SampleFamily.build(this.context, Arrays.stream(samples).map(sample -> sample
.increase(range, (lowerBoundValue, lowerBoundTime) ->
sample.timestamp - lowerBoundTime < 1L ? 0.0D
: (sample.value - lowerBoundValue) / ((sample.timestamp - lowerBoundTime) / 1000)))
.toArray(Sample[]::new));
return SampleFamily.build(
this.context,
Arrays.stream(samples)
.map(sample -> sample.increase(
range,
(lowerBoundValue, lowerBoundTime) -> {
final long timeDiff = (sample.timestamp - lowerBoundTime) / 1000;
return timeDiff < 1L ? 0.0 : (sample.value - lowerBoundValue) / timeDiff;
}
))
.toArray(Sample[]::new)
);
}
public SampleFamily irate() {
if (this == EMPTY) {
return EMPTY;
}
return SampleFamily.build(this.context, Arrays.stream(samples).map(sample -> sample
.increase("PT1S", (lowerBoundValue, lowerBoundTime) ->
sample.timestamp - lowerBoundTime < 1L ? 0.0D
: (sample.value - lowerBoundValue) / ((sample.timestamp - lowerBoundTime) / 1000)))
.toArray(Sample[]::new));
return rate("PT1S");
}
@SuppressWarnings(value = "unchecked")
......@@ -255,14 +271,21 @@ public class SampleFamily {
if (this == EMPTY) {
return EMPTY;
}
return SampleFamily.build(this.context, Arrays.stream(samples).map(sample -> {
Object delegate = new Object();
Closure<?> c = cl.rehydrate(delegate, sample, delegate);
Map<String, String> arg = Maps.newHashMap(sample.labels);
Object r = c.call(arg);
return newSample(ImmutableMap.copyOf(Optional.ofNullable((r instanceof Map) ? (Map<String, String>) r : null)
.orElse(arg)), sample.timestamp, sample.value);
}).toArray(Sample[]::new));
return SampleFamily.build(
this.context,
Arrays.stream(samples)
.map(sample -> {
Object delegate = new Object();
Closure<?> c = cl.rehydrate(delegate, sample, delegate);
Map<String, String> arg = Maps.newHashMap(sample.labels);
Object r = c.call(arg);
return sample.toBuilder()
.labels(
ImmutableMap.copyOf(Optional.ofNullable((r instanceof Map) ? (Map<String, String>) r : null)
.orElse(arg)))
.build();
}).toArray(Sample[]::new)
);
}
public SampleFamily histogram() {
......@@ -282,20 +305,24 @@ public class SampleFamily {
}
AtomicDouble pre = new AtomicDouble();
AtomicReference<String> preLe = new AtomicReference<>("0");
return SampleFamily.build(this.context, Stream.concat(
Arrays.stream(samples).filter(s -> !s.labels.containsKey(le)),
Arrays.stream(samples)
.filter(s -> s.labels.containsKey(le))
.sorted(Comparator.comparingDouble(s -> Double.parseDouble(s.labels.get(le))))
.map(s -> {
double r = this.context.histogramType == HistogramType.ORDINARY ? s.value : s.value - pre.get();
pre.set(s.value);
ImmutableMap<String, String> ll = ImmutableMap.<String, String>builder()
.putAll(Maps.filterKeys(s.labels, key -> !Objects.equals(key, le)))
.put("le", String.valueOf((long) ((Double.parseDouble(this.context.histogramType == HistogramType.ORDINARY ? s.labels.get(le) : preLe.get())) * scale))).build();
preLe.set(s.labels.get(le));
return newSample(ll, s.timestamp, r);
})).toArray(Sample[]::new));
return SampleFamily.build(
this.context,
Stream.concat(
Arrays.stream(samples).filter(s -> !s.labels.containsKey(le)),
Arrays.stream(samples)
.filter(s -> s.labels.containsKey(le))
.sorted(Comparator.comparingDouble(s -> Double.parseDouble(s.labels.get(le))))
.map(s -> {
double r = this.context.histogramType == HistogramType.ORDINARY ? s.value : s.value - pre.get();
pre.set(s.value);
ImmutableMap<String, String> ll = ImmutableMap.<String, String>builder()
.putAll(Maps.filterKeys(s.labels, key -> !Objects.equals(key, le)))
.put("le", String.valueOf((long) ((Double.parseDouble(this.context.histogramType == HistogramType.ORDINARY ? s.labels.get(le) : preLe.get())) * scale))).build();
preLe.set(s.labels.get(le));
return newSample(ll, s.timestamp, r);
})
).toArray(Sample[]::new)
);
}
public SampleFamily histogram_percentile(List<Integer> percentiles) {
......@@ -359,14 +386,17 @@ public class SampleFamily {
}
private SampleFamily left(List<String> labelKeys) {
return SampleFamily.build(this.context, Arrays.stream(samples)
.map(s -> {
ImmutableMap<String, String> ll = ImmutableMap.<String, String>builder()
.putAll(Maps.filterKeys(s.labels, key -> !labelKeys.contains(key)))
.build();
return newSample(ll, s.timestamp, s.value);
})
.toArray(Sample[]::new));
return SampleFamily.build(
this.context,
Arrays.stream(samples)
.map(s -> {
ImmutableMap<String, String> ll = ImmutableMap.<String, String>builder()
.putAll(Maps.filterKeys(s.labels, key -> !labelKeys.contains(key)))
.build();
return s.toBuilder().labels(ll).build();
})
.toArray(Sample[]::new)
);
}
private SampleFamily match(String[] labels, Function2<String, String, Boolean> op) {
......@@ -375,13 +405,12 @@ public class SampleFamily {
for (int i = 0; i < labels.length; i += 2) {
ll.put(labels[i], labels[i + 1]);
}
Stream<Sample> ss = Arrays.stream(samples).filter(sample ->
ll.entrySet().stream().allMatch(entry -> op.apply(sample.labels.getOrDefault(entry.getKey(), ""), entry.getValue())));
Sample[] sArr = ss.toArray(Sample[]::new);
if (sArr.length < 1) {
return SampleFamily.EMPTY;
}
return SampleFamily.build(this.context, sArr);
Sample[] ss = Arrays.stream(samples)
.filter(sample -> ll.entrySet()
.stream()
.allMatch(entry -> op.apply(sample.labels.getOrDefault(entry.getKey(), ""), entry.getValue())))
.toArray(Sample[]::new);
return ss.length > 0 ? SampleFamily.build(this.context, ss) : EMPTY;
}
SampleFamily newValue(Function<Double, Double> transform) {
......@@ -397,11 +426,12 @@ public class SampleFamily {
private SampleFamily newValue(SampleFamily another, Function2<Double, Double, Double> transform) {
Sample[] ss = Arrays.stream(samples)
.flatMap(cs -> io.vavr.collection.Stream.of(another.samples)
.find(as -> cs.labels.equals(as.labels))
.map(as -> newSample(cs.labels, cs.timestamp, transform.apply(cs.value, as.value)))
.toJavaStream())
.toArray(Sample[]::new);
.flatMap(cs -> io.vavr.collection.Stream.of(another.samples)
.find(as -> cs.labels.equals(as.labels))
.map(as -> cs.toBuilder().value(transform.apply(cs.value, as.value)))
.map(Sample.SampleBuilder::build)
.toJavaStream())
.toArray(Sample[]::new);
return ss.length > 0 ? SampleFamily.build(this.context, ss) : EMPTY;
}
......@@ -423,6 +453,11 @@ public class SampleFamily {
return a.equals(b);
}
public SampleFamily downsampling(final DownsamplingType type) {
ExpressionParsingContext.get().ifPresent(it -> it.downsampling = type);
return this;
}
/**
* The parsing context holds key results more than sample collection.
*/
......
......@@ -18,7 +18,7 @@
package org.apache.skywalking.oap.meter.analyzer.dsl;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
import org.junit.Test;
......@@ -52,37 +52,53 @@ public class ExpressionParsingTest {
@Parameterized.Parameters(name = "{index}: {0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
// {
// "mini",
// "foo.instance(['service'], ['host'])",
// ExpressionParsingContext.builder()
// .downsampling(DownsamplingType.AVG)
// .scopeLabels(Arrays.asList("service", "host"))
// .scopeType(ScopeType.SERVICE_INSTANCE)
// .aggregationLabels(Lists.newArrayList()).build(),
// false,
// },
{
"all",
"latest (foo - 1).tagEqual('bar', '1').sum(['tt']).irate().histogram().histogram_percentile([50,99]).service(['rr'])",
"(foo - 1).tagEqual('bar', '1').sum(['tt']).irate().histogram().histogram_percentile([50,99]).service(['rr']).downsampling(LATEST)",
ExpressionParsingContext.builder()
.samples(Collections.singletonList("foo"))
.scopeType(ScopeType.SERVICE)
.scopeLabels(Collections.singletonList("rr"))
.aggregationLabels(Collections.singletonList("tt"))
.scopeLabels(Sets.newHashSet("rr"))
.aggregationLabels(Sets.newHashSet("tt"))
.downsampling(DownsamplingType.LATEST)
.isHistogram(true)
.percentiles(new int[]{50, 99}).build(),
false,
},
{
"sumThenAvg",
"(foo - 1).tagEqual('bar', '1').sum(['tt']).irate().histogram().histogram_percentile([50,99]).service(['rr']).avg(['tt'])",
ExpressionParsingContext.builder()
.samples(Collections.singletonList("foo"))
.scopeType(ScopeType.SERVICE)
.scopeLabels(Sets.newHashSet("rr"))
.aggregationLabels(Sets.newHashSet("tt"))
.downsampling(DownsamplingType.AVG)
.isHistogram(true)
.percentiles(new int[]{50, 99}).build(),
false,
},
{
"avgThenOthersThenSum",
"(foo - 1).tagEqual('bar', '1').avg(['tt']).irate().histogram().histogram_percentile([50,99]).service(['rr']).sum(['tt']).downsampling(SUM)",
ExpressionParsingContext.builder()
.samples(Collections.singletonList("foo"))
.scopeType(ScopeType.SERVICE)
.scopeLabels(Sets.newHashSet("rr"))
.aggregationLabels(Sets.newHashSet("tt"))
.downsampling(DownsamplingType.SUM)
.isHistogram(true)
.percentiles(new int[]{50, 99}).build(),
false,
},
{
"sameSamples",
"(node_cpu_seconds_total.sum(['node_identifier_host_name']) - node_cpu_seconds_total.tagEqual('mode', 'idle').sum(['node_identifier_host_name'])).service(['node_identifier_host_name']) ",
ExpressionParsingContext.builder()
.samples(Collections.singletonList("node_cpu_seconds_total"))
.scopeType(ScopeType.SERVICE)
.scopeLabels(Collections.singletonList("node_identifier_host_name"))
.aggregationLabels(Lists.newArrayList("node_identifier_host_name" , "node_identifier_host_name"))
.scopeLabels(Sets.newHashSet("node_identifier_host_name"))
.aggregationLabels(Sets.newHashSet("node_identifier_host_name"))
.downsampling(DownsamplingType.AVG)
.isHistogram(false).build(),
false,
......@@ -108,4 +124,4 @@ public class ExpressionParsingTest {
}
assertThat(r, is(want));
}
}
\ No newline at end of file
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.meter.function.sum;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
import org.apache.skywalking.oap.server.core.analysis.meter.function.MeterFunction;
import org.apache.skywalking.oap.server.core.analysis.metrics.LongValueHolder;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.Entrance;
import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.SourceFrom;
import org.apache.skywalking.oap.server.core.query.sql.Function;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
@ToString
@MeterFunction(functionName = "sum")
public abstract class SumFunction extends Metrics implements AcceptableValue<Long>, LongValueHolder {
protected static final String VALUE = "value";
@Setter
@Getter
@Column(columnName = ENTITY_ID, length = 512)
private String entityId;
@Setter
@Getter
@Column(columnName = InstanceTraffic.SERVICE_ID)
private String serviceId;
@Getter
@Setter
@Column(columnName = VALUE, dataType = Column.ValueDataType.COMMON_VALUE, function = Function.Sum)
private long value;
@Entrance
public final void combine(@SourceFrom long value) {
setValue(this.value + value);
}
@Override
public final boolean combine(Metrics metrics) {
final SumFunction sumFunc = (SumFunction) metrics;
combine(sumFunc.getValue());
return true;
}
@Override
public final void calculate() {
}
@Override
public Metrics toHour() {
final SumFunction metrics = (SumFunction) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInHour());
metrics.setServiceId(getServiceId());
metrics.setValue(getValue());
return metrics;
}
@Override
public Metrics toDay() {
final SumFunction metrics = (SumFunction) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInDay());
metrics.setServiceId(getServiceId());
metrics.setValue(getValue());
return metrics;
}
@Override
public int remoteHashCode() {
return getEntityId().hashCode();
}
@Override
public void deserialize(final RemoteData remoteData) {
setValue(remoteData.getDataLongs(0));
setTimeBucket(remoteData.getDataLongs(1));
setEntityId(remoteData.getDataStrings(0));
setServiceId(remoteData.getDataStrings(1));
}
@Override
public RemoteData.Builder serialize() {
final RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.addDataLongs(getValue());
remoteBuilder.addDataLongs(getTimeBucket());
remoteBuilder.addDataStrings(getEntityId());
remoteBuilder.addDataStrings(getServiceId());
return remoteBuilder;
}
@Override
public String id() {
return getTimeBucket() + Const.ID_CONNECTOR + getEntityId();
}
@Override
public void accept(final MeterEntity entity, final Long value) {
setEntityId(entity.id());
setServiceId(entity.serviceId());
setValue(getValue() + value);
}
@Override
public Class<? extends StorageHashMapBuilder<?>> builder() {
return SumStorageBuilder.class;
}
public static class SumStorageBuilder implements StorageHashMapBuilder<SumFunction> {
@Override
public SumFunction storage2Entity(final Map<String, Object> dbMap) {
final SumFunction metrics = new SumFunction() {
@Override
public AcceptableValue<Long> createNew() {
throw new UnexpectedException("createNew should not be called");
}
};
metrics.setValue(((Number) dbMap.get(VALUE)).longValue());
metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
metrics.setServiceId((String) dbMap.get(InstanceTraffic.SERVICE_ID));
metrics.setEntityId((String) dbMap.get(ENTITY_ID));
return metrics;
}
@Override
public Map<String, Object> entity2Storage(final SumFunction storageData) {
final Map<String, Object> map = new HashMap<>();
map.put(VALUE, storageData.getValue());
map.put(TIME_BUCKET, storageData.getTimeBucket());
map.put(InstanceTraffic.SERVICE_ID, storageData.getServiceId());
map.put(ENTITY_ID, storageData.getEntityId());
return map;
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof SumFunction)) {
return false;
}
final SumFunction function = (SumFunction) o;
return Objects.equals(getEntityId(), function.getEntityId())
&& Objects.equals(getTimeBucket(), function.getTimeBucket());
}
@Override
public int hashCode() {
return Objects.hash(getEntityId(), getTimeBucket());
}
}
......@@ -33,4 +33,4 @@ expSuffix: instance(['service'], ['instance'])
metricPrefix: log
metricsRules:
- name: count_info
exp: log_count.tagEqual('level', 'INFO').sum(['service', 'instance'])
exp: log_count.tagEqual('level', 'INFO').sum(['service', 'instance']).downsampling(SUM)
......@@ -153,9 +153,9 @@ public class LogE2E extends SkyWalkingTestAdapter {
LOGGER.info("{}: {}", metricsName, instanceMetrics);
final AtLeastOneOfMetricsMatcher instanceRespTimeMatcher = new AtLeastOneOfMetricsMatcher();
final MetricsValueMatcher greaterThanZero = new MetricsValueMatcher();
greaterThanZero.setValue("gt 0");
instanceRespTimeMatcher.setValue(greaterThanZero);
final MetricsValueMatcher greaterThanOne = new MetricsValueMatcher();
greaterThanOne.setValue("gt 1");
instanceRespTimeMatcher.setValue(greaterThanOne);
instanceRespTimeMatcher.verify(instanceMetrics.getValues());
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册