From 801ea93651bca2fdcc19f4232aae7795fa3e0cd9 Mon Sep 17 00:00:00 2001 From: peng-yongsheng <8082209@qq.com> Date: Sun, 7 Jan 2018 18:32:43 +0800 Subject: [PATCH] Instance reference metric pyramid aggregate. --- .../InstanceReferenceMetricAlarmGraph.java | 2 +- .../define/graph/MetricWorkerIdDefine.java | 12 +- .../AnalysisMetricModuleProvider.java | 2 +- .../ApplicationReferenceMetricGraph.java | 2 +- .../worker/instance/InstanceMetricGraph.java | 2 +- ...eReferenceDayMetricPersistenceWorker.java} | 18 +-- ...stanceReferenceDayMetricTransformNode.java | 44 +++++ ...eReferenceHourMetricPersistenceWorker.java | 67 ++++++++ ...tanceReferenceHourMetricTransformNode.java | 44 +++++ ...tanceReferenceMetricAggregationWorker.java | 14 +- .../InstanceReferenceMetricGraph.java | 20 ++- .../InstanceReferenceMetricRemoteWorker.java | 4 +- ...eferenceMinuteMetricPersistenceWorker.java | 67 ++++++++ ...ReferenceMonthMetricPersistenceWorker.java | 67 ++++++++ ...anceReferenceMonthMetricTransformNode.java | 44 +++++ ...erviceReferenceDayMetricTransformNode.java | 2 + ...rviceReferenceHourMetricTransformNode.java | 2 + ...viceReferenceMonthMetricTransformNode.java | 2 + .../apm/collector/storage/StorageModule.java | 4 +- .../storage/base/dao/IPersistenceDAO.java | 6 +- ...anceReferenceDayMetricPersistenceDAO.java} | 4 +- ...anceReferenceHourMetricPersistenceDAO.java | 28 ++++ ...ceReferenceMinuteMetricPersistenceDAO.java | 28 ++++ ...nceReferenceMonthMetricPersistenceDAO.java | 28 ++++ ...rviceReferenceDayMetricPersistenceDAO.java | 2 +- ...viceReferenceHourMetricPersistenceDAO.java | 2 +- ...ceReferenceMinuteMetricPersistenceDAO.java | 2 +- ...iceReferenceMonthMetricPersistenceDAO.java | 2 +- .../instance/InstanceReferenceMetric.java | 25 ++- .../storage/es/StorageModuleEsProvider.java | 6 +- ...stanceReferenceMetricEsPersistenceDAO.java | 150 ------------------ ...stanceReferenceMetricEsPersistenceDAO.java | 101 ++++++++++++ ...nceReferenceDayMetricEsPersistenceDAO.java | 46 ++++++ ...ceReferenceHourMetricEsPersistenceDAO.java | 46 ++++++ ...ReferenceMinuteMetricEsPersistenceDAO.java | 42 +++++ ...eReferenceMonthMetricEsPersistenceDAO.java | 46 ++++++ .../storage/h2/StorageModuleH2Provider.java | 4 +- ...stanceReferenceMetricH2PersistenceDAO.java | 4 +- 38 files changed, 792 insertions(+), 199 deletions(-) rename apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/{InstanceReferencePersistenceWorker.java => refmetric/InstanceReferenceDayMetricPersistenceWorker.java} (73%) create mode 100644 apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceDayMetricTransformNode.java create mode 100644 apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceHourMetricPersistenceWorker.java create mode 100644 apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceHourMetricTransformNode.java rename apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/{ => refmetric}/InstanceReferenceMetricAggregationWorker.java (93%) rename apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/{ => refmetric}/InstanceReferenceMetricGraph.java (74%) rename apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/{ => refmetric}/InstanceReferenceMetricRemoteWorker.java (95%) create mode 100644 apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceMinuteMetricPersistenceWorker.java create mode 100644 apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceMonthMetricPersistenceWorker.java create mode 100644 apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceMonthMetricTransformNode.java rename apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/{IInstanceReferenceMetricPersistenceDAO.java => irmp/IInstanceReferenceDayMetricPersistenceDAO.java} (81%) create mode 100644 apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/irmp/IInstanceReferenceHourMetricPersistenceDAO.java create mode 100644 apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/irmp/IInstanceReferenceMinuteMetricPersistenceDAO.java create mode 100644 apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/irmp/IInstanceReferenceMonthMetricPersistenceDAO.java delete mode 100644 apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/InstanceReferenceMetricEsPersistenceDAO.java create mode 100644 apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/AbstractInstanceReferenceMetricEsPersistenceDAO.java create mode 100644 apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceDayMetricEsPersistenceDAO.java create mode 100644 apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceHourMetricEsPersistenceDAO.java create mode 100644 apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceMinuteMetricEsPersistenceDAO.java create mode 100644 apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceMonthMetricEsPersistenceDAO.java diff --git a/apm-collector/apm-collector-analysis/analysis-alarm/alarm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/alarm/provider/worker/instance/InstanceReferenceMetricAlarmGraph.java b/apm-collector/apm-collector-analysis/analysis-alarm/alarm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/alarm/provider/worker/instance/InstanceReferenceMetricAlarmGraph.java index 5ace500a37..7ef12e1a68 100644 --- a/apm-collector/apm-collector-analysis/analysis-alarm/alarm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/alarm/provider/worker/instance/InstanceReferenceMetricAlarmGraph.java +++ b/apm-collector/apm-collector-analysis/analysis-alarm/alarm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/alarm/provider/worker/instance/InstanceReferenceMetricAlarmGraph.java @@ -64,7 +64,7 @@ public class InstanceReferenceMetricAlarmGraph { private void link(Graph graph) { GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.INSTANCE_REFERENCE_METRIC_GRAPH_ID, InstanceReferenceMetric.class) - .toFinder().findNode(MetricWorkerIdDefine.INSTANCE_REFERENCE_METRIC_PERSISTENCE_WORKER_ID, InstanceReferenceMetric.class) + .toFinder().findNode(MetricWorkerIdDefine.INSTANCE_REFERENCE_MINUTE_METRIC_PERSISTENCE_WORKER_ID, InstanceReferenceMetric.class) .addNext(new NodeProcessor() { @Override public int id() { return AlarmWorkerIdDefine.INSTANCE_REFERENCE_METRIC_ALARM_GRAPH_BRIDGE_WORKER_ID; diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-define/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/define/graph/MetricWorkerIdDefine.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-define/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/define/graph/MetricWorkerIdDefine.java index 2af213fddd..b63eaa1534 100644 --- a/apm-collector/apm-collector-analysis/analysis-metric/metric-define/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/define/graph/MetricWorkerIdDefine.java +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-define/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/define/graph/MetricWorkerIdDefine.java @@ -32,9 +32,15 @@ public class MetricWorkerIdDefine { public static final int SERVICE_REFERENCE_MONTH_METRIC_PERSISTENCE_WORKER_ID = 4107; public static final int SERVICE_REFERENCE_MONTH_METRIC_TRANSFORM_NODE_ID = 4108; - public static final int INSTANCE_REFERENCE_METRIC_AGGREGATION_WORKER_ID = 403; - public static final int INSTANCE_REFERENCE_METRIC_REMOTE_WORKER_ID = 404; - public static final int INSTANCE_REFERENCE_METRIC_PERSISTENCE_WORKER_ID = 405; + public static final int INSTANCE_REFERENCE_MINUTE_METRIC_AGGREGATION_WORKER_ID = 4200; + public static final int INSTANCE_REFERENCE_MINUTE_METRIC_REMOTE_WORKER_ID = 4201; + public static final int INSTANCE_REFERENCE_MINUTE_METRIC_PERSISTENCE_WORKER_ID = 4202; + public static final int INSTANCE_REFERENCE_HOUR_METRIC_PERSISTENCE_WORKER_ID = 4203; + public static final int INSTANCE_REFERENCE_HOUR_METRIC_TRANSFORM_NODE_ID = 4204; + public static final int INSTANCE_REFERENCE_DAY_METRIC_PERSISTENCE_WORKER_ID = 4205; + public static final int INSTANCE_REFERENCE_DAY_METRIC_TRANSFORM_NODE_ID = 4206; + public static final int INSTANCE_REFERENCE_MONTH_METRIC_PERSISTENCE_WORKER_ID = 4207; + public static final int INSTANCE_REFERENCE_MONTH_METRIC_TRANSFORM_NODE_ID = 4208; public static final int APPLICATION_REFERENCE_METRIC_AGGREGATION_WORKER_ID = 406; public static final int APPLICATION_REFERENCE_METRIC_REMOTE_WORKER_ID = 407; diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java index 632aa5763e..fd457336e3 100644 --- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java @@ -31,7 +31,7 @@ import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.globa import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.InstanceMappingGraph; import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.InstanceMappingSpanListener; import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.InstanceMetricGraph; -import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.InstanceReferenceMetricGraph; +import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.refmetric.InstanceReferenceMetricGraph; import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.segment.SegmentCostGraph; import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.segment.SegmentCostSpanListener; import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.ServiceEntryGraph; diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/application/ApplicationReferenceMetricGraph.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/application/ApplicationReferenceMetricGraph.java index 50c7a5a36a..92df5164d3 100644 --- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/application/ApplicationReferenceMetricGraph.java +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/application/ApplicationReferenceMetricGraph.java @@ -57,7 +57,7 @@ public class ApplicationReferenceMetricGraph { private void link(Graph graph) { GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.INSTANCE_REFERENCE_METRIC_GRAPH_ID, InstanceReferenceMetric.class) - .toFinder().findNode(MetricWorkerIdDefine.INSTANCE_REFERENCE_METRIC_AGGREGATION_WORKER_ID, InstanceReferenceMetric.class) + .toFinder().findNode(MetricWorkerIdDefine.INSTANCE_REFERENCE_MINUTE_METRIC_AGGREGATION_WORKER_ID, InstanceReferenceMetric.class) .addNext(new NodeProcessor() { @Override public int id() { return MetricWorkerIdDefine.APPLICATION_REFERENCE_GRAPH_BRIDGE_WORKER_ID; diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceMetricGraph.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceMetricGraph.java index 5ca0d9042d..b37c7aaed6 100644 --- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceMetricGraph.java +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceMetricGraph.java @@ -57,7 +57,7 @@ public class InstanceMetricGraph { private void link(Graph graph) { GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.INSTANCE_REFERENCE_METRIC_GRAPH_ID, InstanceReferenceMetric.class) - .toFinder().findNode(MetricWorkerIdDefine.INSTANCE_REFERENCE_METRIC_AGGREGATION_WORKER_ID, InstanceReferenceMetric.class) + .toFinder().findNode(MetricWorkerIdDefine.INSTANCE_REFERENCE_MINUTE_METRIC_AGGREGATION_WORKER_ID, InstanceReferenceMetric.class) .addNext(new NodeProcessor() { @Override public int id() { diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceReferencePersistenceWorker.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceDayMetricPersistenceWorker.java similarity index 73% rename from apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceReferencePersistenceWorker.java rename to apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceDayMetricPersistenceWorker.java index 4d0eba8b42..2cbd7c101b 100644 --- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceReferencePersistenceWorker.java +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceDayMetricPersistenceWorker.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance; +package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.refmetric; import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine; import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorker; @@ -24,39 +24,39 @@ import org.apache.skywalking.apm.collector.analysis.worker.model.impl.Persistenc import org.apache.skywalking.apm.collector.core.module.ModuleManager; import org.apache.skywalking.apm.collector.storage.StorageModule; import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.IInstanceReferenceMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceDayMetricPersistenceDAO; import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; /** * @author peng-yongsheng */ -public class InstanceReferencePersistenceWorker extends PersistenceWorker { +public class InstanceReferenceDayMetricPersistenceWorker extends PersistenceWorker { - public InstanceReferencePersistenceWorker(ModuleManager moduleManager) { + public InstanceReferenceDayMetricPersistenceWorker(ModuleManager moduleManager) { super(moduleManager); } @Override public int id() { - return MetricWorkerIdDefine.INSTANCE_REFERENCE_METRIC_PERSISTENCE_WORKER_ID; + return MetricWorkerIdDefine.INSTANCE_REFERENCE_DAY_METRIC_PERSISTENCE_WORKER_ID; } @SuppressWarnings("unchecked") @Override protected IPersistenceDAO persistenceDAO() { - return getModuleManager().find(StorageModule.NAME).getService(IInstanceReferenceMetricPersistenceDAO.class); + return getModuleManager().find(StorageModule.NAME).getService(IInstanceReferenceDayMetricPersistenceDAO.class); } @Override protected boolean needMergeDBData() { return true; } - public static class Factory extends PersistenceWorkerProvider { + public static class Factory extends PersistenceWorkerProvider { public Factory(ModuleManager moduleManager) { super(moduleManager); } - @Override public InstanceReferencePersistenceWorker workerInstance(ModuleManager moduleManager) { - return new InstanceReferencePersistenceWorker(moduleManager); + @Override public InstanceReferenceDayMetricPersistenceWorker workerInstance(ModuleManager moduleManager) { + return new InstanceReferenceDayMetricPersistenceWorker(moduleManager); } @Override diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceDayMetricTransformNode.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceDayMetricTransformNode.java new file mode 100644 index 0000000000..c949d675d3 --- /dev/null +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceDayMetricTransformNode.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.refmetric; + +import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine; +import org.apache.skywalking.apm.collector.core.graph.Next; +import org.apache.skywalking.apm.collector.core.graph.NodeProcessor; +import org.apache.skywalking.apm.collector.core.util.Const; +import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; + +/** + * @author peng-yongsheng + */ +public class InstanceReferenceDayMetricTransformNode implements NodeProcessor { + + @Override public int id() { + return MetricWorkerIdDefine.INSTANCE_REFERENCE_DAY_METRIC_TRANSFORM_NODE_ID; + } + + @Override public void process(InstanceReferenceMetric instanceReferenceMetric, Next next) { + long timeBucket = TimeBucketUtils.INSTANCE.minuteToDay(instanceReferenceMetric.getTimeBucket()); + instanceReferenceMetric.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + instanceReferenceMetric.getMetricId()); + instanceReferenceMetric.setTimeBucket(timeBucket); + + next.execute(instanceReferenceMetric); + } +} diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceHourMetricPersistenceWorker.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceHourMetricPersistenceWorker.java new file mode 100644 index 0000000000..f5b26726a5 --- /dev/null +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceHourMetricPersistenceWorker.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.refmetric; + +import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine; +import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorker; +import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorkerProvider; +import org.apache.skywalking.apm.collector.core.module.ModuleManager; +import org.apache.skywalking.apm.collector.storage.StorageModule; +import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceHourMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; + +/** + * @author peng-yongsheng + */ +public class InstanceReferenceHourMetricPersistenceWorker extends PersistenceWorker { + + public InstanceReferenceHourMetricPersistenceWorker(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public int id() { + return MetricWorkerIdDefine.INSTANCE_REFERENCE_HOUR_METRIC_PERSISTENCE_WORKER_ID; + } + + @SuppressWarnings("unchecked") + @Override protected IPersistenceDAO persistenceDAO() { + return getModuleManager().find(StorageModule.NAME).getService(IInstanceReferenceHourMetricPersistenceDAO.class); + } + + @Override protected boolean needMergeDBData() { + return true; + } + + public static class Factory extends PersistenceWorkerProvider { + + public Factory(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public InstanceReferenceHourMetricPersistenceWorker workerInstance(ModuleManager moduleManager) { + return new InstanceReferenceHourMetricPersistenceWorker(moduleManager); + } + + @Override + public int queueSize() { + return 1024; + } + } +} diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceHourMetricTransformNode.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceHourMetricTransformNode.java new file mode 100644 index 0000000000..c76789afed --- /dev/null +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceHourMetricTransformNode.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.refmetric; + +import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine; +import org.apache.skywalking.apm.collector.core.graph.Next; +import org.apache.skywalking.apm.collector.core.graph.NodeProcessor; +import org.apache.skywalking.apm.collector.core.util.Const; +import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; + +/** + * @author peng-yongsheng + */ +public class InstanceReferenceHourMetricTransformNode implements NodeProcessor { + + @Override public int id() { + return MetricWorkerIdDefine.INSTANCE_REFERENCE_HOUR_METRIC_TRANSFORM_NODE_ID; + } + + @Override public void process(InstanceReferenceMetric instanceReferenceMetric, Next next) { + long timeBucket = TimeBucketUtils.INSTANCE.minuteToHour(instanceReferenceMetric.getTimeBucket()); + instanceReferenceMetric.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + instanceReferenceMetric.getMetricId()); + instanceReferenceMetric.setTimeBucket(timeBucket); + + next.execute(instanceReferenceMetric); + } +} diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceReferenceMetricAggregationWorker.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceMetricAggregationWorker.java similarity index 93% rename from apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceReferenceMetricAggregationWorker.java rename to apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceMetricAggregationWorker.java index 138e904345..37b3555faa 100644 --- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceReferenceMetricAggregationWorker.java +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceMetricAggregationWorker.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance; +package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.refmetric; import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine; import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorkerProvider; @@ -36,16 +36,20 @@ public class InstanceReferenceMetricAggregationWorker extends AggregationWorker< } @Override public int id() { - return MetricWorkerIdDefine.INSTANCE_REFERENCE_METRIC_AGGREGATION_WORKER_ID; + return MetricWorkerIdDefine.INSTANCE_REFERENCE_MINUTE_METRIC_AGGREGATION_WORKER_ID; } @Override protected InstanceReferenceMetric transform(ServiceReferenceMetric serviceReferenceMetric) { - String id = serviceReferenceMetric.getTimeBucket() - + Const.ID_SPLIT + serviceReferenceMetric.getFrontInstanceId() + String metricId = serviceReferenceMetric.getFrontInstanceId() + Const.ID_SPLIT + serviceReferenceMetric.getBehindInstanceId() + Const.ID_SPLIT + serviceReferenceMetric.getSourceValue(); - InstanceReferenceMetric instanceReferenceMetric = new InstanceReferenceMetric(id); + String id = serviceReferenceMetric.getTimeBucket() + + Const.ID_SPLIT + metricId; + + InstanceReferenceMetric instanceReferenceMetric = new InstanceReferenceMetric(); + instanceReferenceMetric.setId(id); + instanceReferenceMetric.setMetricId(metricId); instanceReferenceMetric.setFrontApplicationId(serviceReferenceMetric.getFrontApplicationId()); instanceReferenceMetric.setFrontInstanceId(serviceReferenceMetric.getFrontInstanceId()); instanceReferenceMetric.setBehindApplicationId(serviceReferenceMetric.getBehindApplicationId()); diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceReferenceMetricGraph.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceMetricGraph.java similarity index 74% rename from apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceReferenceMetricGraph.java rename to apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceMetricGraph.java index ba22c83dc4..18ff59c473 100644 --- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceReferenceMetricGraph.java +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceMetricGraph.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance; +package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.refmetric; import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricGraphIdDefine; import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine; @@ -24,10 +24,12 @@ import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCrea import org.apache.skywalking.apm.collector.core.graph.Graph; import org.apache.skywalking.apm.collector.core.graph.GraphManager; import org.apache.skywalking.apm.collector.core.graph.Next; +import org.apache.skywalking.apm.collector.core.graph.Node; import org.apache.skywalking.apm.collector.core.graph.NodeProcessor; import org.apache.skywalking.apm.collector.core.module.ModuleManager; import org.apache.skywalking.apm.collector.remote.RemoteModule; import org.apache.skywalking.apm.collector.remote.service.RemoteSenderService; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric; /** @@ -48,9 +50,19 @@ public class InstanceReferenceMetricGraph { Graph graph = GraphManager.INSTANCE.createIfAbsent(MetricGraphIdDefine.INSTANCE_REFERENCE_METRIC_GRAPH_ID, ServiceReferenceMetric.class); - graph.addNode(new InstanceReferenceMetricAggregationWorker.Factory(moduleManager).create(workerCreateListener)) - .addNext(new InstanceReferenceMetricRemoteWorker.Factory(moduleManager, remoteSenderService, MetricGraphIdDefine.INSTANCE_REFERENCE_METRIC_GRAPH_ID).create(workerCreateListener)) - .addNext(new InstanceReferencePersistenceWorker.Factory(moduleManager).create(workerCreateListener)); + Node remoteNode = graph.addNode(new InstanceReferenceMetricAggregationWorker.Factory(moduleManager).create(workerCreateListener)) + .addNext(new InstanceReferenceMetricRemoteWorker.Factory(moduleManager, remoteSenderService, MetricGraphIdDefine.INSTANCE_REFERENCE_METRIC_GRAPH_ID).create(workerCreateListener)); + + remoteNode.addNext(new InstanceReferenceMinuteMetricPersistenceWorker.Factory(moduleManager).create(workerCreateListener)); + + remoteNode.addNext(new InstanceReferenceHourMetricTransformNode()) + .addNext(new InstanceReferenceHourMetricPersistenceWorker.Factory(moduleManager).create(workerCreateListener)); + + remoteNode.addNext(new InstanceReferenceDayMetricTransformNode()) + .addNext(new InstanceReferenceDayMetricPersistenceWorker.Factory(moduleManager).create(workerCreateListener)); + + remoteNode.addNext(new InstanceReferenceMonthMetricTransformNode()) + .addNext(new InstanceReferenceMonthMetricPersistenceWorker.Factory(moduleManager).create(workerCreateListener)); link(graph); } diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceReferenceMetricRemoteWorker.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceMetricRemoteWorker.java similarity index 95% rename from apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceReferenceMetricRemoteWorker.java rename to apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceMetricRemoteWorker.java index dce688579b..f740d16dd1 100644 --- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/InstanceReferenceMetricRemoteWorker.java +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceMetricRemoteWorker.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance; +package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.refmetric; import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine; import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractRemoteWorker; @@ -37,7 +37,7 @@ public class InstanceReferenceMetricRemoteWorker extends AbstractRemoteWorker { + + public InstanceReferenceMinuteMetricPersistenceWorker(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public int id() { + return MetricWorkerIdDefine.INSTANCE_REFERENCE_MINUTE_METRIC_PERSISTENCE_WORKER_ID; + } + + @SuppressWarnings("unchecked") + @Override protected IPersistenceDAO persistenceDAO() { + return getModuleManager().find(StorageModule.NAME).getService(IInstanceReferenceMinuteMetricPersistenceDAO.class); + } + + @Override protected boolean needMergeDBData() { + return true; + } + + public static class Factory extends PersistenceWorkerProvider { + + public Factory(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public InstanceReferenceMinuteMetricPersistenceWorker workerInstance(ModuleManager moduleManager) { + return new InstanceReferenceMinuteMetricPersistenceWorker(moduleManager); + } + + @Override + public int queueSize() { + return 1024; + } + } +} diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceMonthMetricPersistenceWorker.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceMonthMetricPersistenceWorker.java new file mode 100644 index 0000000000..03972af004 --- /dev/null +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceMonthMetricPersistenceWorker.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.refmetric; + +import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine; +import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorker; +import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorkerProvider; +import org.apache.skywalking.apm.collector.core.module.ModuleManager; +import org.apache.skywalking.apm.collector.storage.StorageModule; +import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceMonthMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; + +/** + * @author peng-yongsheng + */ +public class InstanceReferenceMonthMetricPersistenceWorker extends PersistenceWorker { + + public InstanceReferenceMonthMetricPersistenceWorker(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public int id() { + return MetricWorkerIdDefine.INSTANCE_REFERENCE_MONTH_METRIC_PERSISTENCE_WORKER_ID; + } + + @SuppressWarnings("unchecked") + @Override protected IPersistenceDAO persistenceDAO() { + return getModuleManager().find(StorageModule.NAME).getService(IInstanceReferenceMonthMetricPersistenceDAO.class); + } + + @Override protected boolean needMergeDBData() { + return true; + } + + public static class Factory extends PersistenceWorkerProvider { + + public Factory(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public InstanceReferenceMonthMetricPersistenceWorker workerInstance(ModuleManager moduleManager) { + return new InstanceReferenceMonthMetricPersistenceWorker(moduleManager); + } + + @Override + public int queueSize() { + return 1024; + } + } +} diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceMonthMetricTransformNode.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceMonthMetricTransformNode.java new file mode 100644 index 0000000000..176af34414 --- /dev/null +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/refmetric/InstanceReferenceMonthMetricTransformNode.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.refmetric; + +import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine; +import org.apache.skywalking.apm.collector.core.graph.Next; +import org.apache.skywalking.apm.collector.core.graph.NodeProcessor; +import org.apache.skywalking.apm.collector.core.util.Const; +import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; + +/** + * @author peng-yongsheng + */ +public class InstanceReferenceMonthMetricTransformNode implements NodeProcessor { + + @Override public int id() { + return MetricWorkerIdDefine.INSTANCE_REFERENCE_MONTH_METRIC_TRANSFORM_NODE_ID; + } + + @Override public void process(InstanceReferenceMetric instanceReferenceMetric, Next next) { + long timeBucket = TimeBucketUtils.INSTANCE.minuteToMonth(instanceReferenceMetric.getTimeBucket()); + instanceReferenceMetric.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + instanceReferenceMetric.getMetricId()); + instanceReferenceMetric.setTimeBucket(timeBucket); + + next.execute(instanceReferenceMetric); + } +} diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/refmetric/ServiceReferenceDayMetricTransformNode.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/refmetric/ServiceReferenceDayMetricTransformNode.java index 105d4ef331..6cf44bbabf 100644 --- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/refmetric/ServiceReferenceDayMetricTransformNode.java +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/refmetric/ServiceReferenceDayMetricTransformNode.java @@ -38,5 +38,7 @@ public class ServiceReferenceDayMetricTransformNode implements NodeProcessor extends DAO { +public interface IPersistenceDAO extends DAO { STREAM_DATA get(String id); - Insert prepareBatchInsert(STREAM_DATA data); + INSERT prepareBatchInsert(STREAM_DATA data); - Update prepareBatchUpdate(STREAM_DATA data); + UPDATE prepareBatchUpdate(STREAM_DATA data); void deleteHistory(Long startTimestamp, Long endTimestamp); } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/IInstanceReferenceMetricPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/irmp/IInstanceReferenceDayMetricPersistenceDAO.java similarity index 81% rename from apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/IInstanceReferenceMetricPersistenceDAO.java rename to apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/irmp/IInstanceReferenceDayMetricPersistenceDAO.java index 339ae78eca..1d2cead681 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/IInstanceReferenceMetricPersistenceDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/irmp/IInstanceReferenceDayMetricPersistenceDAO.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.apm.collector.storage.dao; +package org.apache.skywalking.apm.collector.storage.dao.irmp; import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; @@ -24,5 +24,5 @@ import org.apache.skywalking.apm.collector.storage.table.instance.InstanceRefere /** * @author peng-yongsheng */ -public interface IInstanceReferenceMetricPersistenceDAO extends IPersistenceDAO { +public interface IInstanceReferenceDayMetricPersistenceDAO extends IPersistenceDAO { } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/irmp/IInstanceReferenceHourMetricPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/irmp/IInstanceReferenceHourMetricPersistenceDAO.java new file mode 100644 index 0000000000..bd6f8838ea --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/irmp/IInstanceReferenceHourMetricPersistenceDAO.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.storage.dao.irmp; + +import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; + +/** + * @author peng-yongsheng + */ +public interface IInstanceReferenceHourMetricPersistenceDAO extends IPersistenceDAO { +} diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/irmp/IInstanceReferenceMinuteMetricPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/irmp/IInstanceReferenceMinuteMetricPersistenceDAO.java new file mode 100644 index 0000000000..b7507cddc8 --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/irmp/IInstanceReferenceMinuteMetricPersistenceDAO.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.storage.dao.irmp; + +import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; + +/** + * @author peng-yongsheng + */ +public interface IInstanceReferenceMinuteMetricPersistenceDAO extends IPersistenceDAO { +} diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/irmp/IInstanceReferenceMonthMetricPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/irmp/IInstanceReferenceMonthMetricPersistenceDAO.java new file mode 100644 index 0000000000..cae66c8456 --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/irmp/IInstanceReferenceMonthMetricPersistenceDAO.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.storage.dao.irmp; + +import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; + +/** + * @author peng-yongsheng + */ +public interface IInstanceReferenceMonthMetricPersistenceDAO extends IPersistenceDAO { +} diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/srmp/IServiceReferenceDayMetricPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/srmp/IServiceReferenceDayMetricPersistenceDAO.java index 88e537c8fb..590f7191d5 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/srmp/IServiceReferenceDayMetricPersistenceDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/srmp/IServiceReferenceDayMetricPersistenceDAO.java @@ -24,5 +24,5 @@ import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenc /** * @author peng-yongsheng */ -public interface IServiceReferenceDayMetricPersistenceDAO extends IPersistenceDAO { +public interface IServiceReferenceDayMetricPersistenceDAO extends IPersistenceDAO { } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/srmp/IServiceReferenceHourMetricPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/srmp/IServiceReferenceHourMetricPersistenceDAO.java index feee7e24b1..d7329d1aae 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/srmp/IServiceReferenceHourMetricPersistenceDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/srmp/IServiceReferenceHourMetricPersistenceDAO.java @@ -24,5 +24,5 @@ import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenc /** * @author peng-yongsheng */ -public interface IServiceReferenceHourMetricPersistenceDAO extends IPersistenceDAO { +public interface IServiceReferenceHourMetricPersistenceDAO extends IPersistenceDAO { } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/srmp/IServiceReferenceMinuteMetricPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/srmp/IServiceReferenceMinuteMetricPersistenceDAO.java index 35a25a49c8..69021ecf9d 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/srmp/IServiceReferenceMinuteMetricPersistenceDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/srmp/IServiceReferenceMinuteMetricPersistenceDAO.java @@ -24,5 +24,5 @@ import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenc /** * @author peng-yongsheng */ -public interface IServiceReferenceMinuteMetricPersistenceDAO extends IPersistenceDAO { +public interface IServiceReferenceMinuteMetricPersistenceDAO extends IPersistenceDAO { } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/srmp/IServiceReferenceMonthMetricPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/srmp/IServiceReferenceMonthMetricPersistenceDAO.java index 28a49eb91b..e3e7eaf73f 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/srmp/IServiceReferenceMonthMetricPersistenceDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/srmp/IServiceReferenceMonthMetricPersistenceDAO.java @@ -24,5 +24,5 @@ import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenc /** * @author peng-yongsheng */ -public interface IServiceReferenceMonthMetricPersistenceDAO extends IPersistenceDAO { +public interface IServiceReferenceMonthMetricPersistenceDAO extends IPersistenceDAO { } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java index 2b6018cf4d..c05e1a784d 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java @@ -19,7 +19,7 @@ package org.apache.skywalking.apm.collector.storage.table.instance; import org.apache.skywalking.apm.collector.core.data.Column; -import org.apache.skywalking.apm.collector.core.data.AbstractData; +import org.apache.skywalking.apm.collector.core.data.StreamData; import org.apache.skywalking.apm.collector.core.data.operator.AddOperation; import org.apache.skywalking.apm.collector.core.data.operator.NonOperation; import org.apache.skywalking.apm.collector.storage.table.Metric; @@ -27,10 +27,11 @@ import org.apache.skywalking.apm.collector.storage.table.Metric; /** * @author peng-yongsheng */ -public class InstanceReferenceMetric extends AbstractData implements Metric { +public class InstanceReferenceMetric extends StreamData implements Metric { private static final Column[] STRING_COLUMNS = { new Column(InstanceReferenceMetricTable.COLUMN_ID, new NonOperation()), + new Column(InstanceReferenceMetricTable.COLUMN_METRIC_ID, new NonOperation()), }; private static final Column[] LONG_COLUMNS = { @@ -64,8 +65,24 @@ public class InstanceReferenceMetric extends AbstractData implements Metric { private static final Column[] BYTE_COLUMNS = {}; - public InstanceReferenceMetric(String id) { - super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS); + public InstanceReferenceMetric() { + super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS); + } + + @Override public String getId() { + return getDataString(0); + } + + @Override public void setId(String id) { + setDataString(0, id); + } + + @Override public String getMetricId() { + return getDataString(1); + } + + @Override public void setMetricId(String metricId) { + setDataString(1, metricId); } @Override diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java index b4ab873487..92004984fc 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java @@ -59,7 +59,7 @@ import org.apache.skywalking.apm.collector.storage.dao.IInstanceMetricPersistenc import org.apache.skywalking.apm.collector.storage.dao.IInstanceMetricUIDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceReferenceAlarmListPersistenceDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceReferenceAlarmPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.IInstanceReferenceMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceMinuteMetricPersistenceDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceRegisterDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceUIDAO; import org.apache.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO; @@ -115,7 +115,7 @@ import org.apache.skywalking.apm.collector.storage.es.dao.InstanceMetricEsPersis import org.apache.skywalking.apm.collector.storage.es.dao.InstanceMetricEsUIDAO; import org.apache.skywalking.apm.collector.storage.es.dao.InstanceReferenceAlarmEsPersistenceDAO; import org.apache.skywalking.apm.collector.storage.es.dao.InstanceReferenceAlarmListEsPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.es.dao.InstanceReferenceMetricEsPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.es.dao.irmp.InstanceReferenceMinuteMetricEsPersistenceDAO; import org.apache.skywalking.apm.collector.storage.es.dao.MemoryMetricEsPersistenceDAO; import org.apache.skywalking.apm.collector.storage.es.dao.MemoryMetricEsUIDAO; import org.apache.skywalking.apm.collector.storage.es.dao.MemoryPoolMetricEsPersistenceDAO; @@ -244,7 +244,7 @@ public class StorageModuleEsProvider extends ModuleProvider { this.registerServiceImplementation(IServiceReferenceMinuteMetricPersistenceDAO.class, new ServiceReferenceMinuteMetricEsPersistenceDAO(elasticSearchClient)); this.registerServiceImplementation(IInstanceMetricPersistenceDAO.class, new InstanceMetricEsPersistenceDAO(elasticSearchClient)); - this.registerServiceImplementation(IInstanceReferenceMetricPersistenceDAO.class, new InstanceReferenceMetricEsPersistenceDAO(elasticSearchClient)); + this.registerServiceImplementation(IInstanceReferenceMinuteMetricPersistenceDAO.class, new InstanceReferenceMinuteMetricEsPersistenceDAO(elasticSearchClient)); this.registerServiceImplementation(IInstanceMappingPersistenceDAO.class, new InstanceMappingEsPersistenceDAO(elasticSearchClient)); this.registerServiceImplementation(IInstanceHeartBeatPersistenceDAO.class, new InstanceHeartBeatEsPersistenceDAO(elasticSearchClient)); } diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/InstanceReferenceMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/InstanceReferenceMetricEsPersistenceDAO.java deleted file mode 100644 index b75d4d6499..0000000000 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/InstanceReferenceMetricEsPersistenceDAO.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.apm.collector.storage.es.dao; - -import java.util.HashMap; -import java.util.Map; -import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; -import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils; -import org.apache.skywalking.apm.collector.storage.dao.IInstanceReferenceMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO; -import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; -import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetricTable; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.update.UpdateRequestBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.index.reindex.BulkByScrollResponse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author peng-yongsheng - */ -public class InstanceReferenceMetricEsPersistenceDAO extends EsDAO implements IInstanceReferenceMetricPersistenceDAO { - - private final Logger logger = LoggerFactory.getLogger(InstanceReferenceMetricEsPersistenceDAO.class); - - public InstanceReferenceMetricEsPersistenceDAO(ElasticSearchClient client) { - super(client); - } - - @Override public InstanceReferenceMetric get(String id) { - GetResponse getResponse = getClient().prepareGet(InstanceReferenceMetricTable.TABLE, id).get(); - if (getResponse.isExists()) { - logger.debug("getId: {} is exist", id); - InstanceReferenceMetric instanceReferenceMetric = new InstanceReferenceMetric(id); - Map source = getResponse.getSource(); - instanceReferenceMetric.setFrontApplicationId((Integer)source.get(InstanceReferenceMetricTable.COLUMN_FRONT_APPLICATION_ID)); - instanceReferenceMetric.setBehindApplicationId((Integer)source.get(InstanceReferenceMetricTable.COLUMN_BEHIND_APPLICATION_ID)); - instanceReferenceMetric.setFrontInstanceId((Integer)source.get(InstanceReferenceMetricTable.COLUMN_FRONT_INSTANCE_ID)); - instanceReferenceMetric.setBehindInstanceId((Integer)source.get(InstanceReferenceMetricTable.COLUMN_BEHIND_INSTANCE_ID)); - instanceReferenceMetric.setSourceValue((Integer)source.get(InstanceReferenceMetricTable.COLUMN_SOURCE_VALUE)); - - instanceReferenceMetric.setTransactionCalls(((Number)source.get(InstanceReferenceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue()); - instanceReferenceMetric.setTransactionErrorCalls(((Number)source.get(InstanceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue()); - instanceReferenceMetric.setTransactionDurationSum(((Number)source.get(InstanceReferenceMetricTable.COLUMN_TRANSACTION_DURATION_SUM)).longValue()); - instanceReferenceMetric.setTransactionErrorDurationSum(((Number)source.get(InstanceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM)).longValue()); - - instanceReferenceMetric.setBusinessTransactionCalls(((Number)source.get(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_CALLS)).longValue()); - instanceReferenceMetric.setBusinessTransactionErrorCalls(((Number)source.get(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_CALLS)).longValue()); - instanceReferenceMetric.setBusinessTransactionDurationSum(((Number)source.get(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_DURATION_SUM)).longValue()); - instanceReferenceMetric.setBusinessTransactionErrorDurationSum(((Number)source.get(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_DURATION_SUM)).longValue()); - - instanceReferenceMetric.setMqTransactionCalls(((Number)source.get(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_CALLS)).longValue()); - instanceReferenceMetric.setMqTransactionErrorCalls(((Number)source.get(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_CALLS)).longValue()); - instanceReferenceMetric.setMqTransactionDurationSum(((Number)source.get(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_DURATION_SUM)).longValue()); - instanceReferenceMetric.setMqTransactionErrorDurationSum(((Number)source.get(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_DURATION_SUM)).longValue()); - - instanceReferenceMetric.setTimeBucket(((Number)source.get(InstanceReferenceMetricTable.COLUMN_TIME_BUCKET)).longValue()); - - return instanceReferenceMetric; - } else { - return null; - } - } - - @Override public IndexRequestBuilder prepareBatchInsert(InstanceReferenceMetric data) { - Map source = new HashMap<>(); - source.put(InstanceReferenceMetricTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId()); - source.put(InstanceReferenceMetricTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId()); - source.put(InstanceReferenceMetricTable.COLUMN_FRONT_INSTANCE_ID, data.getFrontInstanceId()); - source.put(InstanceReferenceMetricTable.COLUMN_BEHIND_INSTANCE_ID, data.getBehindInstanceId()); - source.put(InstanceReferenceMetricTable.COLUMN_SOURCE_VALUE, data.getSourceValue()); - - source.put(InstanceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, data.getTransactionCalls()); - source.put(InstanceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, data.getTransactionErrorCalls()); - source.put(InstanceReferenceMetricTable.COLUMN_TRANSACTION_DURATION_SUM, data.getTransactionDurationSum()); - source.put(InstanceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM, data.getTransactionErrorDurationSum()); - - source.put(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_CALLS, data.getBusinessTransactionCalls()); - source.put(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_CALLS, data.getBusinessTransactionErrorCalls()); - source.put(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_DURATION_SUM, data.getBusinessTransactionDurationSum()); - source.put(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_DURATION_SUM, data.getBusinessTransactionErrorDurationSum()); - - source.put(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_CALLS, data.getMqTransactionCalls()); - source.put(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_CALLS, data.getMqTransactionErrorCalls()); - source.put(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_DURATION_SUM, data.getMqTransactionDurationSum()); - source.put(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_DURATION_SUM, data.getMqTransactionErrorDurationSum()); - - source.put(InstanceReferenceMetricTable.COLUMN_TIME_BUCKET, data.getTimeBucket()); - - return getClient().prepareIndex(InstanceReferenceMetricTable.TABLE, data.getId()).setSource(source); - } - - @Override public UpdateRequestBuilder prepareBatchUpdate(InstanceReferenceMetric data) { - Map source = new HashMap<>(); - source.put(InstanceReferenceMetricTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId()); - source.put(InstanceReferenceMetricTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId()); - source.put(InstanceReferenceMetricTable.COLUMN_FRONT_INSTANCE_ID, data.getFrontInstanceId()); - source.put(InstanceReferenceMetricTable.COLUMN_BEHIND_INSTANCE_ID, data.getBehindInstanceId()); - source.put(InstanceReferenceMetricTable.COLUMN_SOURCE_VALUE, data.getSourceValue()); - - source.put(InstanceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, data.getTransactionCalls()); - source.put(InstanceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, data.getTransactionErrorCalls()); - source.put(InstanceReferenceMetricTable.COLUMN_TRANSACTION_DURATION_SUM, data.getTransactionDurationSum()); - source.put(InstanceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM, data.getTransactionErrorDurationSum()); - - source.put(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_CALLS, data.getBusinessTransactionCalls()); - source.put(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_CALLS, data.getBusinessTransactionErrorCalls()); - source.put(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_DURATION_SUM, data.getBusinessTransactionDurationSum()); - source.put(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_DURATION_SUM, data.getBusinessTransactionErrorDurationSum()); - - source.put(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_CALLS, data.getMqTransactionCalls()); - source.put(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_CALLS, data.getMqTransactionErrorCalls()); - source.put(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_DURATION_SUM, data.getMqTransactionDurationSum()); - source.put(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_DURATION_SUM, data.getMqTransactionErrorDurationSum()); - - source.put(InstanceReferenceMetricTable.COLUMN_TIME_BUCKET, data.getTimeBucket()); - - return getClient().prepareUpdate(InstanceReferenceMetricTable.TABLE, data.getId()).setDoc(source); - } - - @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) { - long startTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(startTimestamp); - long endTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(endTimestamp); - BulkByScrollResponse response = getClient().prepareDelete() - .filter(QueryBuilders.rangeQuery(InstanceReferenceMetricTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket)) - .source(InstanceReferenceMetricTable.TABLE) - .get(); - - long deleted = response.getDeleted(); - logger.info("Delete {} rows history from {} index.", deleted, InstanceReferenceMetricTable.TABLE); - } -} diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/AbstractInstanceReferenceMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/AbstractInstanceReferenceMetricEsPersistenceDAO.java new file mode 100644 index 0000000000..fbabc2d42f --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/AbstractInstanceReferenceMetricEsPersistenceDAO.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.storage.es.dao.irmp; + +import java.util.HashMap; +import java.util.Map; +import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; +import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetricTable; + +/** + * @author peng-yongsheng + */ +public abstract class AbstractInstanceReferenceMetricEsPersistenceDAO extends AbstractPersistenceEsDAO { + + AbstractInstanceReferenceMetricEsPersistenceDAO(ElasticSearchClient client) { + super(client); + } + + @Override protected final String timeBucketColumnNameForDelete() { + return InstanceReferenceMetricTable.COLUMN_TIME_BUCKET; + } + + @Override protected final InstanceReferenceMetric esDataToStreamData(Map source) { + InstanceReferenceMetric instanceReferenceMetric = new InstanceReferenceMetric(); + instanceReferenceMetric.setId((String)source.get(InstanceReferenceMetricTable.COLUMN_ID)); + instanceReferenceMetric.setMetricId((String)source.get(InstanceReferenceMetricTable.COLUMN_METRIC_ID)); + + instanceReferenceMetric.setFrontApplicationId((Integer)source.get(InstanceReferenceMetricTable.COLUMN_FRONT_APPLICATION_ID)); + instanceReferenceMetric.setBehindApplicationId((Integer)source.get(InstanceReferenceMetricTable.COLUMN_BEHIND_APPLICATION_ID)); + instanceReferenceMetric.setFrontInstanceId((Integer)source.get(InstanceReferenceMetricTable.COLUMN_FRONT_INSTANCE_ID)); + instanceReferenceMetric.setBehindInstanceId((Integer)source.get(InstanceReferenceMetricTable.COLUMN_BEHIND_INSTANCE_ID)); + instanceReferenceMetric.setSourceValue((Integer)source.get(InstanceReferenceMetricTable.COLUMN_SOURCE_VALUE)); + + instanceReferenceMetric.setTransactionCalls(((Number)source.get(InstanceReferenceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue()); + instanceReferenceMetric.setTransactionErrorCalls(((Number)source.get(InstanceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue()); + instanceReferenceMetric.setTransactionDurationSum(((Number)source.get(InstanceReferenceMetricTable.COLUMN_TRANSACTION_DURATION_SUM)).longValue()); + instanceReferenceMetric.setTransactionErrorDurationSum(((Number)source.get(InstanceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM)).longValue()); + + instanceReferenceMetric.setBusinessTransactionCalls(((Number)source.get(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_CALLS)).longValue()); + instanceReferenceMetric.setBusinessTransactionErrorCalls(((Number)source.get(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_CALLS)).longValue()); + instanceReferenceMetric.setBusinessTransactionDurationSum(((Number)source.get(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_DURATION_SUM)).longValue()); + instanceReferenceMetric.setBusinessTransactionErrorDurationSum(((Number)source.get(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_DURATION_SUM)).longValue()); + + instanceReferenceMetric.setMqTransactionCalls(((Number)source.get(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_CALLS)).longValue()); + instanceReferenceMetric.setMqTransactionErrorCalls(((Number)source.get(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_CALLS)).longValue()); + instanceReferenceMetric.setMqTransactionDurationSum(((Number)source.get(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_DURATION_SUM)).longValue()); + instanceReferenceMetric.setMqTransactionErrorDurationSum(((Number)source.get(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_DURATION_SUM)).longValue()); + + instanceReferenceMetric.setTimeBucket(((Number)source.get(InstanceReferenceMetricTable.COLUMN_TIME_BUCKET)).longValue()); + return instanceReferenceMetric; + } + + @Override protected final Map esStreamDataToEsData(InstanceReferenceMetric streamData) { + Map source = new HashMap<>(); + source.put(InstanceReferenceMetricTable.COLUMN_ID, streamData.getId()); + source.put(InstanceReferenceMetricTable.COLUMN_METRIC_ID, streamData.getMetricId()); + + source.put(InstanceReferenceMetricTable.COLUMN_FRONT_APPLICATION_ID, streamData.getFrontApplicationId()); + source.put(InstanceReferenceMetricTable.COLUMN_BEHIND_APPLICATION_ID, streamData.getBehindApplicationId()); + source.put(InstanceReferenceMetricTable.COLUMN_FRONT_INSTANCE_ID, streamData.getFrontInstanceId()); + source.put(InstanceReferenceMetricTable.COLUMN_BEHIND_INSTANCE_ID, streamData.getBehindInstanceId()); + source.put(InstanceReferenceMetricTable.COLUMN_SOURCE_VALUE, streamData.getSourceValue()); + + source.put(InstanceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, streamData.getTransactionCalls()); + source.put(InstanceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, streamData.getTransactionErrorCalls()); + source.put(InstanceReferenceMetricTable.COLUMN_TRANSACTION_DURATION_SUM, streamData.getTransactionDurationSum()); + source.put(InstanceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM, streamData.getTransactionErrorDurationSum()); + + source.put(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_CALLS, streamData.getBusinessTransactionCalls()); + source.put(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_CALLS, streamData.getBusinessTransactionErrorCalls()); + source.put(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_DURATION_SUM, streamData.getBusinessTransactionDurationSum()); + source.put(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_DURATION_SUM, streamData.getBusinessTransactionErrorDurationSum()); + + source.put(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_CALLS, streamData.getMqTransactionCalls()); + source.put(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_CALLS, streamData.getMqTransactionErrorCalls()); + source.put(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_DURATION_SUM, streamData.getMqTransactionDurationSum()); + source.put(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_DURATION_SUM, streamData.getMqTransactionErrorDurationSum()); + + source.put(InstanceReferenceMetricTable.COLUMN_TIME_BUCKET, streamData.getTimeBucket()); + + return source; + } +} diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceDayMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceDayMetricEsPersistenceDAO.java new file mode 100644 index 0000000000..c9eefd6e36 --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceDayMetricEsPersistenceDAO.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.storage.es.dao.irmp; + +import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; +import org.apache.skywalking.apm.collector.core.storage.TimePyramid; +import org.apache.skywalking.apm.collector.core.util.Const; +import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceDayMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetricTable; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.update.UpdateRequestBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class InstanceReferenceDayMetricEsPersistenceDAO extends AbstractInstanceReferenceMetricEsPersistenceDAO implements IInstanceReferenceDayMetricPersistenceDAO { + + private final Logger logger = LoggerFactory.getLogger(InstanceReferenceDayMetricEsPersistenceDAO.class); + + public InstanceReferenceDayMetricEsPersistenceDAO(ElasticSearchClient client) { + super(client); + } + + @Override protected String tableName() { + return InstanceReferenceMetricTable.TABLE + Const.ID_SPLIT + TimePyramid.Day.getName(); + } +} diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceHourMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceHourMetricEsPersistenceDAO.java new file mode 100644 index 0000000000..62b6ce8e76 --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceHourMetricEsPersistenceDAO.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.storage.es.dao.irmp; + +import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; +import org.apache.skywalking.apm.collector.core.storage.TimePyramid; +import org.apache.skywalking.apm.collector.core.util.Const; +import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceMinuteMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetricTable; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.update.UpdateRequestBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class InstanceReferenceHourMetricEsPersistenceDAO extends AbstractInstanceReferenceMetricEsPersistenceDAO implements IInstanceReferenceMinuteMetricPersistenceDAO { + + private final Logger logger = LoggerFactory.getLogger(InstanceReferenceHourMetricEsPersistenceDAO.class); + + public InstanceReferenceHourMetricEsPersistenceDAO(ElasticSearchClient client) { + super(client); + } + + @Override protected String tableName() { + return InstanceReferenceMetricTable.TABLE + Const.ID_SPLIT + TimePyramid.Minute.getName(); + } +} diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceMinuteMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceMinuteMetricEsPersistenceDAO.java new file mode 100644 index 0000000000..a9621d6753 --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceMinuteMetricEsPersistenceDAO.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.storage.es.dao.irmp; + +import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; +import org.apache.skywalking.apm.collector.core.storage.TimePyramid; +import org.apache.skywalking.apm.collector.core.util.Const; +import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceHourMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetricTable; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.update.UpdateRequestBuilder; + +/** + * @author peng-yongsheng + */ +public class InstanceReferenceMinuteMetricEsPersistenceDAO extends AbstractInstanceReferenceMetricEsPersistenceDAO implements IInstanceReferenceHourMetricPersistenceDAO { + + public InstanceReferenceMinuteMetricEsPersistenceDAO(ElasticSearchClient client) { + super(client); + } + + @Override protected String tableName() { + return InstanceReferenceMetricTable.TABLE + Const.ID_SPLIT + TimePyramid.Hour.getName(); + } +} diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceMonthMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceMonthMetricEsPersistenceDAO.java new file mode 100644 index 0000000000..b9d7637705 --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceMonthMetricEsPersistenceDAO.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.storage.es.dao.irmp; + +import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; +import org.apache.skywalking.apm.collector.core.storage.TimePyramid; +import org.apache.skywalking.apm.collector.core.util.Const; +import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceMonthMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetricTable; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.update.UpdateRequestBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class InstanceReferenceMonthMetricEsPersistenceDAO extends AbstractInstanceReferenceMetricEsPersistenceDAO implements IInstanceReferenceMonthMetricPersistenceDAO { + + private final Logger logger = LoggerFactory.getLogger(InstanceReferenceMonthMetricEsPersistenceDAO.class); + + public InstanceReferenceMonthMetricEsPersistenceDAO(ElasticSearchClient client) { + super(client); + } + + @Override protected String tableName() { + return InstanceReferenceMetricTable.TABLE + Const.ID_SPLIT + TimePyramid.Month.getName(); + } +} diff --git a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java index eb02cb19de..459944e7cd 100644 --- a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java +++ b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java @@ -55,7 +55,7 @@ import org.apache.skywalking.apm.collector.storage.dao.IInstanceMetricPersistenc import org.apache.skywalking.apm.collector.storage.dao.IInstanceMetricUIDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceReferenceAlarmListPersistenceDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceReferenceAlarmPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.IInstanceReferenceMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceMinuteMetricPersistenceDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceRegisterDAO; import org.apache.skywalking.apm.collector.storage.dao.IInstanceUIDAO; import org.apache.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO; @@ -222,7 +222,7 @@ public class StorageModuleH2Provider extends ModuleProvider { this.registerServiceImplementation(IServiceReferenceMinuteMetricPersistenceDAO.class, new ServiceReferenceMetricH2PersistenceDAO(h2Client)); this.registerServiceImplementation(IInstanceMetricPersistenceDAO.class, new InstanceMetricH2PersistenceDAO(h2Client)); - this.registerServiceImplementation(IInstanceReferenceMetricPersistenceDAO.class, new InstanceReferenceMetricH2PersistenceDAO(h2Client)); + this.registerServiceImplementation(IInstanceReferenceMinuteMetricPersistenceDAO.class, new InstanceReferenceMetricH2PersistenceDAO(h2Client)); this.registerServiceImplementation(IInstanceMappingPersistenceDAO.class, new InstanceMappingH2PersistenceDAO(h2Client)); this.registerServiceImplementation(IInstanceHeartBeatPersistenceDAO.class, new InstanceHeartBeatH2PersistenceDAO(h2Client)); } diff --git a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/InstanceReferenceMetricH2PersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/InstanceReferenceMetricH2PersistenceDAO.java index 39bc9616be..c1450aacbf 100644 --- a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/InstanceReferenceMetricH2PersistenceDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/InstanceReferenceMetricH2PersistenceDAO.java @@ -27,7 +27,7 @@ import java.util.Map; import org.apache.skywalking.apm.collector.client.h2.H2Client; import org.apache.skywalking.apm.collector.client.h2.H2ClientException; import org.apache.skywalking.apm.collector.storage.base.sql.SqlBuilder; -import org.apache.skywalking.apm.collector.storage.dao.IInstanceReferenceMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceMinuteMetricPersistenceDAO; import org.apache.skywalking.apm.collector.storage.h2.base.dao.H2DAO; import org.apache.skywalking.apm.collector.storage.h2.base.define.H2SqlEntity; import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; @@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory; /** * @author peng-yongsheng, clevertension */ -public class InstanceReferenceMetricH2PersistenceDAO extends H2DAO implements IInstanceReferenceMetricPersistenceDAO { +public class InstanceReferenceMetricH2PersistenceDAO extends H2DAO implements IInstanceReferenceMinuteMetricPersistenceDAO { private final Logger logger = LoggerFactory.getLogger(InstanceReferenceMetricH2PersistenceDAO.class); private static final String GET_SQL = "select * from {0} where {1} = ?"; -- GitLab