From 4eb5def126fcd0c0d0f04a47201341a25668c1a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BD=AD=E5=8B=87=E5=8D=87=20pengys?= Date: Fri, 17 May 2019 00:33:29 +0800 Subject: [PATCH] Manage models in one place. (#2695) --- .../oap/server/core/CoreModuleProvider.java | 2 +- .../analysis/StreamAnnotationListener.java | 7 +- .../worker/MetricsStreamProcessor.java | 28 ++++++- .../worker/RecordStreamProcessor.java | 9 ++- .../analysis/worker/TopNStreamProcessor.java | 9 ++- .../worker/InventoryStreamProcessor.java | 8 +- .../core/storage/model/IModelSetter.java | 6 +- .../oap/server/core/storage/model/Model.java | 42 ++++++++--- .../core/storage/model/ModelInstaller.java | 27 ++----- .../{annotation => model}/StorageModels.java | 24 ++++-- .../core/storage/ttl/DataTTLKeeperTimer.java | 75 ++++--------------- .../core/storage/ttl/DayTTLCalculator.java | 31 ++++++++ .../core/storage/ttl/HourTTLCalculator.java | 31 ++++++++ .../core/storage/ttl/MinuteTTLCalculator.java | 31 ++++++++ .../core/storage/ttl/MonthTTLCalculator.java | 31 ++++++++ .../core/storage/ttl/SecondTTLCalculator.java | 31 ++++++++ .../core/storage/ttl/TTLCalculator.java | 29 +++++++ .../jdbc/mysql/MySQLTableInstaller.java | 4 +- 18 files changed, 306 insertions(+), 119 deletions(-) rename oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/{annotation => model}/StorageModels.java (78%) create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DayTTLCalculator.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/HourTTLCalculator.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MinuteTTLCalculator.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MonthTTLCalculator.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/SecondTTLCalculator.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/TTLCalculator.java diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java index 79879b9fc7..9b443d6300 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java @@ -33,7 +33,7 @@ import org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHan import org.apache.skywalking.oap.server.core.server.*; import org.apache.skywalking.oap.server.core.source.*; import org.apache.skywalking.oap.server.core.storage.PersistenceTimer; -import org.apache.skywalking.oap.server.core.storage.annotation.StorageModels; +import org.apache.skywalking.oap.server.core.storage.model.StorageModels; import org.apache.skywalking.oap.server.core.storage.model.*; import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer; import org.apache.skywalking.oap.server.core.worker.*; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java index 4a703fccdb..df522612a2 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java @@ -19,11 +19,10 @@ package org.apache.skywalking.oap.server.core.analysis; import java.lang.annotation.Annotation; -import org.apache.skywalking.oap.server.core.*; +import org.apache.skywalking.oap.server.core.UnexpectedException; import org.apache.skywalking.oap.server.core.analysis.worker.*; import org.apache.skywalking.oap.server.core.annotation.AnnotationListener; import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor; -import org.apache.skywalking.oap.server.core.storage.model.IModelSetter; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; /** @@ -48,16 +47,12 @@ public class StreamAnnotationListener implements AnnotationListener { if (stream.processor().equals(InventoryStreamProcessor.class)) { InventoryStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass); - moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class).putIfAbsent(aClass, false, stream.name(), stream.scopeId(), stream.storage()); } else if (stream.processor().equals(RecordStreamProcessor.class)) { RecordStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass); - moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class).putIfAbsent(aClass, false, stream.name(), stream.scopeId(), stream.storage()); } else if (stream.processor().equals(MetricsStreamProcessor.class)) { MetricsStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass); - moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class).putIfAbsent(aClass, true, stream.name(), stream.scopeId(), stream.storage()); } else if (stream.processor().equals(TopNStreamProcessor.class)) { TopNStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass); - moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class).putIfAbsent(aClass, false, stream.name(), stream.scopeId(), stream.storage()); } else { throw new UnexpectedException("Unknown stream processor."); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java index df224d3a1a..b88c08108d 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java @@ -23,7 +23,9 @@ import lombok.Getter; import org.apache.skywalking.oap.server.core.*; import org.apache.skywalking.oap.server.core.analysis.*; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; +import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService; import org.apache.skywalking.oap.server.core.storage.*; +import org.apache.skywalking.oap.server.core.storage.model.*; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; /** @@ -60,10 +62,28 @@ public class MetricsStreamProcessor implements StreamProcessor { throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " metrics DAO failure.", e); } - MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleDefineHolder, metricsDAO, stream.name()); - MetricsPersistentWorker hourPersistentWorker = worker(moduleDefineHolder, metricsDAO, stream.name() + Const.ID_SPLIT + Downsampling.Hour.getName()); - MetricsPersistentWorker dayPersistentWorker = worker(moduleDefineHolder, metricsDAO, stream.name() + Const.ID_SPLIT + Downsampling.Day.getName()); - MetricsPersistentWorker monthPersistentWorker = worker(moduleDefineHolder, metricsDAO, stream.name() + Const.ID_SPLIT + Downsampling.Month.getName()); + IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class); + DownsamplingConfigService configService = moduleDefineHolder.find(CoreModule.NAME).provider().getService(DownsamplingConfigService.class); + + MetricsPersistentWorker hourPersistentWorker = null; + MetricsPersistentWorker dayPersistentWorker = null; + MetricsPersistentWorker monthPersistentWorker = null; + + if (configService.shouldToHour()) { + Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Hour); + hourPersistentWorker = worker(moduleDefineHolder, metricsDAO, model.getName()); + } + if (configService.shouldToDay()) { + Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Day); + dayPersistentWorker = worker(moduleDefineHolder, metricsDAO, model.getName()); + } + if (configService.shouldToMonth()) { + Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Month); + monthPersistentWorker = worker(moduleDefineHolder, metricsDAO, model.getName()); + } + + Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Minute); + MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleDefineHolder, metricsDAO, model.getName()); MetricsTransWorker transWorker = new MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker); MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, transWorker, stream.name()); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java index 3dfb2fb8ff..ff2a636bba 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java @@ -20,10 +20,11 @@ package org.apache.skywalking.oap.server.core.analysis.worker; import java.util.*; import lombok.Getter; -import org.apache.skywalking.oap.server.core.UnexpectedException; +import org.apache.skywalking.oap.server.core.*; import org.apache.skywalking.oap.server.core.analysis.*; import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.storage.*; +import org.apache.skywalking.oap.server.core.storage.model.*; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; /** @@ -48,6 +49,7 @@ public class RecordStreamProcessor implements StreamProcessor { @Getter private List persistentWorkers = new ArrayList<>(); + @SuppressWarnings("unchecked") public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class recordClass) { if (DisableRegister.INSTANCE.include(stream.name())) { return; @@ -61,7 +63,10 @@ public class RecordStreamProcessor implements StreamProcessor { throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " record DAO failure.", e); } - RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, stream.name(), 1000, recordDAO); + IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class); + Model model = modelSetter.putIfAbsent(recordClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Second); + RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model.getName(), 1000, recordDAO); + persistentWorkers.add(persistentWorker); workers.put(recordClass, persistentWorker); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java index 670ef931f9..4ba101d070 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java @@ -20,11 +20,12 @@ package org.apache.skywalking.oap.server.core.analysis.worker; import java.util.*; import lombok.Getter; -import org.apache.skywalking.oap.server.core.UnexpectedException; +import org.apache.skywalking.oap.server.core.*; import org.apache.skywalking.oap.server.core.analysis.*; import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.analysis.topn.TopN; import org.apache.skywalking.oap.server.core.storage.*; +import org.apache.skywalking.oap.server.core.storage.model.*; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; /** @@ -44,6 +45,7 @@ public class TopNStreamProcessor implements StreamProcessor { return PROCESSOR; } + @SuppressWarnings("unchecked") public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class topNClass) { if (DisableRegister.INSTANCE.include(stream.name())) { return; @@ -57,7 +59,10 @@ public class TopNStreamProcessor implements StreamProcessor { throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " top n record DAO failure.", e); } - TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, stream.name(), 50, recordDAO); + IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class); + Model model = modelSetter.putIfAbsent(topNClass, stream.name(), stream.scopeId(), stream.storage()); + + TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, model.getName(), 50, recordDAO); persistentWorkers.add(persistentWorker); workers.put(topNClass, persistentWorker); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java index 1ce4c7f400..3111902deb 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java @@ -19,10 +19,11 @@ package org.apache.skywalking.oap.server.core.register.worker; import java.util.*; -import org.apache.skywalking.oap.server.core.UnexpectedException; +import org.apache.skywalking.oap.server.core.*; import org.apache.skywalking.oap.server.core.analysis.*; import org.apache.skywalking.oap.server.core.register.RegisterSource; import org.apache.skywalking.oap.server.core.storage.*; +import org.apache.skywalking.oap.server.core.storage.model.*; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; /** @@ -42,6 +43,7 @@ public class InventoryStreamProcessor implements StreamProcessor entryWorkers.get(registerSource.getClass()).in(registerSource); } + @SuppressWarnings("unchecked") public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class inventoryClass) { StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class); IRegisterDAO registerDAO; @@ -51,7 +53,9 @@ public class InventoryStreamProcessor implements StreamProcessor throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " register DAO failure.", e); } - RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleDefineHolder, stream.name(), registerDAO, stream.scopeId()); + IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class); + Model model = modelSetter.putIfAbsent(inventoryClass, stream.name(), stream.scopeId(), stream.storage()); + RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO, stream.scopeId()); RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(moduleDefineHolder, persistentWorker); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java index 9639f11a00..f81d1b4bc1 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java @@ -18,6 +18,7 @@ package org.apache.skywalking.oap.server.core.storage.model; +import org.apache.skywalking.oap.server.core.storage.Downsampling; import org.apache.skywalking.oap.server.core.storage.annotation.Storage; import org.apache.skywalking.oap.server.library.module.Service; @@ -25,5 +26,8 @@ import org.apache.skywalking.oap.server.library.module.Service; * @author peng-yongsheng */ public interface IModelSetter extends Service { - void putIfAbsent(Class aClass, boolean isMetrics, String modelName, int scopeId, Storage storage); + + Model putIfAbsent(Class aClass, String modelName, int scopeId, Storage storage); + + Model putIfAbsent(Class aClass, String modelName, int scopeId, Storage storage, Downsampling downsampling); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java index 43ce38dbd0..a48c48ca70 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java @@ -20,6 +20,9 @@ package org.apache.skywalking.oap.server.core.storage.model; import java.util.List; import lombok.Getter; +import org.apache.skywalking.oap.server.core.*; +import org.apache.skywalking.oap.server.core.storage.Downsampling; +import org.apache.skywalking.oap.server.core.storage.ttl.*; /** * @author peng-yongsheng @@ -27,21 +30,40 @@ import lombok.Getter; @Getter public class Model { private final String name; - private final boolean isMetrics; private final boolean deleteHistory; private final List columns; - private final int sourceScopeId; + private final int scopeId; + private final TTLCalculator ttlCalculator; - public Model(String name, List columns, boolean isMetrics, boolean deleteHistory, - int sourceScopeId) { - this.name = name; + public Model(String name, List columns, boolean deleteHistory, + int scopeId, Downsampling downsampling) { this.columns = columns; - this.isMetrics = isMetrics; this.deleteHistory = deleteHistory; - this.sourceScopeId = sourceScopeId; - } + this.scopeId = scopeId; - public Model copy(String name) { - return new Model(name, columns, isMetrics, deleteHistory, sourceScopeId); + switch (downsampling) { + case Minute: + this.name = name; + this.ttlCalculator = new MinuteTTLCalculator(); + break; + case Hour: + this.name = name + Const.ID_SPLIT + Downsampling.Hour.getName(); + this.ttlCalculator = new HourTTLCalculator(); + break; + case Day: + this.name = name + Const.ID_SPLIT + Downsampling.Day.getName(); + this.ttlCalculator = new DayTTLCalculator(); + break; + case Month: + this.name = name + Const.ID_SPLIT + Downsampling.Month.getName(); + this.ttlCalculator = new MonthTTLCalculator(); + break; + case Second: + this.name = name; + this.ttlCalculator = new SecondTTLCalculator(); + break; + default: + throw new UnexpectedException("Unexpected downsampling setting."); + } } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java index e50e66ef2a..8efdf5b6e8 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java @@ -18,10 +18,9 @@ package org.apache.skywalking.oap.server.core.storage.model; -import java.util.*; +import java.util.List; import org.apache.skywalking.oap.server.core.*; -import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService; -import org.apache.skywalking.oap.server.core.storage.*; +import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.library.client.Client; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.slf4j.*; @@ -41,39 +40,23 @@ public abstract class ModelInstaller { public final void install(Client client) throws StorageException { IModelGetter modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelGetter.class); - DownsamplingConfigService downsamplingConfigService = moduleManager.find(CoreModule.NAME).provider().getService(DownsamplingConfigService.class); List models = modelGetter.getModels(); - List allModels = new ArrayList<>(); - models.forEach(model -> { - if (model.isMetrics()) { - if (downsamplingConfigService.shouldToHour()) { - allModels.add(model.copy(model.getName() + Const.ID_SPLIT + Downsampling.Hour.getName())); - } - if (downsamplingConfigService.shouldToDay()) { - allModels.add(model.copy(model.getName() + Const.ID_SPLIT + Downsampling.Day.getName())); - } - if (downsamplingConfigService.shouldToMonth()) { - allModels.add(model.copy(model.getName() + Const.ID_SPLIT + Downsampling.Month.getName())); - } - } - }); - allModels.addAll(models); - boolean debug = System.getProperty("debug") != null; if (RunningMode.isNoInitMode()) { - for (Model model : allModels) { + for (Model model : models) { while (!isExists(client, model)) { try { logger.info("table: {} does not exist. OAP is running in 'no-init' mode, waiting... retry 3s later.", model.getName()); Thread.sleep(3000L); } catch (InterruptedException e) { + logger.error(e.getMessage()); } } } } else { - for (Model model : allModels) { + for (Model model : models) { if (!isExists(client, model)) { logger.info("table: {} does not exist", model.getName()); createTable(client, model); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java similarity index 78% rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageModels.java rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java index 7f3cfe45b3..89f8057c2c 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageModels.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java @@ -13,16 +13,16 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ -package org.apache.skywalking.oap.server.core.storage.annotation; +package org.apache.skywalking.oap.server.core.storage.model; import java.lang.reflect.Field; import java.util.*; import lombok.Getter; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; -import org.apache.skywalking.oap.server.core.storage.model.*; +import org.apache.skywalking.oap.server.core.storage.Downsampling; +import org.apache.skywalking.oap.server.core.storage.annotation.*; import org.slf4j.*; /** @@ -38,13 +38,27 @@ public class StorageModels implements IModelGetter, IModelSetter, IModelOverride this.models = new LinkedList<>(); } - @Override public void putIfAbsent(Class aClass, boolean isMetrics, String modelName, int scopeId, Storage storage) { + @Override public Model putIfAbsent(Class aClass, String modelName, int scopeId, Storage storage) { + return putIfAbsent(aClass, modelName, scopeId, storage, Downsampling.Minute); + } + + @Override public Model putIfAbsent(Class aClass, String modelName, int scopeId, Storage storage, Downsampling downsampling) { // Check this scope id is valid. DefaultScopeDefine.nameOf(scopeId); + + for (Model model : models) { + if (model.getName().equals(modelName)) { + return model; + } + } + List modelColumns = new LinkedList<>(); retrieval(aClass, modelName, modelColumns); - models.add(new Model(modelName, modelColumns, isMetrics, storage.deleteHistory(), scopeId)); + Model model = new Model(modelName, modelColumns, storage.deleteHistory(), scopeId, downsampling); + models.add(model); + + return model; } private void retrieval(Class clazz, String modelName, List modelColumns) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java index ca0eca67e8..794a9cc1c4 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java @@ -20,29 +20,18 @@ package org.apache.skywalking.oap.server.core.storage.ttl; import java.io.IOException; import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import lombok.Setter; import org.apache.skywalking.apm.util.RunnableWithExceptionProtection; -import org.apache.skywalking.oap.server.core.Const; -import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.DataTTL; +import org.apache.skywalking.oap.server.core.*; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; -import org.apache.skywalking.oap.server.core.analysis.record.Record; -import org.apache.skywalking.oap.server.core.cluster.ClusterModule; -import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery; -import org.apache.skywalking.oap.server.core.cluster.RemoteInstance; -import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService; -import org.apache.skywalking.oap.server.core.storage.Downsampling; -import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO; -import org.apache.skywalking.oap.server.core.storage.StorageModule; -import org.apache.skywalking.oap.server.core.storage.model.IModelGetter; -import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.apache.skywalking.oap.server.core.cluster.*; +import org.apache.skywalking.oap.server.core.storage.*; +import org.apache.skywalking.oap.server.core.storage.model.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.slf4j.*; /** * @author peng-yongsheng @@ -72,63 +61,25 @@ public enum DataTTLKeeperTimer { return; } - TimeBuckets timeBuckets = convertTimeBucket(new DateTime()); logger.info("Beginning to remove expired metrics from the storage."); - logger.info("Metrics in minute dimension before {}, are going to be removed.", timeBuckets.minuteTimeBucketBefore); - logger.info("Metrics in hour dimension before {}, are going to be removed.", timeBuckets.hourTimeBucketBefore); - logger.info("Metrics in day dimension before {}, are going to be removed.", timeBuckets.dayTimeBucketBefore); - logger.info("Metrics in month dimension before {}, are going to be removed.", timeBuckets.monthTimeBucketBefore); + + DateTime currentTime = new DateTime(); IModelGetter modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelGetter.class); - DownsamplingConfigService downsamplingConfigService = moduleManager.find(CoreModule.NAME).provider().getService(DownsamplingConfigService.class); List models = modelGetter.getModels(); models.forEach(model -> { - if (model.isMetrics()) { - execute(model, model.getName(), timeBuckets.minuteTimeBucketBefore, Metrics.TIME_BUCKET); - - if (downsamplingConfigService.shouldToHour()) { - execute(model, model.getName() + Const.ID_SPLIT + Downsampling.Hour.getName(), timeBuckets.hourTimeBucketBefore, Metrics.TIME_BUCKET); - } - if (downsamplingConfigService.shouldToDay()) { - execute(model, model.getName() + Const.ID_SPLIT + Downsampling.Day.getName(), timeBuckets.dayTimeBucketBefore, Metrics.TIME_BUCKET); - } - if (downsamplingConfigService.shouldToMonth()) { - execute(model, model.getName() + Const.ID_SPLIT + Downsampling.Month.getName(), timeBuckets.monthTimeBucketBefore, Metrics.TIME_BUCKET); - } - } else { - execute(model, model.getName(), timeBuckets.recordDataTTL, Record.TIME_BUCKET); + if (model.isDeleteHistory()) { + execute(model, model.getTtlCalculator().timeBefore(currentTime, dataTTL)); } }); } - TimeBuckets convertTimeBucket(DateTime currentTime) { - TimeBuckets timeBuckets = new TimeBuckets(); - - timeBuckets.recordDataTTL = Long.valueOf(currentTime.plusMinutes(0 - dataTTL.getRecordDataTTL()).toString("yyyyMMddHHmmss")); - timeBuckets.minuteTimeBucketBefore = Long.valueOf(currentTime.plusMinutes(0 - dataTTL.getMinuteMetricsDataTTL()).toString("yyyyMMddHHmm")); - timeBuckets.hourTimeBucketBefore = Long.valueOf(currentTime.plusHours(0 - dataTTL.getHourMetricsDataTTL()).toString("yyyyMMddHH")); - timeBuckets.dayTimeBucketBefore = Long.valueOf(currentTime.plusDays(0 - dataTTL.getDayMetricsDataTTL()).toString("yyyyMMdd")); - timeBuckets.monthTimeBucketBefore = Long.valueOf(currentTime.plusMonths(0 - dataTTL.getMonthMetricsDataTTL()).toString("yyyyMM")); - - return timeBuckets; - } - - private void execute(Model model, String modelName, long timeBucketBefore, String timeBucketColumnName) { + private void execute(Model model, long timeBucketBefore) { try { - if (model.isDeleteHistory()) { - moduleManager.find(StorageModule.NAME).provider().getService(IHistoryDeleteDAO.class).deleteHistory(modelName, timeBucketColumnName, timeBucketBefore); - } + moduleManager.find(StorageModule.NAME).provider().getService(IHistoryDeleteDAO.class).deleteHistory(model.getName(), Metrics.TIME_BUCKET, timeBucketBefore); } catch (IOException e) { - logger.warn("History of {} delete failure, time bucket {}", modelName, timeBucketBefore); + logger.warn("History of {} delete failure, time bucket {}", model.getName(), timeBucketBefore); logger.error(e.getMessage(), e); } } - - class TimeBuckets { - private long recordDataTTL; - private long minuteTimeBucketBefore; - private long hourTimeBucketBefore; - private long dayTimeBucketBefore; - private long monthTimeBucketBefore; - } } \ No newline at end of file diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DayTTLCalculator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DayTTLCalculator.java new file mode 100644 index 0000000000..d14ad7715c --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DayTTLCalculator.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.oap.server.core.storage.ttl; + +import org.apache.skywalking.oap.server.core.DataTTL; +import org.joda.time.DateTime; + +/** + * @author peng-yongsheng + */ +public class DayTTLCalculator implements TTLCalculator { + + @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) { + return Long.valueOf(currentTime.plusDays(0 - dataTTL.getDayMetricsDataTTL()).toString("yyyyMMdd")); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/HourTTLCalculator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/HourTTLCalculator.java new file mode 100644 index 0000000000..d041c42197 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/HourTTLCalculator.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.oap.server.core.storage.ttl; + +import org.apache.skywalking.oap.server.core.DataTTL; +import org.joda.time.DateTime; + +/** + * @author peng-yongsheng + */ +public class HourTTLCalculator implements TTLCalculator { + + @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) { + return Long.valueOf(currentTime.plusHours(0 - dataTTL.getHourMetricsDataTTL()).toString("yyyyMMddHH")); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MinuteTTLCalculator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MinuteTTLCalculator.java new file mode 100644 index 0000000000..3d69f72d94 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MinuteTTLCalculator.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.oap.server.core.storage.ttl; + +import org.apache.skywalking.oap.server.core.DataTTL; +import org.joda.time.DateTime; + +/** + * @author peng-yongsheng + */ +public class MinuteTTLCalculator implements TTLCalculator { + + @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) { + return Long.valueOf(currentTime.plusMinutes(0 - dataTTL.getMinuteMetricsDataTTL()).toString("yyyyMMddHHmm")); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MonthTTLCalculator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MonthTTLCalculator.java new file mode 100644 index 0000000000..a834aa8cc1 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MonthTTLCalculator.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.oap.server.core.storage.ttl; + +import org.apache.skywalking.oap.server.core.DataTTL; +import org.joda.time.DateTime; + +/** + * @author peng-yongsheng + */ +public class MonthTTLCalculator implements TTLCalculator { + + @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) { + return Long.valueOf(currentTime.plusMonths(0 - dataTTL.getMonthMetricsDataTTL()).toString("yyyyMM")); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/SecondTTLCalculator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/SecondTTLCalculator.java new file mode 100644 index 0000000000..d53cf89c61 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/SecondTTLCalculator.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.oap.server.core.storage.ttl; + +import org.apache.skywalking.oap.server.core.DataTTL; +import org.joda.time.DateTime; + +/** + * @author peng-yongsheng + */ +public class SecondTTLCalculator implements TTLCalculator { + + @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) { + return Long.valueOf(currentTime.plusMinutes(0 - dataTTL.getRecordDataTTL()).toString("yyyyMMddHHmmss")); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/TTLCalculator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/TTLCalculator.java new file mode 100644 index 0000000000..fb80adde2b --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/TTLCalculator.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.oap.server.core.storage.ttl; + +import org.apache.skywalking.oap.server.core.DataTTL; +import org.joda.time.DateTime; + +/** + * @author peng-yongsheng + */ +public interface TTLCalculator { + + long timeBefore(DateTime currentTime, DataTTL dataTTL); +} diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java index 58483284ca..307b3da0ef 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java @@ -79,7 +79,7 @@ public class MySQLTableInstaller extends H2TableInstaller { } else if (Double.class.equals(type) || double.class.equals(type)) { return "DOUBLE"; } else if (String.class.equals(type)) { - if (DefaultScopeDefine.SEGMENT == model.getSourceScopeId()) { + if (DefaultScopeDefine.SEGMENT == model.getScopeId()) { if (name.getName().equals(SegmentRecord.TRACE_ID) || name.getName().equals(SegmentRecord.SEGMENT_ID)) return "VARCHAR(300)"; } @@ -94,7 +94,7 @@ public class MySQLTableInstaller extends H2TableInstaller { } protected void createIndexes(JDBCHikariCPClient client, Model model) throws StorageException { - switch (model.getSourceScopeId()) { + switch (model.getScopeId()) { case SERVICE_INVENTORY: case SERVICE_INSTANCE_INVENTORY: case NETWORK_ADDRESS: -- GitLab