未验证 提交 d8fce128 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Cache enhancement - don't read new metrics from database in minute (#10085)

上级 8c31172c
......@@ -7,6 +7,20 @@
* Add `ServerStatusService` in the core module to provide a new way to expose booting status to other modules.
* Adds Micrometer as a new component.(ID=141)
* Refactor session cache in MetricsPersistentWorker.
* Cache enhancement - don't read new metrics from database in minute dimensionality.
```
// When
// (1) the time bucket of the server's latest stability status is provided
// 1.1 the OAP has booted successfully
// 1.2 the current dimensionality is in minute.
// (2) the metrics are from the time after the timeOfLatestStabilitySts
// (3) the metrics don't exist in the cache
// the kernel should NOT try to load it from the database.
//
// Notice, about condition (2),
// for the specific minute of booted successfully, the metrics are expected to load from database when
// it doesn't exist in the cache.
```
#### UI
......
......@@ -25,11 +25,15 @@ import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.status.ServerStatusService;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.core.storage.model.Model;
......@@ -89,6 +93,16 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
* @since 8.7.0 TTL settings from {@link org.apache.skywalking.oap.server.core.CoreModuleConfig#getMetricsDataTTL()}
*/
private int metricsDataTTL;
/**
* @since 9.4.0
*/
private final ServerStatusService serverStatusService;
/**
* The time bucket is 0 or in minute dimensionality of the system in the latest stability status.
*
* @since 9.4.0
*/
private long timeOfLatestStabilitySts = 0;
MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
......@@ -145,6 +159,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
new MetricsTag.Keys("status"), new MetricsTag.Values("cached")
);
SESSION_TIMEOUT_OFFSITE_COUNTER++;
serverStatusService = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ServerStatusService.class);
}
/**
......@@ -191,6 +206,11 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
return Collections.emptyList();
}
if (model.getDownsampling().equals(DownSampling.Minute)) {
timeOfLatestStabilitySts = TimeBucket.getMinuteTimeBucket(
serverStatusService.getBootingStatus().getUptime());
}
/*
* Hard coded the max size. This only affect the multiIDRead if the data doesn't hit the cache.
*/
......@@ -299,7 +319,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
List<Metrics> notInCacheMetrics =
metrics.stream()
.filter(m -> {
final Metrics cachedValue = sessionCache.get(m);
final Metrics cachedValue = requireInitialization(m);
// the metric is tagged `not in cache`.
if (cachedValue == null) {
return true;
......@@ -340,6 +360,41 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
sessionCache.removeExpired();
}
/**
* Check the metrics whether in the cache, and whether the worker should go further to load from database.
*
* @param metrics the metrics in the streaming process.
* @return metrics in cache or null if try to read the metrics from the database.
*/
private Metrics requireInitialization(Metrics metrics) {
final Metrics cached = sessionCache.get(metrics);
// All cached metrics, it at least had been written once.
if (cached != null) {
return cached;
}
// When
// (1) the time bucket of the server's latest stability status is provided
// 1.1 the OAP has booted successfully
// 1.2 the current dimensionality is in minute.
// (2) the metrics are from the time after the timeOfLatestStabilitySts
// (3) the metrics don't exist in the cache
// the kernel should NOT try to load it from the database.
//
// Notice, about condition (2),
// for the specific minute of booted successfully, the metrics are expected to load from database when
// it doesn't exist in the cache.
if (timeOfLatestStabilitySts > 0 &&
metrics.getTimeBucket() > timeOfLatestStabilitySts
&& cached == null) {
// Return metrics as input to avoid reading from database.
return metrics;
}
return null;
}
/**
* Metrics queue processor, merge the received metrics if existing one with same ID(s) and time bucket.
*
......
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.status;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
......@@ -35,6 +36,7 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
@RequiredArgsConstructor
public class ServerStatusService implements Service {
private final ModuleManager manager;
@Getter
private BootingStatus bootingStatus = new BootingStatus();
public void bootedNow(long uptime) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册