From c2a425f55ab86367aeac4721d22810000f1720c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BD=AD=E5=8B=87=E5=8D=87=20pengys?= <8082209@qq.com> Date: Sat, 9 Jun 2018 16:33:05 +0800 Subject: [PATCH] Filter non-active service name when UI query. (#1262) * #1124 Add heart beat time into service name index to filter non-active service name. * Filter service name by heart beat time when query service name count. * #1124 1. Add application id to be a query condition of searchService method. 2. Filter service name by given time which calculated by the minute metric TTL setting. ( CurrentTimeStamp - minuteMetricTTL ) * Fixed the auto test failure. #1124 --- .../provider/AnalysisAlarmModuleProvider.java | 44 ++---- .../provider/AnalysisJVMModuleProvider.java | 26 +--- .../define/graph/MetricGraphIdDefine.java | 1 + .../define/graph/MetricWorkerIdDefine.java | 146 +++++++++--------- .../AnalysisMetricModuleProvider.java | 95 ++++-------- .../ServiceNameAggregationWorker.java | 60 +++++++ .../heartbeat/ServiceNameHeartBeatGraph.java | 51 ++++++ ...ServiceNameHeartBeatPersistenceWorker.java | 72 +++++++++ .../ServiceNameHeartBeatRemoteWorker.java | 58 +++++++ .../heartbeat/ServiceNameSpanListener.java | 92 +++++++++++ .../AnalysisRegisterModuleProvider.java | 36 +---- .../ServiceNameRegisterRemoteWorker.java | 13 +- .../ServiceNameRegisterSerialWorker.java | 16 +- .../AnalysisSegmentParserModuleProvider.java | 20 +-- .../AbstractLocalAsyncWorkerProvider.java | 2 +- .../apm/collector/storage/StorageModule.java | 130 +++------------- .../IServiceNameHeartBeatPersistenceDAO.java | 28 ++++ .../dao/ui/IServiceNameServiceUIDAO.java | 10 +- .../storage/table/register/Instance.java | 9 +- .../storage/table/register/InstanceTable.java | 4 - .../table/register/RegisterColumns.java | 4 + .../storage/table/register/ServiceName.java | 28 +++- .../storage/ttl/ITTLConfigService.java | 36 +++++ .../storage/es/DataTTLKeeperTimer.java | 39 +---- .../storage/es/StorageModuleEsConfig.java | 32 ++-- .../storage/es/StorageModuleEsProvider.java | 12 +- .../ServiceNameHeartBeatEsPersistenceDAO.java | 76 +++++++++ .../register/ServiceNameRegisterEsDAO.java | 11 +- .../es/dao/ui/ServiceNameServiceEsUIDAO.java | 38 +++-- .../register/ServiceNameEsTableDefine.java | 5 +- .../storage/es/ttl/TTLConfigService.java | 54 +++++++ .../es/DataTTLKeeperTimerTestCase.java | 2 +- .../register/ServiceNameRegisterH2DAO.java | 14 +- .../h2/dao/ui/ServiceNameServiceH2UIDAO.java | 29 ++-- .../register/ServiceNameH2TableDefine.java | 5 +- .../apm/collector/ui/query/ServiceQuery.java | 14 +- .../ui/service/ServiceNameService.java | 14 +- .../collector/ui/query/ServiceQueryTest.java | 33 ++-- .../ui/service/ServiceNameServiceTest.java | 2 +- .../ui-graphql/service-layer.graphqls | 2 +- 40 files changed, 855 insertions(+), 508 deletions(-) create mode 100644 apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameAggregationWorker.java create mode 100644 apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameHeartBeatGraph.java create mode 100644 apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameHeartBeatPersistenceWorker.java create mode 100644 apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameHeartBeatRemoteWorker.java create mode 100644 apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameSpanListener.java create mode 100644 apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/IServiceNameHeartBeatPersistenceDAO.java create mode 100644 apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/ttl/ITTLConfigService.java create mode 100644 apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ServiceNameHeartBeatEsPersistenceDAO.java create mode 100644 apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/ttl/TTLConfigService.java diff --git a/apm-collector/apm-collector-analysis/analysis-alarm/alarm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/alarm/provider/AnalysisAlarmModuleProvider.java b/apm-collector/apm-collector-analysis/analysis-alarm/alarm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/alarm/provider/AnalysisAlarmModuleProvider.java index eef4b2c679..89df172ca5 100644 --- a/apm-collector/apm-collector-analysis/analysis-alarm/alarm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/alarm/provider/AnalysisAlarmModuleProvider.java +++ b/apm-collector/apm-collector-analysis/analysis-alarm/alarm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/alarm/provider/AnalysisAlarmModuleProvider.java @@ -19,28 +19,18 @@ package org.apache.skywalking.apm.collector.analysis.alarm.provider; import org.apache.skywalking.apm.collector.analysis.alarm.define.AnalysisAlarmModule; -import org.apache.skywalking.apm.collector.analysis.alarm.provider.worker.application.ApplicationMetricAlarmGraph; -import org.apache.skywalking.apm.collector.analysis.alarm.provider.worker.application.ApplicationReferenceMetricAlarmGraph; -import org.apache.skywalking.apm.collector.analysis.alarm.provider.worker.instance.InstanceMetricAlarmGraph; -import org.apache.skywalking.apm.collector.analysis.alarm.provider.worker.instance.InstanceReferenceMetricAlarmGraph; -import org.apache.skywalking.apm.collector.analysis.alarm.provider.worker.service.ServiceMetricAlarmGraph; -import org.apache.skywalking.apm.collector.analysis.alarm.provider.worker.service.ServiceReferenceMetricAlarmGraph; +import org.apache.skywalking.apm.collector.analysis.alarm.provider.worker.application.*; +import org.apache.skywalking.apm.collector.analysis.alarm.provider.worker.instance.*; +import org.apache.skywalking.apm.collector.analysis.alarm.provider.worker.service.*; import org.apache.skywalking.apm.collector.analysis.metric.define.AnalysisMetricModule; import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCreateListener; import org.apache.skywalking.apm.collector.analysis.worker.timer.PersistenceTimer; import org.apache.skywalking.apm.collector.configuration.ConfigurationModule; -import org.apache.skywalking.apm.collector.core.module.ModuleDefine; -import org.apache.skywalking.apm.collector.core.module.ModuleConfig; -import org.apache.skywalking.apm.collector.core.module.ModuleProvider; +import org.apache.skywalking.apm.collector.core.module.*; import org.apache.skywalking.apm.collector.remote.RemoteModule; import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; import org.apache.skywalking.apm.collector.storage.StorageModule; -import org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationAlarm; -import org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationAlarmList; -import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceAlarm; -import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceAlarmList; -import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceAlarm; -import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceAlarmList; +import org.apache.skywalking.apm.collector.storage.table.alarm.*; /** * @author peng-yongsheng @@ -72,23 +62,12 @@ public class AnalysisAlarmModuleProvider extends ModuleProvider { @Override public void start() { WorkerCreateListener workerCreateListener = new WorkerCreateListener(); - ServiceMetricAlarmGraph serviceMetricAlarmGraph = new ServiceMetricAlarmGraph(getManager(), workerCreateListener); - serviceMetricAlarmGraph.create(); - - InstanceMetricAlarmGraph instanceMetricAlarmGraph = new InstanceMetricAlarmGraph(getManager(), workerCreateListener); - instanceMetricAlarmGraph.create(); - - ApplicationMetricAlarmGraph applicationMetricAlarmGraph = new ApplicationMetricAlarmGraph(getManager(), workerCreateListener); - applicationMetricAlarmGraph.create(); - - ServiceReferenceMetricAlarmGraph serviceReferenceMetricAlarmGraph = new ServiceReferenceMetricAlarmGraph(getManager(), workerCreateListener); - serviceReferenceMetricAlarmGraph.create(); - - InstanceReferenceMetricAlarmGraph instanceReferenceMetricAlarmGraph = new InstanceReferenceMetricAlarmGraph(getManager(), workerCreateListener); - instanceReferenceMetricAlarmGraph.create(); - - ApplicationReferenceMetricAlarmGraph applicationReferenceMetricAlarmGraph = new ApplicationReferenceMetricAlarmGraph(getManager(), workerCreateListener); - applicationReferenceMetricAlarmGraph.create(); + new ServiceMetricAlarmGraph(getManager(), workerCreateListener).create(); + new InstanceMetricAlarmGraph(getManager(), workerCreateListener).create(); + new ApplicationMetricAlarmGraph(getManager(), workerCreateListener).create(); + new ServiceReferenceMetricAlarmGraph(getManager(), workerCreateListener).create(); + new InstanceReferenceMetricAlarmGraph(getManager(), workerCreateListener).create(); + new ApplicationReferenceMetricAlarmGraph(getManager(), workerCreateListener).create(); registerRemoteData(); @@ -110,6 +89,5 @@ public class AnalysisAlarmModuleProvider extends ModuleProvider { remoteDataRegisterService.register(InstanceAlarmList.class, new InstanceAlarmList.InstanceCreator()); remoteDataRegisterService.register(ServiceAlarm.class, new ServiceAlarm.InstanceCreator()); remoteDataRegisterService.register(ServiceAlarmList.class, new ServiceAlarmList.InstanceCreator()); - } } diff --git a/apm-collector/apm-collector-analysis/analysis-jvm/jvm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/jvm/provider/AnalysisJVMModuleProvider.java b/apm-collector/apm-collector-analysis/analysis-jvm/jvm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/jvm/provider/AnalysisJVMModuleProvider.java index c71f2be6ae..57c5b611d7 100644 --- a/apm-collector/apm-collector-analysis/analysis-jvm/jvm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/jvm/provider/AnalysisJVMModuleProvider.java +++ b/apm-collector/apm-collector-analysis/analysis-jvm/jvm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/jvm/provider/AnalysisJVMModuleProvider.java @@ -19,14 +19,8 @@ package org.apache.skywalking.apm.collector.analysis.jvm.provider; import org.apache.skywalking.apm.collector.analysis.jvm.define.AnalysisJVMModule; -import org.apache.skywalking.apm.collector.analysis.jvm.define.service.ICpuMetricService; -import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IGCMetricService; -import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IMemoryMetricService; -import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IMemoryPoolMetricService; -import org.apache.skywalking.apm.collector.analysis.jvm.provider.service.CpuMetricService; -import org.apache.skywalking.apm.collector.analysis.jvm.provider.service.GCMetricService; -import org.apache.skywalking.apm.collector.analysis.jvm.provider.service.MemoryMetricService; -import org.apache.skywalking.apm.collector.analysis.jvm.provider.service.MemoryPoolMetricService; +import org.apache.skywalking.apm.collector.analysis.jvm.define.service.*; +import org.apache.skywalking.apm.collector.analysis.jvm.provider.service.*; import org.apache.skywalking.apm.collector.analysis.jvm.provider.worker.cpu.CpuMetricPersistenceGraph; import org.apache.skywalking.apm.collector.analysis.jvm.provider.worker.gc.GCMetricPersistenceGraph; import org.apache.skywalking.apm.collector.analysis.jvm.provider.worker.memory.MemoryMetricPersistenceGraph; @@ -34,7 +28,6 @@ import org.apache.skywalking.apm.collector.analysis.jvm.provider.worker.memorypo import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCreateListener; import org.apache.skywalking.apm.collector.analysis.worker.timer.PersistenceTimer; import org.apache.skywalking.apm.collector.core.module.*; -import org.apache.skywalking.apm.collector.core.module.ModuleDefine; import org.apache.skywalking.apm.collector.remote.RemoteModule; import org.apache.skywalking.apm.collector.storage.StorageModule; @@ -86,16 +79,9 @@ public class AnalysisJVMModuleProvider extends ModuleProvider { } private void graphCreate(WorkerCreateListener workerCreateListener) { - CpuMetricPersistenceGraph cpuMetricPersistenceGraph = new CpuMetricPersistenceGraph(getManager(), workerCreateListener); - cpuMetricPersistenceGraph.create(); - - GCMetricPersistenceGraph gcMetricPersistenceGraph = new GCMetricPersistenceGraph(getManager(), workerCreateListener); - gcMetricPersistenceGraph.create(); - - MemoryMetricPersistenceGraph memoryMetricPersistenceGraph = new MemoryMetricPersistenceGraph(getManager(), workerCreateListener); - memoryMetricPersistenceGraph.create(); - - MemoryPoolMetricPersistenceGraph memoryPoolMetricPersistenceGraph = new MemoryPoolMetricPersistenceGraph(getManager(), workerCreateListener); - memoryPoolMetricPersistenceGraph.create(); + new CpuMetricPersistenceGraph(getManager(), workerCreateListener).create(); + new GCMetricPersistenceGraph(getManager(), workerCreateListener).create(); + new MemoryMetricPersistenceGraph(getManager(), workerCreateListener).create(); + new MemoryPoolMetricPersistenceGraph(getManager(), workerCreateListener).create(); } } diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-define/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/define/graph/MetricGraphIdDefine.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-define/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/define/graph/MetricGraphIdDefine.java index b1896f6f5d..4f97d316a7 100644 --- a/apm-collector/apm-collector-analysis/analysis-metric/metric-define/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/define/graph/MetricGraphIdDefine.java +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-define/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/define/graph/MetricGraphIdDefine.java @@ -38,4 +38,5 @@ public class MetricGraphIdDefine { public static final int INSTANCE_MAPPING_GRAPH_ID = 411; public static final int INSTANCE_HEART_BEAT_PERSISTENCE_GRAPH_ID = 412; + public static final int SERVICE_HEART_BEAT_PERSISTENCE_GRAPH_ID = 413; } diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-define/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/define/graph/MetricWorkerIdDefine.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-define/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/define/graph/MetricWorkerIdDefine.java index f1b4cf73b6..581c9a9479 100644 --- a/apm-collector/apm-collector-analysis/analysis-metric/metric-define/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/define/graph/MetricWorkerIdDefine.java +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-define/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/define/graph/MetricWorkerIdDefine.java @@ -52,84 +52,88 @@ public class MetricWorkerIdDefine { public static final int APPLICATION_REFERENCE_MONTH_METRIC_PERSISTENCE_WORKER_ID = 4037; public static final int APPLICATION_REFERENCE_MONTH_METRIC_TRANSFORM_NODE_ID = 4038; - public static final int SERVICE_MINUTE_METRIC_AGGREGATION_WORKER_ID = 4400; - public static final int SERVICE_MINUTE_METRIC_REMOTE_WORKER_ID = 4401; - public static final int SERVICE_MINUTE_METRIC_PERSISTENCE_WORKER_ID = 4402; - public static final int SERVICE_HOUR_METRIC_PERSISTENCE_WORKER_ID = 4403; - public static final int SERVICE_HOUR_METRIC_TRANSFORM_NODE_ID = 4404; - public static final int SERVICE_DAY_METRIC_PERSISTENCE_WORKER_ID = 4405; - public static final int SERVICE_DAY_METRIC_TRANSFORM_NODE_ID = 4406; - public static final int SERVICE_MONTH_METRIC_PERSISTENCE_WORKER_ID = 4407; - public static final int SERVICE_MONTH_METRIC_TRANSFORM_NODE_ID = 4408; + public static final int SERVICE_MINUTE_METRIC_AGGREGATION_WORKER_ID = 4040; + public static final int SERVICE_MINUTE_METRIC_REMOTE_WORKER_ID = 4041; + public static final int SERVICE_MINUTE_METRIC_PERSISTENCE_WORKER_ID = 4042; + public static final int SERVICE_HOUR_METRIC_PERSISTENCE_WORKER_ID = 4043; + public static final int SERVICE_HOUR_METRIC_TRANSFORM_NODE_ID = 4044; + public static final int SERVICE_DAY_METRIC_PERSISTENCE_WORKER_ID = 4045; + public static final int SERVICE_DAY_METRIC_TRANSFORM_NODE_ID = 4046; + public static final int SERVICE_MONTH_METRIC_PERSISTENCE_WORKER_ID = 4047; + public static final int SERVICE_MONTH_METRIC_TRANSFORM_NODE_ID = 4048; - public static final int INSTANCE_MINUTE_METRIC_AGGREGATION_WORKER_ID = 4500; - public static final int INSTANCE_MINUTE_METRIC_REMOTE_WORKER_ID = 4501; - public static final int INSTANCE_MINUTE_METRIC_PERSISTENCE_WORKER_ID = 4502; - public static final int INSTANCE_HOUR_METRIC_PERSISTENCE_WORKER_ID = 4503; - public static final int INSTANCE_HOUR_METRIC_TRANSFORM_NODE_ID = 4504; - public static final int INSTANCE_DAY_METRIC_PERSISTENCE_WORKER_ID = 4505; - public static final int INSTANCE_DAY_METRIC_TRANSFORM_NODE_ID = 4506; - public static final int INSTANCE_MONTH_METRIC_PERSISTENCE_WORKER_ID = 4507; - public static final int INSTANCE_MONTH_METRIC_TRANSFORM_NODE_ID = 4508; + public static final int INSTANCE_MINUTE_METRIC_AGGREGATION_WORKER_ID = 4050; + public static final int INSTANCE_MINUTE_METRIC_REMOTE_WORKER_ID = 4051; + public static final int INSTANCE_MINUTE_METRIC_PERSISTENCE_WORKER_ID = 4052; + public static final int INSTANCE_HOUR_METRIC_PERSISTENCE_WORKER_ID = 4053; + public static final int INSTANCE_HOUR_METRIC_TRANSFORM_NODE_ID = 4054; + public static final int INSTANCE_DAY_METRIC_PERSISTENCE_WORKER_ID = 4055; + public static final int INSTANCE_DAY_METRIC_TRANSFORM_NODE_ID = 4056; + public static final int INSTANCE_MONTH_METRIC_PERSISTENCE_WORKER_ID = 4057; + public static final int INSTANCE_MONTH_METRIC_TRANSFORM_NODE_ID = 4058; - public static final int APPLICATION_MINUTE_METRIC_AGGREGATION_WORKER_ID = 4600; - public static final int APPLICATION_MINUTE_METRIC_REMOTE_WORKER_ID = 4601; - public static final int APPLICATION_MINUTE_METRIC_PERSISTENCE_WORKER_ID = 4602; - public static final int APPLICATION_HOUR_METRIC_PERSISTENCE_WORKER_ID = 4603; - public static final int APPLICATION_HOUR_METRIC_TRANSFORM_NODE_ID = 4604; - public static final int APPLICATION_DAY_METRIC_PERSISTENCE_WORKER_ID = 4605; - public static final int APPLICATION_DAY_METRIC_TRANSFORM_NODE_ID = 4606; - public static final int APPLICATION_MONTH_METRIC_PERSISTENCE_WORKER_ID = 4607; - public static final int APPLICATION_MONTH_METRIC_TRANSFORM_NODE_ID = 4608; + public static final int APPLICATION_MINUTE_METRIC_AGGREGATION_WORKER_ID = 4060; + public static final int APPLICATION_MINUTE_METRIC_REMOTE_WORKER_ID = 4061; + public static final int APPLICATION_MINUTE_METRIC_PERSISTENCE_WORKER_ID = 4062; + public static final int APPLICATION_HOUR_METRIC_PERSISTENCE_WORKER_ID = 4063; + public static final int APPLICATION_HOUR_METRIC_TRANSFORM_NODE_ID = 4064; + public static final int APPLICATION_DAY_METRIC_PERSISTENCE_WORKER_ID = 4065; + public static final int APPLICATION_DAY_METRIC_TRANSFORM_NODE_ID = 4066; + public static final int APPLICATION_MONTH_METRIC_PERSISTENCE_WORKER_ID = 4067; + public static final int APPLICATION_MONTH_METRIC_TRANSFORM_NODE_ID = 4068; - public static final int INSTANCE_MAPPING_MINUTE_AGGREGATION_WORKER_ID = 4700; - public static final int INSTANCE_MAPPING_MINUTE_REMOTE_WORKER_ID = 4701; - public static final int INSTANCE_MAPPING_MINUTE_PERSISTENCE_WORKER_ID = 4702; - public static final int INSTANCE_MAPPING_HOUR_PERSISTENCE_WORKER_ID = 4703; - public static final int INSTANCE_MAPPING_HOUR_TRANSFORM_NODE_ID = 4704; - public static final int INSTANCE_MAPPING_DAY_PERSISTENCE_WORKER_ID = 4705; - public static final int INSTANCE_MAPPING_DAY_TRANSFORM_NODE_ID = 4706; - public static final int INSTANCE_MAPPING_MONTH_PERSISTENCE_WORKER_ID = 4707; - public static final int INSTANCE_MAPPING_MONTH_TRANSFORM_NODE_ID = 4708; + public static final int INSTANCE_MAPPING_MINUTE_AGGREGATION_WORKER_ID = 4070; + public static final int INSTANCE_MAPPING_MINUTE_REMOTE_WORKER_ID = 4071; + public static final int INSTANCE_MAPPING_MINUTE_PERSISTENCE_WORKER_ID = 4072; + public static final int INSTANCE_MAPPING_HOUR_PERSISTENCE_WORKER_ID = 4073; + public static final int INSTANCE_MAPPING_HOUR_TRANSFORM_NODE_ID = 4074; + public static final int INSTANCE_MAPPING_DAY_PERSISTENCE_WORKER_ID = 4075; + public static final int INSTANCE_MAPPING_DAY_TRANSFORM_NODE_ID = 4076; + public static final int INSTANCE_MAPPING_MONTH_PERSISTENCE_WORKER_ID = 4077; + public static final int INSTANCE_MAPPING_MONTH_TRANSFORM_NODE_ID = 4078; - public static final int APPLICATION_MAPPING_MINUTE_AGGREGATION_WORKER_ID = 4800; - public static final int APPLICATION_MAPPING_MINUTE_REMOTE_WORKER_ID = 4801; - public static final int APPLICATION_MAPPING_MINUTE_PERSISTENCE_WORKER_ID = 4802; - public static final int APPLICATION_MAPPING_HOUR_PERSISTENCE_WORKER_ID = 4803; - public static final int APPLICATION_MAPPING_HOUR_TRANSFORM_NODE_ID = 4804; - public static final int APPLICATION_MAPPING_DAY_PERSISTENCE_WORKER_ID = 4805; - public static final int APPLICATION_MAPPING_DAY_TRANSFORM_NODE_ID = 4806; - public static final int APPLICATION_MAPPING_MONTH_PERSISTENCE_WORKER_ID = 4807; - public static final int APPLICATION_MAPPING_MONTH_TRANSFORM_NODE_ID = 4808; + public static final int APPLICATION_MAPPING_MINUTE_AGGREGATION_WORKER_ID = 4080; + public static final int APPLICATION_MAPPING_MINUTE_REMOTE_WORKER_ID = 4081; + public static final int APPLICATION_MAPPING_MINUTE_PERSISTENCE_WORKER_ID = 4082; + public static final int APPLICATION_MAPPING_HOUR_PERSISTENCE_WORKER_ID = 4083; + public static final int APPLICATION_MAPPING_HOUR_TRANSFORM_NODE_ID = 4084; + public static final int APPLICATION_MAPPING_DAY_PERSISTENCE_WORKER_ID = 4085; + public static final int APPLICATION_MAPPING_DAY_TRANSFORM_NODE_ID = 4086; + public static final int APPLICATION_MAPPING_MONTH_PERSISTENCE_WORKER_ID = 4087; + public static final int APPLICATION_MAPPING_MONTH_TRANSFORM_NODE_ID = 4088; - public static final int APPLICATION_COMPONENT_MINUTE_AGGREGATION_WORKER_ID = 4900; - public static final int APPLICATION_COMPONENT_MINUTE_REMOTE_WORKER_ID = 4901; - public static final int APPLICATION_COMPONENT_MINUTE_PERSISTENCE_WORKER_ID = 4902; - public static final int APPLICATION_COMPONENT_HOUR_PERSISTENCE_WORKER_ID = 4903; - public static final int APPLICATION_COMPONENT_HOUR_TRANSFORM_NODE_ID = 4904; - public static final int APPLICATION_COMPONENT_DAY_PERSISTENCE_WORKER_ID = 4905; - public static final int APPLICATION_COMPONENT_DAY_TRANSFORM_NODE_ID = 4906; - public static final int APPLICATION_COMPONENT_MONTH_PERSISTENCE_WORKER_ID = 4907; - public static final int APPLICATION_COMPONENT_MONTH_TRANSFORM_NODE_ID = 4908; + public static final int APPLICATION_COMPONENT_MINUTE_AGGREGATION_WORKER_ID = 4090; + public static final int APPLICATION_COMPONENT_MINUTE_REMOTE_WORKER_ID = 4091; + public static final int APPLICATION_COMPONENT_MINUTE_PERSISTENCE_WORKER_ID = 4092; + public static final int APPLICATION_COMPONENT_HOUR_PERSISTENCE_WORKER_ID = 4093; + public static final int APPLICATION_COMPONENT_HOUR_TRANSFORM_NODE_ID = 4094; + public static final int APPLICATION_COMPONENT_DAY_PERSISTENCE_WORKER_ID = 4095; + public static final int APPLICATION_COMPONENT_DAY_TRANSFORM_NODE_ID = 4096; + public static final int APPLICATION_COMPONENT_MONTH_PERSISTENCE_WORKER_ID = 4097; + public static final int APPLICATION_COMPONENT_MONTH_TRANSFORM_NODE_ID = 4098; - public static final int RESPONSE_TIME_DISTRIBUTION_MINUTE_AGGREGATION_WORKER_ID = 4040; - public static final int RESPONSE_TIME_DISTRIBUTION_MINUTE_REMOTE_WORKER_ID = 4041; - public static final int RESPONSE_TIME_DISTRIBUTION_MINUTE_PERSISTENCE_WORKER_ID = 4042; - public static final int RESPONSE_TIME_DISTRIBUTION_HOUR_PERSISTENCE_WORKER_ID = 4043; - public static final int RESPONSE_TIME_DISTRIBUTION_HOUR_TRANSFORM_NODE_ID = 4044; - public static final int RESPONSE_TIME_DISTRIBUTION_DAY_PERSISTENCE_WORKER_ID = 4045; - public static final int RESPONSE_TIME_DISTRIBUTION_DAY_TRANSFORM_NODE_ID = 4046; - public static final int RESPONSE_TIME_DISTRIBUTION_MONTH_PERSISTENCE_WORKER_ID = 4047; - public static final int RESPONSE_TIME_DISTRIBUTION_MONTH_TRANSFORM_NODE_ID = 4048; + public static final int RESPONSE_TIME_DISTRIBUTION_MINUTE_AGGREGATION_WORKER_ID = 4100; + public static final int RESPONSE_TIME_DISTRIBUTION_MINUTE_REMOTE_WORKER_ID = 4101; + public static final int RESPONSE_TIME_DISTRIBUTION_MINUTE_PERSISTENCE_WORKER_ID = 4102; + public static final int RESPONSE_TIME_DISTRIBUTION_HOUR_PERSISTENCE_WORKER_ID = 4103; + public static final int RESPONSE_TIME_DISTRIBUTION_HOUR_TRANSFORM_NODE_ID = 4104; + public static final int RESPONSE_TIME_DISTRIBUTION_DAY_PERSISTENCE_WORKER_ID = 4105; + public static final int RESPONSE_TIME_DISTRIBUTION_DAY_TRANSFORM_NODE_ID = 4106; + public static final int RESPONSE_TIME_DISTRIBUTION_MONTH_PERSISTENCE_WORKER_ID = 4107; + public static final int RESPONSE_TIME_DISTRIBUTION_MONTH_TRANSFORM_NODE_ID = 4108; - public static final int GLOBAL_TRACE_PERSISTENCE_WORKER_ID = 427; - public static final int SEGMENT_DURATION_PERSISTENCE_WORKER_ID = 428; + public static final int GLOBAL_TRACE_PERSISTENCE_WORKER_ID = 4110; + public static final int SEGMENT_DURATION_PERSISTENCE_WORKER_ID = 4120; - public static final int INSTANCE_REFERENCE_GRAPH_BRIDGE_WORKER_ID = 429; - public static final int APPLICATION_REFERENCE_GRAPH_BRIDGE_WORKER_ID = 430; - public static final int SERVICE_METRIC_GRAPH_BRIDGE_WORKER_ID = 431; - public static final int INSTANCE_METRIC_GRAPH_BRIDGE_WORKER_ID = 432; - public static final int APPLICATION_METRIC_GRAPH_BRIDGE_WORKER_ID = 433; + public static final int INSTANCE_REFERENCE_GRAPH_BRIDGE_WORKER_ID = 4130; + public static final int APPLICATION_REFERENCE_GRAPH_BRIDGE_WORKER_ID = 4140; + public static final int SERVICE_METRIC_GRAPH_BRIDGE_WORKER_ID = 4150; + public static final int INSTANCE_METRIC_GRAPH_BRIDGE_WORKER_ID = 4160; + public static final int APPLICATION_METRIC_GRAPH_BRIDGE_WORKER_ID = 4170; - public static final int INST_HEART_BEAT_PERSISTENCE_WORKER_ID = 400; + public static final int INST_HEART_BEAT_PERSISTENCE_WORKER_ID = 4180; + + public static final int SERVICE_NAME_HEART_BEAT_AGGREGATION_WORKER_ID = 4190; + public static final int SERVICE_NAME_HEART_BEAT_REMOTE_WORKER_ID = 4191; + public static final int SERVICE_NAME_HEART_BEAT_PERSISTENCE_WORKER_ID = 4192; } diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java index 4667c887bb..35db2d9e36 100644 --- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java @@ -21,49 +21,34 @@ package org.apache.skywalking.apm.collector.analysis.metric.provider; import org.apache.skywalking.apm.collector.analysis.metric.define.AnalysisMetricModule; import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService; import org.apache.skywalking.apm.collector.analysis.metric.provider.service.InstanceHeartBeatService; -import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.component.ApplicationComponentGraph; -import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.component.ApplicationComponentSpanListener; -import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.mapping.ApplicationMappingGraph; -import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.mapping.ApplicationMappingSpanListener; +import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.component.*; +import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.mapping.*; import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.metric.ApplicationMetricGraph; import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.refmetric.ApplicationReferenceMetricGraph; -import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.global.GlobalTraceGraph; -import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.global.GlobalTraceSpanListener; -import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.global.std.ResponseTimeDistributionGraph; -import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.global.std.ResponseTimeDistributionSpanListener; +import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.global.*; +import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.global.std.*; import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.heartbeat.InstanceHeartBeatPersistenceGraph; -import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.mapping.InstanceMappingGraph; -import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.mapping.InstanceMappingSpanListener; +import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.mapping.*; import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.metric.InstanceMetricGraph; import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.refmetric.InstanceReferenceMetricGraph; -import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.segment.SegmentDurationGraph; -import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.segment.SegmentDurationSpanListener; +import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.segment.*; +import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.heartbeat.*; import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.metric.ServiceMetricGraph; -import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric.ServiceReferenceMetricGraph; -import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric.ServiceReferenceMetricSpanListener; +import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric.*; import org.apache.skywalking.apm.collector.analysis.segment.parser.define.AnalysisSegmentParserModule; import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParserListenerRegister; import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCreateListener; import org.apache.skywalking.apm.collector.analysis.worker.timer.PersistenceTimer; import org.apache.skywalking.apm.collector.cache.CacheModule; import org.apache.skywalking.apm.collector.configuration.ConfigurationModule; -import org.apache.skywalking.apm.collector.core.module.ModuleDefine; -import org.apache.skywalking.apm.collector.core.module.ModuleConfig; -import org.apache.skywalking.apm.collector.core.module.ModuleProvider; -import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException; +import org.apache.skywalking.apm.collector.core.module.*; import org.apache.skywalking.apm.collector.remote.RemoteModule; import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; import org.apache.skywalking.apm.collector.storage.StorageModule; -import org.apache.skywalking.apm.collector.storage.table.application.ApplicationComponent; -import org.apache.skywalking.apm.collector.storage.table.application.ApplicationMapping; -import org.apache.skywalking.apm.collector.storage.table.application.ApplicationMetric; -import org.apache.skywalking.apm.collector.storage.table.application.ApplicationReferenceMetric; +import org.apache.skywalking.apm.collector.storage.table.application.*; import org.apache.skywalking.apm.collector.storage.table.global.ResponseTimeDistribution; -import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMapping; -import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric; -import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; -import org.apache.skywalking.apm.collector.storage.table.service.ServiceMetric; -import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric; +import org.apache.skywalking.apm.collector.storage.table.instance.*; +import org.apache.skywalking.apm.collector.storage.table.service.*; /** * @author peng-yongsheng @@ -122,47 +107,27 @@ public class AnalysisMetricModuleProvider extends ModuleProvider { segmentParserListenerRegister.register(new GlobalTraceSpanListener.Factory()); segmentParserListenerRegister.register(new SegmentDurationSpanListener.Factory()); segmentParserListenerRegister.register(new ResponseTimeDistributionSpanListener.Factory()); + segmentParserListenerRegister.register(new ServiceNameSpanListener.Factory()); } private void graphCreate(WorkerCreateListener workerCreateListener) { - ServiceReferenceMetricGraph serviceReferenceMetricGraph = new ServiceReferenceMetricGraph(getManager(), workerCreateListener); - serviceReferenceMetricGraph.create(); - - InstanceReferenceMetricGraph instanceReferenceMetricGraph = new InstanceReferenceMetricGraph(getManager(), workerCreateListener); - instanceReferenceMetricGraph.create(); - - ApplicationReferenceMetricGraph applicationReferenceMetricGraph = new ApplicationReferenceMetricGraph(getManager(), workerCreateListener); - applicationReferenceMetricGraph.create(); - - ServiceMetricGraph serviceMetricGraph = new ServiceMetricGraph(getManager(), workerCreateListener); - serviceMetricGraph.create(); - - InstanceMetricGraph instanceMetricGraph = new InstanceMetricGraph(getManager(), workerCreateListener); - instanceMetricGraph.create(); - - ApplicationMetricGraph applicationMetricGraph = new ApplicationMetricGraph(getManager(), workerCreateListener); - applicationMetricGraph.create(); - - ApplicationComponentGraph applicationComponentGraph = new ApplicationComponentGraph(getManager(), workerCreateListener); - applicationComponentGraph.create(); - - ApplicationMappingGraph applicationMappingGraph = new ApplicationMappingGraph(getManager(), workerCreateListener); - applicationMappingGraph.create(); - - InstanceMappingGraph instanceMappingGraph = new InstanceMappingGraph(getManager(), workerCreateListener); - instanceMappingGraph.create(); - - GlobalTraceGraph globalTraceGraph = new GlobalTraceGraph(getManager(), workerCreateListener); - globalTraceGraph.create(); - - ResponseTimeDistributionGraph responseTimeDistributionGraph = new ResponseTimeDistributionGraph(getManager(), workerCreateListener); - responseTimeDistributionGraph.create(); - - SegmentDurationGraph segmentDurationGraph = new SegmentDurationGraph(getManager(), workerCreateListener); - segmentDurationGraph.create(); - - InstanceHeartBeatPersistenceGraph instanceHeartBeatPersistenceGraph = new InstanceHeartBeatPersistenceGraph(getManager(), workerCreateListener); - instanceHeartBeatPersistenceGraph.create(); + new ServiceReferenceMetricGraph(getManager(), workerCreateListener).create(); + new ServiceMetricGraph(getManager(), workerCreateListener).create(); + new ServiceNameHeartBeatGraph(getManager(), workerCreateListener).create(); + + new InstanceHeartBeatPersistenceGraph(getManager(), workerCreateListener).create(); + new InstanceMappingGraph(getManager(), workerCreateListener).create(); + new InstanceReferenceMetricGraph(getManager(), workerCreateListener).create(); + new InstanceMetricGraph(getManager(), workerCreateListener).create(); + + new ApplicationComponentGraph(getManager(), workerCreateListener).create(); + new ApplicationMappingGraph(getManager(), workerCreateListener).create(); + new ApplicationReferenceMetricGraph(getManager(), workerCreateListener).create(); + new ApplicationMetricGraph(getManager(), workerCreateListener).create(); + + new GlobalTraceGraph(getManager(), workerCreateListener).create(); + new ResponseTimeDistributionGraph(getManager(), workerCreateListener).create(); + new SegmentDurationGraph(getManager(), workerCreateListener).create(); } private void registerRemoteData() { diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameAggregationWorker.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameAggregationWorker.java new file mode 100644 index 0000000000..1079bbc28d --- /dev/null +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameAggregationWorker.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.heartbeat; + +import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine; +import org.apache.skywalking.apm.collector.analysis.worker.model.base.*; +import org.apache.skywalking.apm.collector.analysis.worker.model.impl.AggregationWorker; +import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric; +import org.apache.skywalking.apm.collector.core.module.ModuleManager; +import org.apache.skywalking.apm.collector.storage.table.register.*; + +/** + * @author peng-yongsheng + */ +public class ServiceNameAggregationWorker extends AggregationWorker { + + private ServiceNameAggregationWorker(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public int id() { + return MetricWorkerIdDefine.SERVICE_NAME_HEART_BEAT_AGGREGATION_WORKER_ID; + } + + public static class Factory extends AbstractLocalAsyncWorkerProvider { + + public Factory(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public ServiceNameAggregationWorker workerInstance(ModuleManager moduleManager) { + return new ServiceNameAggregationWorker(moduleManager); + } + + @Override public int queueSize() { + return 256; + } + } + + @GraphComputingMetric(name = "/aggregate/onWork/" + ServiceNameTable.TABLE) + @Override protected void onWork(ServiceName message) throws WorkerException { + super.onWork(message); + } +} diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameHeartBeatGraph.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameHeartBeatGraph.java new file mode 100644 index 0000000000..68939f7fde --- /dev/null +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameHeartBeatGraph.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.heartbeat; + +import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricGraphIdDefine; +import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCreateListener; +import org.apache.skywalking.apm.collector.core.graph.*; +import org.apache.skywalking.apm.collector.core.module.ModuleManager; +import org.apache.skywalking.apm.collector.remote.RemoteModule; +import org.apache.skywalking.apm.collector.remote.service.RemoteSenderService; +import org.apache.skywalking.apm.collector.storage.table.register.ServiceName; + +/** + * @author peng-yongsheng + */ +public class ServiceNameHeartBeatGraph { + + private final ModuleManager moduleManager; + private final WorkerCreateListener workerCreateListener; + + public ServiceNameHeartBeatGraph(ModuleManager moduleManager, WorkerCreateListener workerCreateListener) { + this.moduleManager = moduleManager; + this.workerCreateListener = workerCreateListener; + } + + public void create() { + RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(MetricGraphIdDefine.SERVICE_HEART_BEAT_PERSISTENCE_GRAPH_ID, ServiceName.class); + + graph.addNode(new ServiceNameAggregationWorker.Factory(moduleManager).create(workerCreateListener)) + .addNext(new ServiceNameHeartBeatRemoteWorker.Factory(moduleManager, remoteSenderService, MetricGraphIdDefine.SERVICE_HEART_BEAT_PERSISTENCE_GRAPH_ID).create(workerCreateListener)) + .addNext(new ServiceNameHeartBeatPersistenceWorker.Factory(moduleManager).create(workerCreateListener)); + } +} diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameHeartBeatPersistenceWorker.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameHeartBeatPersistenceWorker.java new file mode 100644 index 0000000000..1491c950e1 --- /dev/null +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameHeartBeatPersistenceWorker.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.heartbeat; + +import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine; +import org.apache.skywalking.apm.collector.analysis.worker.model.impl.*; +import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric; +import org.apache.skywalking.apm.collector.core.module.ModuleManager; +import org.apache.skywalking.apm.collector.storage.StorageModule; +import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.dao.IServiceNameHeartBeatPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.table.register.ServiceName; + +/** + * @author peng-yongsheng + */ +public class ServiceNameHeartBeatPersistenceWorker extends MergePersistenceWorker { + + private ServiceNameHeartBeatPersistenceWorker(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public int id() { + return MetricWorkerIdDefine.SERVICE_NAME_HEART_BEAT_PERSISTENCE_WORKER_ID; + } + + @Override protected boolean needMergeDBData() { + return true; + } + + @SuppressWarnings("unchecked") + @Override protected IPersistenceDAO persistenceDAO() { + return getModuleManager().find(StorageModule.NAME).getService(IServiceNameHeartBeatPersistenceDAO.class); + } + + public static class Factory extends MergePersistenceWorkerProvider { + + public Factory(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public ServiceNameHeartBeatPersistenceWorker workerInstance(ModuleManager moduleManager) { + return new ServiceNameHeartBeatPersistenceWorker(moduleManager); + } + + @Override + public int queueSize() { + return 1024; + } + } + + @GraphComputingMetric(name = "/persistence/onWork/serviceName/heartbeat") + @Override protected void onWork(ServiceName input) { + super.onWork(input); + } +} diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameHeartBeatRemoteWorker.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameHeartBeatRemoteWorker.java new file mode 100644 index 0000000000..968cf17e32 --- /dev/null +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameHeartBeatRemoteWorker.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.heartbeat; + +import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine; +import org.apache.skywalking.apm.collector.analysis.worker.model.base.*; +import org.apache.skywalking.apm.collector.core.module.ModuleManager; +import org.apache.skywalking.apm.collector.remote.service.*; +import org.apache.skywalking.apm.collector.storage.table.register.ServiceName; + +/** + * @author peng-yongsheng + */ +public class ServiceNameHeartBeatRemoteWorker extends AbstractRemoteWorker { + + private ServiceNameHeartBeatRemoteWorker(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public int id() { + return MetricWorkerIdDefine.SERVICE_NAME_HEART_BEAT_REMOTE_WORKER_ID; + } + + @Override protected void onWork(ServiceName serviceName) { + onNext(serviceName); + } + + @Override public Selector selector() { + return Selector.HashCode; + } + + public static class Factory extends AbstractRemoteWorkerProvider { + + public Factory(ModuleManager moduleManager, RemoteSenderService remoteSenderService, int graphId) { + super(moduleManager, remoteSenderService, graphId); + } + + @Override public ServiceNameHeartBeatRemoteWorker workerInstance(ModuleManager moduleManager) { + return new ServiceNameHeartBeatRemoteWorker(moduleManager); + } + } +} diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameSpanListener.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameSpanListener.java new file mode 100644 index 0000000000..44a8ca60ee --- /dev/null +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameSpanListener.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.heartbeat; + +import java.util.*; +import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricGraphIdDefine; +import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.*; +import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.*; +import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric; +import org.apache.skywalking.apm.collector.core.graph.*; +import org.apache.skywalking.apm.collector.core.module.ModuleManager; +import org.apache.skywalking.apm.collector.storage.table.register.ServiceName; + +/** + * @author peng-yongsheng + */ +public class ServiceNameSpanListener implements EntrySpanListener, ExitSpanListener, LocalSpanListener { + + private List serviceNames; + + private ServiceNameSpanListener() { + this.serviceNames = new LinkedList<>(); + } + + @Override public boolean containsPoint(Point point) { + return Point.Entry.equals(point) || Point.Exit.equals(point) || Point.Local.equals(point); + } + + @Override public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) { + ServiceName serviceName = new ServiceName(); + serviceName.setId(String.valueOf(spanDecorator.getOperationNameId())); + serviceName.setHeartBeatTime(segmentCoreInfo.getStartTime()); + serviceNames.add(serviceName); + + for (int i = 0; i < spanDecorator.getRefsCount(); i++) { + ReferenceDecorator referenceDecorator = spanDecorator.getRefs(i); + + ServiceName entryServiceName = new ServiceName(); + entryServiceName.setId(String.valueOf(referenceDecorator.getEntryServiceId())); + entryServiceName.setHeartBeatTime(segmentCoreInfo.getStartTime()); + serviceNames.add(entryServiceName); + + ServiceName parentServiceName = new ServiceName(); + parentServiceName.setId(String.valueOf(referenceDecorator.getParentServiceId())); + parentServiceName.setHeartBeatTime(segmentCoreInfo.getStartTime()); + serviceNames.add(parentServiceName); + } + } + + @Override public void parseExit(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) { + ServiceName serviceName = new ServiceName(); + serviceName.setId(String.valueOf(spanDecorator.getOperationNameId())); + serviceName.setHeartBeatTime(segmentCoreInfo.getStartTime()); + serviceNames.add(serviceName); + } + + @Override public void parseLocal(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) { + ServiceName serviceName = new ServiceName(); + serviceName.setId(String.valueOf(spanDecorator.getOperationNameId())); + serviceName.setHeartBeatTime(segmentCoreInfo.getStartTime()); + serviceNames.add(serviceName); + } + + @Override public void build() { + Graph graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.SERVICE_HEART_BEAT_PERSISTENCE_GRAPH_ID, ServiceName.class); + serviceNames.forEach(graph::start); + } + + public static class Factory implements SpanListenerFactory { + + @GraphComputingMetric(name = "/segment/parse/createSpanListeners/serviceNameSpanListener") + @Override public SpanListener create(ModuleManager moduleManager) { + return new ServiceNameSpanListener(); + } + } +} diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/AnalysisRegisterModuleProvider.java b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/AnalysisRegisterModuleProvider.java index 97572dfb3f..e5d6a1b8ef 100644 --- a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/AnalysisRegisterModuleProvider.java +++ b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/AnalysisRegisterModuleProvider.java @@ -19,30 +19,17 @@ package org.apache.skywalking.apm.collector.analysis.register.provider; import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegisterModule; -import org.apache.skywalking.apm.collector.analysis.register.define.service.IApplicationIDService; -import org.apache.skywalking.apm.collector.analysis.register.define.service.IInstanceIDService; -import org.apache.skywalking.apm.collector.analysis.register.define.service.INetworkAddressIDService; -import org.apache.skywalking.apm.collector.analysis.register.define.service.IServiceNameService; -import org.apache.skywalking.apm.collector.analysis.register.provider.register.ApplicationRegisterGraph; -import org.apache.skywalking.apm.collector.analysis.register.provider.register.InstanceRegisterGraph; -import org.apache.skywalking.apm.collector.analysis.register.provider.register.NetworkAddressRegisterGraph; -import org.apache.skywalking.apm.collector.analysis.register.provider.register.ServiceNameRegisterGraph; -import org.apache.skywalking.apm.collector.analysis.register.provider.service.ApplicationIDService; -import org.apache.skywalking.apm.collector.analysis.register.provider.service.InstanceIDService; -import org.apache.skywalking.apm.collector.analysis.register.provider.service.NetworkAddressIDService; -import org.apache.skywalking.apm.collector.analysis.register.provider.service.ServiceNameService; +import org.apache.skywalking.apm.collector.analysis.register.define.service.*; +import org.apache.skywalking.apm.collector.analysis.register.provider.register.*; +import org.apache.skywalking.apm.collector.analysis.register.provider.service.*; import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCreateListener; import org.apache.skywalking.apm.collector.analysis.worker.timer.PersistenceTimer; import org.apache.skywalking.apm.collector.cache.CacheModule; import org.apache.skywalking.apm.collector.core.module.*; -import org.apache.skywalking.apm.collector.core.module.ModuleDefine; import org.apache.skywalking.apm.collector.remote.RemoteModule; import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; import org.apache.skywalking.apm.collector.storage.StorageModule; -import org.apache.skywalking.apm.collector.storage.table.register.Application; -import org.apache.skywalking.apm.collector.storage.table.register.Instance; -import org.apache.skywalking.apm.collector.storage.table.register.NetworkAddress; -import org.apache.skywalking.apm.collector.storage.table.register.ServiceName; +import org.apache.skywalking.apm.collector.storage.table.register.*; /** * @author peng-yongsheng @@ -94,17 +81,10 @@ public class AnalysisRegisterModuleProvider extends ModuleProvider { } private void graphCreate(WorkerCreateListener workerCreateListener) { - ApplicationRegisterGraph applicationRegisterGraph = new ApplicationRegisterGraph(getManager(), workerCreateListener); - applicationRegisterGraph.create(); - - InstanceRegisterGraph instanceRegisterGraph = new InstanceRegisterGraph(getManager(), workerCreateListener); - instanceRegisterGraph.create(); - - ServiceNameRegisterGraph serviceNameRegisterGraph = new ServiceNameRegisterGraph(getManager(), workerCreateListener); - serviceNameRegisterGraph.create(); - - NetworkAddressRegisterGraph networkAddressRegisterGraph = new NetworkAddressRegisterGraph(getManager(), workerCreateListener); - networkAddressRegisterGraph.create(); + new ApplicationRegisterGraph(getManager(), workerCreateListener).create(); + new InstanceRegisterGraph(getManager(), workerCreateListener).create(); + new ServiceNameRegisterGraph(getManager(), workerCreateListener).create(); + new NetworkAddressRegisterGraph(getManager(), workerCreateListener).create(); } private void registerRemoteData() { diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ServiceNameRegisterRemoteWorker.java b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ServiceNameRegisterRemoteWorker.java index 1e155e6ed6..9d1d48123b 100644 --- a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ServiceNameRegisterRemoteWorker.java +++ b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ServiceNameRegisterRemoteWorker.java @@ -19,23 +19,16 @@ package org.apache.skywalking.apm.collector.analysis.register.provider.register; import org.apache.skywalking.apm.collector.analysis.register.define.graph.WorkerIdDefine; -import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractRemoteWorker; -import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractRemoteWorkerProvider; -import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException; +import org.apache.skywalking.apm.collector.analysis.worker.model.base.*; import org.apache.skywalking.apm.collector.core.module.ModuleManager; -import org.apache.skywalking.apm.collector.remote.service.RemoteSenderService; -import org.apache.skywalking.apm.collector.remote.service.Selector; +import org.apache.skywalking.apm.collector.remote.service.*; import org.apache.skywalking.apm.collector.storage.table.register.ServiceName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ public class ServiceNameRegisterRemoteWorker extends AbstractRemoteWorker { - private static final Logger logger = LoggerFactory.getLogger(ServiceNameRegisterRemoteWorker.class); - private ServiceNameRegisterRemoteWorker(ModuleManager moduleManager) { super(moduleManager); } @@ -44,7 +37,7 @@ public class ServiceNameRegisterRemoteWorker extends AbstractRemoteWorker localAsyncWorkerRef = new LocalAsyncWorkerRef<>(localAsyncWorker); - DataCarrier dataCarrier = new DataCarrier<>(1, 8192 * 2); + DataCarrier dataCarrier = new DataCarrier<>(1, queueSize()); localAsyncWorkerRef.setQueueEventHandler(dataCarrier); dataCarrier.consume(localAsyncWorkerRef, 1); return localAsyncWorkerRef; diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageModule.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageModule.java index 32c240ee1c..f78216ecfc 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageModule.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageModule.java @@ -18,114 +18,29 @@ package org.apache.skywalking.apm.collector.storage; -import java.util.ArrayList; -import java.util.List; +import java.util.*; import org.apache.skywalking.apm.collector.core.module.ModuleDefine; import org.apache.skywalking.apm.collector.storage.base.dao.IBatchDAO; -import org.apache.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.ISegmentDurationPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.acp.IApplicationComponentDayPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.acp.IApplicationComponentHourPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.acp.IApplicationComponentMinutePersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.acp.IApplicationComponentMonthPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmListDayPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmListHourPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmListMinutePersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmListMonthPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationReferenceAlarmListPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationReferenceAlarmPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceAlarmListPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceAlarmPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceReferenceAlarmListPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceReferenceAlarmPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceAlarmListPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceAlarmPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceReferenceAlarmListPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceReferenceAlarmPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.amp.IApplicationDayMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.amp.IApplicationHourMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.amp.IApplicationMinuteMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.amp.IApplicationMonthMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.ampp.IApplicationMappingDayPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.ampp.IApplicationMappingHourPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.ampp.IApplicationMappingMinutePersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.ampp.IApplicationMappingMonthPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.armp.IApplicationReferenceDayMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.armp.IApplicationReferenceHourMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.armp.IApplicationReferenceMinuteMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.armp.IApplicationReferenceMonthMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.cache.IApplicationCacheDAO; -import org.apache.skywalking.apm.collector.storage.dao.cache.IInstanceCacheDAO; -import org.apache.skywalking.apm.collector.storage.dao.cache.INetworkAddressCacheDAO; -import org.apache.skywalking.apm.collector.storage.dao.cache.IServiceNameCacheDAO; -import org.apache.skywalking.apm.collector.storage.dao.cpu.ICpuDayMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.cpu.ICpuHourMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.cpu.ICpuMinuteMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.cpu.ICpuMonthMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.gc.IGCDayMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.gc.IGCHourMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.gc.IGCMinuteMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.gc.IGCMonthMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceDayMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceHourMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMinuteMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMonthMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.impp.IInstanceMappingDayPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.impp.IInstanceMappingHourPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.impp.IInstanceMappingMinutePersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.impp.IInstanceMappingMonthPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceDayMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceHourMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceMinuteMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceMonthMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.memory.IMemoryDayMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.memory.IMemoryHourMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.memory.IMemoryMinuteMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.memory.IMemoryMonthMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.mpool.IMemoryPoolDayMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.mpool.IMemoryPoolHourMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.mpool.IMemoryPoolMinuteMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.mpool.IMemoryPoolMonthMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.register.IApplicationRegisterDAO; -import org.apache.skywalking.apm.collector.storage.dao.register.IInstanceRegisterDAO; -import org.apache.skywalking.apm.collector.storage.dao.register.INetworkAddressRegisterDAO; -import org.apache.skywalking.apm.collector.storage.dao.register.IServiceNameRegisterDAO; -import org.apache.skywalking.apm.collector.storage.dao.rtd.IResponseTimeDistributionDayPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.rtd.IResponseTimeDistributionHourPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.rtd.IResponseTimeDistributionMinutePersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.rtd.IResponseTimeDistributionMonthPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.smp.IServiceDayMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.smp.IServiceHourMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.smp.IServiceMinuteMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.smp.IServiceMonthMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceDayMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceHourMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMinuteMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMonthMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationAlarmListUIDAO; -import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationAlarmUIDAO; -import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationComponentUIDAO; -import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationMappingUIDAO; -import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationMetricUIDAO; -import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationReferenceMetricUIDAO; -import org.apache.skywalking.apm.collector.storage.dao.ui.ICpuMetricUIDAO; -import org.apache.skywalking.apm.collector.storage.dao.ui.IGCMetricUIDAO; -import org.apache.skywalking.apm.collector.storage.dao.ui.IGlobalTraceUIDAO; -import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceAlarmUIDAO; -import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceMetricUIDAO; -import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceUIDAO; -import org.apache.skywalking.apm.collector.storage.dao.ui.IMemoryMetricUIDAO; -import org.apache.skywalking.apm.collector.storage.dao.ui.INetworkAddressUIDAO; -import org.apache.skywalking.apm.collector.storage.dao.ui.IResponseTimeDistributionUIDAO; -import org.apache.skywalking.apm.collector.storage.dao.ui.ISegmentDurationUIDAO; -import org.apache.skywalking.apm.collector.storage.dao.ui.ISegmentUIDAO; -import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceAlarmUIDAO; -import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceMetricUIDAO; -import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceNameServiceUIDAO; -import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceReferenceMetricUIDAO; +import org.apache.skywalking.apm.collector.storage.dao.*; +import org.apache.skywalking.apm.collector.storage.dao.acp.*; +import org.apache.skywalking.apm.collector.storage.dao.alarm.*; +import org.apache.skywalking.apm.collector.storage.dao.amp.*; +import org.apache.skywalking.apm.collector.storage.dao.ampp.*; +import org.apache.skywalking.apm.collector.storage.dao.armp.*; +import org.apache.skywalking.apm.collector.storage.dao.cache.*; +import org.apache.skywalking.apm.collector.storage.dao.cpu.*; +import org.apache.skywalking.apm.collector.storage.dao.gc.*; +import org.apache.skywalking.apm.collector.storage.dao.imp.*; +import org.apache.skywalking.apm.collector.storage.dao.impp.*; +import org.apache.skywalking.apm.collector.storage.dao.irmp.*; +import org.apache.skywalking.apm.collector.storage.dao.memory.*; +import org.apache.skywalking.apm.collector.storage.dao.mpool.*; +import org.apache.skywalking.apm.collector.storage.dao.register.*; +import org.apache.skywalking.apm.collector.storage.dao.rtd.*; +import org.apache.skywalking.apm.collector.storage.dao.smp.*; +import org.apache.skywalking.apm.collector.storage.dao.srmp.*; +import org.apache.skywalking.apm.collector.storage.dao.ui.*; +import org.apache.skywalking.apm.collector.storage.ttl.ITTLConfigService; /** * @author peng-yongsheng @@ -142,6 +57,8 @@ public class StorageModule extends ModuleDefine { List classes = new ArrayList<>(); classes.add(IBatchDAO.class); + classes.add(ITTLConfigService.class); + addCacheDAO(classes); addRegisterDAO(classes); addPersistenceDAO(classes); @@ -205,6 +122,7 @@ public class StorageModule extends ModuleDefine { classes.add(ISegmentDurationPersistenceDAO.class); classes.add(ISegmentPersistenceDAO.class); classes.add(IInstanceHeartBeatPersistenceDAO.class); + classes.add(IServiceNameHeartBeatPersistenceDAO.class); classes.add(IResponseTimeDistributionMinutePersistenceDAO.class); classes.add(IResponseTimeDistributionHourPersistenceDAO.class); diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/IServiceNameHeartBeatPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/IServiceNameHeartBeatPersistenceDAO.java new file mode 100644 index 0000000000..52d14517f7 --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/IServiceNameHeartBeatPersistenceDAO.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.storage.dao; + +import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.table.register.ServiceName; + +/** + * @author peng-yongsheng + */ +public interface IServiceNameHeartBeatPersistenceDAO extends IPersistenceDAO { +} diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/ui/IServiceNameServiceUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/ui/IServiceNameServiceUIDAO.java index 62d27d865a..04f4e88a88 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/ui/IServiceNameServiceUIDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/ui/IServiceNameServiceUIDAO.java @@ -37,21 +37,27 @@ public interface IServiceNameServiceUIDAO extends DAO { * *

SQL as: select count(SERVICE_NAME) from SERVICE_NAME * where SRC_SPAN_TYPE = SpanType.Entry_VALUE + * and HEARTBEAT_TIME ge ${startTimeMillis} * + * @param startTimeMillis service heart beat time after given data * @return count of service names */ - int getCount(); + int getCount(long startTimeMillis); /** *

SQL as: select SERVICE_ID, SERVICE_NAME from SERVICE_NAME * where SRC_SPAN_TYPE = SpanType.Entry_VALUE * and SERVICE_NAME like '%{keyword}%' + * and APPLICATION_ID = ${applicationId} + * and HEARTBEAT_TIME ge ${startTimeMillis} * *

Note: keyword might not given * * @param keyword fuzzy query condition + * @param applicationId the owner id of this service + * @param startTimeMillis service heart beat time after given data * @param topN how many rows should return * @return not nullable result list */ - List searchService(String keyword, int topN); + List searchService(String keyword, int applicationId, long startTimeMillis, int topN); } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java index d25a48d3b7..6e3d0b936a 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java @@ -18,11 +18,8 @@ package org.apache.skywalking.apm.collector.storage.table.register; -import org.apache.skywalking.apm.collector.core.data.Column; -import org.apache.skywalking.apm.collector.core.data.RemoteData; -import org.apache.skywalking.apm.collector.core.data.StreamData; -import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation; -import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation; +import org.apache.skywalking.apm.collector.core.data.*; +import org.apache.skywalking.apm.collector.core.data.operator.*; import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; /** @@ -39,7 +36,7 @@ public class Instance extends StreamData { private static final Column[] LONG_COLUMNS = { new Column(InstanceTable.REGISTER_TIME, new CoverMergeOperation()), - new Column(InstanceTable.HEARTBEAT_TIME, new CoverMergeOperation()), + new Column(InstanceTable.HEARTBEAT_TIME, new MaxMergeOperation()), }; private static final Column[] DOUBLE_COLUMNS = {}; diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/InstanceTable.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/InstanceTable.java index 5f688945fe..059c4a6ccf 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/InstanceTable.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/InstanceTable.java @@ -31,10 +31,6 @@ public interface InstanceTable extends CommonTable, RegisterColumns { ColumnName AGENT_UUID = new ColumnName("agent_uuid", "iau"); - ColumnName REGISTER_TIME = new ColumnName("register_time", "irt"); - - ColumnName HEARTBEAT_TIME = new ColumnName("heartbeat_time", "iht"); - ColumnName OS_INFO = new ColumnName("os_info", "ioi"); ColumnName IS_ADDRESS = new ColumnName("is_address", "iia"); diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/RegisterColumns.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/RegisterColumns.java index 403fb5ee7e..be022d92fc 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/RegisterColumns.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/RegisterColumns.java @@ -31,4 +31,8 @@ public interface RegisterColumns { ColumnName SERVICE_ID = new ColumnName("service_id", "si"); ColumnName ADDRESS_ID = new ColumnName("address_id", "ni"); + + ColumnName REGISTER_TIME = new ColumnName("register_time", "rt"); + + ColumnName HEARTBEAT_TIME = new ColumnName("heartbeat_time", "ht"); } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java index 5022b82d0d..6f860b0230 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java @@ -18,11 +18,8 @@ package org.apache.skywalking.apm.collector.storage.table.register; -import org.apache.skywalking.apm.collector.core.data.Column; -import org.apache.skywalking.apm.collector.core.data.RemoteData; -import org.apache.skywalking.apm.collector.core.data.StreamData; -import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation; -import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation; +import org.apache.skywalking.apm.collector.core.data.*; +import org.apache.skywalking.apm.collector.core.data.operator.*; import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; /** @@ -35,7 +32,10 @@ public class ServiceName extends StreamData { new Column(ServiceNameTable.SERVICE_NAME, new CoverMergeOperation()), }; - private static final Column[] LONG_COLUMNS = {}; + private static final Column[] LONG_COLUMNS = { + new Column(ServiceNameTable.REGISTER_TIME, new NonMergeOperation()), + new Column(ServiceNameTable.HEARTBEAT_TIME, new MaxMergeOperation()), + }; private static final Column[] DOUBLE_COLUMNS = {}; @@ -99,6 +99,22 @@ public class ServiceName extends StreamData { setDataInteger(2, srcSpanType); } + public long getRegisterTime() { + return getDataLong(0); + } + + public void setRegisterTime(long registerTime) { + setDataLong(0, registerTime); + } + + public long getHeartBeatTime() { + return getDataLong(1); + } + + public void setHeartBeatTime(long heartBeatTime) { + setDataLong(1, heartBeatTime); + } + public static class InstanceCreator implements RemoteDataRegisterService.RemoteDataInstanceCreator { @Override public RemoteData createInstance() { return new ServiceName(); diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/ttl/ITTLConfigService.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/ttl/ITTLConfigService.java new file mode 100644 index 0000000000..98592cc937 --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/ttl/ITTLConfigService.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.storage.ttl; + +import org.apache.skywalking.apm.collector.core.module.Service; + +/** + * @author peng-yongsheng + */ +public interface ITTLConfigService extends Service { + int traceDataTTL(); + + int minuteMetricDataTTL(); + + int hourMetricDataTTL(); + + int dayMetricDataTTL(); + + int monthMetricDataTTL(); +} diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/DataTTLKeeperTimer.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/DataTTLKeeperTimer.java index 44969f71f8..cb6f00bdf7 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/DataTTLKeeperTimer.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/DataTTLKeeperTimer.java @@ -50,17 +50,14 @@ class DataTTLKeeperTimer { private final ModuleManager moduleManager; private final StorageModuleEsNamingListener namingListener; private final String selfAddress; - private int traceDataTTL = 90; - private int minuteMetricDataTTL = 90; - private int hourMetricDataTTL = 36; - private int dayMetricDataTTL = 45; - private int monthMetricDataTTL = 18; + private final StorageModuleEsConfig config; DataTTLKeeperTimer(ModuleManager moduleManager, - StorageModuleEsNamingListener namingListener, String selfAddress) { + StorageModuleEsNamingListener namingListener, String selfAddress, StorageModuleEsConfig config) { this.moduleManager = moduleManager; this.namingListener = namingListener; this.selfAddress = selfAddress; + this.config = config; } void start() { @@ -91,11 +88,11 @@ class DataTTLKeeperTimer { TimeBuckets convertTimeBucket(DateTime currentTime) { TimeBuckets timeBuckets = new TimeBuckets(); - timeBuckets.traceDataBefore = Long.valueOf(currentTime.plusMinutes(0 - traceDataTTL).toString("yyyyMMddHHmm")); - timeBuckets.minuteTimeBucketBefore = Long.valueOf(currentTime.plusMinutes(0 - minuteMetricDataTTL).toString("yyyyMMddHHmm")); - timeBuckets.hourTimeBucketBefore = Long.valueOf(currentTime.plusHours(0 - hourMetricDataTTL).toString("yyyyMMddHH")); - timeBuckets.dayTimeBucketBefore = Long.valueOf(currentTime.plusDays(0 - dayMetricDataTTL).toString("yyyyMMdd")); - timeBuckets.monthTimeBucketBefore = Long.valueOf(currentTime.plusMonths(0 - monthMetricDataTTL).toString("yyyyMM")); + timeBuckets.traceDataBefore = Long.valueOf(currentTime.plusMinutes(0 - config.getTraceDataTTL()).toString("yyyyMMddHHmm")); + timeBuckets.minuteTimeBucketBefore = Long.valueOf(currentTime.plusMinutes(0 - config.getMinuteMetricDataTTL()).toString("yyyyMMddHHmm")); + timeBuckets.hourTimeBucketBefore = Long.valueOf(currentTime.plusHours(0 - config.getHourMetricDataTTL()).toString("yyyyMMddHH")); + timeBuckets.dayTimeBucketBefore = Long.valueOf(currentTime.plusDays(0 - config.getDayMetricDataTTL()).toString("yyyyMMdd")); + timeBuckets.monthTimeBucketBefore = Long.valueOf(currentTime.plusMonths(0 - config.getMonthMetricDataTTL()).toString("yyyyMM")); return timeBuckets; } @@ -207,26 +204,6 @@ class DataTTLKeeperTimer { moduleManager.find(StorageModule.NAME).getService(IServiceReferenceMonthMetricPersistenceDAO.class).deleteHistory(timeBuckets.monthTimeBucketBefore); } - void setTraceDataTTL(int traceDataTTL) { - this.traceDataTTL = traceDataTTL == 0 ? 90 : traceDataTTL; - } - - void setMinuteMetricDataTTL(int minuteMetricDataTTL) { - this.minuteMetricDataTTL = minuteMetricDataTTL == 0 ? 90 : minuteMetricDataTTL; - } - - void setHourMetricDataTTL(int hourMetricDataTTL) { - this.hourMetricDataTTL = hourMetricDataTTL == 0 ? 36 : hourMetricDataTTL; - } - - void setDayMetricDataTTL(int dayMetricDataTTL) { - this.dayMetricDataTTL = dayMetricDataTTL == 0 ? 45 : dayMetricDataTTL; - } - - void setMonthMetricDataTTL(int monthMetricDataTTL) { - this.monthMetricDataTTL = monthMetricDataTTL == 0 ? 18 : monthMetricDataTTL; - } - class TimeBuckets { private long traceDataBefore; private long minuteTimeBucketBefore; diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsConfig.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsConfig.java index b769d60786..83b28eba52 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsConfig.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsConfig.java @@ -23,16 +23,16 @@ import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchCli /** * @author peng-yongsheng */ -class StorageModuleEsConfig extends ElasticSearchClientConfig { +public class StorageModuleEsConfig extends ElasticSearchClientConfig { private int indexShardsNumber; private int indexReplicasNumber; private boolean highPerformanceMode; - private int traceDataTTL; - private int minuteMetricDataTTL; - private int hourMetricDataTTL; - private int dayMetricDataTTL; - private int monthMetricDataTTL; + private int traceDataTTL = 90; + private int minuteMetricDataTTL = 90; + private int hourMetricDataTTL = 36; + private int dayMetricDataTTL = 45; + private int monthMetricDataTTL = 18; int getIndexShardsNumber() { return indexShardsNumber; @@ -58,43 +58,43 @@ class StorageModuleEsConfig extends ElasticSearchClientConfig { this.highPerformanceMode = highPerformanceMode; } - int getTraceDataTTL() { + public int getTraceDataTTL() { return traceDataTTL; } void setTraceDataTTL(int traceDataTTL) { - this.traceDataTTL = traceDataTTL; + this.traceDataTTL = traceDataTTL == 0 ? 90 : traceDataTTL; } - int getMinuteMetricDataTTL() { + public int getMinuteMetricDataTTL() { return minuteMetricDataTTL; } void setMinuteMetricDataTTL(int minuteMetricDataTTL) { - this.minuteMetricDataTTL = minuteMetricDataTTL; + this.minuteMetricDataTTL = minuteMetricDataTTL == 0 ? 90 : minuteMetricDataTTL; } - int getHourMetricDataTTL() { + public int getHourMetricDataTTL() { return hourMetricDataTTL; } void setHourMetricDataTTL(int hourMetricDataTTL) { - this.hourMetricDataTTL = hourMetricDataTTL; + this.hourMetricDataTTL = hourMetricDataTTL == 0 ? 36 : hourMetricDataTTL; } - int getDayMetricDataTTL() { + public int getDayMetricDataTTL() { return dayMetricDataTTL; } void setDayMetricDataTTL(int dayMetricDataTTL) { - this.dayMetricDataTTL = dayMetricDataTTL; + this.dayMetricDataTTL = dayMetricDataTTL == 0 ? 45 : dayMetricDataTTL; } - int getMonthMetricDataTTL() { + public int getMonthMetricDataTTL() { return monthMetricDataTTL; } void setMonthMetricDataTTL(int monthMetricDataTTL) { - this.monthMetricDataTTL = monthMetricDataTTL; + this.monthMetricDataTTL = monthMetricDataTTL == 0 ? 18 : monthMetricDataTTL; } } diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java index 14f8c6ed09..a71226a0fe 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java @@ -69,6 +69,8 @@ import org.apache.skywalking.apm.collector.storage.es.dao.rtd.*; import org.apache.skywalking.apm.collector.storage.es.dao.smp.*; import org.apache.skywalking.apm.collector.storage.es.dao.srmp.*; import org.apache.skywalking.apm.collector.storage.es.dao.ui.*; +import org.apache.skywalking.apm.collector.storage.es.ttl.TTLConfigService; +import org.apache.skywalking.apm.collector.storage.ttl.ITTLConfigService; /** * @author peng-yongsheng @@ -102,7 +104,9 @@ public class StorageModuleEsProvider extends ModuleProvider { @Override public void prepare() throws ServiceNotProvidedException { elasticSearchClient = new ElasticSearchClient(config.getClusterName(), config.getClusterTransportSniffer(), config.getClusterNodes(), nameSpace); + this.registerServiceImplementation(ITTLConfigService.class, new TTLConfigService(config)); this.registerServiceImplementation(IBatchDAO.class, new BatchEsDAO(elasticSearchClient)); + registerCacheDAO(); registerRegisterDAO(); registerPersistenceDAO(); @@ -132,12 +136,7 @@ public class StorageModuleEsProvider extends ModuleProvider { ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class); moduleListenerService.addListener(namingListener); - deleteTimer = new DataTTLKeeperTimer(getManager(), namingListener, esRegistration.buildValue().getHostPort()); - deleteTimer.setTraceDataTTL(config.getTraceDataTTL()); - deleteTimer.setMinuteMetricDataTTL(config.getMinuteMetricDataTTL()); - deleteTimer.setHourMetricDataTTL(config.getHourMetricDataTTL()); - deleteTimer.setDayMetricDataTTL(config.getDayMetricDataTTL()); - deleteTimer.setMonthMetricDataTTL(config.getMonthMetricDataTTL()); + deleteTimer = new DataTTLKeeperTimer(getManager(), namingListener, esRegistration.buildValue().getHostPort(), config); } @Override @@ -241,6 +240,7 @@ public class StorageModuleEsProvider extends ModuleProvider { this.registerServiceImplementation(IInstanceReferenceMonthMetricPersistenceDAO.class, new InstanceReferenceMonthMetricEsPersistenceDAO(elasticSearchClient)); this.registerServiceImplementation(IInstanceHeartBeatPersistenceDAO.class, new InstanceHeartBeatEsPersistenceDAO(elasticSearchClient)); + this.registerServiceImplementation(IServiceNameHeartBeatPersistenceDAO.class, new ServiceNameHeartBeatEsPersistenceDAO(elasticSearchClient)); } private void registerUiDAO() throws ServiceNotProvidedException { diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ServiceNameHeartBeatEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ServiceNameHeartBeatEsPersistenceDAO.java new file mode 100644 index 0000000000..1c9b826f1d --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ServiceNameHeartBeatEsPersistenceDAO.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.storage.es.dao; + +import java.util.*; +import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; +import org.apache.skywalking.apm.collector.core.UnexpectedException; +import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric; +import org.apache.skywalking.apm.collector.storage.dao.IServiceNameHeartBeatPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO; +import org.apache.skywalking.apm.collector.storage.table.register.*; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.update.UpdateRequestBuilder; +import org.slf4j.*; + +/** + * @author peng-yongsheng + */ +public class ServiceNameHeartBeatEsPersistenceDAO extends EsDAO implements IServiceNameHeartBeatPersistenceDAO { + + private static final Logger logger = LoggerFactory.getLogger(ServiceNameHeartBeatEsPersistenceDAO.class); + + public ServiceNameHeartBeatEsPersistenceDAO(ElasticSearchClient client) { + super(client); + } + + @GraphComputingMetric(name = "/persistence/get/" + ServiceNameTable.TABLE + "/heartbeat") + @Override public ServiceName get(String id) { + GetResponse getResponse = getClient().prepareGet(ServiceNameTable.TABLE, id).get(); + if (getResponse.isExists()) { + Map source = getResponse.getSource(); + + ServiceName serviceName = new ServiceName(); + serviceName.setId(id); + serviceName.setServiceId(((Number)source.get(ServiceNameTable.SERVICE_ID.getName())).intValue()); + serviceName.setHeartBeatTime(((Number)source.get(ServiceNameTable.HEARTBEAT_TIME.getName())).longValue()); + logger.debug("service id: {} is exists", id); + return serviceName; + } else { + logger.debug("service id: {} is not exists", id); + return null; + } + } + + @Override public IndexRequestBuilder prepareBatchInsert(ServiceName data) { + throw new UnexpectedException("Received an service name heart beat message under service id= " + data.getId() + " , which doesn't exist."); + } + + @Override public UpdateRequestBuilder prepareBatchUpdate(ServiceName data) { + logger.info("service name heart beat, service id: {}, heart beat time: {}", data.getId(), data.getHeartBeatTime()); + + Map source = new HashMap<>(); + source.put(ServiceNameTable.HEARTBEAT_TIME.getName(), data.getHeartBeatTime()); + return getClient().prepareUpdate(ServiceNameTable.TABLE, data.getId()).setDoc(source); + } + + @Override public void deleteHistory(Long timeBucketBefore) { + } +} diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/register/ServiceNameRegisterEsDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/register/ServiceNameRegisterEsDAO.java index de74d26e58..4dcde30337 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/register/ServiceNameRegisterEsDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/register/ServiceNameRegisterEsDAO.java @@ -18,17 +18,14 @@ package org.apache.skywalking.apm.collector.storage.es.dao.register; -import java.util.HashMap; -import java.util.Map; +import java.util.*; import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.apm.collector.storage.dao.register.IServiceNameRegisterDAO; import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO; -import org.apache.skywalking.apm.collector.storage.table.register.ServiceName; -import org.apache.skywalking.apm.collector.storage.table.register.ServiceNameTable; +import org.apache.skywalking.apm.collector.storage.table.register.*; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.slf4j.*; /** * @author peng-yongsheng @@ -58,6 +55,8 @@ public class ServiceNameRegisterEsDAO extends EsDAO implements IServiceNameRegis target.put(ServiceNameTable.SERVICE_NAME.getName(), serviceName.getServiceName()); target.put(ServiceNameTable.SERVICE_NAME_KEYWORD.getName(), serviceName.getServiceName()); target.put(ServiceNameTable.SRC_SPAN_TYPE.getName(), serviceName.getSrcSpanType()); + target.put(ServiceNameTable.REGISTER_TIME.getName(), serviceName.getRegisterTime()); + target.put(ServiceNameTable.HEARTBEAT_TIME.getName(), serviceName.getHeartBeatTime()); IndexResponse response = client.prepareIndex(ServiceNameTable.TABLE, serviceName.getId()).setSource(target).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); logger.debug("save service name register info, application getApplicationId: {}, service name: {}, status: {}", serviceName.getId(), serviceName.getServiceName(), response.status().name()); diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceNameServiceEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceNameServiceEsUIDAO.java index d937dfb00a..5f7cde54dc 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceNameServiceEsUIDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceNameServiceEsUIDAO.java @@ -18,20 +18,16 @@ package org.apache.skywalking.apm.collector.storage.es.dao.ui; -import java.util.LinkedList; -import java.util.List; +import java.util.*; import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; -import org.apache.skywalking.apm.collector.core.util.StringUtils; +import org.apache.skywalking.apm.collector.core.util.*; import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceNameServiceUIDAO; import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO; import org.apache.skywalking.apm.collector.storage.table.register.ServiceNameTable; import org.apache.skywalking.apm.collector.storage.ui.service.ServiceInfo; import org.apache.skywalking.apm.network.proto.SpanType; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.action.search.*; +import org.elasticsearch.index.query.*; import org.elasticsearch.search.SearchHit; /** @@ -43,31 +39,41 @@ public class ServiceNameServiceEsUIDAO extends EsDAO implements IServiceNameServ super(client); } - @Override public int getCount() { + @Override public int getCount(long startTimeMillis) { SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(ServiceNameTable.TABLE); searchRequestBuilder.setTypes(ServiceNameTable.TABLE_TYPE); searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH); - searchRequestBuilder.setQuery(QueryBuilders.termQuery(ServiceNameTable.SRC_SPAN_TYPE.getName(), SpanType.Entry_VALUE)); + + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); + boolQuery.must().add(QueryBuilders.termQuery(ServiceNameTable.SRC_SPAN_TYPE.getName(), SpanType.Entry_VALUE)); + boolQuery.must().add(QueryBuilders.rangeQuery(ServiceNameTable.HEARTBEAT_TIME.getName()).gte(startTimeMillis)); + searchRequestBuilder.setQuery(boolQuery); + searchRequestBuilder.setSize(0); SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); return (int)searchResponse.getHits().getTotalHits(); } - @Override public List searchService(String keyword, int topN) { + @Override + public List searchService(String keyword, int applicationId, long startTimeMillis, int topN) { SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(ServiceNameTable.TABLE); searchRequestBuilder.setTypes(ServiceNameTable.TABLE_TYPE); searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH); searchRequestBuilder.setSize(topN); + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); + boolQuery.must().add(QueryBuilders.termQuery(ServiceNameTable.SRC_SPAN_TYPE.getName(), SpanType.Entry_VALUE)); + boolQuery.must().add(QueryBuilders.rangeQuery(ServiceNameTable.HEARTBEAT_TIME.getName()).gte(startTimeMillis)); + + if (applicationId != Const.NONE) { + boolQuery.must().add(QueryBuilders.termQuery(ServiceNameTable.APPLICATION_ID.getName(), applicationId)); + } + if (StringUtils.isNotEmpty(keyword)) { - BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); boolQuery.must().add(QueryBuilders.matchQuery(ServiceNameTable.SERVICE_NAME.getName(), keyword)); - boolQuery.must().add(QueryBuilders.termQuery(ServiceNameTable.SRC_SPAN_TYPE.getName(), SpanType.Entry_VALUE)); - searchRequestBuilder.setQuery(boolQuery); - } else { - searchRequestBuilder.setQuery(QueryBuilders.termQuery(ServiceNameTable.SRC_SPAN_TYPE.getName(), SpanType.Entry_VALUE)); } + searchRequestBuilder.setQuery(boolQuery); SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); SearchHit[] searchHits = searchResponse.getHits().getHits(); diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/define/register/ServiceNameEsTableDefine.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/define/register/ServiceNameEsTableDefine.java index 0a8b3e0d48..fb37727d84 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/define/register/ServiceNameEsTableDefine.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/define/register/ServiceNameEsTableDefine.java @@ -18,8 +18,7 @@ package org.apache.skywalking.apm.collector.storage.es.define.register; -import org.apache.skywalking.apm.collector.storage.es.base.define.ElasticSearchColumnDefine; -import org.apache.skywalking.apm.collector.storage.es.base.define.ElasticSearchTableDefine; +import org.apache.skywalking.apm.collector.storage.es.base.define.*; import org.apache.skywalking.apm.collector.storage.table.register.ServiceNameTable; /** @@ -41,5 +40,7 @@ public class ServiceNameEsTableDefine extends ElasticSearchTableDefine { addColumn(new ElasticSearchColumnDefine(ServiceNameTable.SERVICE_NAME_KEYWORD, ElasticSearchColumnDefine.Type.Keyword.name())); addColumn(new ElasticSearchColumnDefine(ServiceNameTable.SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name())); addColumn(new ElasticSearchColumnDefine(ServiceNameTable.SRC_SPAN_TYPE, ElasticSearchColumnDefine.Type.Integer.name())); + addColumn(new ElasticSearchColumnDefine(ServiceNameTable.REGISTER_TIME, ElasticSearchColumnDefine.Type.Long.name())); + addColumn(new ElasticSearchColumnDefine(ServiceNameTable.HEARTBEAT_TIME, ElasticSearchColumnDefine.Type.Long.name())); } } diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/ttl/TTLConfigService.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/ttl/TTLConfigService.java new file mode 100644 index 0000000000..bb700683ae --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/ttl/TTLConfigService.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.storage.es.ttl; + +import org.apache.skywalking.apm.collector.storage.es.StorageModuleEsConfig; +import org.apache.skywalking.apm.collector.storage.ttl.ITTLConfigService; + +/** + * @author peng-yongsheng + */ +public class TTLConfigService implements ITTLConfigService { + + private StorageModuleEsConfig config; + + public TTLConfigService(StorageModuleEsConfig config) { + this.config = config; + } + + @Override public int traceDataTTL() { + return config.getTraceDataTTL(); + } + + @Override public int minuteMetricDataTTL() { + return config.getMinuteMetricDataTTL(); + } + + @Override public int hourMetricDataTTL() { + return config.getHourMetricDataTTL(); + } + + @Override public int dayMetricDataTTL() { + return config.getDayMetricDataTTL(); + } + + @Override public int monthMetricDataTTL() { + return config.getMonthMetricDataTTL(); + } +} diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/test/java/org/apache/skywalking/apm/collector/storage/es/DataTTLKeeperTimerTestCase.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/test/java/org/apache/skywalking/apm/collector/storage/es/DataTTLKeeperTimerTestCase.java index c619455c2c..47787f1854 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/test/java/org/apache/skywalking/apm/collector/storage/es/DataTTLKeeperTimerTestCase.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/test/java/org/apache/skywalking/apm/collector/storage/es/DataTTLKeeperTimerTestCase.java @@ -29,7 +29,7 @@ public class DataTTLKeeperTimerTestCase { @Test public void testConvertTimeBucket() { - DataTTLKeeperTimer timer = new DataTTLKeeperTimer(null, null, null); + DataTTLKeeperTimer timer = new DataTTLKeeperTimer(null, null, null, new StorageModuleEsConfig()); DateTime currentTime = new DateTime(2018, 5, 26, 15, 5); DataTTLKeeperTimer.TimeBuckets timeBuckets = timer.convertTimeBucket(currentTime); diff --git a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/register/ServiceNameRegisterH2DAO.java b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/register/ServiceNameRegisterH2DAO.java index 8b047d0c0c..baf8b8043a 100644 --- a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/register/ServiceNameRegisterH2DAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/register/ServiceNameRegisterH2DAO.java @@ -18,17 +18,13 @@ package org.apache.skywalking.apm.collector.storage.h2.dao.register; -import java.util.HashMap; -import java.util.Map; -import org.apache.skywalking.apm.collector.client.h2.H2Client; -import org.apache.skywalking.apm.collector.client.h2.H2ClientException; +import java.util.*; +import org.apache.skywalking.apm.collector.client.h2.*; import org.apache.skywalking.apm.collector.storage.base.sql.SqlBuilder; import org.apache.skywalking.apm.collector.storage.dao.register.IServiceNameRegisterDAO; import org.apache.skywalking.apm.collector.storage.h2.base.dao.H2DAO; -import org.apache.skywalking.apm.collector.storage.table.register.ServiceName; -import org.apache.skywalking.apm.collector.storage.table.register.ServiceNameTable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.skywalking.apm.collector.storage.table.register.*; +import org.slf4j.*; /** * @author peng-yongsheng, clevertension @@ -61,6 +57,8 @@ public class ServiceNameRegisterH2DAO extends H2DAO implements IServiceNameRegis target.put(ServiceNameTable.APPLICATION_ID.getName(), serviceName.getApplicationId()); target.put(ServiceNameTable.SERVICE_NAME.getName(), serviceName.getServiceName()); target.put(ServiceNameTable.SRC_SPAN_TYPE.getName(), serviceName.getSrcSpanType()); + target.put(ServiceNameTable.REGISTER_TIME.getName(), serviceName.getRegisterTime()); + target.put(ServiceNameTable.HEARTBEAT_TIME.getName(), serviceName.getHeartBeatTime()); String sql = SqlBuilder.buildBatchInsertSql(ServiceNameTable.TABLE, target.keySet()); Object[] params = target.values().toArray(new Object[0]); diff --git a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/ui/ServiceNameServiceH2UIDAO.java b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/ui/ServiceNameServiceH2UIDAO.java index b41d4cb683..167b8f6a89 100644 --- a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/ui/ServiceNameServiceH2UIDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/ui/ServiceNameServiceH2UIDAO.java @@ -18,20 +18,16 @@ package org.apache.skywalking.apm.collector.storage.h2.dao.ui; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.LinkedList; -import java.util.List; -import org.apache.skywalking.apm.collector.client.h2.H2Client; -import org.apache.skywalking.apm.collector.client.h2.H2ClientException; +import java.sql.*; +import java.util.*; +import org.apache.skywalking.apm.collector.client.h2.*; import org.apache.skywalking.apm.collector.storage.base.sql.SqlBuilder; import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceNameServiceUIDAO; import org.apache.skywalking.apm.collector.storage.h2.base.dao.H2DAO; import org.apache.skywalking.apm.collector.storage.table.register.ServiceNameTable; import org.apache.skywalking.apm.collector.storage.ui.service.ServiceInfo; import org.apache.skywalking.apm.network.proto.SpanType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.slf4j.*; /** * @author peng-yongsheng @@ -44,10 +40,10 @@ public class ServiceNameServiceH2UIDAO extends H2DAO implements IServiceNameServ super(client); } - @Override public int getCount() { - String dynamicSql = "select count({0}) as cnt from {1} where {2} = ?"; - String sql = SqlBuilder.buildSql(dynamicSql, ServiceNameTable.SERVICE_ID.getName(), ServiceNameTable.TABLE, ServiceNameTable.SRC_SPAN_TYPE.getName()); - Object[] params = new Object[] {SpanType.Entry_VALUE}; + @Override public int getCount(long startTimeMillis) { + String dynamicSql = "select count({0}) as cnt from {1} where {2} = ? and {3} >= ?"; + String sql = SqlBuilder.buildSql(dynamicSql, ServiceNameTable.SERVICE_ID.getName(), ServiceNameTable.TABLE, ServiceNameTable.SRC_SPAN_TYPE.getName(), ServiceNameTable.HEARTBEAT_TIME.getName()); + Object[] params = new Object[] {SpanType.Entry_VALUE, startTimeMillis}; try (ResultSet rs = getClient().executeQuery(sql, params)) { if (rs.next()) { @@ -59,10 +55,11 @@ public class ServiceNameServiceH2UIDAO extends H2DAO implements IServiceNameServ return 0; } - @Override public List searchService(String keyword, int topN) { - String dynamicSql = "select {0},{1} from {2} where {3} like ? and {4} = ? limit ?"; - String sql = SqlBuilder.buildSql(dynamicSql, ServiceNameTable.SERVICE_ID.getName(), ServiceNameTable.SERVICE_NAME.getName(), ServiceNameTable.TABLE, ServiceNameTable.SERVICE_NAME.getName(), ServiceNameTable.SRC_SPAN_TYPE.getName()); - Object[] params = new Object[] {keyword, SpanType.Entry_VALUE, topN}; + @Override + public List searchService(String keyword, int applicationId, long startTimeMillis, int topN) { + String dynamicSql = "select {0},{1} from {2} where {3} like ? and {4} = ? and {5} = ? and {6} >= ? limit ?"; + String sql = SqlBuilder.buildSql(dynamicSql, ServiceNameTable.SERVICE_ID.getName(), ServiceNameTable.SERVICE_NAME.getName(), ServiceNameTable.TABLE, ServiceNameTable.SERVICE_NAME.getName(), ServiceNameTable.SRC_SPAN_TYPE.getName(), ServiceNameTable.APPLICATION_ID.getName(), ServiceNameTable.HEARTBEAT_TIME.getName()); + Object[] params = new Object[] {keyword, SpanType.Entry_VALUE, applicationId, startTimeMillis, topN}; List serviceInfos = new LinkedList<>(); try (ResultSet rs = getClient().executeQuery(sql, params)) { diff --git a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/define/register/ServiceNameH2TableDefine.java b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/define/register/ServiceNameH2TableDefine.java index eaae57d3b0..d81410bc1e 100644 --- a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/define/register/ServiceNameH2TableDefine.java +++ b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/define/register/ServiceNameH2TableDefine.java @@ -18,8 +18,7 @@ package org.apache.skywalking.apm.collector.storage.h2.define.register; -import org.apache.skywalking.apm.collector.storage.h2.base.define.H2ColumnDefine; -import org.apache.skywalking.apm.collector.storage.h2.base.define.H2TableDefine; +import org.apache.skywalking.apm.collector.storage.h2.base.define.*; import org.apache.skywalking.apm.collector.storage.table.register.ServiceNameTable; /** @@ -37,5 +36,7 @@ public class ServiceNameH2TableDefine extends H2TableDefine { addColumn(new H2ColumnDefine(ServiceNameTable.SERVICE_NAME, H2ColumnDefine.Type.Varchar.name())); addColumn(new H2ColumnDefine(ServiceNameTable.SERVICE_ID, H2ColumnDefine.Type.Int.name())); addColumn(new H2ColumnDefine(ServiceNameTable.SRC_SPAN_TYPE, H2ColumnDefine.Type.Int.name())); + addColumn(new H2ColumnDefine(ServiceNameTable.REGISTER_TIME, H2ColumnDefine.Type.Bigint.name())); + addColumn(new H2ColumnDefine(ServiceNameTable.HEARTBEAT_TIME, H2ColumnDefine.Type.Bigint.name())); } } diff --git a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/query/ServiceQuery.java b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/query/ServiceQuery.java index 05d9d96060..c4f848fd2e 100644 --- a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/query/ServiceQuery.java +++ b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/query/ServiceQuery.java @@ -18,17 +18,15 @@ package org.apache.skywalking.apm.collector.ui.query; +import java.text.ParseException; +import java.util.List; import org.apache.skywalking.apm.collector.core.module.ModuleManager; import org.apache.skywalking.apm.collector.storage.ui.common.*; import org.apache.skywalking.apm.collector.storage.ui.service.ServiceInfo; import org.apache.skywalking.apm.collector.ui.graphql.Query; -import org.apache.skywalking.apm.collector.ui.service.ServiceNameService; -import org.apache.skywalking.apm.collector.ui.service.ServiceTopologyService; +import org.apache.skywalking.apm.collector.ui.service.*; import org.apache.skywalking.apm.collector.ui.utils.DurationUtils; -import java.text.ParseException; -import java.util.List; - import static java.util.Objects.isNull; /** @@ -58,8 +56,8 @@ public class ServiceQuery implements Query { return serviceTopologyService; } - public List searchService(String keyword, int topN) throws ParseException { - return getServiceNameService().searchService(keyword, topN); + public List searchService(String keyword, int applicationId, int topN) { + return getServiceNameService().searchService(keyword, applicationId, topN); } public ResponseTimeTrend getServiceResponseTimeTrend(int serviceId, Duration duration) throws ParseException { @@ -81,7 +79,7 @@ public class ServiceQuery implements Query { return getServiceNameService().getServiceSLATrend(serviceId, duration.getStep(), startTimeBucket, endTimeBucket); } - public Topology getServiceTopology(int serviceId, Duration duration) throws ParseException { + public Topology getServiceTopology(int serviceId, Duration duration) { long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart()); long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd()); diff --git a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/ServiceNameService.java b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/ServiceNameService.java index 73cd0ad1bd..964317f22a 100644 --- a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/ServiceNameService.java +++ b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/ServiceNameService.java @@ -27,6 +27,7 @@ import org.apache.skywalking.apm.collector.storage.StorageModule; import org.apache.skywalking.apm.collector.storage.dao.ui.*; import org.apache.skywalking.apm.collector.storage.table.MetricSource; import org.apache.skywalking.apm.collector.storage.table.register.ServiceName; +import org.apache.skywalking.apm.collector.storage.ttl.ITTLConfigService; import org.apache.skywalking.apm.collector.storage.ui.common.*; import org.apache.skywalking.apm.collector.storage.ui.service.*; import org.apache.skywalking.apm.collector.storage.utils.DurationPoint; @@ -44,20 +45,27 @@ public class ServiceNameService { private final IServiceMetricUIDAO serviceMetricUIDAO; private final ServiceNameCacheService serviceNameCacheService; private final DateBetweenService dateBetweenService; + private final ITTLConfigService configService; public ServiceNameService(ModuleManager moduleManager) { this.serviceNameServiceUIDAO = moduleManager.find(StorageModule.NAME).getService(IServiceNameServiceUIDAO.class); this.serviceMetricUIDAO = moduleManager.find(StorageModule.NAME).getService(IServiceMetricUIDAO.class); this.serviceNameCacheService = moduleManager.find(CacheModule.NAME).getService(ServiceNameCacheService.class); + this.configService = moduleManager.find(StorageModule.NAME).getService(ITTLConfigService.class); this.dateBetweenService = new DateBetweenService(moduleManager); } public int getCount() { - return serviceNameServiceUIDAO.getCount(); + return serviceNameServiceUIDAO.getCount(startTimeMillis()); } - public List searchService(String keyword, int topN) { - return serviceNameServiceUIDAO.searchService(keyword, topN); + public List searchService(String keyword, int applicationId, int topN) { + return serviceNameServiceUIDAO.searchService(keyword, applicationId, startTimeMillis(), topN); + } + + private long startTimeMillis() { + int minuteMetricDataTTL = configService.minuteMetricDataTTL(); + return System.currentTimeMillis() - minuteMetricDataTTL * 60 * 60 * 100; } public ThroughputTrend getServiceThroughputTrend(int serviceId, Step step, long startTimeBucket, diff --git a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/query/ServiceQueryTest.java b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/query/ServiceQueryTest.java index fd0b99c81b..b615842418 100644 --- a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/query/ServiceQueryTest.java +++ b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/query/ServiceQueryTest.java @@ -18,18 +18,13 @@ package org.apache.skywalking.apm.collector.ui.query; -import org.apache.skywalking.apm.collector.storage.ui.common.Duration; -import org.apache.skywalking.apm.collector.storage.ui.common.Step; -import org.apache.skywalking.apm.collector.ui.service.ServiceNameService; -import org.apache.skywalking.apm.collector.ui.service.ServiceTopologyService; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import java.text.ParseException; +import org.apache.skywalking.apm.collector.storage.ui.common.*; +import org.apache.skywalking.apm.collector.ui.service.*; +import org.junit.*; import org.mockito.Mockito; import org.mockito.internal.util.reflection.Whitebox; -import java.text.ParseException; - /** * @author lican */ @@ -50,14 +45,14 @@ public class ServiceQueryTest { @Test public void searchService() throws ParseException { - serviceQuery.searchService("keyword", -1); + serviceQuery.searchService("keyword", 0, -1); } @Test public void getServiceResponseTimeTrend() throws ParseException { Mockito.when(serviceNameService.getServiceResponseTimeTrend( - Mockito.anyInt(), Mockito.anyObject(), - Mockito.anyLong(), Mockito.anyLong()) + Mockito.anyInt(), Mockito.anyObject(), + Mockito.anyLong(), Mockito.anyLong()) ).then(invocation -> { Object[] arguments = invocation.getArguments(); Assert.assertEquals(201701L, arguments[2]); @@ -74,8 +69,8 @@ public class ServiceQueryTest { @Test public void getServiceThroughputTrend() throws ParseException { Mockito.when(serviceNameService.getServiceThroughputTrend( - Mockito.anyInt(), Mockito.anyObject(), - Mockito.anyLong(), Mockito.anyLong()) + Mockito.anyInt(), Mockito.anyObject(), + Mockito.anyLong(), Mockito.anyLong()) ).then(invocation -> { Object[] arguments = invocation.getArguments(); Assert.assertEquals(201701L, arguments[2]); @@ -92,8 +87,8 @@ public class ServiceQueryTest { @Test public void getServiceSLATrend() throws ParseException { Mockito.when(serviceNameService.getServiceSLATrend( - Mockito.anyInt(), Mockito.anyObject(), - Mockito.anyLong(), Mockito.anyLong()) + Mockito.anyInt(), Mockito.anyObject(), + Mockito.anyLong(), Mockito.anyLong()) ).then(invocation -> { Object[] arguments = invocation.getArguments(); Assert.assertEquals(201701L, arguments[2]); @@ -110,9 +105,9 @@ public class ServiceQueryTest { @Test public void getServiceTopology() throws ParseException { Mockito.when(serviceTopologyService.getServiceTopology( - Mockito.anyObject(), Mockito.anyInt(), - Mockito.anyLong(), Mockito.anyLong(), - Mockito.anyLong(), Mockito.anyLong()) + Mockito.anyObject(), Mockito.anyInt(), + Mockito.anyLong(), Mockito.anyLong(), + Mockito.anyLong(), Mockito.anyLong()) ).then(invocation -> { Object[] arguments = invocation.getArguments(); Assert.assertEquals(201701L, arguments[2]); diff --git a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/service/ServiceNameServiceTest.java b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/service/ServiceNameServiceTest.java index dfb0651f40..3a01925e92 100644 --- a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/service/ServiceNameServiceTest.java +++ b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/service/ServiceNameServiceTest.java @@ -79,7 +79,7 @@ public class ServiceNameServiceTest { @Test public void searchService() { - List serviceInfos = serverNameService.searchService("keyword", 10); + List serviceInfos = serverNameService.searchService("keyword", 0, 10); Assert.assertTrue(serviceInfos.size() == 0); } diff --git a/apm-protocol/apm-ui-protocol/src/main/resources/ui-graphql/service-layer.graphqls b/apm-protocol/apm-ui-protocol/src/main/resources/ui-graphql/service-layer.graphqls index 8faa7e9a7c..a33ad3d321 100644 --- a/apm-protocol/apm-ui-protocol/src/main/resources/ui-graphql/service-layer.graphqls +++ b/apm-protocol/apm-ui-protocol/src/main/resources/ui-graphql/service-layer.graphqls @@ -47,7 +47,7 @@ type TraceItem { } extend type Query { - searchService(keyword: String!, topN: Int!): [ServiceInfo!]! + searchService(keyword: String!, applicationId: ID!, topN: Int!): [ServiceInfo!]! getServiceResponseTimeTrend(serviceId: ID!, duration: Duration!): ResponseTimeTrend getServiceThroughputTrend(serviceId: ID!, duration: Duration!): ThroughputTrend getServiceSLATrend(serviceId: ID!, duration: Duration!): SLATrend -- GitLab