From 0145361ad97bccdc5306d079dfc8c7868d2ab0c6 Mon Sep 17 00:00:00 2001 From: peng-yongsheng <8082209@qq.com> Date: Sun, 7 Jan 2018 20:15:20 +0800 Subject: [PATCH] Instance metric pyramid aggregate. --- .../instance/InstanceMetricAlarmGraph.java | 2 +- .../define/graph/MetricWorkerIdDefine.java | 12 +++- .../AnalysisMetricModuleProvider.java | 2 +- .../InstanceDayMetricPersistenceWorker.java} | 18 ++--- .../InstanceDayMetricTransformNode.java | 44 ++++++++++++ .../InstanceHourMetricPersistenceWorker.java | 67 +++++++++++++++++++ .../InstanceHourMetricTransformNode.java | 44 ++++++++++++ .../InstanceMetricAggregationWorker.java | 12 ++-- .../{ => metric}/InstanceMetricGraph.java | 19 ++++-- .../InstanceMetricRemoteWorker.java | 4 +- ...InstanceMinuteMetricPersistenceWorker.java | 67 +++++++++++++++++++ .../InstanceMonthMetricPersistenceWorker.java | 67 +++++++++++++++++++ .../InstanceMonthMetricTransformNode.java | 44 ++++++++++++ .../apm/collector/storage/StorageModule.java | 4 +- .../IInstanceDayMetricPersistenceDAO.java} | 4 +- .../IInstanceHourMetricPersistenceDAO.java | 28 ++++++++ .../IInstanceMinuteMetricPersistenceDAO.java | 28 ++++++++ .../IInstanceMonthMetricPersistenceDAO.java | 28 ++++++++ .../storage/es/DataTTLKeeperTimer.java | 3 +- .../storage/es/StorageModuleEsProvider.java | 6 +- ...stractInstanceMetricEsPersistenceDAO.java} | 25 +++---- .../InstanceDayMetricEsPersistenceDAO.java | 43 ++++++++++++ .../InstanceHourMetricEsPersistenceDAO.java | 42 ++++++++++++ .../InstanceMinuteMetricEsPersistenceDAO.java | 42 ++++++++++++ .../InstanceMonthMetricEsPersistenceDAO.java | 42 ++++++++++++ .../storage/h2/StorageModuleH2Provider.java | 6 +- ...InstanceMinuteMetricH2PersistenceDAO.java} | 8 +-- 27 files changed, 658 insertions(+), 53 deletions(-) rename apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/{InstanceMetricPersistenceWorker.java => metric/InstanceDayMetricPersistenceWorker.java} (76%) create mode 100644 apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceDayMetricTransformNode.java create mode 100644 apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceHourMetricPersistenceWorker.java create mode 100644 apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceHourMetricTransformNode.java rename apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/{ => metric}/InstanceMetricAggregationWorker.java (90%) rename apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/{ => metric}/InstanceMetricGraph.java (76%) rename apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/{ => metric}/InstanceMetricRemoteWorker.java (95%) create mode 100644 apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceMinuteMetricPersistenceWorker.java create mode 100644 apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceMonthMetricPersistenceWorker.java create mode 100644 apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceMonthMetricTransformNode.java rename apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/{IInstanceMetricPersistenceDAO.java => imp/IInstanceDayMetricPersistenceDAO.java} (82%) create mode 100644 apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/imp/IInstanceHourMetricPersistenceDAO.java create mode 100644 apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/imp/IInstanceMinuteMetricPersistenceDAO.java create mode 100644 apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/imp/IInstanceMonthMetricPersistenceDAO.java rename apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/{InstanceMetricEsPersistenceDAO.java => imp/AbstractInstanceMetricEsPersistenceDAO.java} (86%) create mode 100644 apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/InstanceDayMetricEsPersistenceDAO.java create mode 100644 apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/InstanceHourMetricEsPersistenceDAO.java create mode 100644 apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/InstanceMinuteMetricEsPersistenceDAO.java create mode 100644 apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/InstanceMonthMetricEsPersistenceDAO.java rename apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/{InstanceMetricH2PersistenceDAO.java => InstanceMinuteMetricH2PersistenceDAO.java} (96%) diff --git a/apm-collector/apm-collector-analysis/analysis-alarm/alarm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/alarm/provider/worker/instance/InstanceMetricAlarmGraph.java b/apm-collector/apm-collector-analysis/analysis-alarm/alarm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/alarm/provider/worker/instance/InstanceMetricAlarmGraph.java index 5a7bcac109..5954314bb9 100644 --- a/apm-collector/apm-collector-analysis/analysis-alarm/alarm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/alarm/provider/worker/instance/InstanceMetricAlarmGraph.java +++ b/apm-collector/apm-collector-analysis/analysis-alarm/alarm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/alarm/provider/worker/instance/InstanceMetricAlarmGraph.java @@ -64,7 +64,7 @@ public class InstanceMetricAlarmGraph { private void link(Graph graph) { GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.INSTANCE_METRIC_GRAPH_ID, InstanceMetric.class) - .toFinder().findNode(MetricWorkerIdDefine.INSTANCE_METRIC_PERSISTENCE_WORKER_ID, InstanceMetric.class) + .toFinder().findNode(MetricWorkerIdDefine.INSTANCE_MINUTE_METRIC_PERSISTENCE_WORKER_ID, InstanceMetric.class) .addNext(new NodeProcessor() { @Override public int id() { return AlarmWorkerIdDefine.INSTANCE_METRIC_ALARM_GRAPH_BRIDGE_WORKER_ID; diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-define/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/define/graph/MetricWorkerIdDefine.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-define/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/define/graph/MetricWorkerIdDefine.java index 91233e3860..26466ab695 100644 --- a/apm-collector/apm-collector-analysis/analysis-metric/metric-define/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/define/graph/MetricWorkerIdDefine.java +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-define/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/define/graph/MetricWorkerIdDefine.java @@ -62,9 +62,15 @@ public class MetricWorkerIdDefine { public static final int SERVICE_MONTH_METRIC_PERSISTENCE_WORKER_ID = 4407; public static final int SERVICE_MONTH_METRIC_TRANSFORM_NODE_ID = 4408; - public static final int INSTANCE_METRIC_AGGREGATION_WORKER_ID = 412; - public static final int INSTANCE_METRIC_REMOTE_WORKER_ID = 413; - public static final int INSTANCE_METRIC_PERSISTENCE_WORKER_ID = 414; + public static final int INSTANCE_MINUTE_METRIC_AGGREGATION_WORKER_ID = 4500; + public static final int INSTANCE_MINUTE_METRIC_REMOTE_WORKER_ID = 4501; + public static final int INSTANCE_MINUTE_METRIC_PERSISTENCE_WORKER_ID = 4502; + public static final int INSTANCE_HOUR_METRIC_PERSISTENCE_WORKER_ID = 4503; + public static final int INSTANCE_HOUR_METRIC_TRANSFORM_NODE_ID = 4504; + public static final int INSTANCE_DAY_METRIC_PERSISTENCE_WORKER_ID = 4505; + public static final int INSTANCE_DAY_METRIC_TRANSFORM_NODE_ID = 4506; + public static final int INSTANCE_MONTH_METRIC_PERSISTENCE_WORKER_ID = 4507; + public static final int INSTANCE_MONTH_METRIC_TRANSFORM_NODE_ID = 4508; public static final int APPLICATION_METRIC_AGGREGATION_WORKER_ID = 415; public static final int APPLICATION_METRIC_REMOTE_WORKER_ID = 416; diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java index c262eeb610..55a70f1578 100644 --- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java @@ -30,7 +30,7 @@ import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.globa import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.global.GlobalTraceSpanListener; import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.InstanceMappingGraph; import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.InstanceMappingSpanListener; -import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.InstanceMetricGraph; +import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.metric.InstanceMetricGraph; import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.refmetric.InstanceReferenceMetricGraph; import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.segment.SegmentCostGraph; import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.segment.SegmentCostSpanListener; diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceMetricPersistenceWorker.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceDayMetricPersistenceWorker.java similarity index 76% rename from apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceMetricPersistenceWorker.java rename to apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceDayMetricPersistenceWorker.java index 7aad31f3b3..e65280be1c 100644 --- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceMetricPersistenceWorker.java +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceDayMetricPersistenceWorker.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance; +package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.metric; import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine; import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorker; @@ -24,20 +24,20 @@ import org.apache.skywalking.apm.collector.analysis.worker.model.impl.Persistenc import org.apache.skywalking.apm.collector.core.module.ModuleManager; import org.apache.skywalking.apm.collector.storage.StorageModule; import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.IInstanceMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceDayMetricPersistenceDAO; import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric; /** * @author peng-yongsheng */ -public class InstanceMetricPersistenceWorker extends PersistenceWorker { +public class InstanceDayMetricPersistenceWorker extends PersistenceWorker { - public InstanceMetricPersistenceWorker(ModuleManager moduleManager) { + public InstanceDayMetricPersistenceWorker(ModuleManager moduleManager) { super(moduleManager); } @Override public int id() { - return MetricWorkerIdDefine.INSTANCE_METRIC_PERSISTENCE_WORKER_ID; + return MetricWorkerIdDefine.INSTANCE_DAY_METRIC_PERSISTENCE_WORKER_ID; } @Override protected boolean needMergeDBData() { @@ -46,17 +46,17 @@ public class InstanceMetricPersistenceWorker extends PersistenceWorker persistenceDAO() { - return getModuleManager().find(StorageModule.NAME).getService(IInstanceMetricPersistenceDAO.class); + return getModuleManager().find(StorageModule.NAME).getService(IInstanceDayMetricPersistenceDAO.class); } - public static class Factory extends PersistenceWorkerProvider { + public static class Factory extends PersistenceWorkerProvider { public Factory(ModuleManager moduleManager) { super(moduleManager); } - @Override public InstanceMetricPersistenceWorker workerInstance(ModuleManager moduleManager) { - return new InstanceMetricPersistenceWorker(moduleManager); + @Override public InstanceDayMetricPersistenceWorker workerInstance(ModuleManager moduleManager) { + return new InstanceDayMetricPersistenceWorker(moduleManager); } @Override diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceDayMetricTransformNode.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceDayMetricTransformNode.java new file mode 100644 index 0000000000..8f1ada4e3c --- /dev/null +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceDayMetricTransformNode.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.metric; + +import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine; +import org.apache.skywalking.apm.collector.core.graph.Next; +import org.apache.skywalking.apm.collector.core.graph.NodeProcessor; +import org.apache.skywalking.apm.collector.core.util.Const; +import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric; + +/** + * @author peng-yongsheng + */ +public class InstanceDayMetricTransformNode implements NodeProcessor { + + @Override public int id() { + return MetricWorkerIdDefine.INSTANCE_DAY_METRIC_TRANSFORM_NODE_ID; + } + + @Override public void process(InstanceMetric instanceMetric, Next next) { + long timeBucket = TimeBucketUtils.INSTANCE.minuteToDay(instanceMetric.getTimeBucket()); + instanceMetric.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + instanceMetric.getMetricId()); + instanceMetric.setTimeBucket(timeBucket); + + next.execute(instanceMetric); + } +} diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceHourMetricPersistenceWorker.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceHourMetricPersistenceWorker.java new file mode 100644 index 0000000000..0250bf130f --- /dev/null +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceHourMetricPersistenceWorker.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.metric; + +import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine; +import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorker; +import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorkerProvider; +import org.apache.skywalking.apm.collector.core.module.ModuleManager; +import org.apache.skywalking.apm.collector.storage.StorageModule; +import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceHourMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric; + +/** + * @author peng-yongsheng + */ +public class InstanceHourMetricPersistenceWorker extends PersistenceWorker { + + public InstanceHourMetricPersistenceWorker(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public int id() { + return MetricWorkerIdDefine.INSTANCE_HOUR_METRIC_PERSISTENCE_WORKER_ID; + } + + @Override protected boolean needMergeDBData() { + return true; + } + + @SuppressWarnings("unchecked") + @Override protected IPersistenceDAO persistenceDAO() { + return getModuleManager().find(StorageModule.NAME).getService(IInstanceHourMetricPersistenceDAO.class); + } + + public static class Factory extends PersistenceWorkerProvider { + + public Factory(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public InstanceHourMetricPersistenceWorker workerInstance(ModuleManager moduleManager) { + return new InstanceHourMetricPersistenceWorker(moduleManager); + } + + @Override + public int queueSize() { + return 1024; + } + } +} diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceHourMetricTransformNode.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceHourMetricTransformNode.java new file mode 100644 index 0000000000..0a4e6a430e --- /dev/null +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceHourMetricTransformNode.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.metric; + +import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine; +import org.apache.skywalking.apm.collector.core.graph.Next; +import org.apache.skywalking.apm.collector.core.graph.NodeProcessor; +import org.apache.skywalking.apm.collector.core.util.Const; +import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric; + +/** + * @author peng-yongsheng + */ +public class InstanceHourMetricTransformNode implements NodeProcessor { + + @Override public int id() { + return MetricWorkerIdDefine.INSTANCE_HOUR_METRIC_TRANSFORM_NODE_ID; + } + + @Override public void process(InstanceMetric instanceMetric, Next next) { + long timeBucket = TimeBucketUtils.INSTANCE.minuteToHour(instanceMetric.getTimeBucket()); + instanceMetric.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + instanceMetric.getMetricId()); + instanceMetric.setTimeBucket(timeBucket); + + next.execute(instanceMetric); + } +} diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceMetricAggregationWorker.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceMetricAggregationWorker.java similarity index 90% rename from apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceMetricAggregationWorker.java rename to apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceMetricAggregationWorker.java index 237a713ec8..d59e948416 100644 --- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceMetricAggregationWorker.java +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceMetricAggregationWorker.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance; +package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.metric; import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine; import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorkerProvider; @@ -36,13 +36,17 @@ public class InstanceMetricAggregationWorker extends AggregationWorker graph = GraphManager.INSTANCE.createIfAbsent(MetricGraphIdDefine.INSTANCE_METRIC_GRAPH_ID, InstanceReferenceMetric.class); - graph.addNode(new InstanceMetricAggregationWorker.Factory(moduleManager).create(workerCreateListener)) - .addNext(new InstanceMetricRemoteWorker.Factory(moduleManager, remoteSenderService, MetricGraphIdDefine.INSTANCE_METRIC_GRAPH_ID).create(workerCreateListener)) - .addNext(new InstanceMetricPersistenceWorker.Factory(moduleManager).create(workerCreateListener)); + Node remoteNode = graph.addNode(new InstanceMetricAggregationWorker.Factory(moduleManager).create(workerCreateListener)) + .addNext(new InstanceMetricRemoteWorker.Factory(moduleManager, remoteSenderService, MetricGraphIdDefine.INSTANCE_METRIC_GRAPH_ID).create(workerCreateListener)); + remoteNode.addNext(new InstanceMinuteMetricPersistenceWorker.Factory(moduleManager).create(workerCreateListener)); + + remoteNode.addNext(new InstanceHourMetricTransformNode()) + .addNext(new InstanceHourMetricPersistenceWorker.Factory(moduleManager).create(workerCreateListener)); + + remoteNode.addNext(new InstanceDayMetricTransformNode()) + .addNext(new InstanceDayMetricPersistenceWorker.Factory(moduleManager).create(workerCreateListener)); + + remoteNode.addNext(new InstanceMonthMetricTransformNode()) + .addNext(new InstanceMonthMetricPersistenceWorker.Factory(moduleManager).create(workerCreateListener)); link(graph); } diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceMetricRemoteWorker.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceMetricRemoteWorker.java similarity index 95% rename from apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceMetricRemoteWorker.java rename to apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceMetricRemoteWorker.java index c91a96f405..57073300c8 100644 --- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceMetricRemoteWorker.java +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceMetricRemoteWorker.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance; +package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.metric; import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine; import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractRemoteWorker; @@ -37,7 +37,7 @@ public class InstanceMetricRemoteWorker extends AbstractRemoteWorker { + + public InstanceMinuteMetricPersistenceWorker(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public int id() { + return MetricWorkerIdDefine.INSTANCE_MINUTE_METRIC_PERSISTENCE_WORKER_ID; + } + + @Override protected boolean needMergeDBData() { + return true; + } + + @SuppressWarnings("unchecked") + @Override protected IPersistenceDAO persistenceDAO() { + return getModuleManager().find(StorageModule.NAME).getService(IInstanceMinuteMetricPersistenceDAO.class); + } + + public static class Factory extends PersistenceWorkerProvider { + + public Factory(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public InstanceMinuteMetricPersistenceWorker workerInstance(ModuleManager moduleManager) { + return new InstanceMinuteMetricPersistenceWorker(moduleManager); + } + + @Override + public int queueSize() { + return 1024; + } + } +} diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceMonthMetricPersistenceWorker.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceMonthMetricPersistenceWorker.java new file mode 100644 index 0000000000..d11f827761 --- /dev/null +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceMonthMetricPersistenceWorker.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.metric; + +import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine; +import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorker; +import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorkerProvider; +import org.apache.skywalking.apm.collector.core.module.ModuleManager; +import org.apache.skywalking.apm.collector.storage.StorageModule; +import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMonthMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric; + +/** + * @author peng-yongsheng + */ +public class InstanceMonthMetricPersistenceWorker extends PersistenceWorker { + + public InstanceMonthMetricPersistenceWorker(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public int id() { + return MetricWorkerIdDefine.INSTANCE_MONTH_METRIC_PERSISTENCE_WORKER_ID; + } + + @Override protected boolean needMergeDBData() { + return true; + } + + @SuppressWarnings("unchecked") + @Override protected IPersistenceDAO persistenceDAO() { + return getModuleManager().find(StorageModule.NAME).getService(IInstanceMonthMetricPersistenceDAO.class); + } + + public static class Factory extends PersistenceWorkerProvider { + + public Factory(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public InstanceMonthMetricPersistenceWorker workerInstance(ModuleManager moduleManager) { + return new InstanceMonthMetricPersistenceWorker(moduleManager); + } + + @Override + public int queueSize() { + return 1024; + } + } +} diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceMonthMetricTransformNode.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceMonthMetricTransformNode.java new file mode 100644 index 0000000000..2ff7259224 --- /dev/null +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/metric/InstanceMonthMetricTransformNode.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.metric; + +import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine; +import org.apache.skywalking.apm.collector.core.graph.Next; +import org.apache.skywalking.apm.collector.core.graph.NodeProcessor; +import org.apache.skywalking.apm.collector.core.util.Const; +import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric; + +/** + * @author peng-yongsheng + */ +public class InstanceMonthMetricTransformNode implements NodeProcessor { + + @Override public int id() { + return MetricWorkerIdDefine.INSTANCE_MONTH_METRIC_TRANSFORM_NODE_ID; + } + + @Override public void process(InstanceMetric instanceMetric, Next next) { + long timeBucket = TimeBucketUtils.INSTANCE.minuteToMonth(instanceMetric.getTimeBucket()); + instanceMetric.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + instanceMetric.getMetricId()); + instanceMetric.setTimeBucket(timeBucket); + + next.execute(instanceMetric); + } +} diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageModule.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageModule.java index 1718e87cbe..ac67b33203 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageModule.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageModule.java @@ -46,7 +46,7 @@ import org.apache.skywalking.apm.collector.storage.dao.IInstanceAlarmPersistence import org.apache.skywalking.apm.collector.storage.dao.IInstanceCacheDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceMappingPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.IInstanceMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMinuteMetricPersistenceDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceMetricUIDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceReferenceAlarmListPersistenceDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceReferenceAlarmPersistenceDAO; @@ -130,7 +130,7 @@ public class StorageModule extends Module { classes.add(IServiceMinuteMetricPersistenceDAO.class); classes.add(IServiceReferenceMinuteMetricPersistenceDAO.class); - classes.add(IInstanceMetricPersistenceDAO.class); + classes.add(IInstanceMinuteMetricPersistenceDAO.class); classes.add(IInstanceReferenceMinuteMetricPersistenceDAO.class); classes.add(IInstanceMappingPersistenceDAO.class); classes.add(IInstanceHeartBeatPersistenceDAO.class); diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/IInstanceMetricPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/imp/IInstanceDayMetricPersistenceDAO.java similarity index 82% rename from apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/IInstanceMetricPersistenceDAO.java rename to apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/imp/IInstanceDayMetricPersistenceDAO.java index dc6fa9fb24..490ee6096a 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/IInstanceMetricPersistenceDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/imp/IInstanceDayMetricPersistenceDAO.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.apm.collector.storage.dao; +package org.apache.skywalking.apm.collector.storage.dao.imp; import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric; @@ -24,5 +24,5 @@ import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric /** * @author peng-yongsheng */ -public interface IInstanceMetricPersistenceDAO extends IPersistenceDAO { +public interface IInstanceDayMetricPersistenceDAO extends IPersistenceDAO { } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/imp/IInstanceHourMetricPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/imp/IInstanceHourMetricPersistenceDAO.java new file mode 100644 index 0000000000..1baa5885f9 --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/imp/IInstanceHourMetricPersistenceDAO.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.storage.dao.imp; + +import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric; + +/** + * @author peng-yongsheng + */ +public interface IInstanceHourMetricPersistenceDAO extends IPersistenceDAO { +} diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/imp/IInstanceMinuteMetricPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/imp/IInstanceMinuteMetricPersistenceDAO.java new file mode 100644 index 0000000000..b8404f4d16 --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/imp/IInstanceMinuteMetricPersistenceDAO.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.storage.dao.imp; + +import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric; + +/** + * @author peng-yongsheng + */ +public interface IInstanceMinuteMetricPersistenceDAO extends IPersistenceDAO { +} diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/imp/IInstanceMonthMetricPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/imp/IInstanceMonthMetricPersistenceDAO.java new file mode 100644 index 0000000000..cee6008732 --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/imp/IInstanceMonthMetricPersistenceDAO.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.storage.dao.imp; + +import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric; + +/** + * @author peng-yongsheng + */ +public interface IInstanceMonthMetricPersistenceDAO extends IPersistenceDAO { +} diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/DataTTLKeeperTimer.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/DataTTLKeeperTimer.java index 04266733ad..8887c3482f 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/DataTTLKeeperTimer.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/DataTTLKeeperTimer.java @@ -27,6 +27,7 @@ import java.util.Calendar; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.skywalking.apm.collector.storage.dao.armp.IApplicationReferenceMinuteMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMinuteMetricPersistenceDAO; import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMinuteMetricPersistenceDAO; /** @@ -88,7 +89,7 @@ public class DataTTLKeeperTimer { IGlobalTracePersistenceDAO globalTracePersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IGlobalTracePersistenceDAO.class); globalTracePersistenceDAO.deleteHistory(startTimestamp, endTimestamp); - IInstanceMetricPersistenceDAO instanceMetricPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IInstanceMetricPersistenceDAO.class); + IInstanceMinuteMetricPersistenceDAO instanceMetricPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IInstanceMinuteMetricPersistenceDAO.class); instanceMetricPersistenceDAO.deleteHistory(startTimestamp, endTimestamp); IApplicationComponentPersistenceDAO applicationComponentPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IApplicationComponentPersistenceDAO.class); diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java index 693f9808b8..11d233082b 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java @@ -55,7 +55,7 @@ import org.apache.skywalking.apm.collector.storage.dao.IInstanceAlarmPersistence import org.apache.skywalking.apm.collector.storage.dao.IInstanceCacheDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceMappingPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.IInstanceMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMinuteMetricPersistenceDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceMetricUIDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceReferenceAlarmListPersistenceDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceReferenceAlarmPersistenceDAO; @@ -111,7 +111,7 @@ import org.apache.skywalking.apm.collector.storage.es.dao.InstanceEsRegisterDAO; import org.apache.skywalking.apm.collector.storage.es.dao.InstanceEsUIDAO; import org.apache.skywalking.apm.collector.storage.es.dao.InstanceHeartBeatEsPersistenceDAO; import org.apache.skywalking.apm.collector.storage.es.dao.InstanceMappingEsPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.es.dao.InstanceMetricEsPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.es.dao.imp.InstanceMinuteMetricEsPersistenceDAO; import org.apache.skywalking.apm.collector.storage.es.dao.InstanceMetricEsUIDAO; import org.apache.skywalking.apm.collector.storage.es.dao.InstanceReferenceAlarmEsPersistenceDAO; import org.apache.skywalking.apm.collector.storage.es.dao.InstanceReferenceAlarmListEsPersistenceDAO; @@ -243,7 +243,7 @@ public class StorageModuleEsProvider extends ModuleProvider { this.registerServiceImplementation(IServiceMinuteMetricPersistenceDAO.class, new ServiceMinuteMetricEsPersistenceDAO(elasticSearchClient)); this.registerServiceImplementation(IServiceReferenceMinuteMetricPersistenceDAO.class, new ServiceReferenceMinuteMetricEsPersistenceDAO(elasticSearchClient)); - this.registerServiceImplementation(IInstanceMetricPersistenceDAO.class, new InstanceMetricEsPersistenceDAO(elasticSearchClient)); + this.registerServiceImplementation(IInstanceMinuteMetricPersistenceDAO.class, new InstanceMinuteMetricEsPersistenceDAO(elasticSearchClient)); this.registerServiceImplementation(IInstanceReferenceMinuteMetricPersistenceDAO.class, new InstanceReferenceMinuteMetricEsPersistenceDAO(elasticSearchClient)); this.registerServiceImplementation(IInstanceMappingPersistenceDAO.class, new InstanceMappingEsPersistenceDAO(elasticSearchClient)); this.registerServiceImplementation(IInstanceHeartBeatPersistenceDAO.class, new InstanceHeartBeatEsPersistenceDAO(elasticSearchClient)); diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/InstanceMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/AbstractInstanceMetricEsPersistenceDAO.java similarity index 86% rename from apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/InstanceMetricEsPersistenceDAO.java rename to apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/AbstractInstanceMetricEsPersistenceDAO.java index bc22aed4e1..626545de6d 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/InstanceMetricEsPersistenceDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/AbstractInstanceMetricEsPersistenceDAO.java @@ -16,37 +16,31 @@ * */ -package org.apache.skywalking.apm.collector.storage.es.dao; +package org.apache.skywalking.apm.collector.storage.es.dao.imp; import java.util.HashMap; import java.util.Map; import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; -import org.apache.skywalking.apm.collector.storage.dao.IInstanceMetricPersistenceDAO; import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO; import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric; import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetricTable; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.update.UpdateRequestBuilder; /** * @author peng-yongsheng */ -public class InstanceMetricEsPersistenceDAO extends AbstractPersistenceEsDAO implements IInstanceMetricPersistenceDAO { +public abstract class AbstractInstanceMetricEsPersistenceDAO extends AbstractPersistenceEsDAO { - public InstanceMetricEsPersistenceDAO(ElasticSearchClient client) { + AbstractInstanceMetricEsPersistenceDAO(ElasticSearchClient client) { super(client); } - @Override protected String tableName() { - return InstanceMetricTable.TABLE; - } - - @Override protected String timeBucketColumnNameForDelete() { + @Override protected final String timeBucketColumnNameForDelete() { return InstanceMetricTable.COLUMN_TIME_BUCKET; } - @Override protected InstanceMetric esDataToStreamData(Map source) { + @Override protected final InstanceMetric esDataToStreamData(Map source) { InstanceMetric instanceMetric = new InstanceMetric(); + instanceMetric.setId((String)source.get(InstanceMetricTable.COLUMN_ID)); instanceMetric.setMetricId((String)source.get(InstanceMetricTable.COLUMN_METRIC_ID)); instanceMetric.setApplicationId((Integer)source.get(InstanceMetricTable.COLUMN_APPLICATION_ID)); @@ -69,11 +63,14 @@ public class InstanceMetricEsPersistenceDAO extends AbstractPersistenceEsDAO esStreamDataToEsData(InstanceMetric streamData) { + @Override protected final Map esStreamDataToEsData(InstanceMetric streamData) { Map source = new HashMap<>(); + source.put(InstanceMetricTable.COLUMN_ID, streamData.getId()); + source.put(InstanceMetricTable.COLUMN_METRIC_ID, streamData.getMetricId()); + source.put(InstanceMetricTable.COLUMN_METRIC_ID, streamData.getMetricId()); source.put(InstanceMetricTable.COLUMN_APPLICATION_ID, streamData.getApplicationId()); diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/InstanceDayMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/InstanceDayMetricEsPersistenceDAO.java new file mode 100644 index 0000000000..3141571c3d --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/InstanceDayMetricEsPersistenceDAO.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.storage.es.dao.imp; + +import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; +import org.apache.skywalking.apm.collector.core.storage.TimePyramid; +import org.apache.skywalking.apm.collector.core.util.Const; +import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceDayMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMinuteMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetricTable; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.update.UpdateRequestBuilder; + +/** + * @author peng-yongsheng + */ +public class InstanceDayMetricEsPersistenceDAO extends AbstractInstanceMetricEsPersistenceDAO implements IInstanceDayMetricPersistenceDAO { + + public InstanceDayMetricEsPersistenceDAO(ElasticSearchClient client) { + super(client); + } + + @Override protected String tableName() { + return InstanceMetricTable.TABLE + Const.ID_SPLIT + TimePyramid.Day.getName(); + } +} diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/InstanceHourMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/InstanceHourMetricEsPersistenceDAO.java new file mode 100644 index 0000000000..22146814c7 --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/InstanceHourMetricEsPersistenceDAO.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.storage.es.dao.imp; + +import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; +import org.apache.skywalking.apm.collector.core.storage.TimePyramid; +import org.apache.skywalking.apm.collector.core.util.Const; +import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceHourMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetricTable; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.update.UpdateRequestBuilder; + +/** + * @author peng-yongsheng + */ +public class InstanceHourMetricEsPersistenceDAO extends AbstractInstanceMetricEsPersistenceDAO implements IInstanceHourMetricPersistenceDAO { + + public InstanceHourMetricEsPersistenceDAO(ElasticSearchClient client) { + super(client); + } + + @Override protected String tableName() { + return InstanceMetricTable.TABLE + Const.ID_SPLIT + TimePyramid.Hour.getName(); + } +} diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/InstanceMinuteMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/InstanceMinuteMetricEsPersistenceDAO.java new file mode 100644 index 0000000000..6f3027678f --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/InstanceMinuteMetricEsPersistenceDAO.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.storage.es.dao.imp; + +import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; +import org.apache.skywalking.apm.collector.core.storage.TimePyramid; +import org.apache.skywalking.apm.collector.core.util.Const; +import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMinuteMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetricTable; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.update.UpdateRequestBuilder; + +/** + * @author peng-yongsheng + */ +public class InstanceMinuteMetricEsPersistenceDAO extends AbstractInstanceMetricEsPersistenceDAO implements IInstanceMinuteMetricPersistenceDAO { + + public InstanceMinuteMetricEsPersistenceDAO(ElasticSearchClient client) { + super(client); + } + + @Override protected String tableName() { + return InstanceMetricTable.TABLE + Const.ID_SPLIT + TimePyramid.Minute.getName(); + } +} diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/InstanceMonthMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/InstanceMonthMetricEsPersistenceDAO.java new file mode 100644 index 0000000000..d34e7ea03e --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/InstanceMonthMetricEsPersistenceDAO.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.storage.es.dao.imp; + +import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; +import org.apache.skywalking.apm.collector.core.storage.TimePyramid; +import org.apache.skywalking.apm.collector.core.util.Const; +import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMonthMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetricTable; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.update.UpdateRequestBuilder; + +/** + * @author peng-yongsheng + */ +public class InstanceMonthMetricEsPersistenceDAO extends AbstractInstanceMetricEsPersistenceDAO implements IInstanceMonthMetricPersistenceDAO { + + public InstanceMonthMetricEsPersistenceDAO(ElasticSearchClient client) { + super(client); + } + + @Override protected String tableName() { + return InstanceMetricTable.TABLE + Const.ID_SPLIT + TimePyramid.Month.getName(); + } +} diff --git a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java index d05590c13a..1eb18824eb 100644 --- a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java +++ b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java @@ -51,7 +51,7 @@ import org.apache.skywalking.apm.collector.storage.dao.IInstanceAlarmPersistence import org.apache.skywalking.apm.collector.storage.dao.IInstanceCacheDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceMappingPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.IInstanceMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMinuteMetricPersistenceDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceMetricUIDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceReferenceAlarmListPersistenceDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceReferenceAlarmPersistenceDAO; @@ -107,7 +107,7 @@ import org.apache.skywalking.apm.collector.storage.h2.dao.InstanceH2RegisterDAO; import org.apache.skywalking.apm.collector.storage.h2.dao.InstanceH2UIDAO; import org.apache.skywalking.apm.collector.storage.h2.dao.InstanceHeartBeatH2PersistenceDAO; import org.apache.skywalking.apm.collector.storage.h2.dao.InstanceMappingH2PersistenceDAO; -import org.apache.skywalking.apm.collector.storage.h2.dao.InstanceMetricH2PersistenceDAO; +import org.apache.skywalking.apm.collector.storage.h2.dao.InstanceMinuteMetricH2PersistenceDAO; import org.apache.skywalking.apm.collector.storage.h2.dao.InstanceMetricH2UIDAO; import org.apache.skywalking.apm.collector.storage.h2.dao.InstanceReferenceAlarmH2PersistenceDAO; import org.apache.skywalking.apm.collector.storage.h2.dao.InstanceReferenceAlarmListH2PersistenceDAO; @@ -221,7 +221,7 @@ public class StorageModuleH2Provider extends ModuleProvider { this.registerServiceImplementation(IServiceMinuteMetricPersistenceDAO.class, new ServiceMinuteMetricH2PersistenceDAO(h2Client)); this.registerServiceImplementation(IServiceReferenceMinuteMetricPersistenceDAO.class, new ServiceReferenceMetricH2PersistenceDAO(h2Client)); - this.registerServiceImplementation(IInstanceMetricPersistenceDAO.class, new InstanceMetricH2PersistenceDAO(h2Client)); + this.registerServiceImplementation(IInstanceMinuteMetricPersistenceDAO.class, new InstanceMinuteMetricH2PersistenceDAO(h2Client)); this.registerServiceImplementation(IInstanceReferenceMinuteMetricPersistenceDAO.class, new InstanceReferenceMetricH2PersistenceDAO(h2Client)); this.registerServiceImplementation(IInstanceMappingPersistenceDAO.class, new InstanceMappingH2PersistenceDAO(h2Client)); this.registerServiceImplementation(IInstanceHeartBeatPersistenceDAO.class, new InstanceHeartBeatH2PersistenceDAO(h2Client)); diff --git a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/InstanceMetricH2PersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/InstanceMinuteMetricH2PersistenceDAO.java similarity index 96% rename from apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/InstanceMetricH2PersistenceDAO.java rename to apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/InstanceMinuteMetricH2PersistenceDAO.java index 6b89183f6f..d35f09aaa8 100644 --- a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/InstanceMetricH2PersistenceDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/InstanceMinuteMetricH2PersistenceDAO.java @@ -26,7 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.skywalking.apm.collector.storage.base.sql.SqlBuilder; -import org.apache.skywalking.apm.collector.storage.dao.IInstanceMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMinuteMetricPersistenceDAO; import org.apache.skywalking.apm.collector.storage.h2.base.define.H2SqlEntity; import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric; import org.apache.skywalking.apm.collector.client.h2.H2Client; @@ -39,12 +39,12 @@ import org.slf4j.LoggerFactory; /** * @author peng-yongsheng, clevertension */ -public class InstanceMetricH2PersistenceDAO extends H2DAO implements IInstanceMetricPersistenceDAO { +public class InstanceMinuteMetricH2PersistenceDAO extends H2DAO implements IInstanceMinuteMetricPersistenceDAO { - private final Logger logger = LoggerFactory.getLogger(InstanceMetricH2PersistenceDAO.class); + private final Logger logger = LoggerFactory.getLogger(InstanceMinuteMetricH2PersistenceDAO.class); private static final String GET_SQL = "select * from {0} where {1} = ?"; - public InstanceMetricH2PersistenceDAO(H2Client client) { + public InstanceMinuteMetricH2PersistenceDAO(H2Client client) { super(client); } -- GitLab