diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java index 7d4c3ab4ddb330f4faed876953cc89f77400084b..e47171a3d3bfbe9a5f5eb2521843a8820d199be7 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java @@ -37,6 +37,7 @@ public class CoreModuleConfig extends ModuleConfig { @Setter private int gRPCPort; @Setter private int maxConcurrentCallsPerConnection; @Setter private int maxMessageSize; + @Setter private boolean enableDatabaseSession; private final List downsampling; /** * The period of doing data persistence. diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java index f05b776323bb1a8f885df8b40d9e0c2439d5b81f..fa6c48a70e6da6f57de429a4f0708a5dbd80b877 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java @@ -20,65 +20,25 @@ package org.apache.skywalking.oap.server.core; import java.io.IOException; import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule; -import org.apache.skywalking.oap.server.core.analysis.DisableRegister; -import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener; +import org.apache.skywalking.oap.server.core.analysis.*; +import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor; import org.apache.skywalking.oap.server.core.annotation.AnnotationScan; -import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer; -import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache; -import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache; -import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache; -import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache; -import org.apache.skywalking.oap.server.core.cluster.ClusterModule; -import org.apache.skywalking.oap.server.core.cluster.ClusterRegister; -import org.apache.skywalking.oap.server.core.cluster.RemoteInstance; -import org.apache.skywalking.oap.server.core.config.ComponentLibraryCatalogService; -import org.apache.skywalking.oap.server.core.config.ConfigService; -import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService; -import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService; -import org.apache.skywalking.oap.server.core.oal.rt.OALEngine; -import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoader; -import org.apache.skywalking.oap.server.core.query.AggregationQueryService; -import org.apache.skywalking.oap.server.core.query.AlarmQueryService; -import org.apache.skywalking.oap.server.core.query.LogQueryService; -import org.apache.skywalking.oap.server.core.query.MetadataQueryService; -import org.apache.skywalking.oap.server.core.query.MetricQueryService; -import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService; -import org.apache.skywalking.oap.server.core.query.TopologyQueryService; -import org.apache.skywalking.oap.server.core.query.TraceQueryService; -import org.apache.skywalking.oap.server.core.register.service.EndpointInventoryRegister; -import org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister; -import org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister; -import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister; -import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister; -import org.apache.skywalking.oap.server.core.register.service.NetworkAddressInventoryRegister; -import org.apache.skywalking.oap.server.core.register.service.ServiceInstanceInventoryRegister; -import org.apache.skywalking.oap.server.core.register.service.ServiceInventoryRegister; -import org.apache.skywalking.oap.server.core.remote.RemoteSenderService; -import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler; -import org.apache.skywalking.oap.server.core.remote.client.Address; -import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager; +import org.apache.skywalking.oap.server.core.cache.*; +import org.apache.skywalking.oap.server.core.cluster.*; +import org.apache.skywalking.oap.server.core.config.*; +import org.apache.skywalking.oap.server.core.oal.rt.*; +import org.apache.skywalking.oap.server.core.query.*; +import org.apache.skywalking.oap.server.core.register.service.*; +import org.apache.skywalking.oap.server.core.remote.*; +import org.apache.skywalking.oap.server.core.remote.client.*; import org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHandler; -import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; -import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl; -import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister; -import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl; -import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; -import org.apache.skywalking.oap.server.core.source.SourceReceiver; -import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl; +import org.apache.skywalking.oap.server.core.server.*; +import org.apache.skywalking.oap.server.core.source.*; import org.apache.skywalking.oap.server.core.storage.PersistenceTimer; -import org.apache.skywalking.oap.server.core.storage.model.IModelGetter; -import org.apache.skywalking.oap.server.core.storage.model.IModelOverride; -import org.apache.skywalking.oap.server.core.storage.model.IModelSetter; -import org.apache.skywalking.oap.server.core.storage.model.StorageModels; +import org.apache.skywalking.oap.server.core.storage.model.*; import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer; -import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter; -import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter; -import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService; -import org.apache.skywalking.oap.server.library.module.ModuleConfig; -import org.apache.skywalking.oap.server.library.module.ModuleDefine; -import org.apache.skywalking.oap.server.library.module.ModuleProvider; -import org.apache.skywalking.oap.server.library.module.ModuleStartException; -import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; +import org.apache.skywalking.oap.server.core.worker.*; +import org.apache.skywalking.oap.server.library.module.*; import org.apache.skywalking.oap.server.library.server.ServerException; import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer; import org.apache.skywalking.oap.server.library.server.jetty.JettyServer; @@ -96,7 +56,6 @@ public class CoreModuleProvider extends ModuleProvider { private final AnnotationScan annotationScan; private final StorageModels storageModels; private final SourceReceiverImpl receiver; - private StreamAnnotationListener streamAnnotationListener; private OALEngine oalEngine; public CoreModuleProvider() { @@ -120,7 +79,7 @@ public class CoreModuleProvider extends ModuleProvider { } @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException { - streamAnnotationListener = new StreamAnnotationListener(getManager()); + StreamAnnotationListener streamAnnotationListener = new StreamAnnotationListener(getManager()); AnnotationScan scopeScan = new AnnotationScan(); scopeScan.registerListener(new DefaultScopeDefine.Listener()); @@ -200,6 +159,8 @@ public class CoreModuleProvider extends ModuleProvider { this.remoteClientManager = new RemoteClientManager(getManager()); this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager); + + MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession()); } @Override public void start() throws ModuleStartException { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java index 96b59da302c360da13a8f3fe3f09af975877affd..25c312562ba2f57d497fe004024c27ef61639b94 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java @@ -35,6 +35,7 @@ public abstract class Metrics extends StreamData implements StorageData { public static final String ENTITY_ID = "entity_id"; @Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket; + @Getter @Setter private long survivalTime = 0L; public abstract String id(); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PxxMetrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PxxMetrics.java index 1cc88c2a0d1794ce9ffe7907f1152981891f9c02..3754c62d90395d6f7fc89ed267a232d5727d38e9 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PxxMetrics.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PxxMetrics.java @@ -43,7 +43,7 @@ public abstract class PxxMetrics extends Metrics implements IntValueHolder { @Getter @Setter @Column(columnName = DETAIL_GROUP) private IntKeyLongValueArray detailGroup; private final int percentileRank; - private Map detailIndex; + @Getter private Map detailIndex; public PxxMetrics(int percentileRank) { this.percentileRank = percentileRank; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java index 34e6436700b9297665609f72185d9192acecab63..8c2865aa17fe7b3a15a6064567ad96ff01da03fb 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java @@ -18,6 +18,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker; +import java.io.IOException; import java.util.*; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.consumer.*; @@ -40,16 +41,20 @@ public class MetricsPersistentWorker extends PersistenceWorker databaseSession; private final MergeDataCache mergeDataCache; private final IMetricsDAO metricsDAO; private final AbstractWorker nextAlarmWorker; private final AbstractWorker nextExportWorker; private final DataCarrier dataCarrier; + private final boolean enableDatabaseSession; MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO, AbstractWorker nextAlarmWorker, - AbstractWorker nextExportWorker) { + AbstractWorker nextExportWorker, boolean enableDatabaseSession) { super(moduleDefineHolder); this.model = model; + this.databaseSession = new HashMap<>(100); + this.enableDatabaseSession = enableDatabaseSession; this.mergeDataCache = new MergeDataCache<>(); this.metricsDAO = metricsDAO; this.nextAlarmWorker = nextAlarmWorker; @@ -83,23 +88,21 @@ public class MetricsPersistentWorker extends PersistenceWorker cache, List prepareRequests) { + @Override public void prepareBatch(Collection lastCollection, List prepareRequests) { long start = System.currentTimeMillis(); - Collection collection = cache.getLast().collection(); - int i = 0; + int batchGetSize = 2000; Metrics[] metrics = null; - for (Metrics data : collection) { + for (Metrics data : lastCollection) { if (Objects.nonNull(nextExportWorker)) { ExportEvent event = new ExportEvent(data, ExportEvent.EventType.INCREMENT); nextExportWorker.in(event); } - int batchGetSize = 2000; int mod = i % batchGetSize; if (mod == 0) { - int residual = collection.size() - i; + int residual = lastCollection.size() - i; if (residual >= batchGetSize) { metrics = new Metrics[batchGetSize]; } else { @@ -110,23 +113,18 @@ public class MetricsPersistentWorker extends PersistenceWorker dbMetricsMap = metricsDAO.get(model, metrics); + syncStorageToCache(metrics); for (Metrics metric : metrics) { - if (dbMetricsMap.containsKey(metric.id())) { - metric.combine(dbMetricsMap.get(metric.id())); - metric.calculate(); - prepareRequests.add(metricsDAO.prepareBatchUpdate(model, metric)); + Metrics cacheMetric = databaseSession.get(metric); + if (cacheMetric != null) { + cacheMetric.combine(metric); + cacheMetric.calculate(); + prepareRequests.add(metricsDAO.prepareBatchUpdate(model, cacheMetric)); + nextWorker(cacheMetric); } else { prepareRequests.add(metricsDAO.prepareBatchInsert(model, metric)); - } - - if (Objects.nonNull(nextAlarmWorker)) { - nextAlarmWorker.in(metric); - } - if (Objects.nonNull(nextExportWorker)) { - ExportEvent event = new ExportEvent(metric, ExportEvent.EventType.TOTAL); - nextExportWorker.in(event); + nextWorker(metric); } } } catch (Throwable t) { @@ -142,6 +140,16 @@ public class MetricsPersistentWorker extends PersistenceWorker notInCacheIds = new ArrayList<>(); + for (Metrics metric : metrics) { + if (!databaseSession.containsKey(metric)) { + notInCacheIds.add(metric.id()); + } + } + + if (notInCacheIds.size() > 0) { + List metricsList = metricsDAO.multiGet(model, notInCacheIds); + for (Metrics metric : metricsList) { + databaseSession.put(metric, metric); + } + } + } + + @Override public void endOfRound(long tookTime) { + if (enableDatabaseSession) { + Iterator iterator = databaseSession.values().iterator(); + while (iterator.hasNext()) { + Metrics metrics = iterator.next(); + metrics.setSurvivalTime(tookTime + metrics.getSurvivalTime()); + if (metrics.getSurvivalTime() > 70000) { + iterator.remove(); + } + } + } + } + private class PersistentConsumer implements IConsumer { private final MetricsPersistentWorker persistent; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java index e81906e4b5417fcc8ead90511d6bf949cb9b6f71..4ee60ab49fe0eaebf7c6d551ecbc99e6bae7d2fb 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java @@ -19,7 +19,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker; import java.util.*; -import lombok.Getter; +import lombok.*; import org.apache.skywalking.oap.server.core.*; import org.apache.skywalking.oap.server.core.analysis.*; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; @@ -39,6 +39,7 @@ public class MetricsStreamProcessor implements StreamProcessor { private Map, MetricsAggregateWorker> entryWorkers = new HashMap<>(); @Getter private List persistentWorkers = new ArrayList<>(); + @Setter @Getter private boolean enableDatabaseSession; public static MetricsStreamProcessor getInstance() { return PROCESSOR; @@ -100,19 +101,18 @@ public class MetricsStreamProcessor implements StreamProcessor { entryWorkers.put(metricsClass, aggregateWorker); } - private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder moduleDefineHolder, - IMetricsDAO metricsDAO, Model model) { + private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder moduleDefineHolder, IMetricsDAO metricsDAO, Model model) { AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(moduleDefineHolder); ExportWorker exportWorker = new ExportWorker(moduleDefineHolder); - MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker); + MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, enableDatabaseSession); persistentWorkers.add(minutePersistentWorker); return minutePersistentWorker; } private MetricsPersistentWorker worker(ModuleDefineHolder moduleDefineHolder, IMetricsDAO metricsDAO, Model model) { - MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, null, null); + MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, null, null, enableDatabaseSession); persistentWorkers.add(persistentWorker); return persistentWorker; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java index 166eb9c2a658b71af67a6d88c322dc7d247da1d2..88c9a5b283a99a1157198b206c8a765782b0f424 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java @@ -18,8 +18,8 @@ package org.apache.skywalking.oap.server.core.analysis.worker; -import java.util.List; -import org.apache.skywalking.oap.server.core.analysis.data.Window; +import java.util.*; +import org.apache.skywalking.oap.server.core.analysis.data.*; import org.apache.skywalking.oap.server.core.storage.StorageData; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.library.client.request.PrepareRequest; @@ -45,6 +45,8 @@ public abstract class PersistenceWorker prepareRequests); + public abstract void prepareBatch(Collection lastCollection, List prepareRequests); public final void buildBatchRequests(List prepareRequests) { try { - while (getCache().getLast().isWriting()) { + SWCollection last = getCache().getLast(); + while (last.isWriting()) { try { Thread.sleep(10); } catch (InterruptedException e) { @@ -69,8 +72,8 @@ public abstract class PersistenceWorker cache, List prepareRequests) { - cache.getLast().collection().forEach(record -> { + @Override public void prepareBatch(Collection lastCollection, List prepareRequests) { + lastCollection.forEach(record -> { try { prepareRequests.add(recordDAO.prepareBatchInsert(model, record)); } catch (Throwable t) { @@ -96,6 +96,12 @@ public class TopNWorker extends PersistenceWorker get(Model model, Metrics[] metrics) throws IOException; + List multiGet(Model model, List ids) throws IOException; InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java index cedd0740cc6cca42457a45e7b87e63784ac06d77..5a1871649a6daaf4c652b8edc1c735c5d04b7bc3 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java @@ -43,6 +43,8 @@ public enum PersistenceTimer { private CounterMetrics errorCounter; private HistogramMetrics prepareLatency; private HistogramMetrics executeLatency; + private long lastTime = System.currentTimeMillis(); + private final List prepareRequests = new ArrayList<>(50000); PersistenceTimer() { this.debug = System.getProperty("debug") != null; @@ -61,7 +63,7 @@ public enum PersistenceTimer { MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); if (!isStarted) { - Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate( + Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay( new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> logger.error("Extract data and save failure.", t)), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS); @@ -75,10 +77,10 @@ public enum PersistenceTimer { } long startTime = System.currentTimeMillis(); + try { HistogramMetrics.Timer timer = prepareLatency.createTimer(); - List prepareRequests = new LinkedList<>(); try { List persistenceWorkers = new ArrayList<>(); persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers()); @@ -92,6 +94,8 @@ public enum PersistenceTimer { if (worker.flushAndSwitch()) { worker.buildBatchRequests(prepareRequests); } + + worker.endOfRound(System.currentTimeMillis() - lastTime); }); if (debug) { @@ -116,6 +120,9 @@ public enum PersistenceTimer { if (logger.isDebugEnabled()) { logger.debug("Persistence data save finish"); } + + prepareRequests.clear(); + lastTime = System.currentTimeMillis(); } if (debug) { diff --git a/oap-server/server-starter/src/main/assembly/application.yml b/oap-server/server-starter/src/main/assembly/application.yml index bc4b74556389421c91ade1aa5fd9c2ed6f1b56c1..e45666ccf689fb649445fcf5e90cd45b35db0bf4 100644 --- a/oap-server/server-starter/src/main/assembly/application.yml +++ b/oap-server/server-starter/src/main/assembly/application.yml @@ -52,9 +52,9 @@ core: gRPCHost: ${SW_CORE_GRPC_HOST:0.0.0.0} gRPCPort: ${SW_CORE_GRPC_PORT:11800} downsampling: - - Hour - - Day - - Month + - Hour + - Day + - Month # Set a timeout on metrics data. After the timeout has expired, the metrics data will automatically be deleted. enableDataKeeperExecutor: ${SW_CORE_ENABLE_DATA_KEEPER_EXECUTOR:true} # Turn it off then automatically metrics data delete will be close. recordDataTTL: ${SW_CORE_RECORD_DATA_TTL:90} # Unit is minute @@ -62,6 +62,9 @@ core: hourMetricsDataTTL: ${SW_CORE_HOUR_METRIC_DATA_TTL:36} # Unit is hour dayMetricsDataTTL: ${SW_CORE_DAY_METRIC_DATA_TTL:45} # Unit is day monthMetricsDataTTL: ${SW_CORE_MONTH_METRIC_DATA_TTL:18} # Unit is month + # Cache metric data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute, + # the metrics may not be accurate within that minute. + enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true} storage: # elasticsearch: # nameSpace: ${SW_NAMESPACE:""} diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml index ec79843f4dcdbc32aded4332e9f4507675457b61..30d21342bd6fd30962a0c0c57789082501b0f0dc 100644 --- a/oap-server/server-starter/src/main/resources/application.yml +++ b/oap-server/server-starter/src/main/resources/application.yml @@ -62,6 +62,9 @@ core: hourMetricsDataTTL: ${SW_CORE_HOUR_METRIC_DATA_TTL:36} # Unit is hour dayMetricsDataTTL: ${SW_CORE_DAY_METRIC_DATA_TTL:45} # Unit is day monthMetricsDataTTL: ${SW_CORE_MONTH_METRIC_DATA_TTL:18} # Unit is month + # Cache metric data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute, + # the metrics may not be accurate within that minute. + enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true} storage: elasticsearch: nameSpace: ${SW_NAMESPACE:""} diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java index e0403c73589a19f68b4771701ae39451e5341fee..7061531863045bda351f9ff26c65ddc412140c49 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java @@ -39,18 +39,13 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO { this.storageBuilder = storageBuilder; } - @Override public Map get(Model model, Metrics[] metrics) throws IOException { - Map result = new HashMap<>(); + @Override public List multiGet(Model model, List ids) throws IOException { + SearchResponse response = getClient().ids(model.getName(), ids.toArray(new String[0])); - String[] ids = new String[metrics.length]; - for (int i = 0; i < metrics.length; i++) { - ids[i] = metrics[i].id(); - } - - SearchResponse response = getClient().ids(model.getName(), ids); + List result = new ArrayList<>((int)response.getHits().totalHits); for (int i = 0; i < response.getHits().totalHits; i++) { Metrics source = storageBuilder.map2Data(response.getHits().getAt(i).getSourceAsMap()); - result.put(source.id(), source); + result.add(source); } return result; } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java index 7f18b4c2f10fdb036808a77b2640ebb7f3a498df..e071c51dc73bcf73e9c8f589ace6cf364ab22c14 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java @@ -39,18 +39,11 @@ public class H2MetricsDAO extends H2SQLExecutor implements IMetricsDAO { this.storageBuilder = storageBuilder; } - @Override public Map get(Model model, Metrics[] metrics) throws IOException { - Map result = new HashMap<>(); - - String[] ids = new String[metrics.length]; - for (int i = 0; i < metrics.length; i++) { - ids[i] = metrics[i].id(); - } - - List storageDataList = getByIDs(h2Client, model.getName(), ids, storageBuilder); - + @Override public List multiGet(Model model, List ids) throws IOException { + List storageDataList = getByIDs(h2Client, model.getName(), ids.toArray(new String[0]), storageBuilder); + List result = new ArrayList<>(storageDataList.size()); for (StorageData storageData : storageDataList) { - result.put(storageData.id(), (Metrics)storageData); + result.add((Metrics)storageData); } return result; }