未验证 提交 d8a5e8c1 编写于 作者: D Daming 提交者: GitHub

eliminate `now` error at a fixed rate schedule executor (#7145)

上级 3541c325
......@@ -42,6 +42,7 @@ Release Notes.
* Upgrade commons-lang3 to avoid potential NPE in some JDK versions.
* OAL supports generating metrics from events.
* Support endpoint name grouping by OpenAPI definitions.
* Fix CounterWindow increase computing issue.
#### UI
* Fix the date component for log conditions.
......
......@@ -51,4 +51,10 @@ public class Sample {
double nv = transform.apply(i._2, i._1);
return newValue(ignored -> nv);
}
Sample increase(Function2<Double, Long, Double> transform) {
Tuple2<Long, Double> i = CounterWindow.INSTANCE.pop(name, labels, value, timestamp);
double nv = transform.apply(i._2, i._1);
return newValue(ignored -> nv);
}
}
......@@ -288,7 +288,20 @@ public class SampleFamily {
}
public SampleFamily irate() {
return rate("PT1S");
if (this == EMPTY) {
return EMPTY;
}
return SampleFamily.build(
this.context,
Arrays.stream(samples)
.map(sample -> sample.increase(
(lowerBoundValue, lowerBoundTime) -> {
final long timeDiff = (sample.timestamp - lowerBoundTime) / 1000;
return timeDiff < 1L ? 0.0 : (sample.value - lowerBoundValue) / timeDiff;
}
))
.toArray(Sample[]::new)
);
}
@SuppressWarnings(value = "unchecked")
......
......@@ -22,8 +22,8 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import java.util.LinkedList;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
......@@ -42,20 +42,47 @@ public class CounterWindow {
public static final CounterWindow INSTANCE = new CounterWindow();
private final Map<ID, Tuple2<Long, Double>> lastElementMap = Maps.newHashMap();
private final Map<ID, Queue<Tuple2<Long, Double>>> windows = Maps.newHashMap();
public Tuple2<Long, Double> increase(String name, ImmutableMap<String, String> labels, Double value, long windowSize, long now) {
ID id = new ID(name, labels);
if (!windows.containsKey(id)) {
windows.put(id, new LinkedList<>());
windows.put(id, new PriorityQueue<>());
}
Queue<Tuple2<Long, Double>> window = windows.get(id);
window.offer(Tuple.of(now, value));
Tuple2<Long, Double> ps = window.element();
if ((now - ps._1) >= windowSize) {
window.remove();
long waterLevel = now - windowSize;
Tuple2<Long, Double> peek = window.peek();
if (peek._1 > waterLevel) {
return peek;
}
Tuple2<Long, Double> result = peek;
while (peek._1 < waterLevel) {
result = window.poll();
peek = window.element();
}
// Choose the closed slot to the expected timestamp
if (waterLevel - result._1 <= peek._1 - waterLevel) {
return result;
}
return peek;
}
public Tuple2<Long, Double> pop(String name, ImmutableMap<String, String> labels, Double value, long now) {
ID id = new ID(name, labels);
Tuple2<Long, Double> element = Tuple.of(now, value);
Tuple2<Long, Double> result = lastElementMap.get(id);
lastElementMap.put(id, element);
if (result == null) {
return element;
}
return ps;
return result;
}
public void reset() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.meter.analyzer.dsl.counter;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.vavr.Tuple2;
import java.time.Duration;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
import static java.time.Instant.parse;
public class CounterWindowTest {
public static List<Tuple2<Long, Double>> parameters() {
return Lists.newArrayList(
new Tuple2<>(parse("2020-09-11T11:11:01.03Z").toEpochMilli(), 10d),
new Tuple2<>(parse("2020-09-11T11:11:15.99Z").toEpochMilli(), 11d),
new Tuple2<>(parse("2020-09-11T11:11:31.00Z").toEpochMilli(), 12d),
new Tuple2<>(parse("2020-09-11T11:11:46.09Z").toEpochMilli(), 13d),
new Tuple2<>(parse("2020-09-11T11:12:00.97Z").toEpochMilli(), 14d),
new Tuple2<>(parse("2020-09-11T11:11:00.97Z").toEpochMilli(), 15d),
new Tuple2<>(parse("2020-09-11T11:12:16.60Z").toEpochMilli(), 16d),
new Tuple2<>(parse("2020-09-11T11:12:31.66Z").toEpochMilli(), 17d)
);
}
@Test
public void testPT15S() {
double[] actuals = parameters().stream().mapToDouble(e -> {
Tuple2<Long, Double> increase = CounterWindow.INSTANCE.increase(
"test", ImmutableMap.<String, String>builder().build(), e._2,
Duration.parse("PT15S").getSeconds() * 1000, e._1
);
return e._2 - increase._2;
}).toArray();
Assert.assertArrayEquals(new double[] {0, 1d, 1d, 1d, 1d, 0d, 2d, 1d}, actuals, 0.d);
}
@Test
public void testPT35S() {
double[] actuals = parameters().stream().mapToDouble(e -> {
Tuple2<Long, Double> increase = CounterWindow.INSTANCE.increase(
"test", ImmutableMap.<String, String>builder().build(), e._2,
Duration.parse("PT35S").getSeconds() * 1000, e._1
);
return e._2 - increase._2;
}).toArray();
Assert.assertArrayEquals(new double[] {0, 1d, 2d, 2d, 2d, 0d, 3d, 3d}, actuals, 0.d);
}
@Test
public void testPT1M() {
double[] actuals = parameters().stream().mapToDouble(e -> {
Tuple2<Long, Double> increase = CounterWindow.INSTANCE.increase(
"test", ImmutableMap.<String, String>builder().build(), e._2,
Duration.parse("PT1M").getSeconds() * 1000, e._1
);
return e._2 - increase._2;
}).toArray();
Assert.assertArrayEquals(new double[] {0, 1d, 2d, 3d, 4d, 0d, 5d, 5d}, actuals, 0.d);
}
@Test
public void testPT2M() {
double[] actuals = parameters().stream().mapToDouble(e -> {
Tuple2<Long, Double> increase = CounterWindow.INSTANCE.increase(
"test", ImmutableMap.<String, String>builder().build(), e._2,
Duration.parse("PT2M").getSeconds() * 1000, e._1
);
return e._2 - increase._2;
}).toArray();
Assert.assertArrayEquals(new double[] {0, 1d, 2d, 3d, 4d, 0d, 1d, 2d}, actuals, 0.d);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册