未验证 提交 8c31172c 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Refactor session cache in MetricsPersistentWorker. (#10084)

上级 532e42b2
......@@ -6,6 +6,7 @@
* Add `ServerStatusService` in the core module to provide a new way to expose booting status to other modules.
* Adds Micrometer as a new component.(ID=141)
* Refactor session cache in MetricsPersistentWorker.
#### UI
......
......@@ -20,12 +20,9 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.UnexpectedException;
......@@ -37,9 +34,7 @@ import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
......@@ -61,25 +56,13 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
private static long SESSION_TIMEOUT_OFFSITE_COUNTER = 0;
private final Model model;
/**
* The session cache holds the latest metrics in-memory.
* There are two ways to make sure metrics in-cache,
* 1. Metrics is read from the Database through {@link #loadFromStorage(List)}
* 2. The built {@link InsertRequest} executed successfully.
*
* There are two cases to remove metrics from the cache.
* 1. The metrics expired.
* 2. The built {@link UpdateRequest} executed failure, which could be caused
* (1) Database error. (2) No data updated, such as the counter of update statement is 0 in JDBC.
*/
private final Map<Metrics, Metrics> sessionCache;
private final MetricsSessionCache sessionCache;
private final IMetricsDAO metricsDAO;
private final Optional<AbstractWorker<Metrics>> nextAlarmWorker;
private final Optional<AbstractWorker<ExportEvent>> nextExportWorker;
private final DataCarrier<Metrics> dataCarrier;
private final Optional<MetricsTransWorker> transWorker;
private final boolean supportUpdate;
private long sessionTimeout;
/**
* The counter of L2 aggregation.
*/
......@@ -113,17 +96,12 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) {
super(moduleDefineHolder, new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData()));
this.model = model;
// Due to the cache would be updated depending on final storage implementation,
// the map/cache could be updated concurrently.
// Set to ConcurrentHashMap in order to avoid HashMap deadlock.
// Since 9.3.0
this.sessionCache = new ConcurrentHashMap<>(100);
this.sessionCache = new MetricsSessionCache(storageSessionTimeout);
this.metricsDAO = metricsDAO;
this.nextAlarmWorker = Optional.ofNullable(nextAlarmWorker);
this.nextExportWorker = Optional.ofNullable(nextExportWorker);
this.transWorker = Optional.ofNullable(transWorker);
this.supportUpdate = supportUpdate;
this.sessionTimeout = storageSessionTimeout;
this.persistentCounter = 0;
this.persistentMod = 1;
this.metricsDataTTL = metricsDataTTL;
......@@ -186,7 +164,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
// For a down-sampling metrics, we prolong the session timeout for 4 times, nearly 5 minutes.
// And add offset according to worker creation sequence, to avoid context clear overlap,
// eventually optimize load of IDs reading.
this.sessionTimeout = this.sessionTimeout * 4 + SESSION_TIMEOUT_OFFSITE_COUNTER * 200;
sessionCache.setTimeoutThreshold(storageSessionTimeout * 4 + SESSION_TIMEOUT_OFFSITE_COUNTER * 200);
// The down sampling level worker executes every 4 periods.
this.persistentMod = 4;
}
......@@ -351,7 +329,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
return;
}
metricsDAO.multiGet(model, notInCacheMetrics).forEach(m -> sessionCache.put(m, m));
metricsDAO.multiGet(model, notInCacheMetrics).forEach(m -> sessionCache.put(m));
} catch (final Exception e) {
log.error("Failed to load metrics for merging", e);
}
......@@ -359,15 +337,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
@Override
public void endOfRound() {
Iterator<Metrics> iterator = sessionCache.values().iterator();
long timestamp = System.currentTimeMillis();
while (iterator.hasNext()) {
Metrics metrics = iterator.next();
if (metrics.isExpired(timestamp, sessionTimeout)) {
iterator.remove();
}
}
sessionCache.removeExpired();
}
/**
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.AccessLevel;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
/**
* MetricsSessionCache is a key-value cache to hold hot metric in-memory to reduce payload to pre-read.
*
* There are two ways to make sure metrics in-cache,
* 1. Metrics is read from the Database through {@link MetricsPersistentWorker}.loadFromStorage
* 2. The built {@link InsertRequest} executed successfully.
*
* There are two cases to remove metrics from the cache.
* 1. The metrics expired.
* 2. The built {@link UpdateRequest} executed failure, which could be caused
* (1) Database error. (2) No data updated, such as the counter of update statement is 0 in JDBC.
*
* @since 9.4.0 Created this from MetricsPersistentWorker.sessionCache.
*/
public class MetricsSessionCache {
private final Map<Metrics, Metrics> sessionCache;
@Setter(AccessLevel.PACKAGE)
private long timeoutThreshold;
public MetricsSessionCache(long timeoutThreshold) {
// Due to the cache would be updated depending on final storage implementation,
// the map/cache could be updated concurrently.
// Set to ConcurrentHashMap in order to avoid HashMap deadlock.
// Since 9.3.0
this.sessionCache = new ConcurrentHashMap<>(100);
this.timeoutThreshold = timeoutThreshold;
}
Metrics get(Metrics metrics) {
return sessionCache.get(metrics);
}
public Metrics remove(Metrics metrics) {
return sessionCache.remove(metrics);
}
public void put(Metrics metrics) {
sessionCache.put(metrics, metrics);
}
void removeExpired() {
Iterator<Metrics> iterator = sessionCache.values().iterator();
long timestamp = System.currentTimeMillis();
while (iterator.hasNext()) {
Metrics metrics = iterator.next();
if (metrics.isExpired(timestamp, timeoutThreshold)) {
iterator.remove();
}
}
}
}
......@@ -18,16 +18,16 @@
package org.apache.skywalking.oap.server.core.storage;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsSessionCache;
/**
* SessionCacheCallback provides a bridge for storage implementations
*/
@RequiredArgsConstructor
public class SessionCacheCallback {
private final Map<Metrics, Metrics> sessionCache;
private final MetricsSessionCache sessionCache;
private final Metrics metrics;
/**
* In some cases, this callback could be shared by multiple executions, such as SQLExecutor#additionalSQLs.
......@@ -40,7 +40,7 @@ public class SessionCacheCallback {
if (isFailed) {
return;
}
sessionCache.put(metrics, metrics);
sessionCache.put(metrics);
}
public void onUpdateFailure() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册