From b744d4028c35ec15bc271e1f5b463d3e9c0f65de Mon Sep 17 00:00:00 2001 From: peng-yongsheng <8082209@qq.com> Date: Wed, 15 Nov 2017 00:12:24 +0800 Subject: [PATCH] Trace segment stream finish. --- .../agent/stream/AgentStreamSingleton.java | 13 +- .../agent/stream/graph/TraceStreamGraph.java | 188 ++++++++++++++++++ .../agent/stream/parser/SegmentParse.java | 8 +- .../SegmentStandardizationWorker.java | 2 +- .../trace/global/GlobalTraceSpanListener.java | 5 + .../instance/InstPerformanceSpanListener.java | 6 + .../trace/node/NodeComponentSpanListener.java | 6 + .../trace/node/NodeMappingSpanListener.java | 6 + .../noderef/NodeReferenceSpanListener.java | 5 + .../segment/SegmentCostSpanListener.java | 5 + .../service/ServiceEntrySpanListener.java | 5 + .../ServiceReferenceSpanListener.java | 6 + 12 files changed, 252 insertions(+), 3 deletions(-) create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/TraceStreamGraph.java diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/AgentStreamSingleton.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/AgentStreamSingleton.java index 45c1f13f94..06a845e9e4 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/AgentStreamSingleton.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/AgentStreamSingleton.java @@ -20,6 +20,7 @@ package org.skywalking.apm.collector.agent.stream; import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph; import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph; +import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.util.ObjectUtils; import org.skywalking.apm.collector.stream.timer.PersistenceTimer; @@ -74,6 +75,16 @@ public class AgentStreamSingleton { } private void createTraceGraph() { - + TraceStreamGraph traceStreamGraph = new TraceStreamGraph(moduleManager, workerCreateListener); + traceStreamGraph.createSegmentStandardizationGraph(); + traceStreamGraph.createGlobalTraceGraph(); + traceStreamGraph.createInstPerformanceGraph(); + traceStreamGraph.createNodeComponentGraph(); + traceStreamGraph.createNodeMappingGraph(); + traceStreamGraph.createNodeReferenceGraph(); + traceStreamGraph.createServiceEntryGraph(); + traceStreamGraph.createServiceReferenceGraph(); + traceStreamGraph.createSegmentGraph(); + traceStreamGraph.createSegmentCostGraph(); } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/TraceStreamGraph.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/TraceStreamGraph.java new file mode 100644 index 0000000000..06d18f8531 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/TraceStreamGraph.java @@ -0,0 +1,188 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.graph; + +import org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentStandardizationWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.global.GlobalTracePersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstPerformancePersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeComponentAggregationWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeComponentPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeComponentRemoteWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeMappingAggregationWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeMappingPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeMappingRemoteWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeReferenceAggregationWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeReferencePersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeReferenceRemoteWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentCostPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryAggregationWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryRemoteWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.serviceref.ServiceReferenceAggregationWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.serviceref.ServiceReferencePersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.serviceref.ServiceReferenceRemoteWorker; +import org.skywalking.apm.collector.core.graph.Graph; +import org.skywalking.apm.collector.core.graph.GraphManager; +import org.skywalking.apm.collector.core.module.ModuleManager; +import org.skywalking.apm.collector.queue.QueueModule; +import org.skywalking.apm.collector.queue.service.QueueCreatorService; +import org.skywalking.apm.collector.remote.RemoteModule; +import org.skywalking.apm.collector.remote.service.RemoteSenderService; +import org.skywalking.apm.collector.storage.table.global.GlobalTrace; +import org.skywalking.apm.collector.storage.table.instance.InstPerformance; +import org.skywalking.apm.collector.storage.table.node.NodeComponent; +import org.skywalking.apm.collector.storage.table.node.NodeMapping; +import org.skywalking.apm.collector.storage.table.noderef.NodeReference; +import org.skywalking.apm.collector.storage.table.segment.Segment; +import org.skywalking.apm.collector.storage.table.segment.SegmentCost; +import org.skywalking.apm.collector.storage.table.service.ServiceEntry; +import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference; +import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener; +import org.skywalking.apm.network.proto.UpstreamSegment; + +/** + * @author peng-yongsheng + */ +public class TraceStreamGraph { + + public static final int GLOBAL_TRACE_GRAPH_ID = 300; + public static final int INST_PERFORMANCE_GRAPH_ID = 301; + public static final int NODE_COMPONENT_GRAPH_ID = 302; + public static final int NODE_MAPPING_GRAPH_ID = 303; + public static final int NODE_REFERENCE_GRAPH_ID = 304; + public static final int SERVICE_ENTRY_GRAPH_ID = 305; + public static final int SERVICE_REFERENCE_GRAPH_ID = 306; + public static final int SEGMENT_GRAPH_ID = 307; + public static final int SEGMENT_COST_GRAPH_ID = 308; + public static final int SEGMENT_STANDARDIZATION_GRAPH_ID = 309; + + private final ModuleManager moduleManager; + private final WorkerCreateListener workerCreateListener; + + public TraceStreamGraph(ModuleManager moduleManager, WorkerCreateListener workerCreateListener) { + this.moduleManager = moduleManager; + this.workerCreateListener = workerCreateListener; + } + + @SuppressWarnings("unchecked") + public Graph createSegmentStandardizationGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(SEGMENT_STANDARDIZATION_GRAPH_ID, UpstreamSegment.class); + graph.addNode(new SegmentStandardizationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); + return graph; + } + + @SuppressWarnings("unchecked") + public Graph createGlobalTraceGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(GLOBAL_TRACE_GRAPH_ID, GlobalTrace.class); + graph.addNode(new GlobalTracePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); + return graph; + } + + @SuppressWarnings("unchecked") + public Graph createInstPerformanceGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(INST_PERFORMANCE_GRAPH_ID, InstPerformance.class); + graph.addNode(new InstPerformancePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); + return graph; + } + + @SuppressWarnings("unchecked") + public Graph createNodeComponentGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(NODE_COMPONENT_GRAPH_ID, NodeComponent.class); + graph.addNode(new NodeComponentAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)) + .addNext(new NodeComponentRemoteWorker.Factory(moduleManager, remoteSenderService, NODE_COMPONENT_GRAPH_ID).create(workerCreateListener)) + .addNext(new NodeComponentPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); + return graph; + } + + @SuppressWarnings("unchecked") + public Graph createNodeMappingGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(NODE_MAPPING_GRAPH_ID, NodeMapping.class); + graph.addNode(new NodeMappingAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)) + .addNext(new NodeMappingRemoteWorker.Factory(moduleManager, remoteSenderService, NODE_MAPPING_GRAPH_ID).create(workerCreateListener)) + .addNext(new NodeMappingPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); + return graph; + } + + @SuppressWarnings("unchecked") + public Graph createNodeReferenceGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(NODE_REFERENCE_GRAPH_ID, NodeReference.class); + graph.addNode(new NodeReferenceAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)) + .addNext(new NodeReferenceRemoteWorker.Factory(moduleManager, remoteSenderService, NODE_REFERENCE_GRAPH_ID).create(workerCreateListener)) + .addNext(new NodeReferencePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); + return graph; + } + + @SuppressWarnings("unchecked") + public Graph createServiceEntryGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(SERVICE_ENTRY_GRAPH_ID, ServiceEntry.class); + graph.addNode(new ServiceEntryAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)) + .addNext(new ServiceEntryRemoteWorker.Factory(moduleManager, remoteSenderService, SERVICE_ENTRY_GRAPH_ID).create(workerCreateListener)) + .addNext(new ServiceEntryPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); + return graph; + } + + @SuppressWarnings("unchecked") + public Graph createServiceReferenceGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(SERVICE_REFERENCE_GRAPH_ID, ServiceReference.class); + graph.addNode(new ServiceReferenceAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)) + .addNext(new ServiceReferenceRemoteWorker.Factory(moduleManager, remoteSenderService, SERVICE_REFERENCE_GRAPH_ID).create(workerCreateListener)) + .addNext(new ServiceReferencePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); + return graph; + } + + @SuppressWarnings("unchecked") + public Graph createSegmentGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(SEGMENT_GRAPH_ID, Segment.class); + graph.addNode(new SegmentPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); + return graph; + } + + @SuppressWarnings("unchecked") + public Graph createSegmentCostGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(SEGMENT_COST_GRAPH_ID, SegmentCost.class); + graph.addNode(new SegmentCostPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); + return graph; + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SegmentParse.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SegmentParse.java index baca08a09f..982850f388 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SegmentParse.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SegmentParse.java @@ -21,6 +21,7 @@ package org.skywalking.apm.collector.agent.stream.parser; import com.google.protobuf.InvalidProtocolBufferException; import java.util.ArrayList; import java.util.List; +import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceIdExchanger; import org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentDecorator; @@ -34,6 +35,8 @@ import org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeRefere import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentCostSpanListener; import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntrySpanListener; import org.skywalking.apm.collector.agent.stream.worker.trace.serviceref.ServiceReferenceSpanListener; +import org.skywalking.apm.collector.core.graph.Graph; +import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.storage.table.segment.Segment; import org.skywalking.apm.network.proto.SpanType; @@ -150,11 +153,14 @@ public class SegmentParse { private void buildSegment(String id, byte[] dataBinary) { Segment segment = new Segment(id); segment.setDataBinary(dataBinary); + Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SEGMENT_GRAPH_ID, Segment.class); + graph.start(segment); } private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) { logger.debug("send to segment buffer write worker, id: {}", id); -// context.getClusterWorkerContext().lookup(SegmentStandardizationWorker.WorkerRole.INSTANCE).tell(upstreamSegment); + Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SEGMENT_STANDARDIZATION_GRAPH_ID, UpstreamSegment.class); + graph.start(upstreamSegment); } private void notifyListenerToBuild() { diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java index ea32a2c44d..441b368053 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java @@ -41,7 +41,7 @@ public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.GLOBAL_TRACE_GRAPH_ID, GlobalTrace.class); for (String globalTraceId : globalTraceIds) { GlobalTrace globalTrace = new GlobalTrace(segmentId + Const.ID_SPLIT + globalTraceId); globalTrace.setGlobalTraceId(globalTraceId); globalTrace.setSegmentId(segmentId); globalTrace.setTimeBucket(timeBucket); + graph.start(globalTrace); } } } \ No newline at end of file diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/instance/InstPerformanceSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/instance/InstPerformanceSpanListener.java index 3ed7d16627..653b829f4d 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/instance/InstPerformanceSpanListener.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/instance/InstPerformanceSpanListener.java @@ -18,9 +18,12 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.instance; +import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; +import org.skywalking.apm.collector.core.graph.Graph; +import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.TimeBucketUtils; import org.skywalking.apm.collector.storage.table.instance.InstPerformance; @@ -60,5 +63,8 @@ public class InstPerformanceSpanListener implements EntrySpanListener, FirstSpan instPerformance.setCalls(1); instPerformance.setCostTotal(cost); instPerformance.setTimeBucket(timeBucket); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.INST_PERFORMANCE_GRAPH_ID, InstPerformance.class); + graph.start(instPerformance); } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeComponentSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeComponentSpanListener.java index 6dbdd373e4..a8a97c40ff 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeComponentSpanListener.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeComponentSpanListener.java @@ -20,10 +20,13 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.node; import java.util.ArrayList; import java.util.List; +import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.ExitSpanListener; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; +import org.skywalking.apm.collector.core.graph.Graph; +import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.TimeBucketUtils; import org.skywalking.apm.collector.storage.table.node.NodeComponent; @@ -89,9 +92,12 @@ public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanLis } @Override public void build() { + Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.NODE_COMPONENT_GRAPH_ID, NodeComponent.class); + nodeComponents.forEach(nodeComponent -> { nodeComponent.setId(timeBucket + Const.ID_SPLIT + nodeComponent.getId()); nodeComponent.setTimeBucket(timeBucket); + graph.start(nodeComponent); }); } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeMappingSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeMappingSpanListener.java index 0bf66c26c4..d1ac5bfaf1 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeMappingSpanListener.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeMappingSpanListener.java @@ -20,10 +20,13 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.node; import java.util.ArrayList; import java.util.List; +import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; import org.skywalking.apm.collector.agent.stream.parser.RefsListener; import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; +import org.skywalking.apm.collector.core.graph.Graph; +import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.TimeBucketUtils; import org.skywalking.apm.collector.storage.table.node.NodeMapping; @@ -59,10 +62,13 @@ public class NodeMappingSpanListener implements RefsListener, FirstSpanListener @Override public void build() { logger.debug("node mapping listener build"); + Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.NODE_MAPPING_GRAPH_ID, NodeMapping.class); + for (NodeMapping nodeMapping : nodeMappings) { nodeMapping.setId(timeBucket + Const.ID_SPLIT + nodeMapping.getId()); nodeMapping.setTimeBucket(timeBucket); logger.debug("send to node mapping aggregation worker, id: {}", nodeMapping.getId()); + graph.start(nodeMapping); } } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/noderef/NodeReferenceSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/noderef/NodeReferenceSpanListener.java index 88a8d1395c..4ea030d438 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/noderef/NodeReferenceSpanListener.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/noderef/NodeReferenceSpanListener.java @@ -20,6 +20,7 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.noderef; import java.util.LinkedList; import java.util.List; +import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.ExitSpanListener; import org.skywalking.apm.collector.agent.stream.parser.RefsListener; @@ -27,6 +28,8 @@ import org.skywalking.apm.collector.agent.stream.parser.standardization.Referenc import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.service.InstanceCacheService; +import org.skywalking.apm.collector.core.graph.Graph; +import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.util.CollectionUtils; import org.skywalking.apm.collector.core.util.Const; @@ -105,7 +108,9 @@ public class NodeReferenceSpanListener implements EntrySpanListener, ExitSpanLis @Override public void build() { logger.debug("node reference summary listener build"); + Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.NODE_REFERENCE_GRAPH_ID, NodeReference.class); for (NodeReference nodeReference : nodeReferences) { + graph.start(nodeReference); } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/segment/SegmentCostSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/segment/SegmentCostSpanListener.java index bbd59775c6..edd68f6522 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/segment/SegmentCostSpanListener.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/segment/SegmentCostSpanListener.java @@ -20,6 +20,7 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.segment; import java.util.ArrayList; import java.util.List; +import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.ExitSpanListener; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; @@ -27,6 +28,8 @@ import org.skywalking.apm.collector.agent.stream.parser.LocalSpanListener; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.service.ServiceNameCacheService; +import org.skywalking.apm.collector.core.graph.Graph; +import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.TimeBucketUtils; @@ -91,10 +94,12 @@ public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListe } @Override public void build() { + Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SEGMENT_COST_GRAPH_ID, SegmentCost.class); logger.debug("segment cost listener build"); for (SegmentCost segmentCost : segmentCosts) { segmentCost.setIsError(isError); segmentCost.setTimeBucket(timeBucket); + graph.start(segmentCost); } } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceEntrySpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceEntrySpanListener.java index 3075cb2cc4..0312bbf6de 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceEntrySpanListener.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceEntrySpanListener.java @@ -18,6 +18,7 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.service; +import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; import org.skywalking.apm.collector.agent.stream.parser.RefsListener; @@ -25,6 +26,8 @@ import org.skywalking.apm.collector.agent.stream.parser.standardization.Referenc import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.service.ServiceNameCacheService; +import org.skywalking.apm.collector.core.graph.Graph; +import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.TimeBucketUtils; @@ -82,6 +85,8 @@ public class ServiceEntrySpanListener implements RefsListener, FirstSpanListener serviceEntry.setNewestTime(timeBucket); logger.debug("send to service entry aggregation worker, id: {}", serviceEntry.getId()); + Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SERVICE_ENTRY_GRAPH_ID, ServiceEntry.class); + graph.start(serviceEntry); } } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/serviceref/ServiceReferenceSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/serviceref/ServiceReferenceSpanListener.java index a8178014a8..45aa2a1e38 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/serviceref/ServiceReferenceSpanListener.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/serviceref/ServiceReferenceSpanListener.java @@ -20,11 +20,14 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.serviceref; import java.util.LinkedList; import java.util.List; +import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; import org.skywalking.apm.collector.agent.stream.parser.RefsListener; import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; +import org.skywalking.apm.collector.core.graph.Graph; +import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.TimeBucketUtils; import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference; @@ -128,5 +131,8 @@ public class ServiceReferenceSpanListener implements FirstSpanListener, EntrySpa serviceReference.setId(idBuilder.toString()); serviceReference.setTimeBucket(timeBucket); logger.debug("send to service reference aggregation worker, id: {}", serviceReference.getId()); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SERVICE_REFERENCE_GRAPH_ID, ServiceReference.class); + graph.start(serviceReference); } } -- GitLab