未验证 提交 3a4ee08e 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Enhance persistent session timeout mechanism. (#7334)

Fix bug, the enhanced session could cache the metadata metrics(hot entity) forever. A new timeout mechanism is designed for avoiding this specific case.

Optimize this timeout mechanism, make it different for ES(one index per day) and non-ES storage implementation.
上级 4003c4b7
......@@ -15,6 +15,7 @@ Release Notes.
the master branch codes, please don't use in production environments.
#### Java Agent
* Supports modifying span attributes in async mode.
* Agent supports the collection of JVM arguments and jar dependency information.
* [Temporary] Support authentication for log report channel. This feature and grpc channel is going to be removed after
......@@ -108,6 +109,8 @@ Release Notes.
* Optimization: Concurrency mode of execution stage for metrics is removed(added in 8.5.0). Only concurrency of prepare
stage is meaningful and kept.
* Fix -meters metrics topic isn't created with namespace issue
* Enhance persistent session timeout mechanism. Because the enhanced session could cache the metadata metrics forever,
new timeout mechanism is designed for avoiding this specific case.
#### UI
......
......@@ -290,10 +290,20 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(
UITemplateManagementService.class, new UITemplateManagementService(getManager()));
if (moduleConfig.getMetricsDataTTL() < 2) {
throw new ModuleStartException(
"Metric TTL should be at least 2 days, current value is " + moduleConfig.getMetricsDataTTL());
}
if (moduleConfig.getRecordDataTTL() < 2) {
throw new ModuleStartException(
"Record TTL should be at least 2 days, current value is " + moduleConfig.getRecordDataTTL());
}
final MetricsStreamProcessor metricsStreamProcessor = MetricsStreamProcessor.getInstance();
metricsStreamProcessor.setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
metricsStreamProcessor.setL1FlushPeriod(moduleConfig.getL1FlushPeriod());
metricsStreamProcessor.setStorageSessionTimeout(moduleConfig.getStorageSessionTimeout());
metricsStreamProcessor.setMetricsDataTTL(moduleConfig.getMetricsDataTTL());
TopNStreamProcessor.getInstance().setTopNWorkerReportCycle(moduleConfig.getTopNReportPeriod());
apdexThresholdConfig = new ApdexThresholdConfig(this);
ApdexMetrics.setDICT(apdexThresholdConfig);
......
......@@ -78,11 +78,15 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
* every {@link #persistentMod} periods. And minute level workers execute every time.
*/
private int persistentMod;
/**
* @since 8.7.0 TTL settings from {@link org.apache.skywalking.oap.server.core.CoreModuleConfig#getMetricsDataTTL()}
*/
private int metricsDataTTL;
MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
MetricsTransWorker transWorker, boolean enableDatabaseSession, boolean supportUpdate,
long storageSessionTimeout) {
long storageSessionTimeout, int metricsDataTTL) {
super(moduleDefineHolder, new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData()));
this.model = model;
this.context = new HashMap<>(100);
......@@ -95,6 +99,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
this.sessionTimeout = storageSessionTimeout;
this.persistentCounter = 0;
this.persistentMod = 1;
this.metricsDataTTL = metricsDataTTL;
String name = "METRICS_L2_AGGREGATION";
int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
......@@ -125,11 +130,16 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
/**
* Create the leaf and down-sampling MetricsPersistentWorker, no next step.
*/
MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
boolean enableDatabaseSession, boolean supportUpdate, long storageSessionTimeout) {
MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder,
Model model,
IMetricsDAO metricsDAO,
boolean enableDatabaseSession,
boolean supportUpdate,
long storageSessionTimeout,
int metricsDataTTL) {
this(moduleDefineHolder, model, metricsDAO,
null, null, null,
enableDatabaseSession, supportUpdate, storageSessionTimeout
enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL
);
// For a down-sampling metrics, we prolong the session timeout for 4 times, nearly 5 minutes.
// And add offset according to worker creation sequence, to avoid context clear overlap,
......@@ -246,10 +256,33 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
* Load data from the storage, if {@link #enableDatabaseSession} == true, only load data when the id doesn't exist.
*/
private void loadFromStorage(List<Metrics> metrics) {
final long currentTimeMillis = System.currentTimeMillis();
try {
List<Metrics> notInCacheMetrics = metrics.stream()
.filter(m -> !context.containsKey(m) || !enableDatabaseSession)
.collect(Collectors.toList());
List<Metrics> notInCacheMetrics =
metrics.stream()
.filter(m -> {
final Metrics cachedValue = context.get(m);
// Not cached or session disabled, the metric could be tagged `not in cache`.
if (cachedValue == null || !enableDatabaseSession) {
return true;
}
// The metric is in the cache, but still we have to check
// whether the cache is expired due to TTL.
// This is a cache-DB inconsistent case:
// Metrics keep coming due to traffic, but the entity in the
// database has been removed due to TTL.
if (!model.isTimeRelativeID() && supportUpdate) {
// Mostly all updatable metadata level metrics are required to do this check.
if (metricsDAO.isExpiredCache(model, cachedValue, currentTimeMillis, metricsDataTTL)) {
// The expired metrics should be tagged `not in cache` directly.
return true;
}
}
return false;
})
.collect(Collectors.toList());
if (notInCacheMetrics.isEmpty()) {
return;
}
......
......@@ -87,6 +87,11 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
*/
@Setter
private long storageSessionTimeout = 70_000;
/**
* @since 8.7.0 TTL settings from {@link org.apache.skywalking.oap.server.core.CoreModuleConfig#getMetricsDataTTL()}
*/
@Setter
private int metricsDataTTL = 3;
public static MetricsStreamProcessor getInstance() {
return PROCESSOR;
......@@ -157,12 +162,16 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
if (supportDownSampling) {
if (configService.shouldToHour()) {
Model model = modelSetter.add(
metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Hour), false);
metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Hour),
false
);
hourPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
}
if (configService.shouldToDay()) {
Model model = modelSetter.add(
metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Day), false);
metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Day),
false
);
dayPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
}
......@@ -171,7 +180,9 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
}
Model model = modelSetter.add(
metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Minute), false);
metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Minute),
false
);
MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(
moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate);
......@@ -197,8 +208,8 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
ExportWorker exportWorker = new ExportWorker(moduleDefineHolder);
MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(
moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker, enableDatabaseSession,
supportUpdate, storageSessionTimeout
moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker,
enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL
);
persistentWorkers.add(minutePersistentWorker);
......@@ -210,7 +221,9 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
Model model,
boolean supportUpdate) {
MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(
moduleDefineHolder, model, metricsDAO, enableDatabaseSession, supportUpdate, storageSessionTimeout);
moduleDefineHolder, model, metricsDAO,
enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL
);
persistentWorkers.add(persistentWorker);
return persistentWorker;
......
......@@ -20,6 +20,8 @@ package org.apache.skywalking.oap.server.core.storage;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
......@@ -54,4 +56,20 @@ public interface IMetricsDAO extends DAO {
* executed ASAP.
*/
UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException;
/**
* Calculate the expired status of the metric by given current timestamp, metric and TTL.
*
* @param model of the given cached value
* @param cachedValue is a metric instance
* @param currentTimeMillis current system time of OAP.
* @param ttl from core setting.
* @return true if the metric is expired.
*/
default boolean isExpiredCache(Model model, Metrics cachedValue, long currentTimeMillis, int ttl) {
final long metricTimestamp = TimeBucket.getTimestamp(
cachedValue.getTimeBucket(), model.getDownsampling());
// If the cached metric is older than the TTL indicated.
return currentTimeMillis - metricTimestamp > TimeUnit.DAYS.toMillis(ttl);
}
}
......@@ -23,7 +23,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
......@@ -34,6 +37,7 @@ import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.IndicesMetadataCache;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.joda.time.DateTime;
import static java.util.stream.Collectors.groupingBy;
......@@ -117,4 +121,25 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
String id = IndexController.INSTANCE.generateDocId(model, metrics.id());
return getClient().prepareUpdate(modelName, id, builder);
}
@Override
public boolean isExpiredCache(final Model model,
final Metrics cachedValue,
final long currentTimeMillis,
final int ttl) {
final long metricTimestamp = TimeBucket.getTimestamp(
cachedValue.getTimeBucket(), model.getDownsampling());
// Fast fail check. If the duration is still less than TTL - 1 days(absolute)
// the cache should not be expired.
if (currentTimeMillis - metricTimestamp < TimeUnit.DAYS.toMillis(ttl - 1)) {
return false;
}
final long deadline = Long.parseLong(new DateTime(currentTimeMillis).plusDays(-ttl).toString("yyyyMMdd"));
final long timeBucket = TimeBucket.getTimeBucket(cachedValue.getTimeBucket(), DownSampling.Day);
// If time bucket is earlier or equals(mostly) the deadline, then the cached metric is expired.
if (timeBucket <= deadline) {
return true;
}
return false;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册