From 3a4ee08e54bd3f08441f1023fd25442d6a2badde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=99=9F=20Wu=20Sheng?= Date: Tue, 20 Jul 2021 07:50:27 +0800 Subject: [PATCH] Enhance persistent session timeout mechanism. (#7334) Fix bug, the enhanced session could cache the metadata metrics(hot entity) forever. A new timeout mechanism is designed for avoiding this specific case. Optimize this timeout mechanism, make it different for ES(one index per day) and non-ES storage implementation. --- CHANGES.md | 3 ++ .../oap/server/core/CoreModuleProvider.java | 10 ++++ .../worker/MetricsPersistentWorker.java | 47 ++++++++++++++++--- .../worker/MetricsStreamProcessor.java | 25 +++++++--- .../oap/server/core/storage/IMetricsDAO.java | 18 +++++++ .../elasticsearch/base/MetricsEsDAO.java | 25 ++++++++++ 6 files changed, 115 insertions(+), 13 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index c1fce5a390..52fd3222cc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -15,6 +15,7 @@ Release Notes. the master branch codes, please don't use in production environments. #### Java Agent + * Supports modifying span attributes in async mode. * Agent supports the collection of JVM arguments and jar dependency information. * [Temporary] Support authentication for log report channel. This feature and grpc channel is going to be removed after @@ -108,6 +109,8 @@ Release Notes. * Optimization: Concurrency mode of execution stage for metrics is removed(added in 8.5.0). Only concurrency of prepare stage is meaningful and kept. * Fix -meters metrics topic isn't created with namespace issue +* Enhance persistent session timeout mechanism. Because the enhanced session could cache the metadata metrics forever, + new timeout mechanism is designed for avoiding this specific case. #### UI diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java index 31f5112903..1be58a3280 100755 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java @@ -290,10 +290,20 @@ public class CoreModuleProvider extends ModuleProvider { this.registerServiceImplementation( UITemplateManagementService.class, new UITemplateManagementService(getManager())); + if (moduleConfig.getMetricsDataTTL() < 2) { + throw new ModuleStartException( + "Metric TTL should be at least 2 days, current value is " + moduleConfig.getMetricsDataTTL()); + } + if (moduleConfig.getRecordDataTTL() < 2) { + throw new ModuleStartException( + "Record TTL should be at least 2 days, current value is " + moduleConfig.getRecordDataTTL()); + } + final MetricsStreamProcessor metricsStreamProcessor = MetricsStreamProcessor.getInstance(); metricsStreamProcessor.setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession()); metricsStreamProcessor.setL1FlushPeriod(moduleConfig.getL1FlushPeriod()); metricsStreamProcessor.setStorageSessionTimeout(moduleConfig.getStorageSessionTimeout()); + metricsStreamProcessor.setMetricsDataTTL(moduleConfig.getMetricsDataTTL()); TopNStreamProcessor.getInstance().setTopNWorkerReportCycle(moduleConfig.getTopNReportPeriod()); apdexThresholdConfig = new ApdexThresholdConfig(this); ApdexMetrics.setDICT(apdexThresholdConfig); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java index 65e31a4c35..172eb048d9 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java @@ -78,11 +78,15 @@ public class MetricsPersistentWorker extends PersistenceWorker { * every {@link #persistentMod} periods. And minute level workers execute every time. */ private int persistentMod; + /** + * @since 8.7.0 TTL settings from {@link org.apache.skywalking.oap.server.core.CoreModuleConfig#getMetricsDataTTL()} + */ + private int metricsDataTTL; MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO, AbstractWorker nextAlarmWorker, AbstractWorker nextExportWorker, MetricsTransWorker transWorker, boolean enableDatabaseSession, boolean supportUpdate, - long storageSessionTimeout) { + long storageSessionTimeout, int metricsDataTTL) { super(moduleDefineHolder, new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData())); this.model = model; this.context = new HashMap<>(100); @@ -95,6 +99,7 @@ public class MetricsPersistentWorker extends PersistenceWorker { this.sessionTimeout = storageSessionTimeout; this.persistentCounter = 0; this.persistentMod = 1; + this.metricsDataTTL = metricsDataTTL; String name = "METRICS_L2_AGGREGATION"; int size = BulkConsumePool.Creator.recommendMaxSize() / 8; @@ -125,11 +130,16 @@ public class MetricsPersistentWorker extends PersistenceWorker { /** * Create the leaf and down-sampling MetricsPersistentWorker, no next step. */ - MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO, - boolean enableDatabaseSession, boolean supportUpdate, long storageSessionTimeout) { + MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, + Model model, + IMetricsDAO metricsDAO, + boolean enableDatabaseSession, + boolean supportUpdate, + long storageSessionTimeout, + int metricsDataTTL) { this(moduleDefineHolder, model, metricsDAO, null, null, null, - enableDatabaseSession, supportUpdate, storageSessionTimeout + enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL ); // For a down-sampling metrics, we prolong the session timeout for 4 times, nearly 5 minutes. // And add offset according to worker creation sequence, to avoid context clear overlap, @@ -246,10 +256,33 @@ public class MetricsPersistentWorker extends PersistenceWorker { * Load data from the storage, if {@link #enableDatabaseSession} == true, only load data when the id doesn't exist. */ private void loadFromStorage(List metrics) { + final long currentTimeMillis = System.currentTimeMillis(); try { - List notInCacheMetrics = metrics.stream() - .filter(m -> !context.containsKey(m) || !enableDatabaseSession) - .collect(Collectors.toList()); + List notInCacheMetrics = + metrics.stream() + .filter(m -> { + final Metrics cachedValue = context.get(m); + // Not cached or session disabled, the metric could be tagged `not in cache`. + if (cachedValue == null || !enableDatabaseSession) { + return true; + } + // The metric is in the cache, but still we have to check + // whether the cache is expired due to TTL. + // This is a cache-DB inconsistent case: + // Metrics keep coming due to traffic, but the entity in the + // database has been removed due to TTL. + if (!model.isTimeRelativeID() && supportUpdate) { + // Mostly all updatable metadata level metrics are required to do this check. + + if (metricsDAO.isExpiredCache(model, cachedValue, currentTimeMillis, metricsDataTTL)) { + // The expired metrics should be tagged `not in cache` directly. + return true; + } + } + + return false; + }) + .collect(Collectors.toList()); if (notInCacheMetrics.isEmpty()) { return; } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java index 6c323f4744..1fb1d41ae7 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java @@ -87,6 +87,11 @@ public class MetricsStreamProcessor implements StreamProcessor { */ @Setter private long storageSessionTimeout = 70_000; + /** + * @since 8.7.0 TTL settings from {@link org.apache.skywalking.oap.server.core.CoreModuleConfig#getMetricsDataTTL()} + */ + @Setter + private int metricsDataTTL = 3; public static MetricsStreamProcessor getInstance() { return PROCESSOR; @@ -157,12 +162,16 @@ public class MetricsStreamProcessor implements StreamProcessor { if (supportDownSampling) { if (configService.shouldToHour()) { Model model = modelSetter.add( - metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Hour), false); + metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Hour), + false + ); hourPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate); } if (configService.shouldToDay()) { Model model = modelSetter.add( - metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Day), false); + metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Day), + false + ); dayPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate); } @@ -171,7 +180,9 @@ public class MetricsStreamProcessor implements StreamProcessor { } Model model = modelSetter.add( - metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Minute), false); + metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Minute), + false + ); MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker( moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate); @@ -197,8 +208,8 @@ public class MetricsStreamProcessor implements StreamProcessor { ExportWorker exportWorker = new ExportWorker(moduleDefineHolder); MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker( - moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker, enableDatabaseSession, - supportUpdate, storageSessionTimeout + moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker, + enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL ); persistentWorkers.add(minutePersistentWorker); @@ -210,7 +221,9 @@ public class MetricsStreamProcessor implements StreamProcessor { Model model, boolean supportUpdate) { MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker( - moduleDefineHolder, model, metricsDAO, enableDatabaseSession, supportUpdate, storageSessionTimeout); + moduleDefineHolder, model, metricsDAO, + enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL + ); persistentWorkers.add(persistentWorker); return persistentWorker; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java index 2b4c05a7cd..4320a83cd9 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java @@ -20,6 +20,8 @@ package org.apache.skywalking.oap.server.core.storage; import java.io.IOException; import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; @@ -54,4 +56,20 @@ public interface IMetricsDAO extends DAO { * executed ASAP. */ UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException; + + /** + * Calculate the expired status of the metric by given current timestamp, metric and TTL. + * + * @param model of the given cached value + * @param cachedValue is a metric instance + * @param currentTimeMillis current system time of OAP. + * @param ttl from core setting. + * @return true if the metric is expired. + */ + default boolean isExpiredCache(Model model, Metrics cachedValue, long currentTimeMillis, int ttl) { + final long metricTimestamp = TimeBucket.getTimestamp( + cachedValue.getTimeBucket(), model.getDownsampling()); + // If the cached metric is older than the TTL indicated. + return currentTimeMillis - metricTimestamp > TimeUnit.DAYS.toMillis(ttl); + } } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java index 0f204c9b80..3574a81fcb 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java @@ -23,7 +23,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.core.analysis.DownSampling; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder; @@ -34,6 +37,7 @@ import org.apache.skywalking.oap.server.library.client.request.UpdateRequest; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.IndicesMetadataCache; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.joda.time.DateTime; import static java.util.stream.Collectors.groupingBy; @@ -117,4 +121,25 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO { String id = IndexController.INSTANCE.generateDocId(model, metrics.id()); return getClient().prepareUpdate(modelName, id, builder); } + + @Override + public boolean isExpiredCache(final Model model, + final Metrics cachedValue, + final long currentTimeMillis, + final int ttl) { + final long metricTimestamp = TimeBucket.getTimestamp( + cachedValue.getTimeBucket(), model.getDownsampling()); + // Fast fail check. If the duration is still less than TTL - 1 days(absolute) + // the cache should not be expired. + if (currentTimeMillis - metricTimestamp < TimeUnit.DAYS.toMillis(ttl - 1)) { + return false; + } + final long deadline = Long.parseLong(new DateTime(currentTimeMillis).plusDays(-ttl).toString("yyyyMMdd")); + final long timeBucket = TimeBucket.getTimeBucket(cachedValue.getTimeBucket(), DownSampling.Day); + // If time bucket is earlier or equals(mostly) the deadline, then the cached metric is expired. + if (timeBucket <= deadline) { + return true; + } + return false; + } } -- GitLab