diff --git a/apm-collector/apm-collector-agent-jetty/collector-agent-jetty-provider/src/test/java/org/skywalking/apm/collector/agent/jetty/handler/SegmentPost.java b/apm-collector/apm-collector-agent-jetty/collector-agent-jetty-provider/src/test/java/org/skywalking/apm/collector/agent/jetty/handler/SegmentPost.java index 297eab2a264c44ffd82949f7708da942e63e2709..bf162629f4cff8e09c89fe5cf35ca8d00a9d7f90 100644 --- a/apm-collector/apm-collector-agent-jetty/collector-agent-jetty-provider/src/test/java/org/skywalking/apm/collector/agent/jetty/handler/SegmentPost.java +++ b/apm-collector/apm-collector-agent-jetty/collector-agent-jetty-provider/src/test/java/org/skywalking/apm/collector/agent/jetty/handler/SegmentPost.java @@ -42,7 +42,7 @@ public class SegmentPost { JsonElement provider = JsonFileReader.INSTANCE.read("json/dubbox-provider.json"); JsonElement consumer = JsonFileReader.INSTANCE.read("json/dubbox-consumer.json"); - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < 1; i++) { HttpClientTools.INSTANCE.post("http://localhost:12800/segments", provider.toString()); HttpClientTools.INSTANCE.post("http://localhost:12800/segments", consumer.toString()); } diff --git a/apm-collector/apm-collector-agent-jetty/collector-agent-jetty-provider/src/test/resources/json/dubbox-provider.json b/apm-collector/apm-collector-agent-jetty/collector-agent-jetty-provider/src/test/resources/json/dubbox-provider.json index 709f57760d3f4f7d7b4903fb37a7d936ec5518cd..aba490792c5db55c72a81b3b41c11d422aca515e 100644 --- a/apm-collector/apm-collector-agent-jetty/collector-agent-jetty-provider/src/test/resources/json/dubbox-provider.json +++ b/apm-collector/apm-collector-agent-jetty/collector-agent-jetty-provider/src/test/resources/json/dubbox-provider.json @@ -15,25 +15,6 @@ ], "ai": 2, "ii": 2, - "rs": [ - { - "ts": [ - 230150, - 185809, - 24040000 - ], - "ai": -1, - "si": 1, - "vi": 0, - "vn": "/dubbox-case/case/dubbox-rest", - "ni": 0, - "nn": "172.25.0.4:20880", - "ea": 2, - "ei": 0, - "en": "/dubbox-case/case/dubbox-rest", - "rn": 0 - } - ], "ss": [ { "si": 0, @@ -49,6 +30,25 @@ "pi": 0, "pn": "", "ie": false, + "rs": [ + { + "ts": [ + 230150, + 185809, + 24040000 + ], + "ai": -1, + "si": 1, + "vi": 0, + "vn": "/dubbox-case/case/dubbox-rest", + "ni": 0, + "nn": "172.25.0.4:20880", + "ea": 2, + "ei": 0, + "en": "/dubbox-case/case/dubbox-rest", + "rn": 0 + } + ], "to": [ { "k": "url", diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-define/src/main/java/org/skywalking/apm/collector/agent/stream/service/graph/ApplicationGraphNodeIdDefine.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-define/src/main/java/org/skywalking/apm/collector/agent/stream/service/graph/ApplicationGraphNodeIdDefine.java new file mode 100644 index 0000000000000000000000000000000000000000..969b701d477590eaee410c70e8e7c05263b101d7 --- /dev/null +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-define/src/main/java/org/skywalking/apm/collector/agent/stream/service/graph/ApplicationGraphNodeIdDefine.java @@ -0,0 +1,28 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.service.graph; + +/** + * @author peng-yongsheng + */ +public class ApplicationGraphNodeIdDefine { + public static final int APPLICATION_REFERENCE_METRIC_AGGREGATION_NODE_ID = 1001; + public static final int APPLICATION_REFERENCE_METRIC_REMOTE_NODE_ID = 1002; + public static final int APPLICATION_REFERENCE_METRIC_PERSISTENCE_NODE_ID = 1003; +} diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-define/src/main/java/org/skywalking/apm/collector/agent/stream/service/graph/InstanceGraphNodeIdDefine.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-define/src/main/java/org/skywalking/apm/collector/agent/stream/service/graph/InstanceGraphNodeIdDefine.java new file mode 100644 index 0000000000000000000000000000000000000000..dc702cdc68487418148172b1b9fde6439fdf33da --- /dev/null +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-define/src/main/java/org/skywalking/apm/collector/agent/stream/service/graph/InstanceGraphNodeIdDefine.java @@ -0,0 +1,28 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.service.graph; + +/** + * @author peng-yongsheng + */ +public class InstanceGraphNodeIdDefine { + public static final int INSTANCE_REFERENCE_METRIC_AGGREGATION_NODE_ID = 2001; + public static final int INSTANCE_REFERENCE_METRIC_REMOTE_NODE_ID = 2002; + public static final int INSTANCE_REFERENCE_METRIC_PERSISTENCE_NODE_ID = 2003; +} diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-define/src/main/java/org/skywalking/apm/collector/agent/stream/service/graph/JvmMetricStreamGraphDefine.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-define/src/main/java/org/skywalking/apm/collector/agent/stream/service/graph/JvmMetricStreamGraphDefine.java new file mode 100644 index 0000000000000000000000000000000000000000..9a22d036f8c3c116b0350a286c46007f3f4feb9d --- /dev/null +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-define/src/main/java/org/skywalking/apm/collector/agent/stream/service/graph/JvmMetricStreamGraphDefine.java @@ -0,0 +1,30 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.service.graph; + +/** + * @author peng-yongsheng + */ +public class JvmMetricStreamGraphDefine { + public static final int GC_METRIC_GRAPH_ID = 100; + public static final int MEMORY_METRIC_GRAPH_ID = 101; + public static final int MEMORY_POOL_METRIC_GRAPH_ID = 102; + public static final int CPU_METRIC_GRAPH_ID = 103; + public static final int INST_HEART_BEAT_GRAPH_ID = 104; +} diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/parser/RefsListener.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-define/src/main/java/org/skywalking/apm/collector/agent/stream/service/graph/RegisterStreamGraphDefine.java similarity index 70% rename from apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/parser/RefsListener.java rename to apm-collector/apm-collector-agent-stream/collector-agent-stream-define/src/main/java/org/skywalking/apm/collector/agent/stream/service/graph/RegisterStreamGraphDefine.java index d12db455f657d2c8be2b7554173cd49397b899ae..4c194be5794d43dd73f7123728d87ed21c1a3a9a 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/parser/RefsListener.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-define/src/main/java/org/skywalking/apm/collector/agent/stream/service/graph/RegisterStreamGraphDefine.java @@ -16,13 +16,13 @@ * Project repository: https://github.com/OpenSkywalking/skywalking */ -package org.skywalking.apm.collector.agent.stream.parser; - -import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator; +package org.skywalking.apm.collector.agent.stream.service.graph; /** * @author peng-yongsheng */ -public interface RefsListener extends SpanListener { - void parseRef(ReferenceDecorator referenceDecorator, int applicationId, int instanceId, String segmentId); +public class RegisterStreamGraphDefine { + public static final int APPLICATION_REGISTER_GRAPH_ID = 200; + public static final int INSTANCE_REGISTER_GRAPH_ID = 201; + public static final int SERVICE_NAME_REGISTER_GRAPH_ID = 202; } diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-define/src/main/java/org/skywalking/apm/collector/agent/stream/service/graph/ServiceGraphNodeIdDefine.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-define/src/main/java/org/skywalking/apm/collector/agent/stream/service/graph/ServiceGraphNodeIdDefine.java new file mode 100644 index 0000000000000000000000000000000000000000..1394ca71de1fc00dd08b44aeb548405de9da8f75 --- /dev/null +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-define/src/main/java/org/skywalking/apm/collector/agent/stream/service/graph/ServiceGraphNodeIdDefine.java @@ -0,0 +1,28 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.service.graph; + +/** + * @author peng-yongsheng + */ +public class ServiceGraphNodeIdDefine { + public static final int SERVICE_REFERENCE_METRIC_AGGREGATION_NODE_ID = 3001; + public static final int SERVICE_REFERENCE_METRIC_REMOTE_NODE_ID = 3002; + public static final int SERVICE_REFERENCE_METRIC_PERSISTENCE_NODE_ID = 3003; +} diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-define/src/main/java/org/skywalking/apm/collector/agent/stream/service/trace/MetricSource.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-define/src/main/java/org/skywalking/apm/collector/agent/stream/service/trace/MetricSource.java new file mode 100644 index 0000000000000000000000000000000000000000..54cfbc0401b136036e84ac920cc1b2b65995dfaa --- /dev/null +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-define/src/main/java/org/skywalking/apm/collector/agent/stream/service/trace/MetricSource.java @@ -0,0 +1,26 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.service.trace; + +/** + * @author peng-yongsheng + */ +public enum MetricSource { + Exit, Entry +} diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/AgentStreamBootStartup.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/AgentStreamBootStartup.java index a0f56dba84edbb502b720aa644ba796f7a5e5e0c..448470cda81d30cc12354bb559c238ed43665a6c 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/AgentStreamBootStartup.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/AgentStreamBootStartup.java @@ -45,7 +45,6 @@ public class AgentStreamBootStartup { PersistenceTimer timer = new PersistenceTimer(); timer.start(moduleManager, workerCreateListener.getPersistenceWorkers()); - } private void createJVMGraph() { @@ -71,10 +70,11 @@ public class AgentStreamBootStartup { traceStreamGraph.createInstanceMetricGraph(); traceStreamGraph.createApplicationComponentGraph(); traceStreamGraph.createApplicationMappingGraph(); - traceStreamGraph.createApplicationReferenceMetricGraph(); +// traceStreamGraph.createApplicationReferenceMetricGraph(); traceStreamGraph.createServiceEntryGraph(); - traceStreamGraph.createServiceReferenceGraph(); traceStreamGraph.createSegmentGraph(); traceStreamGraph.createSegmentCostGraph(); + + traceStreamGraph.createServiceReferenceGraph(); } } diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/graph/GraphDefine.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/graph/GraphDefine.java new file mode 100644 index 0000000000000000000000000000000000000000..97828d83efc7764091f82034eaade29f23078ce7 --- /dev/null +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/graph/GraphDefine.java @@ -0,0 +1,27 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.graph; + +/** + * @author peng-yongsheng + */ +public interface GraphDefine { + + +} diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/graph/JvmMetricStreamGraph.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/graph/JvmMetricStreamGraph.java index 355b17c8672f9cf3504f9aa3c08ce75c82860e99..87b145cbfaf38219e743dfe56699e977d5364629 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/graph/JvmMetricStreamGraph.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/graph/JvmMetricStreamGraph.java @@ -18,6 +18,7 @@ package org.skywalking.apm.collector.agent.stream.graph; +import org.skywalking.apm.collector.agent.stream.service.graph.JvmMetricStreamGraphDefine; import org.skywalking.apm.collector.agent.stream.worker.jvm.CpuMetricPersistenceWorker; import org.skywalking.apm.collector.agent.stream.worker.jvm.GCMetricPersistenceWorker; import org.skywalking.apm.collector.agent.stream.worker.jvm.InstHeartBeatPersistenceWorker; @@ -40,12 +41,6 @@ import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener; */ public class JvmMetricStreamGraph { - public static final int GC_METRIC_GRAPH_ID = 100; - public static final int MEMORY_METRIC_GRAPH_ID = 101; - public static final int MEMORY_POOL_METRIC_GRAPH_ID = 102; - public static final int CPU_METRIC_GRAPH_ID = 103; - public static final int INST_HEART_BEAT_GRAPH_ID = 104; - private final ModuleManager moduleManager; private final WorkerCreateListener workerCreateListener; @@ -58,7 +53,7 @@ public class JvmMetricStreamGraph { public void createGcMetricGraph() { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); - Graph graph = GraphManager.INSTANCE.createIfAbsent(GC_METRIC_GRAPH_ID, GCMetric.class); + Graph graph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraphDefine.GC_METRIC_GRAPH_ID, GCMetric.class); graph.addNode(new GCMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); } @@ -66,7 +61,7 @@ public class JvmMetricStreamGraph { public void createCpuMetricGraph() { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); - Graph graph = GraphManager.INSTANCE.createIfAbsent(CPU_METRIC_GRAPH_ID, CpuMetric.class); + Graph graph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraphDefine.CPU_METRIC_GRAPH_ID, CpuMetric.class); graph.addNode(new CpuMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); } @@ -74,7 +69,7 @@ public class JvmMetricStreamGraph { public void createMemoryMetricGraph() { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); - Graph graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_METRIC_GRAPH_ID, MemoryMetric.class); + Graph graph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraphDefine.MEMORY_METRIC_GRAPH_ID, MemoryMetric.class); graph.addNode(new MemoryMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); } @@ -82,7 +77,7 @@ public class JvmMetricStreamGraph { public void createMemoryPoolMetricGraph() { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); - Graph graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_POOL_METRIC_GRAPH_ID, MemoryPoolMetric.class); + Graph graph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraphDefine.MEMORY_POOL_METRIC_GRAPH_ID, MemoryPoolMetric.class); graph.addNode(new MemoryPoolMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); } @@ -90,7 +85,7 @@ public class JvmMetricStreamGraph { public void createHeartBeatGraph() { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); - Graph graph = GraphManager.INSTANCE.createIfAbsent(INST_HEART_BEAT_GRAPH_ID, Instance.class); + Graph graph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraphDefine.INST_HEART_BEAT_GRAPH_ID, Instance.class); graph.addNode(new InstHeartBeatPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); } } diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/graph/RegisterStreamGraph.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/graph/RegisterStreamGraph.java index 9e5d7af081038b7be37b61470794b2492a47789d..017a573ced7015aa10b693bbb96e71d20c42a1fe 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/graph/RegisterStreamGraph.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/graph/RegisterStreamGraph.java @@ -18,6 +18,7 @@ package org.skywalking.apm.collector.agent.stream.graph; +import org.skywalking.apm.collector.agent.stream.service.graph.RegisterStreamGraphDefine; import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationRegisterRemoteWorker; import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationRegisterSerialWorker; import org.skywalking.apm.collector.agent.stream.worker.register.InstanceRegisterRemoteWorker; @@ -41,10 +42,6 @@ import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener; */ public class RegisterStreamGraph { - public static final int APPLICATION_REGISTER_GRAPH_ID = 200; - public static final int INSTANCE_REGISTER_GRAPH_ID = 201; - public static final int SERVICE_NAME_REGISTER_GRAPH_ID = 202; - private final ModuleManager moduleManager; private final WorkerCreateListener workerCreateListener; @@ -59,8 +56,8 @@ public class RegisterStreamGraph { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); - Graph graph = GraphManager.INSTANCE.createIfAbsent(APPLICATION_REGISTER_GRAPH_ID, Application.class); - graph.addNode(new ApplicationRegisterRemoteWorker.Factory(moduleManager, remoteSenderService, APPLICATION_REGISTER_GRAPH_ID).create(workerCreateListener)) + Graph graph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraphDefine.APPLICATION_REGISTER_GRAPH_ID, Application.class); + graph.addNode(new ApplicationRegisterRemoteWorker.Factory(moduleManager, remoteSenderService, RegisterStreamGraphDefine.APPLICATION_REGISTER_GRAPH_ID).create(workerCreateListener)) .addNext(new ApplicationRegisterSerialWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); } @@ -70,8 +67,8 @@ public class RegisterStreamGraph { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); - Graph graph = GraphManager.INSTANCE.createIfAbsent(INSTANCE_REGISTER_GRAPH_ID, Instance.class); - graph.addNode(new InstanceRegisterRemoteWorker.Factory(moduleManager, remoteSenderService, INSTANCE_REGISTER_GRAPH_ID).create(workerCreateListener)) + Graph graph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraphDefine.INSTANCE_REGISTER_GRAPH_ID, Instance.class); + graph.addNode(new InstanceRegisterRemoteWorker.Factory(moduleManager, remoteSenderService, RegisterStreamGraphDefine.INSTANCE_REGISTER_GRAPH_ID).create(workerCreateListener)) .addNext(new InstanceRegisterSerialWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); } @@ -81,8 +78,8 @@ public class RegisterStreamGraph { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); - Graph graph = GraphManager.INSTANCE.createIfAbsent(SERVICE_NAME_REGISTER_GRAPH_ID, ServiceName.class); - graph.addNode(new ServiceNameRegisterRemoteWorker.Factory(moduleManager, remoteSenderService, SERVICE_NAME_REGISTER_GRAPH_ID).create(workerCreateListener)) + Graph graph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraphDefine.SERVICE_NAME_REGISTER_GRAPH_ID, ServiceName.class); + graph.addNode(new ServiceNameRegisterRemoteWorker.Factory(moduleManager, remoteSenderService, RegisterStreamGraphDefine.SERVICE_NAME_REGISTER_GRAPH_ID).create(workerCreateListener)) .addNext(new ServiceNameRegisterSerialWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); } } diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/graph/TraceStreamGraph.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/graph/TraceStreamGraph.java index 446faed81871fc7d82e069b89ca36f769f5d9a0e..117acdea864ea37938bb7f544cb5f0f48d26cfd0 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/graph/TraceStreamGraph.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/graph/TraceStreamGraph.java @@ -20,6 +20,7 @@ package org.skywalking.apm.collector.agent.stream.graph; import org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentStandardization; import org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentStandardizationWorker; +import org.skywalking.apm.collector.agent.stream.service.graph.ServiceGraphNodeIdDefine; import org.skywalking.apm.collector.agent.stream.worker.trace.application.ApplicationComponentAggregationWorker; import org.skywalking.apm.collector.agent.stream.worker.trace.application.ApplicationComponentPersistenceWorker; import org.skywalking.apm.collector.agent.stream.worker.trace.application.ApplicationComponentRemoteWorker; @@ -31,6 +32,9 @@ import org.skywalking.apm.collector.agent.stream.worker.trace.application.Applic import org.skywalking.apm.collector.agent.stream.worker.trace.application.ApplicationReferenceMetricRemoteWorker; import org.skywalking.apm.collector.agent.stream.worker.trace.global.GlobalTracePersistenceWorker; import org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstanceMetricPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstanceReferenceMetricAggregationWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstanceReferenceMetricRemoteWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstanceReferencePersistenceWorker; import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentCostPersistenceWorker; import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentPersistenceWorker; import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryAggregationWorker; @@ -41,6 +45,7 @@ import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceRef import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceReferenceMetricRemoteWorker; import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.GraphManager; +import org.skywalking.apm.collector.core.graph.Node; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.queue.QueueModule; import org.skywalking.apm.collector.queue.service.QueueCreatorService; @@ -51,6 +56,7 @@ import org.skywalking.apm.collector.storage.table.application.ApplicationMapping import org.skywalking.apm.collector.storage.table.application.ApplicationReferenceMetric; import org.skywalking.apm.collector.storage.table.global.GlobalTrace; import org.skywalking.apm.collector.storage.table.instance.InstanceMetric; +import org.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; import org.skywalking.apm.collector.storage.table.segment.Segment; import org.skywalking.apm.collector.storage.table.segment.SegmentCost; import org.skywalking.apm.collector.storage.table.service.ServiceEntry; @@ -61,7 +67,6 @@ import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener; * @author peng-yongsheng */ public class TraceStreamGraph { - public static final int GLOBAL_TRACE_GRAPH_ID = 300; public static final int INSTANCE_METRIC_GRAPH_ID = 301; public static final int APPLICATION_COMPONENT_GRAPH_ID = 302; @@ -127,17 +132,6 @@ public class TraceStreamGraph { .addNext(new ApplicationMappingPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); } - @SuppressWarnings("unchecked") - public void createApplicationReferenceMetricGraph() { - QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); - RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); - - Graph graph = GraphManager.INSTANCE.createIfAbsent(APPLICATION_REFERENCE_METRIC_GRAPH_ID, ApplicationReferenceMetric.class); - graph.addNode(new ApplicationReferenceMetricAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)) - .addNext(new ApplicationReferenceMetricRemoteWorker.Factory(moduleManager, remoteSenderService, APPLICATION_REFERENCE_METRIC_GRAPH_ID).create(workerCreateListener)) - .addNext(new ApplicationReferenceMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); - } - @SuppressWarnings("unchecked") public void createServiceEntryGraph() { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); @@ -158,6 +152,33 @@ public class TraceStreamGraph { graph.addNode(new ServiceReferenceMetricAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)) .addNext(new ServiceReferenceMetricRemoteWorker.Factory(moduleManager, remoteSenderService, SERVICE_REFERENCE_GRAPH_ID).create(workerCreateListener)) .addNext(new ServiceReferenceMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); + + createInstanceReferenceGraph(graph); + } + + @SuppressWarnings("unchecked") + private void createInstanceReferenceGraph(Graph graph) { + QueueCreatorService aggregationQueueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + QueueCreatorService persistenceQueueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); + + Node serviceReferenceMetricNode = graph.toFinder().findNode(ServiceGraphNodeIdDefine.SERVICE_REFERENCE_METRIC_AGGREGATION_NODE_ID, ServiceReferenceMetric.class); + serviceReferenceMetricNode.addNext(new InstanceReferenceMetricAggregationWorker.Factory(moduleManager, aggregationQueueCreatorService).create(workerCreateListener)) + .addNext(new InstanceReferenceMetricRemoteWorker.Factory(moduleManager, remoteSenderService, SERVICE_REFERENCE_GRAPH_ID).create(workerCreateListener)) + .addNext(new InstanceReferencePersistenceWorker.Factory(moduleManager, persistenceQueueCreatorService).create(workerCreateListener)); + + createApplicationReferenceMetricGraph(graph); + } + + @SuppressWarnings("unchecked") + private void createApplicationReferenceMetricGraph(Graph graph) { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); + +// Node serviceReferenceMetricNode = graph.toFinder().findNode(ServiceGraphNodeIdDefine.SERVICE_REFERENCE_METRIC_AGGREGATION_NODE_ID, ServiceReferenceMetric.class); +// graph.addNode(new ApplicationReferenceMetricAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)) +// .addNext(new ApplicationReferenceMetricRemoteWorker.Factory(moduleManager, remoteSenderService, APPLICATION_REFERENCE_METRIC_GRAPH_ID).create(workerCreateListener)) +// .addNext(new ApplicationReferenceMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); } @SuppressWarnings("unchecked") diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SegmentParse.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SegmentParse.java index 1460f15e71fbcbd4125f9ea39432ed3760b866a3..442dae4479a73550cd5d6f250ca667cddfe645f3 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SegmentParse.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SegmentParse.java @@ -36,6 +36,7 @@ import org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstanceM import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentCostSpanListener; import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntrySpanListener; import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceReferenceMetricSpanListener; +import org.skywalking.apm.collector.core.UnexpectedException; import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.module.ModuleManager; @@ -119,6 +120,7 @@ public class SegmentParse { int applicationId = segmentDecorator.getApplicationId(); int applicationInstanceId = segmentDecorator.getApplicationInstanceId(); + int entrySpanCount = 0; for (int i = 0; i < segmentDecorator.getSpansCount(); i++) { SpanDecorator spanDecorator = segmentDecorator.getSpans(i); @@ -130,11 +132,21 @@ public class SegmentParse { if (!ReferenceIdExchanger.getInstance(moduleManager).exchange(referenceDecorator, applicationId)) { return false; } - - notifyRefsListener(referenceDecorator, applicationId, applicationInstanceId, segmentId); } } + if (SpanType.Entry.equals(spanDecorator.getSpanType())) { + entrySpanCount++; + } + + if (entrySpanCount > 1) { + throw new UnexpectedException("This segment contains multiple entry span."); + } + } + + for (int i = 0; i < segmentDecorator.getSpansCount(); i++) { + SpanDecorator spanDecorator = segmentDecorator.getSpans(i); + if (spanDecorator.getSpanId() == 0) { notifyFirstListener(spanDecorator, applicationId, applicationInstanceId, segmentId); timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime()); @@ -210,15 +222,6 @@ public class SegmentParse { } } - private void notifyRefsListener(ReferenceDecorator reference, int applicationId, int applicationInstanceId, - String segmentId) { - for (SpanListener listener : spanListeners) { - if (listener instanceof RefsListener) { - ((RefsListener)listener).parseRef(reference, applicationId, applicationInstanceId, segmentId); - } - } - } - private void notifyGlobalsListener(UniqueId uniqueId) { for (SpanListener listener : spanListeners) { if (listener instanceof GlobalTraceIdsListener) { diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SpanDecorator.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SpanDecorator.java index f2b5f594a3be9d861f9aaed43c7827abc88994d0..41701319f21ade3c65444535a869fd96a67998bb 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SpanDecorator.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SpanDecorator.java @@ -191,7 +191,11 @@ public class SpanDecorator implements StandardBuilder { } public int getRefsCount() { - return spanObject.getRefsCount(); + if (isOrigin) { + return spanObject.getRefsCount(); + } else { + return spanBuilder.getRefsCount(); + } } public ReferenceDecorator getRefs(int index) { diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/CpuMetricService.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/CpuMetricService.java index 1bbb03bd041183ea97e0c0bda9b0d86512fa54b0..af6e63c4a1c47eeb468330698b0dd3739cdc2b3a 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/CpuMetricService.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/CpuMetricService.java @@ -18,7 +18,7 @@ package org.skywalking.apm.collector.agent.stream.worker.jvm; -import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph; +import org.skywalking.apm.collector.agent.stream.service.graph.JvmMetricStreamGraphDefine; import org.skywalking.apm.collector.agent.stream.service.jvm.ICpuMetricService; import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.GraphManager; @@ -39,7 +39,7 @@ public class CpuMetricService implements ICpuMetricService { private Graph getCpuMetricGraph() { if (ObjectUtils.isEmpty(cpuMetricGraph)) { - cpuMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.CPU_METRIC_GRAPH_ID, CpuMetric.class); + cpuMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraphDefine.CPU_METRIC_GRAPH_ID, CpuMetric.class); } return cpuMetricGraph; } diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/GCMetricService.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/GCMetricService.java index e2ae3a348c807d42f2ca5ffb0d2be62ee34a0ce0..1b55e16889f2fdc88ac7877fe6eaf73b69308274 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/GCMetricService.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/GCMetricService.java @@ -18,7 +18,7 @@ package org.skywalking.apm.collector.agent.stream.worker.jvm; -import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph; +import org.skywalking.apm.collector.agent.stream.service.graph.JvmMetricStreamGraphDefine; import org.skywalking.apm.collector.agent.stream.service.jvm.IGCMetricService; import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.GraphManager; @@ -39,7 +39,7 @@ public class GCMetricService implements IGCMetricService { private Graph getGcMetricGraph() { if (ObjectUtils.isEmpty(gcMetricGraph)) { - gcMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.GC_METRIC_GRAPH_ID, GCMetric.class); + gcMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraphDefine.GC_METRIC_GRAPH_ID, GCMetric.class); } return gcMetricGraph; } diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/InstanceHeartBeatService.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/InstanceHeartBeatService.java index 8ef9c35061801e9a3ce361b5f4be3e7bab3c8746..9a38dc3d8c33a9e66a841f26f0e65e4eb81b64cd 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/InstanceHeartBeatService.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/InstanceHeartBeatService.java @@ -18,7 +18,7 @@ package org.skywalking.apm.collector.agent.stream.worker.jvm; -import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph; +import org.skywalking.apm.collector.agent.stream.service.graph.JvmMetricStreamGraphDefine; import org.skywalking.apm.collector.agent.stream.service.jvm.IInstanceHeartBeatService; import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.GraphManager; @@ -39,7 +39,7 @@ public class InstanceHeartBeatService implements IInstanceHeartBeatService { private Graph getHeartBeatGraph() { if (ObjectUtils.isEmpty(heartBeatGraph)) { - this.heartBeatGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.INST_HEART_BEAT_GRAPH_ID, Instance.class); + this.heartBeatGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraphDefine.INST_HEART_BEAT_GRAPH_ID, Instance.class); } return heartBeatGraph; } diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryMetricService.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryMetricService.java index 32d5e8f2e95b52148388a6f8aadafe9152144c56..027171220b7c465e4a6171626d40dcd1f36c9a1f 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryMetricService.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryMetricService.java @@ -18,7 +18,7 @@ package org.skywalking.apm.collector.agent.stream.worker.jvm; -import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph; +import org.skywalking.apm.collector.agent.stream.service.graph.JvmMetricStreamGraphDefine; import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryMetricService; import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.GraphManager; @@ -39,7 +39,7 @@ public class MemoryMetricService implements IMemoryMetricService { private Graph getMemoryMetricGraph() { if (ObjectUtils.isEmpty(memoryMetricGraph)) { - this.memoryMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.MEMORY_METRIC_GRAPH_ID, MemoryMetric.class); + this.memoryMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraphDefine.MEMORY_METRIC_GRAPH_ID, MemoryMetric.class); } return memoryMetricGraph; } diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryPoolMetricService.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryPoolMetricService.java index 8e4aed00a48b7e25ca165d98b45798f1f1527347..f9502c2a14f26b6cb0240530f866115792aa552e 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryPoolMetricService.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryPoolMetricService.java @@ -18,7 +18,7 @@ package org.skywalking.apm.collector.agent.stream.worker.jvm; -import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph; +import org.skywalking.apm.collector.agent.stream.service.graph.JvmMetricStreamGraphDefine; import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryPoolMetricService; import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.GraphManager; @@ -39,7 +39,7 @@ public class MemoryPoolMetricService implements IMemoryPoolMetricService { private Graph getMemoryPoolMetricGraph() { if (ObjectUtils.isEmpty(memoryPoolMetricGraph)) { - this.memoryPoolMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.MEMORY_POOL_METRIC_GRAPH_ID, MemoryPoolMetric.class); + this.memoryPoolMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraphDefine.MEMORY_POOL_METRIC_GRAPH_ID, MemoryPoolMetric.class); } return memoryPoolMetricGraph; } diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationIDService.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationIDService.java index 42aa307ba432530fbc5d468689f2dce548a586dd..2bbb50235a26c142c790f00ae0f6e7e09216c051 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationIDService.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationIDService.java @@ -18,7 +18,7 @@ package org.skywalking.apm.collector.agent.stream.worker.register; -import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph; +import org.skywalking.apm.collector.agent.stream.service.graph.RegisterStreamGraphDefine; import org.skywalking.apm.collector.agent.stream.service.register.IApplicationIDService; import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.service.ApplicationCacheService; @@ -47,7 +47,7 @@ public class ApplicationIDService implements IApplicationIDService { private Graph getApplicationRegisterGraph() { if (ObjectUtils.isEmpty(applicationRegisterGraph)) { - this.applicationRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.APPLICATION_REGISTER_GRAPH_ID, Application.class); + this.applicationRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraphDefine.APPLICATION_REGISTER_GRAPH_ID, Application.class); } return this.applicationRegisterGraph; } diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/InstanceIDService.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/InstanceIDService.java index 4f1c94231d98e730ffd519063d5f8adfa4f833cb..59b8edb90381bc610d77f0e12a5e8a290a4fec77 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/InstanceIDService.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/InstanceIDService.java @@ -18,7 +18,7 @@ package org.skywalking.apm.collector.agent.stream.worker.register; -import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph; +import org.skywalking.apm.collector.agent.stream.service.graph.RegisterStreamGraphDefine; import org.skywalking.apm.collector.agent.stream.service.register.IInstanceIDService; import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.service.InstanceCacheService; @@ -57,7 +57,7 @@ public class InstanceIDService implements IInstanceIDService { private Graph getInstanceRegisterGraph() { if (ObjectUtils.isEmpty(instanceRegisterGraph)) { - this.instanceRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.INSTANCE_REGISTER_GRAPH_ID, Instance.class); + this.instanceRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraphDefine.INSTANCE_REGISTER_GRAPH_ID, Instance.class); } return instanceRegisterGraph; } diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ServiceNameService.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ServiceNameService.java index 27a98f0739f1cad19262f7aec108a0c957eaf4d8..69288b09ed01ec4597b55581704d567129459e4d 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ServiceNameService.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ServiceNameService.java @@ -18,7 +18,7 @@ package org.skywalking.apm.collector.agent.stream.worker.register; -import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph; +import org.skywalking.apm.collector.agent.stream.service.graph.RegisterStreamGraphDefine; import org.skywalking.apm.collector.agent.stream.service.register.IServiceNameService; import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.service.ServiceIdCacheService; @@ -54,7 +54,7 @@ public class ServiceNameService implements IServiceNameService { private Graph getServiceNameRegisterGraph() { if (ObjectUtils.isEmpty(serviceNameRegisterGraph)) { - this.serviceNameRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.SERVICE_NAME_REGISTER_GRAPH_ID, ServiceName.class); + this.serviceNameRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraphDefine.SERVICE_NAME_REGISTER_GRAPH_ID, ServiceName.class); } return serviceNameRegisterGraph; } diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/application/ApplicationMappingSpanListener.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/application/ApplicationMappingSpanListener.java index df8d8f30f5d9ed33f778272b57cca13b1229a7b8..3f93ab2537d2f717896acf7520cdfca99e3f382f 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/application/ApplicationMappingSpanListener.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/application/ApplicationMappingSpanListener.java @@ -18,12 +18,11 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.application; -import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; +import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; -import org.skywalking.apm.collector.agent.stream.parser.RefsListener; -import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.GraphManager; @@ -36,22 +35,23 @@ import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ -public class ApplicationMappingSpanListener implements RefsListener, FirstSpanListener { +public class ApplicationMappingSpanListener implements FirstSpanListener, EntrySpanListener { private final Logger logger = LoggerFactory.getLogger(ApplicationMappingSpanListener.class); - private List applicationMappings = new ArrayList<>(); + private List applicationMappings = new LinkedList<>(); private long timeBucket; - @Override public void parseRef(ReferenceDecorator referenceDecorator, int applicationId, int instanceId, - String segmentId) { + @Override public void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) { logger.debug("node mapping listener parse reference"); - ApplicationMapping applicationMapping = new ApplicationMapping(Const.EMPTY_STRING); - applicationMapping.setApplicationId(applicationId); - applicationMapping.setAddressId(referenceDecorator.getNetworkAddressId()); - String id = String.valueOf(applicationId) + Const.ID_SPLIT + String.valueOf(applicationMapping.getAddressId()); - applicationMapping.setId(id); - applicationMappings.add(applicationMapping); + if (spanDecorator.getRefsCount() > 0) { + ApplicationMapping applicationMapping = new ApplicationMapping(Const.EMPTY_STRING); + applicationMapping.setApplicationId(applicationId); + applicationMapping.setAddressId(spanDecorator.getRefs(0).getNetworkAddressId()); + String id = String.valueOf(applicationId) + Const.ID_SPLIT + String.valueOf(applicationMapping.getAddressId()); + applicationMapping.setId(id); + applicationMappings.add(applicationMapping); + } } @Override @@ -63,12 +63,11 @@ public class ApplicationMappingSpanListener implements RefsListener, FirstSpanLi @Override public void build() { logger.debug("node mapping listener build"); Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.APPLICATION_MAPPING_GRAPH_ID, ApplicationMapping.class); - - for (ApplicationMapping applicationMapping : applicationMappings) { + applicationMappings.forEach(applicationMapping -> { applicationMapping.setId(timeBucket + Const.ID_SPLIT + applicationMapping.getId()); applicationMapping.setTimeBucket(timeBucket); logger.debug("push to node mapping aggregation worker, id: {}", applicationMapping.getId()); graph.start(applicationMapping); - } + }); } } diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/application/ApplicationReferenceMetricAggregationWorker.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/application/ApplicationReferenceMetricAggregationWorker.java index c87c56116d43fabf3f2fb7854c2fdb3f1d8d6c41..f9855162eed68cd455c098b4de69eef18dbe6562 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/application/ApplicationReferenceMetricAggregationWorker.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/application/ApplicationReferenceMetricAggregationWorker.java @@ -18,28 +18,65 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.application; +import org.skywalking.apm.collector.agent.stream.service.graph.ApplicationGraphNodeIdDefine; +import org.skywalking.apm.collector.cache.CacheModule; +import org.skywalking.apm.collector.cache.service.InstanceCacheService; import org.skywalking.apm.collector.core.module.ModuleManager; +import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.queue.service.QueueCreatorService; import org.skywalking.apm.collector.storage.table.application.ApplicationReferenceMetric; +import org.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider; import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker; /** * @author peng-yongsheng */ -public class ApplicationReferenceMetricAggregationWorker extends AggregationWorker { +public class ApplicationReferenceMetricAggregationWorker extends AggregationWorker { + + private final InstanceCacheService instanceCacheService; public ApplicationReferenceMetricAggregationWorker(ModuleManager moduleManager) { super(moduleManager); + this.instanceCacheService = moduleManager.find(CacheModule.NAME).getService(InstanceCacheService.class); } @Override public int id() { - return ApplicationReferenceMetricAggregationWorker.class.hashCode(); + return ApplicationGraphNodeIdDefine.APPLICATION_REFERENCE_METRIC_AGGREGATION_NODE_ID; + } + + @Override protected ApplicationReferenceMetric transform(InstanceReferenceMetric instanceReferenceMetric) { + Integer frontApplicationId = instanceCacheService.get(instanceReferenceMetric.getFrontInstanceId()); + Integer behindApplicationId = instanceCacheService.get(instanceReferenceMetric.getBehindInstanceId()); + + String id = instanceReferenceMetric.getTimeBucket() + Const.ID_SPLIT + frontApplicationId + Const.ID_SPLIT + behindApplicationId; + + ApplicationReferenceMetric applicationReferenceMetric = new ApplicationReferenceMetric(id); + applicationReferenceMetric.setFrontApplicationId(frontApplicationId); + applicationReferenceMetric.setBehindApplicationId(behindApplicationId); + + applicationReferenceMetric.setTransactionCalls(instanceReferenceMetric.getTransactionCalls()); + applicationReferenceMetric.setTransactionErrorCalls(instanceReferenceMetric.getTransactionErrorCalls()); + applicationReferenceMetric.setTransactionDurationSum(instanceReferenceMetric.getTransactionDurationSum()); + applicationReferenceMetric.setTransactionErrorDurationSum(instanceReferenceMetric.getTransactionErrorDurationSum()); + + applicationReferenceMetric.setBusinessTransactionCalls(instanceReferenceMetric.getBusinessTransactionCalls()); + applicationReferenceMetric.setBusinessTransactionErrorCalls(instanceReferenceMetric.getBusinessTransactionErrorCalls()); + applicationReferenceMetric.setBusinessTransactionDurationSum(instanceReferenceMetric.getBusinessTransactionDurationSum()); + applicationReferenceMetric.setBusinessTransactionErrorDurationSum(instanceReferenceMetric.getBusinessTransactionErrorDurationSum()); + + applicationReferenceMetric.setMqTransactionCalls(instanceReferenceMetric.getMqTransactionCalls()); + applicationReferenceMetric.setMqTransactionErrorCalls(instanceReferenceMetric.getMqTransactionErrorCalls()); + applicationReferenceMetric.setMqTransactionDurationSum(instanceReferenceMetric.getMqTransactionDurationSum()); + applicationReferenceMetric.setMqTransactionErrorDurationSum(instanceReferenceMetric.getMqTransactionErrorDurationSum()); + + return applicationReferenceMetric; } - public static class Factory extends AbstractLocalAsyncWorkerProvider { + public static class Factory extends AbstractLocalAsyncWorkerProvider { - public Factory(ModuleManager moduleManager, QueueCreatorService queueCreatorService) { + public Factory(ModuleManager moduleManager, + QueueCreatorService queueCreatorService) { super(moduleManager, queueCreatorService); } diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/application/ApplicationReferenceMetricPersistenceWorker.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/application/ApplicationReferenceMetricPersistenceWorker.java index 6d8e1085efb98ab48776fa96060652bdedfa40f2..b38ef0d365c3a817e3aadded9c375873f43de726 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/application/ApplicationReferenceMetricPersistenceWorker.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/application/ApplicationReferenceMetricPersistenceWorker.java @@ -18,6 +18,7 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.application; +import org.skywalking.apm.collector.agent.stream.service.graph.ApplicationGraphNodeIdDefine; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.queue.service.QueueCreatorService; import org.skywalking.apm.collector.storage.StorageModule; @@ -37,7 +38,7 @@ public class ApplicationReferenceMetricPersistenceWorker extends PersistenceWork } @Override public int id() { - return 0; + return ApplicationGraphNodeIdDefine.APPLICATION_REFERENCE_METRIC_PERSISTENCE_NODE_ID; } @Override protected boolean needMergeDBData() { @@ -50,7 +51,8 @@ public class ApplicationReferenceMetricPersistenceWorker extends PersistenceWork public static class Factory extends AbstractLocalAsyncWorkerProvider { - public Factory(ModuleManager moduleManager, QueueCreatorService queueCreatorService) { + public Factory(ModuleManager moduleManager, + QueueCreatorService queueCreatorService) { super(moduleManager, queueCreatorService); } diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/application/ApplicationReferenceMetricRemoteWorker.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/application/ApplicationReferenceMetricRemoteWorker.java index ea32603d5a88543ec57d591b9cd19ab03f79568e..04d6488f423c9a67a1abbed0233c4789d4cd1f67 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/application/ApplicationReferenceMetricRemoteWorker.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/application/ApplicationReferenceMetricRemoteWorker.java @@ -18,6 +18,7 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.application; +import org.skywalking.apm.collector.agent.stream.service.graph.ApplicationGraphNodeIdDefine; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.remote.service.RemoteSenderService; import org.skywalking.apm.collector.remote.service.Selector; @@ -36,7 +37,7 @@ public class ApplicationReferenceMetricRemoteWorker extends AbstractRemoteWorker } @Override public int id() { - return ApplicationReferenceMetricRemoteWorker.class.hashCode(); + return ApplicationGraphNodeIdDefine.APPLICATION_REFERENCE_METRIC_REMOTE_NODE_ID; } @Override protected void onWork(ApplicationReferenceMetric applicationReferenceMetric) throws WorkerException { diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/application/ApplicationReferenceMetricSpanListener.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/application/ApplicationReferenceMetricSpanListener.java index 99a8d0957f1419b4ed795f74c4f64ab2214c92bb..10474522a343dca576e139a370aa4436fd5bf81e 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/application/ApplicationReferenceMetricSpanListener.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/application/ApplicationReferenceMetricSpanListener.java @@ -20,18 +20,13 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.application; import java.util.LinkedList; import java.util.List; -import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.ExitSpanListener; -import org.skywalking.apm.collector.agent.stream.parser.RefsListener; -import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.service.InstanceCacheService; import org.skywalking.apm.collector.configuration.ConfigurationModule; import org.skywalking.apm.collector.configuration.service.IApdexThresholdService; -import org.skywalking.apm.collector.core.graph.Graph; -import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.util.ApdexThresholdUtils; import org.skywalking.apm.collector.core.util.CollectionUtils; @@ -44,7 +39,7 @@ import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ -public class ApplicationReferenceMetricSpanListener implements EntrySpanListener, ExitSpanListener, RefsListener { +public class ApplicationReferenceMetricSpanListener implements EntrySpanListener, ExitSpanListener { private final Logger logger = LoggerFactory.getLogger(ApplicationReferenceMetricSpanListener.class); @@ -100,22 +95,22 @@ public class ApplicationReferenceMetricSpanListener implements EntrySpanListener } } - @Override public void parseRef(ReferenceDecorator referenceDecorator, int applicationId, int instanceId, - String segmentId) { - int parentApplicationId = instanceCacheService.get(referenceDecorator.getParentApplicationInstanceId()); - - ApplicationReferenceMetric referenceSum = new ApplicationReferenceMetric(Const.EMPTY_STRING); - referenceSum.setFrontApplicationId(parentApplicationId); - referenceSum.setBehindApplicationId(applicationId); - references.add(referenceSum); - } +// @Override public void parseRef(ReferenceDecorator referenceDecorator, int applicationId, int instanceId, +// String segmentId) { +// int parentApplicationId = instanceCacheService.get(referenceDecorator.getParentApplicationInstanceId()); +// +// ApplicationReferenceMetric referenceSum = new ApplicationReferenceMetric(Const.EMPTY_STRING); +// referenceSum.setFrontApplicationId(parentApplicationId); +// referenceSum.setBehindApplicationId(applicationId); +// references.add(referenceSum); +// } @Override public void build() { logger.debug("node reference summary listener build"); - Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.APPLICATION_REFERENCE_METRIC_GRAPH_ID, ApplicationReferenceMetric.class); - for (ApplicationReferenceMetric applicationReferenceMetric : applicationReferenceMetrics) { - graph.start(applicationReferenceMetric); - } +// Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.APPLICATION_REFERENCE_METRIC_GRAPH_ID, ApplicationReferenceMetric.class); +// for (ApplicationReferenceMetric applicationReferenceMetric : applicationReferenceMetrics) { +// graph.start(applicationReferenceMetric); +// } } private ApplicationReferenceMetric buildApplicationRefSum(ApplicationReferenceMetric reference, diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/instance/InstanceReferenceMetricAggregationWorker.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/instance/InstanceReferenceMetricAggregationWorker.java new file mode 100644 index 0000000000000000000000000000000000000000..b0e4558d1ccfb63aa7ca66e344738d765a460e7e --- /dev/null +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/instance/InstanceReferenceMetricAggregationWorker.java @@ -0,0 +1,84 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.worker.trace.instance; + +import org.skywalking.apm.collector.agent.stream.service.graph.InstanceGraphNodeIdDefine; +import org.skywalking.apm.collector.core.module.ModuleManager; +import org.skywalking.apm.collector.core.util.Const; +import org.skywalking.apm.collector.queue.service.QueueCreatorService; +import org.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; +import org.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric; +import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider; +import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker; + +/** + * @author peng-yongsheng + */ +public class InstanceReferenceMetricAggregationWorker extends AggregationWorker { + + public InstanceReferenceMetricAggregationWorker(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public int id() { + return InstanceGraphNodeIdDefine.INSTANCE_REFERENCE_METRIC_AGGREGATION_NODE_ID; + } + + @Override protected InstanceReferenceMetric transform(ServiceReferenceMetric serviceReferenceMetric) { + String id = serviceReferenceMetric.getTimeBucket() + Const.ID_SPLIT + serviceReferenceMetric.getFrontInstanceId() + Const.ID_SPLIT + serviceReferenceMetric.getBehindInstanceId(); + + InstanceReferenceMetric instanceReferenceMetric = new InstanceReferenceMetric(id); + instanceReferenceMetric.setFrontInstanceId(serviceReferenceMetric.getFrontInstanceId()); + instanceReferenceMetric.setBehindInstanceId(serviceReferenceMetric.getBehindInstanceId()); + + instanceReferenceMetric.setTransactionCalls(serviceReferenceMetric.getTransactionCalls()); + instanceReferenceMetric.setTransactionErrorCalls(serviceReferenceMetric.getTransactionErrorCalls()); + instanceReferenceMetric.setTransactionDurationSum(serviceReferenceMetric.getTransactionDurationSum()); + instanceReferenceMetric.setTransactionErrorDurationSum(serviceReferenceMetric.getTransactionErrorDurationSum()); + + instanceReferenceMetric.setBusinessTransactionCalls(serviceReferenceMetric.getBusinessTransactionCalls()); + instanceReferenceMetric.setBusinessTransactionErrorCalls(instanceReferenceMetric.getBusinessTransactionErrorCalls()); + instanceReferenceMetric.setBusinessTransactionDurationSum(instanceReferenceMetric.getBusinessTransactionDurationSum()); + instanceReferenceMetric.setBusinessTransactionErrorDurationSum(instanceReferenceMetric.getBusinessTransactionErrorDurationSum()); + + instanceReferenceMetric.setMqTransactionCalls(instanceReferenceMetric.getMqTransactionCalls()); + instanceReferenceMetric.setMqTransactionErrorCalls(instanceReferenceMetric.getMqTransactionErrorCalls()); + instanceReferenceMetric.setMqTransactionDurationSum(instanceReferenceMetric.getMqTransactionDurationSum()); + instanceReferenceMetric.setMqTransactionErrorDurationSum(instanceReferenceMetric.getMqTransactionErrorDurationSum()); + + instanceReferenceMetric.setSourceValue(serviceReferenceMetric.getSourceValue()); + return instanceReferenceMetric; + } + + public static class Factory extends AbstractLocalAsyncWorkerProvider { + + public Factory(ModuleManager moduleManager, QueueCreatorService queueCreatorService) { + super(moduleManager, queueCreatorService); + } + + @Override public InstanceReferenceMetricAggregationWorker workerInstance(ModuleManager moduleManager) { + return new InstanceReferenceMetricAggregationWorker(moduleManager); + } + + @Override + public int queueSize() { + return 1024; + } + } +} diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/instance/InstanceReferenceMetricRemoteWorker.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/instance/InstanceReferenceMetricRemoteWorker.java new file mode 100644 index 0000000000000000000000000000000000000000..5afe43b499e41c0c4050320f81a95d0dbf3ac28a --- /dev/null +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/instance/InstanceReferenceMetricRemoteWorker.java @@ -0,0 +1,61 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.worker.trace.instance; + +import org.skywalking.apm.collector.agent.stream.service.graph.InstanceGraphNodeIdDefine; +import org.skywalking.apm.collector.core.module.ModuleManager; +import org.skywalking.apm.collector.remote.service.RemoteSenderService; +import org.skywalking.apm.collector.remote.service.Selector; +import org.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; +import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker; +import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorkerProvider; +import org.skywalking.apm.collector.stream.worker.base.WorkerException; + +/** + * @author peng-yongsheng + */ +public class InstanceReferenceMetricRemoteWorker extends AbstractRemoteWorker { + + public InstanceReferenceMetricRemoteWorker(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public int id() { + return InstanceGraphNodeIdDefine.INSTANCE_REFERENCE_METRIC_REMOTE_NODE_ID; + } + + @Override public Selector selector() { + return Selector.HashCode; + } + + @Override protected void onWork(InstanceReferenceMetric instanceReferenceMetric) throws WorkerException { + onNext(instanceReferenceMetric); + } + + public static class Factory extends AbstractRemoteWorkerProvider { + + public Factory(ModuleManager moduleManager, RemoteSenderService remoteSenderService, int graphId) { + super(moduleManager, remoteSenderService, graphId); + } + + @Override public InstanceReferenceMetricRemoteWorker workerInstance(ModuleManager moduleManager) { + return new InstanceReferenceMetricRemoteWorker(moduleManager); + } + } +} diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/instance/InstanceReferencePersistenceWorker.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/instance/InstanceReferencePersistenceWorker.java new file mode 100644 index 0000000000000000000000000000000000000000..b33c6a70ee8cc12b712b51c338c744bed841fb5b --- /dev/null +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/instance/InstanceReferencePersistenceWorker.java @@ -0,0 +1,67 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.worker.trace.instance; + +import org.skywalking.apm.collector.agent.stream.service.graph.InstanceGraphNodeIdDefine; +import org.skywalking.apm.collector.core.module.ModuleManager; +import org.skywalking.apm.collector.queue.service.QueueCreatorService; +import org.skywalking.apm.collector.storage.StorageModule; +import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; +import org.skywalking.apm.collector.storage.dao.IInstanceReferenceMetricPersistenceDAO; +import org.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; +import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider; +import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker; + +/** + * @author peng-yongsheng + */ +public class InstanceReferencePersistenceWorker extends PersistenceWorker { + + public InstanceReferencePersistenceWorker(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public int id() { + return InstanceGraphNodeIdDefine.INSTANCE_REFERENCE_METRIC_PERSISTENCE_NODE_ID; + } + + @Override protected IPersistenceDAO persistenceDAO() { + return getModuleManager().find(StorageModule.NAME).getService(IInstanceReferenceMetricPersistenceDAO.class); + } + + @Override protected boolean needMergeDBData() { + return true; + } + + public static class Factory extends AbstractLocalAsyncWorkerProvider { + + public Factory(ModuleManager moduleManager, QueueCreatorService queueCreatorService) { + super(moduleManager, queueCreatorService); + } + + @Override public InstanceReferencePersistenceWorker workerInstance(ModuleManager moduleManager) { + return new InstanceReferencePersistenceWorker(moduleManager); + } + + @Override + public int queueSize() { + return 1024; + } + } +} diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceEntrySpanListener.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceEntrySpanListener.java index 1540fcf89fdd2678eae9ef8158aa2ce502bd14b5..1682adf34b00f5ae79822088169dd3c3818d78e7 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceEntrySpanListener.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceEntrySpanListener.java @@ -21,8 +21,6 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.service; import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; -import org.skywalking.apm.collector.agent.stream.parser.RefsListener; -import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.service.ServiceNameCacheService; @@ -38,7 +36,7 @@ import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ -public class ServiceEntrySpanListener implements RefsListener, FirstSpanListener, EntrySpanListener { +public class ServiceEntrySpanListener implements FirstSpanListener, EntrySpanListener { private final Logger logger = LoggerFactory.getLogger(ServiceEntrySpanListener.class); @@ -61,11 +59,9 @@ public class ServiceEntrySpanListener implements RefsListener, FirstSpanListener this.entryServiceId = spanDecorator.getOperationNameId(); this.entryServiceName = serviceNameCacheService.getSplitServiceName(serviceNameCacheService.get(entryServiceId)); this.hasEntry = true; - } - - @Override public void parseRef(ReferenceDecorator referenceDecorator, int applicationId, int instanceId, - String segmentId) { - hasReference = true; + if (spanDecorator.getRefsCount() > 0) { + this.hasReference = true; + } } @Override diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceReferenceMetricAggregationWorker.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceReferenceMetricAggregationWorker.java index 82f0d4728330cb186759f39a725cacbd09505d62..56dbf87fc932d86dbf76c85a7a917a7f1631481a 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceReferenceMetricAggregationWorker.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceReferenceMetricAggregationWorker.java @@ -18,6 +18,7 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.service; +import org.skywalking.apm.collector.agent.stream.service.graph.ServiceGraphNodeIdDefine; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.queue.service.QueueCreatorService; import org.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric; @@ -34,7 +35,7 @@ public class ServiceReferenceMetricAggregationWorker extends AggregationWorker { diff --git a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceReferenceMetricPersistenceWorker.java b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceReferenceMetricPersistenceWorker.java index fca80b4fed408b08148dda7e67ff28a4fc565517..cd4c1066630f4e690abf9f6abd665a8b43c3ced9 100644 --- a/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceReferenceMetricPersistenceWorker.java +++ b/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceReferenceMetricPersistenceWorker.java @@ -18,6 +18,7 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.service; +import org.skywalking.apm.collector.agent.stream.service.graph.ServiceGraphNodeIdDefine; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.queue.service.QueueCreatorService; import org.skywalking.apm.collector.storage.StorageModule; @@ -37,7 +38,7 @@ public class ServiceReferenceMetricPersistenceWorker extends PersistenceWorker referenceServices = new LinkedList<>(); - private int serviceId = 0; - private long startTime = 0; - private long endTime = 0; - private boolean isError = false; + private List entryReferenceMetric = new LinkedList<>(); + private List exitReferenceMetric = new LinkedList<>(); + private SpanDecorator entrySpanDecorator; private long timeBucket; - private boolean hasEntry = false; @Override public void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId, @@ -55,80 +55,110 @@ public class ServiceReferenceMetricSpanListener implements FirstSpanListener, En timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime()); } - @Override public void parseRef(ReferenceDecorator referenceDecorator, int applicationId, int applicationInstanceId, - String segmentId) { - referenceServices.add(referenceDecorator); - } - @Override public void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) { - serviceId = spanDecorator.getOperationNameId(); - startTime = spanDecorator.getStartTime(); - endTime = spanDecorator.getEndTime(); - isError = spanDecorator.getIsError(); - this.hasEntry = true; + if (spanDecorator.getRefsCount() > 0) { + for (int i = 0; i < spanDecorator.getRefsCount(); i++) { + ReferenceDecorator reference = spanDecorator.getRefs(i); + ServiceReferenceMetric serviceReferenceMetric = new ServiceReferenceMetric(Const.EMPTY_STRING); + serviceReferenceMetric.setEntryServiceId(reference.getEntryServiceId()); + serviceReferenceMetric.setEntryInstanceId(reference.getEntryApplicationInstanceId()); + serviceReferenceMetric.setFrontServiceId(reference.getParentServiceId()); + serviceReferenceMetric.setFrontInstanceId(reference.getParentApplicationInstanceId()); + serviceReferenceMetric.setBehindServiceId(spanDecorator.getOperationNameId()); + serviceReferenceMetric.setBehindInstanceId(instanceId); + serviceReferenceMetric.setSourceValue(MetricSource.Entry.ordinal()); + calculateCost(serviceReferenceMetric, spanDecorator, true); + entryReferenceMetric.add(serviceReferenceMetric); + } + } else { + ServiceReferenceMetric serviceReferenceMetric = new ServiceReferenceMetric(Const.EMPTY_STRING); + serviceReferenceMetric.setEntryServiceId(spanDecorator.getOperationNameId()); + serviceReferenceMetric.setEntryInstanceId(instanceId); + serviceReferenceMetric.setFrontServiceId(Const.NONE_SERVICE_ID); + serviceReferenceMetric.setFrontInstanceId(instanceId); + serviceReferenceMetric.setBehindServiceId(spanDecorator.getOperationNameId()); + serviceReferenceMetric.setBehindServiceId(instanceId); + serviceReferenceMetric.setSourceValue(MetricSource.Entry.ordinal()); + + calculateCost(serviceReferenceMetric, spanDecorator, false); + entryReferenceMetric.add(serviceReferenceMetric); + } + this.entrySpanDecorator = spanDecorator; } - private void calculateCost(ServiceReferenceMetric serviceReferenceMetric, long startTime, - long endTime, boolean isError) { - long duration = endTime - startTime; + @Override public void parseExit(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) { + ServiceReferenceMetric serviceReferenceMetric = new ServiceReferenceMetric(Const.EMPTY_STRING); - serviceReferenceMetric.setTransactionCalls(1L); - serviceReferenceMetric.setTransactionDurationSum(duration); + serviceReferenceMetric.setFrontInstanceId(instanceId); + serviceReferenceMetric.setBehindServiceId(spanDecorator.getOperationNameId()); + serviceReferenceMetric.setSourceValue(MetricSource.Exit.ordinal()); + calculateCost(serviceReferenceMetric, spanDecorator, true); + exitReferenceMetric.add(serviceReferenceMetric); + } + + private void calculateCost(ServiceReferenceMetric serviceReferenceMetric, SpanDecorator spanDecorator, + boolean hasReference) { + long duration = spanDecorator.getStartTime() - spanDecorator.getEndTime(); - if (isError) { + if (spanDecorator.getIsError()) { serviceReferenceMetric.setTransactionErrorCalls(1L); serviceReferenceMetric.setTransactionErrorDurationSum(duration); + } else { + serviceReferenceMetric.setTransactionCalls(1L); + serviceReferenceMetric.setTransactionDurationSum(duration); } - } - @Override public void build() { - logger.debug("service reference listener build"); - if (hasEntry) { - if (referenceServices.size() > 0) { - referenceServices.forEach(reference -> { - ServiceReferenceMetric serviceReferenceMetric = new ServiceReferenceMetric(Const.EMPTY_STRING); - int entryServiceId = reference.getEntryServiceId(); - int frontServiceId = reference.getParentServiceId(); - int behindServiceId = serviceId; - calculateCost(serviceReferenceMetric, startTime, endTime, isError); - - logger.debug("has reference, entryServiceId: {}", entryServiceId); - sendToAggregationWorker(serviceReferenceMetric, entryServiceId, frontServiceId, behindServiceId); - }); + if (hasReference) { + if (spanDecorator.getIsError()) { + serviceReferenceMetric.setBusinessTransactionErrorCalls(1L); + serviceReferenceMetric.setBusinessTransactionErrorDurationSum(duration); } else { - ServiceReferenceMetric serviceReferenceMetric = new ServiceReferenceMetric(Const.EMPTY_STRING); - int entryServiceId = serviceId; - int frontServiceId = Const.NONE_SERVICE_ID; - int behindServiceId = serviceId; + serviceReferenceMetric.setBusinessTransactionCalls(1L); + serviceReferenceMetric.setBusinessTransactionDurationSum(duration); + } + } - calculateCost(serviceReferenceMetric, startTime, endTime, isError); - sendToAggregationWorker(serviceReferenceMetric, entryServiceId, frontServiceId, behindServiceId); + if (SpanLayer.MQ.equals(spanDecorator.getSpanLayer())) { + if (spanDecorator.getIsError()) { + serviceReferenceMetric.setMqTransactionErrorCalls(1L); + serviceReferenceMetric.setMqTransactionErrorDurationSum(duration); + } else { + serviceReferenceMetric.setMqTransactionCalls(1L); + serviceReferenceMetric.setMqTransactionDurationSum(duration); } } } - private void sendToAggregationWorker(ServiceReferenceMetric serviceReferenceMetric, int entryServiceId, - int frontServiceId, - int behindServiceId) { - StringBuilder idBuilder = new StringBuilder(); - idBuilder.append(timeBucket).append(Const.ID_SPLIT); + @Override public void build() { + logger.debug("service reference listener build"); + Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SERVICE_REFERENCE_GRAPH_ID, ServiceReferenceMetric.class); + entryReferenceMetric.forEach(serviceReferenceMetric -> { + String id = timeBucket + Const.ID_SPLIT + serviceReferenceMetric.getEntryServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getFrontServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getBehindServiceId(); - idBuilder.append(entryServiceId).append(Const.ID_SPLIT); - serviceReferenceMetric.setEntryServiceId(entryServiceId); + serviceReferenceMetric.setId(id); + serviceReferenceMetric.setTimeBucket(timeBucket); + logger.debug("push to service reference aggregation worker, id: {}", serviceReferenceMetric.getId()); - idBuilder.append(frontServiceId).append(Const.ID_SPLIT); - serviceReferenceMetric.setFrontServiceId(frontServiceId); + graph.start(serviceReferenceMetric); + }); - idBuilder.append(behindServiceId); - serviceReferenceMetric.setBehindServiceId(behindServiceId); + exitReferenceMetric.forEach(serviceReferenceMetric -> { + serviceReferenceMetric.setEntryInstanceId(Const.NONE_INSTANCE_ID); + if (ObjectUtils.isNotEmpty(entrySpanDecorator)) { + serviceReferenceMetric.setEntryServiceId(entrySpanDecorator.getOperationNameId()); + serviceReferenceMetric.setFrontServiceId(entrySpanDecorator.getOperationNameId()); + } else { + serviceReferenceMetric.setEntryServiceId(Const.NONE_SERVICE_ID); + serviceReferenceMetric.setFrontServiceId(Const.NONE_SERVICE_ID); + } - serviceReferenceMetric.setId(idBuilder.toString()); - serviceReferenceMetric.setTimeBucket(timeBucket); - logger.debug("push to service reference aggregation worker, id: {}", serviceReferenceMetric.getId()); + String id = timeBucket + Const.ID_SPLIT + serviceReferenceMetric.getEntryServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getFrontServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getBehindServiceId(); + serviceReferenceMetric.setId(id); + serviceReferenceMetric.setTimeBucket(timeBucket); - Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SERVICE_REFERENCE_GRAPH_ID, ServiceReferenceMetric.class); - graph.start(serviceReferenceMetric); + graph.start(serviceReferenceMetric); + }); } } diff --git a/apm-collector/apm-collector-alerting/collector-alerting-provider/pom.xml b/apm-collector/apm-collector-alerting/collector-alerting-provider/pom.xml index 91688271efe2cf3ae7ca621a02fdfe0ab97ca701..457f1c2c5139017c6ae1ebb697359bfccade2c68 100644 --- a/apm-collector/apm-collector-alerting/collector-alerting-provider/pom.xml +++ b/apm-collector/apm-collector-alerting/collector-alerting-provider/pom.xml @@ -41,5 +41,15 @@ collector-agent-stream-define ${project.version} + + org.skywalking + collector-storage-define + ${project.version} + + + org.skywalking + apm-collector-stream + ${project.version} + \ No newline at end of file diff --git a/apm-collector/apm-collector-alerting/collector-alerting-provider/src/main/java/org/skywalking/apm/collector/alerting/AlertingModuleProvider.java b/apm-collector/apm-collector-alerting/collector-alerting-provider/src/main/java/org/skywalking/apm/collector/alerting/AlertingModuleProvider.java index b5ab73e8709f7249430282db9aa8790049a5bdd4..4e7b459f1d88ae0f7c1d726f00f812dff00e8811 100644 --- a/apm-collector/apm-collector-alerting/collector-alerting-provider/src/main/java/org/skywalking/apm/collector/alerting/AlertingModuleProvider.java +++ b/apm-collector/apm-collector-alerting/collector-alerting-provider/src/main/java/org/skywalking/apm/collector/alerting/AlertingModuleProvider.java @@ -23,6 +23,7 @@ import org.skywalking.apm.collector.agent.stream.AgentStreamModule; import org.skywalking.apm.collector.core.module.Module; import org.skywalking.apm.collector.core.module.ModuleProvider; import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; +import org.skywalking.apm.collector.storage.StorageModule; /** * @author peng-yongsheng @@ -50,6 +51,6 @@ public class AlertingModuleProvider extends ModuleProvider { } @Override public String[] requiredModules() { - return new String[] {AgentStreamModule.NAME}; + return new String[] {AgentStreamModule.NAME, StorageModule.NAME}; } } diff --git a/apm-collector/apm-collector-alerting/collector-alerting-provider/src/main/java/org/skywalking/apm/collector/alerting/worker/AlertingListAggregationWorker.java b/apm-collector/apm-collector-alerting/collector-alerting-provider/src/main/java/org/skywalking/apm/collector/alerting/worker/AlertingListAggregationWorker.java new file mode 100644 index 0000000000000000000000000000000000000000..2aae06828d27b8a7e258870e83bc249eda6d96da --- /dev/null +++ b/apm-collector/apm-collector-alerting/collector-alerting-provider/src/main/java/org/skywalking/apm/collector/alerting/worker/AlertingListAggregationWorker.java @@ -0,0 +1,42 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.alerting.worker; + +import org.skywalking.apm.collector.core.module.ModuleManager; +import org.skywalking.apm.collector.storage.table.alerting.AlertingList; +import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker; +import org.skywalking.apm.collector.stream.worker.base.WorkerException; + +/** + * @author peng-yongsheng + */ +public class AlertingListAggregationWorker extends AbstractLocalAsyncWorker { + + public AlertingListAggregationWorker(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public int id() { + return 0; + } + + @Override protected void onWork(AlertingList message) throws WorkerException { + + } +} diff --git a/apm-collector/apm-collector-alerting/collector-alerting-provider/src/main/java/org/skywalking/apm/collector/alerting/worker/AlertingListPersistenceWorker.java b/apm-collector/apm-collector-alerting/collector-alerting-provider/src/main/java/org/skywalking/apm/collector/alerting/worker/AlertingListPersistenceWorker.java new file mode 100644 index 0000000000000000000000000000000000000000..a754f8e4fa5a44758e49fbb213583f8cb846b956 --- /dev/null +++ b/apm-collector/apm-collector-alerting/collector-alerting-provider/src/main/java/org/skywalking/apm/collector/alerting/worker/AlertingListPersistenceWorker.java @@ -0,0 +1,66 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.alerting.worker; + +import org.skywalking.apm.collector.core.module.ModuleManager; +import org.skywalking.apm.collector.queue.service.QueueCreatorService; +import org.skywalking.apm.collector.storage.StorageModule; +import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; +import org.skywalking.apm.collector.storage.dao.IAlertingListPersistenceDAO; +import org.skywalking.apm.collector.storage.table.alerting.AlertingList; +import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider; +import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker; + +/** + * @author peng-yongsheng + */ +public class AlertingListPersistenceWorker extends PersistenceWorker { + + public AlertingListPersistenceWorker(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public int id() { + return 0; + } + + @Override protected boolean needMergeDBData() { + return true; + } + + @Override protected IPersistenceDAO persistenceDAO() { + return getModuleManager().find(StorageModule.NAME).getService(IAlertingListPersistenceDAO.class); + } + + public static class Factory extends AbstractLocalAsyncWorkerProvider { + + public Factory(ModuleManager moduleManager, QueueCreatorService queueCreatorService) { + super(moduleManager, queueCreatorService); + } + + @Override public AlertingListPersistenceWorker workerInstance(ModuleManager moduleManager) { + return new AlertingListPersistenceWorker(moduleManager); + } + + @Override + public int queueSize() { + return 1024; + } + } +} diff --git a/apm-collector/apm-collector-boot/src/main/resources/application.yml b/apm-collector/apm-collector-boot/src/main/resources/application.yml index 2ba018472365e5cca351aad672515bdc34ac1d27..1954ebed0776c61a20cd65f54022a74f1ee35809 100644 --- a/apm-collector/apm-collector-boot/src/main/resources/application.yml +++ b/apm-collector/apm-collector-boot/src/main/resources/application.yml @@ -30,11 +30,11 @@ ui: host: localhost port: 12800 context_path: / -#storage: -# elasticsearch: -# cluster_name: CollectorDBCluster -# cluster_transport_sniffer: true -# cluster_nodes: localhost:9300 -# index_shards_number: 2 -# index_replicas_number: 0 -# ttl: 7 \ No newline at end of file +storage: + elasticsearch: + cluster_name: CollectorDBCluster + cluster_transport_sniffer: true + cluster_nodes: localhost:9300 + index_shards_number: 2 + index_replicas_number: 0 + ttl: 7 \ No newline at end of file diff --git a/apm-collector/apm-collector-boot/src/main/resources/log4j2.xml b/apm-collector/apm-collector-boot/src/main/resources/log4j2.xml index e6915ec9a54802bd25a28e337441817359cde80a..8e78714f0136d3f57b443020b21e03a8db3e826f 100644 --- a/apm-collector/apm-collector-boot/src/main/resources/log4j2.xml +++ b/apm-collector/apm-collector-boot/src/main/resources/log4j2.xml @@ -17,7 +17,7 @@ ~ Project repository: https://github.com/OpenSkywalking/skywalking --> - + @@ -26,10 +26,11 @@ + - + diff --git a/apm-collector/apm-collector-component/client-component/src/main/java/org/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java b/apm-collector/apm-collector-component/client-component/src/main/java/org/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java index f4843c7cd0531c12adb002f97491ba513d9b5e21..a50bf740c779158a4892f1e60777301ab5bcceaf 100644 --- a/apm-collector/apm-collector-component/client-component/src/main/java/org/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java +++ b/apm-collector/apm-collector-component/client-component/src/main/java/org/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java @@ -92,8 +92,7 @@ public class ElasticSearchClient implements Client { List pairsList = new LinkedList<>(); logger.info("elasticsearch cluster nodes: {}", nodes); String[] nodesSplit = nodes.split(","); - for (int i = 0; i < nodesSplit.length; i++) { - String node = nodesSplit[i]; + for (String node : nodesSplit) { String host = node.split(":")[0]; String port = node.split(":")[1]; pairsList.add(new AddressPairs(host, Integer.valueOf(port))); @@ -106,7 +105,7 @@ public class ElasticSearchClient implements Client { private String host; private Integer port; - public AddressPairs(String host, Integer port) { + AddressPairs(String host, Integer port) { this.host = host; this.port = port; } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/Data.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/Data.java index cb0eb02df0458de89b67e46e7082e882edaf15cf..94cd3f67af612810c68750314c91044b1ea0228e 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/Data.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/Data.java @@ -106,15 +106,33 @@ public abstract class Data extends EndOfBatchQueueMessage { } public Long getDataLong(int position) { - return dataLongs[position]; + if (position + 1 > dataLongs.length) { + throw new IndexOutOfBoundsException(); + } else if (dataLongs[position] == null) { + return 0L; + } else { + return dataLongs[position]; + } } public Double getDataDouble(int position) { - return dataDoubles[position]; + if (position + 1 > dataDoubles.length) { + throw new IndexOutOfBoundsException(); + } else if (dataDoubles[position] == null) { + return 0D; + } else { + return dataDoubles[position]; + } } public Integer getDataInteger(int position) { - return dataIntegers[position]; + if (position + 1 > dataIntegers.length) { + throw new IndexOutOfBoundsException(); + } else if (dataIntegers[position] == null) { + return 0; + } else { + return dataIntegers[position]; + } } public Boolean getDataBoolean(int position) { @@ -136,27 +154,27 @@ public abstract class Data extends EndOfBatchQueueMessage { public void mergeData(Data newData) { for (int i = 0; i < stringColumns.length; i++) { - String stringData = stringColumns[i].getOperation().operate(newData.getDataString(i), this.dataStrings[i]); + String stringData = stringColumns[i].getOperation().operate(newData.getDataString(i), this.getDataString(i)); this.dataStrings[i] = stringData; } for (int i = 0; i < longColumns.length; i++) { - Long longData = longColumns[i].getOperation().operate(newData.getDataLong(i), this.dataLongs[i]); + Long longData = longColumns[i].getOperation().operate(newData.getDataLong(i), this.getDataLong(i)); this.dataLongs[i] = longData; } for (int i = 0; i < doubleColumns.length; i++) { - Double doubleData = doubleColumns[i].getOperation().operate(newData.getDataDouble(i), this.dataDoubles[i]); + Double doubleData = doubleColumns[i].getOperation().operate(newData.getDataDouble(i), this.getDataDouble(i)); this.dataDoubles[i] = doubleData; } for (int i = 0; i < integerColumns.length; i++) { - Integer integerData = integerColumns[i].getOperation().operate(newData.getDataInteger(i), this.dataIntegers[i]); + Integer integerData = integerColumns[i].getOperation().operate(newData.getDataInteger(i), this.getDataInteger(i)); this.dataIntegers[i] = integerData; } for (int i = 0; i < booleanColumns.length; i++) { - Boolean booleanData = booleanColumns[i].getOperation().operate(newData.getDataBoolean(i), this.dataBooleans[i]); + Boolean booleanData = booleanColumns[i].getOperation().operate(newData.getDataBoolean(i), this.getDataBoolean(i)); this.dataBooleans[i] = booleanData; } for (int i = 0; i < byteColumns.length; i++) { - byte[] byteData = byteColumns[i].getOperation().operate(newData.getDataBytes(i), this.dataBytes[i]); + byte[] byteData = byteColumns[i].getOperation().operate(newData.getDataBytes(i), this.getDataBytes(i)); this.dataBytes[i] = byteData; } } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/Const.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/Const.java index f6a4a902218234666082263dfa546a5560876a20..adf544d391aba37755516b80858f96faf8710079 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/Const.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/Const.java @@ -25,6 +25,7 @@ public class Const { public static final String ID_SPLIT = "_"; public static final int USER_ID = 1; public static final int NONE_SERVICE_ID = 1; + public static final int NONE_INSTANCE_ID = 1; public static final String NONE_SERVICE_NAME = "None"; public static final String USER_CODE = "User"; public static final String SEGMENT_SPAN_SPLIT = "S"; diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/dao/IInstanceReferenceMetricPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/dao/IInstanceReferenceMetricPersistenceDAO.java new file mode 100644 index 0000000000000000000000000000000000000000..44e4cccd8cfc6faa97f9b83512829845f1f4edc5 --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/dao/IInstanceReferenceMetricPersistenceDAO.java @@ -0,0 +1,28 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.storage.dao; + +import org.skywalking.apm.collector.core.data.Data; +import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; + +/** + * @author peng-yongsheng + */ +public interface IInstanceReferenceMetricPersistenceDAO extends IPersistenceDAO { +} diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/CommonMetricTable.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/CommonMetricTable.java index ce03cb8da13a078bc9cd68e78edca5044d501109..bc2b4c33620d21a1edf2550cfb981b861ff93170 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/CommonMetricTable.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/CommonMetricTable.java @@ -36,4 +36,5 @@ public abstract class CommonMetricTable extends CommonTable { public static final String COLUMN_MQ_TRANSACTION_ERROR_CALLS = "mq_transaction_error_calls"; public static final String COLUMN_MQ_TRANSACTION_DURATION_SUM = "mq_transaction_duration_sum"; public static final String COLUMN_MQ_TRANSACTION_ERROR_DURATION_SUM = "mq_transaction_error_duration_sum"; + public static final String COLUMN_SOURCE_VALUE = "source_value"; } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java new file mode 100644 index 0000000000000000000000000000000000000000..322f16c944b1eeb7876f59f24cb1cbccf3e80d9f --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java @@ -0,0 +1,195 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.storage.table.instance; + +import org.skywalking.apm.collector.core.data.Column; +import org.skywalking.apm.collector.core.data.Data; +import org.skywalking.apm.collector.core.data.operator.AddOperation; +import org.skywalking.apm.collector.core.data.operator.NonOperation; + +/** + * @author peng-yongsheng + */ +public class InstanceReferenceMetric extends Data { + + private static final Column[] STRING_COLUMNS = { + new Column(InstanceReferenceMetricTable.COLUMN_ID, new NonOperation()), + }; + + private static final Column[] LONG_COLUMNS = { + new Column(InstanceReferenceMetricTable.COLUMN_TIME_BUCKET, new NonOperation()), + + new Column(InstanceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, new AddOperation()), + new Column(InstanceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, new AddOperation()), + new Column(InstanceReferenceMetricTable.COLUMN_TRANSACTION_DURATION_SUM, new AddOperation()), + new Column(InstanceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM, new AddOperation()), + new Column(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_CALLS, new AddOperation()), + new Column(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_CALLS, new AddOperation()), + new Column(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_DURATION_SUM, new AddOperation()), + new Column(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddOperation()), + new Column(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_CALLS, new AddOperation()), + new Column(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_CALLS, new AddOperation()), + new Column(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_DURATION_SUM, new AddOperation()), + new Column(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_DURATION_SUM, new AddOperation()), + }; + + private static final Column[] DOUBLE_COLUMNS = {}; + + private static final Column[] INTEGER_COLUMNS = { + new Column(InstanceReferenceMetricTable.COLUMN_FRONT_INSTANCE_ID, new NonOperation()), + new Column(InstanceReferenceMetricTable.COLUMN_BEHIND_INSTANCE_ID, new NonOperation()), + new Column(InstanceReferenceMetricTable.COLUMN_SOURCE_VALUE, new NonOperation()), + }; + + private static final Column[] BOOLEAN_COLUMNS = {}; + + private static final Column[] BYTE_COLUMNS = {}; + + public InstanceReferenceMetric(String id) { + super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS); + } + + public Integer getFrontInstanceId() { + return getDataInteger(0); + } + + public void setFrontInstanceId(Integer frontInstanceId) { + setDataInteger(0, frontInstanceId); + } + + public Integer getBehindInstanceId() { + return getDataInteger(1); + } + + public void setBehindInstanceId(Integer behindInstanceId) { + setDataInteger(1, behindInstanceId); + } + + public Integer getSourceValue() { + return getDataInteger(2); + } + + public void setSourceValue(Integer sourceValue) { + setDataInteger(2, sourceValue); + } + + public Long getTimeBucket() { + return getDataLong(0); + } + + public void setTimeBucket(Long timeBucket) { + setDataLong(0, timeBucket); + } + + public Long getTransactionCalls() { + return getDataLong(1); + } + + public void setTransactionCalls(Long transactionCalls) { + setDataLong(1, transactionCalls); + } + + public Long getTransactionErrorCalls() { + return getDataLong(2); + } + + public void setTransactionErrorCalls(Long transactionErrorCalls) { + setDataLong(2, transactionErrorCalls); + } + + public Long getTransactionDurationSum() { + return getDataLong(3); + } + + public void setTransactionDurationSum(Long transactionDurationSum) { + setDataLong(3, transactionDurationSum); + } + + public Long getTransactionErrorDurationSum() { + return getDataLong(4); + } + + public void setTransactionErrorDurationSum(Long transactionErrorDurationSum) { + setDataLong(4, transactionErrorDurationSum); + } + + public Long getBusinessTransactionCalls() { + return getDataLong(5); + } + + public void setBusinessTransactionCalls(Long businessTransactionCalls) { + setDataLong(5, businessTransactionCalls); + } + + public Long getBusinessTransactionErrorCalls() { + return getDataLong(6); + } + + public void setBusinessTransactionErrorCalls(Long businessTransactionErrorCalls) { + setDataLong(6, businessTransactionErrorCalls); + } + + public Long getBusinessTransactionDurationSum() { + return getDataLong(7); + } + + public void setBusinessTransactionDurationSum(Long businessTransactionDurationSum) { + setDataLong(7, businessTransactionDurationSum); + } + + public Long getBusinessTransactionErrorDurationSum() { + return getDataLong(8); + } + + public void setBusinessTransactionErrorDurationSum(Long businessTransactionErrorDurationSum) { + setDataLong(8, businessTransactionErrorDurationSum); + } + + public Long getMqTransactionCalls() { + return getDataLong(9); + } + + public void setMqTransactionCalls(Long mqTransactionCalls) { + setDataLong(9, mqTransactionCalls); + } + + public Long getMqTransactionErrorCalls() { + return getDataLong(10); + } + + public void setMqTransactionErrorCalls(Long mqTransactionErrorCalls) { + setDataLong(10, mqTransactionErrorCalls); + } + + public Long getMqTransactionDurationSum() { + return getDataLong(11); + } + + public void setMqTransactionDurationSum(Long mqTransactionDurationSum) { + setDataLong(11, mqTransactionDurationSum); + } + + public Long getMqTransactionErrorDurationSum() { + return getDataLong(12); + } + + public void setMqTransactionErrorDurationSum(Long mqTransactionErrorDurationSum) { + setDataLong(12, mqTransactionErrorDurationSum); + } +} diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetricTable.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetricTable.java new file mode 100644 index 0000000000000000000000000000000000000000..07a1a87d9e99e7ee741c55f549c9a17c548c2246 --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetricTable.java @@ -0,0 +1,30 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.storage.table.instance; + +import org.skywalking.apm.collector.storage.table.CommonMetricTable; + +/** + * @author peng-yongsheng + */ +public class InstanceReferenceMetricTable extends CommonMetricTable { + public static final String TABLE = "instance_reference_metric"; + public static final String COLUMN_FRONT_INSTANCE_ID = "front_instance_id"; + public static final String COLUMN_BEHIND_INSTANCE_ID = "behind_instance_id"; +} diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java index 06fe95ba4f6072b5f3f2c63d559cd1d81840ee82..8b4fed2d18a05884696cbbf8d68526b10c58dfd7 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java @@ -55,6 +55,12 @@ public class ServiceReferenceMetric extends Data { new Column(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, new NonOperation()), new Column(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, new NonOperation()), new Column(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, new NonOperation()), + + new Column(ServiceReferenceMetricTable.COLUMN_ENTRY_INSTANCE_ID, new NonOperation()), + new Column(ServiceReferenceMetricTable.COLUMN_FRONT_INSTANCE_ID, new NonOperation()), + new Column(ServiceReferenceMetricTable.COLUMN_BEHIND_INSTANCE_ID, new NonOperation()), + + new Column(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE, new NonOperation()), }; private static final Column[] BOOLEAN_COLUMNS = {}; @@ -89,6 +95,38 @@ public class ServiceReferenceMetric extends Data { setDataInteger(2, behindServiceId); } + public Integer getEntryInstanceId() { + return getDataInteger(3); + } + + public void setEntryInstanceId(Integer entryInstanceId) { + setDataInteger(3, entryInstanceId); + } + + public Integer getFrontInstanceId() { + return getDataInteger(4); + } + + public void setFrontInstanceId(Integer frontInstanceId) { + setDataInteger(4, frontInstanceId); + } + + public Integer getBehindInstanceId() { + return getDataInteger(5); + } + + public void setBehindInstanceId(Integer behindInstanceId) { + setDataInteger(5, behindInstanceId); + } + + public Integer getSourceValue() { + return getDataInteger(6); + } + + public void setSourceValue(Integer sourceValue) { + setDataInteger(6, sourceValue); + } + public Long getTimeBucket() { return getDataLong(0); } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/service/ServiceReferenceMetricTable.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/service/ServiceReferenceMetricTable.java index 7a9498763ca600046ac63d3dc85824812a359a7d..69942a0b30f3e719c16bdb811d60b875499452ca 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/service/ServiceReferenceMetricTable.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/service/ServiceReferenceMetricTable.java @@ -26,6 +26,9 @@ import org.skywalking.apm.collector.storage.table.CommonMetricTable; public class ServiceReferenceMetricTable extends CommonMetricTable { public static final String TABLE = "service_reference_metric"; public static final String COLUMN_ENTRY_SERVICE_ID = "entry_service_id"; + public static final String COLUMN_ENTRY_INSTANCE_ID = "entry_instance_id"; public static final String COLUMN_FRONT_SERVICE_ID = "front_service_id"; + public static final String COLUMN_FRONT_INSTANCE_ID = "front_instance_id"; public static final String COLUMN_BEHIND_SERVICE_ID = "behind_service_id"; + public static final String COLUMN_BEHIND_INSTANCE_ID = "behind_instance_id"; } diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/dao/ServiceReferenceMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/dao/ServiceReferenceMetricEsPersistenceDAO.java index 23da7f9ac066d98464b2bc4bcb5e5920bc10bea2..98ba5c5d641216118eb379dab019abcd4e0f2eed 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/dao/ServiceReferenceMetricEsPersistenceDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/dao/ServiceReferenceMetricEsPersistenceDAO.java @@ -53,6 +53,7 @@ public class ServiceReferenceMetricEsPersistenceDAO extends EsDAO implements ISe serviceReferenceMetric.setEntryServiceId(((Number)source.get(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID)).intValue()); serviceReferenceMetric.setFrontServiceId(((Number)source.get(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID)).intValue()); serviceReferenceMetric.setBehindServiceId(((Number)source.get(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID)).intValue()); + serviceReferenceMetric.setSourceValue(((Number)source.get(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE)).intValue()); serviceReferenceMetric.setTransactionCalls(((Number)source.get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue()); serviceReferenceMetric.setTransactionErrorCalls(((Number)source.get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue()); @@ -81,6 +82,7 @@ public class ServiceReferenceMetricEsPersistenceDAO extends EsDAO implements ISe source.put(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, data.getEntryServiceId()); source.put(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, data.getFrontServiceId()); source.put(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, data.getBehindServiceId()); + source.put(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE, data.getSourceValue()); source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, data.getTransactionCalls()); source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, data.getTransactionErrorCalls()); @@ -107,6 +109,7 @@ public class ServiceReferenceMetricEsPersistenceDAO extends EsDAO implements ISe source.put(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, data.getEntryServiceId()); source.put(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, data.getFrontServiceId()); source.put(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, data.getBehindServiceId()); + source.put(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE, data.getSourceValue()); source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, data.getTransactionCalls()); source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, data.getTransactionErrorCalls()); diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/define/ServiceReferenceMetricEsTableDefine.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/define/ServiceReferenceMetricEsTableDefine.java index 02a4c64c687905799eb8b39dcd66a63584ad8d1b..68992a9eb82c1f5d83fbe726646ce353079313e9 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/define/ServiceReferenceMetricEsTableDefine.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/define/ServiceReferenceMetricEsTableDefine.java @@ -39,6 +39,7 @@ public class ServiceReferenceMetricEsTableDefine extends ElasticSearchTableDefin addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name())); addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name())); addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name())); + addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE, ElasticSearchColumnDefine.Type.Integer.name())); addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, ElasticSearchColumnDefine.Type.Long.name())); addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, ElasticSearchColumnDefine.Type.Long.name())); diff --git a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/dao/ServiceReferenceMetricH2PersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/dao/ServiceReferenceMetricH2PersistenceDAO.java index de6aa4b91113395f8a9a6a4b94ae2b2770ac3439..cd3861f750fac7109219da9281cadee0211d9c46 100644 --- a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/dao/ServiceReferenceMetricH2PersistenceDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/dao/ServiceReferenceMetricH2PersistenceDAO.java @@ -58,6 +58,7 @@ public class ServiceReferenceMetricH2PersistenceDAO extends H2DAO implements ISe serviceReferenceMetric.setEntryServiceId(rs.getInt(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID)); serviceReferenceMetric.setFrontServiceId(rs.getInt(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID)); serviceReferenceMetric.setBehindServiceId(rs.getInt(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID)); + serviceReferenceMetric.setSourceValue(rs.getInt(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE)); serviceReferenceMetric.setTransactionCalls(rs.getLong(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS)); serviceReferenceMetric.setTransactionErrorCalls(rs.getLong(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)); @@ -91,6 +92,7 @@ public class ServiceReferenceMetricH2PersistenceDAO extends H2DAO implements ISe source.put(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, data.getEntryServiceId()); source.put(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, data.getFrontServiceId()); source.put(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, data.getBehindServiceId()); + source.put(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE, data.getSourceValue()); source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, data.getTransactionCalls()); source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, data.getTransactionErrorCalls()); @@ -122,6 +124,7 @@ public class ServiceReferenceMetricH2PersistenceDAO extends H2DAO implements ISe source.put(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, data.getEntryServiceId()); source.put(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, data.getFrontServiceId()); source.put(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, data.getBehindServiceId()); + source.put(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE, data.getSourceValue()); source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, data.getTransactionCalls()); source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, data.getTransactionErrorCalls()); diff --git a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/define/ServiceReferenceMetricH2TableDefine.java b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/define/ServiceReferenceMetricH2TableDefine.java index c0574eec94cf235504ebda0e913a173141a901c8..06238bc489285a6a0600d516e685a74ecf372c0d 100644 --- a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/define/ServiceReferenceMetricH2TableDefine.java +++ b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/define/ServiceReferenceMetricH2TableDefine.java @@ -36,6 +36,7 @@ public class ServiceReferenceMetricH2TableDefine extends H2TableDefine { addColumn(new H2ColumnDefine(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, H2ColumnDefine.Type.Int.name())); addColumn(new H2ColumnDefine(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, H2ColumnDefine.Type.Int.name())); addColumn(new H2ColumnDefine(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, H2ColumnDefine.Type.Int.name())); + addColumn(new H2ColumnDefine(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE, H2ColumnDefine.Type.Int.name())); addColumn(new H2ColumnDefine(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, H2ColumnDefine.Type.Bigint.name())); addColumn(new H2ColumnDefine(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, H2ColumnDefine.Type.Bigint.name()));