diff --git a/README.md b/README.md index b2ef5ad24071618ea535212a0ba115dc4a638615..848e5028adc2d0c85c5b1f94fc072e4491e197f3 100644 --- a/README.md +++ b/README.md @@ -21,8 +21,8 @@ Sky Walking | [中文](README_ZH.md) * The UI released on [skywalking-ui](https://github.com/OpenSkywalking/sky-walking-ui) # Architecture -* Architecture graph for 3.2+ - +* Architecture graph for 3.2.5+ + # Document [![EN doc](https://img.shields.io/badge/document-English-blue.svg)](docs/README.md) [![cn doc](https://img.shields.io/badge/document-中文-blue.svg)](docs/README_ZH.md) diff --git a/README_ZH.md b/README_ZH.md index 8f486309be57df2bc4fe8ad53c1747993730ee5c..fcb7187b287eb5f09c37aeddf95b8cacbb949d1a 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -25,7 +25,7 @@ Sky Walking | [English](README.md) # Architecture * 3.2+版本架构图 - + # Document [![EN doc](https://img.shields.io/badge/document-English-blue.svg)](docs/README.md) [![cn doc](https://img.shields.io/badge/document-中文-blue.svg)](docs/README_ZH.md) diff --git a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/AgentModuleGRPCProvider.java b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/AgentModuleGRPCProvider.java index f13aea73c3dbad2ed577638f4a7282e1f22837cf..8b13cfd82dc0d13671d98a9e3d49d8b2ba6a9c8d 100644 --- a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/AgentModuleGRPCProvider.java +++ b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/AgentModuleGRPCProvider.java @@ -42,7 +42,7 @@ import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService; import org.skywalking.apm.collector.remote.RemoteModule; import org.skywalking.apm.collector.server.Server; import org.skywalking.apm.collector.storage.StorageModule; -import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener; +import org.skywalking.apm.collector.stream.StreamModule; /** * @author peng-yongsheng @@ -82,7 +82,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider { GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class); Server gRPCServer = managerService.createIfAbsent(host, port); - AgentStreamSingleton.getInstance(getManager(), new WorkerCreateListener()); + AgentStreamSingleton.createInstanceIfAbsent(getManager()); addHandlers(gRPCServer); } @@ -91,7 +91,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider { } @Override public String[] requiredModules() { - return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, GRPCManagerModule.NAME, CacheModule.NAME, RemoteModule.NAME}; + return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, GRPCManagerModule.NAME, CacheModule.NAME, RemoteModule.NAME, StreamModule.NAME}; } private void addHandlers(Server gRPCServer) { diff --git a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/JVMMetricsServiceHandler.java b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/JVMMetricsServiceHandler.java index 6aff9a20c02fc8ca0bef334757e2af8de49ebd10..e1d86f431a97e28a6e3e88be4af246692e360685 100644 --- a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/JVMMetricsServiceHandler.java +++ b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/JVMMetricsServiceHandler.java @@ -68,7 +68,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe request.getMetricsList().forEach(metric -> { long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime()); - senToInstanceHeartBeatPersistenceWorker(instanceId, metric.getTime()); + sendToInstanceHeartBeatPersistenceWorker(instanceId, metric.getTime()); sendToCpuMetricPersistenceWorker(instanceId, time, metric.getCpu()); sendToMemoryMetricPersistenceWorker(instanceId, time, metric.getMemoryList()); sendToMemoryPoolMetricPersistenceWorker(instanceId, time, metric.getMemoryPoolList()); @@ -79,7 +79,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe responseObserver.onCompleted(); } - private void senToInstanceHeartBeatPersistenceWorker(int instanceId, long heartBeatTime) { + private void sendToInstanceHeartBeatPersistenceWorker(int instanceId, long heartBeatTime) { Instance instance = new Instance(String.valueOf(instanceId)); instance.setHeartBeatTime(TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartBeatTime)); instance.setInstanceId(instanceId); diff --git a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/test/java/org/skywalking/apm/collector/agent/grpc/handler/ApplicationRegisterServiceHandlerTestCase.java b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/test/java/org/skywalking/apm/collector/agent/grpc/handler/ApplicationRegisterServiceHandlerTestCase.java index c599f01a665955044d62b6fd7ac1d4a2e970738c..1693fa0c54bad2f3540adff0a43d72e793f7ba63 100644 --- a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/test/java/org/skywalking/apm/collector/agent/grpc/handler/ApplicationRegisterServiceHandlerTestCase.java +++ b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/test/java/org/skywalking/apm/collector/agent/grpc/handler/ApplicationRegisterServiceHandlerTestCase.java @@ -35,7 +35,6 @@ public class ApplicationRegisterServiceHandlerTestCase { private ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub stub; - //@Test public void testRegister() { ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build(); stub = ApplicationRegisterServiceGrpc.newBlockingStub(channel); diff --git a/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/AgentModuleJettyProvider.java b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/AgentModuleJettyProvider.java index 6a57ab310647def6761b8b65c0c40415f31f1f9c..68ac58805317220015f8dd829e932df09e664bb1 100644 --- a/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/AgentModuleJettyProvider.java +++ b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/AgentModuleJettyProvider.java @@ -36,7 +36,7 @@ import org.skywalking.apm.collector.naming.NamingModule; import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService; import org.skywalking.apm.collector.server.Server; import org.skywalking.apm.collector.storage.StorageModule; -import org.skywalking.apm.collector.storage.service.DAOService; +import org.skywalking.apm.collector.stream.StreamModule; /** * @author peng-yongsheng @@ -75,11 +75,9 @@ public class AgentModuleJettyProvider extends ModuleProvider { NamingHandlerRegisterService namingHandlerRegisterService = getManager().find(NamingModule.NAME).getService(NamingHandlerRegisterService.class); namingHandlerRegisterService.register(new AgentJettyNamingHandler(namingListener)); - DAOService daoService = getManager().find(StorageModule.NAME).getService(DAOService.class); - JettyManagerService managerService = getManager().find(JettyManagerModule.NAME).getService(JettyManagerService.class); Server jettyServer = managerService.createIfAbsent(host, port, contextPath); - addHandlers(daoService, jettyServer); + addHandlers(jettyServer); } @Override public void notifyAfterCompleted() throws ServiceNotProvidedException { @@ -87,10 +85,10 @@ public class AgentModuleJettyProvider extends ModuleProvider { } @Override public String[] requiredModules() { - return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, JettyManagerModule.NAME, CacheModule.NAME}; + return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, JettyManagerModule.NAME, CacheModule.NAME, StreamModule.NAME}; } - private void addHandlers(DAOService daoService, Server jettyServer) { + private void addHandlers(Server jettyServer) { jettyServer.addHandler(new TraceSegmentServletHandler()); } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/AgentStreamSingleton.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/AgentStreamSingleton.java index 010a8653f1ff1e807cae0508cb8adba689ede93f..06a845e9e4a98fdb63ef3de28f260db09e9bbf1c 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/AgentStreamSingleton.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/AgentStreamSingleton.java @@ -18,9 +18,12 @@ package org.skywalking.apm.collector.agent.stream; +import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph; import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph; +import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.util.ObjectUtils; +import org.skywalking.apm.collector.stream.timer.PersistenceTimer; import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener; /** @@ -33,24 +36,35 @@ public class AgentStreamSingleton { private final ModuleManager moduleManager; private final WorkerCreateListener workerCreateListener; - public AgentStreamSingleton(ModuleManager moduleManager, WorkerCreateListener workerCreateListener) { + private AgentStreamSingleton(ModuleManager moduleManager) { this.moduleManager = moduleManager; - this.workerCreateListener = workerCreateListener; - createJVMGraph(); - createRegisterGraph(); - createTraceGraph(); + this.workerCreateListener = new WorkerCreateListener(); + this.create(); } - public static synchronized AgentStreamSingleton getInstance(ModuleManager moduleManager, - WorkerCreateListener workerCreateListener) { + public static synchronized AgentStreamSingleton createInstanceIfAbsent(ModuleManager moduleManager) { if (ObjectUtils.isEmpty(INSTANCE)) { - INSTANCE = new AgentStreamSingleton(moduleManager, workerCreateListener); + INSTANCE = new AgentStreamSingleton(moduleManager); } return INSTANCE; } - private void createJVMGraph() { + private void create() { + createJVMGraph(); + createRegisterGraph(); + createTraceGraph(); + PersistenceTimer timer = new PersistenceTimer(); + timer.start(moduleManager, workerCreateListener.getPersistenceWorkers()); + } + + private void createJVMGraph() { + JvmMetricStreamGraph jvmMetricStreamGraph = new JvmMetricStreamGraph(moduleManager, workerCreateListener); + jvmMetricStreamGraph.createCpuMetricGraph(); + jvmMetricStreamGraph.createGcMetricGraph(); + jvmMetricStreamGraph.createMemoryMetricGraph(); + jvmMetricStreamGraph.createMemoryPoolMetricGraph(); + jvmMetricStreamGraph.createHeartBeatGraph(); } private void createRegisterGraph() { @@ -61,6 +75,16 @@ public class AgentStreamSingleton { } private void createTraceGraph() { - + TraceStreamGraph traceStreamGraph = new TraceStreamGraph(moduleManager, workerCreateListener); + traceStreamGraph.createSegmentStandardizationGraph(); + traceStreamGraph.createGlobalTraceGraph(); + traceStreamGraph.createInstPerformanceGraph(); + traceStreamGraph.createNodeComponentGraph(); + traceStreamGraph.createNodeMappingGraph(); + traceStreamGraph.createNodeReferenceGraph(); + traceStreamGraph.createServiceEntryGraph(); + traceStreamGraph.createServiceReferenceGraph(); + traceStreamGraph.createSegmentGraph(); + traceStreamGraph.createSegmentCostGraph(); } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/JvmMetricStreamGraph.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/JvmMetricStreamGraph.java index ff599f5355d9161ff212f62588d7b7c363b521c6..defdcca6a11edc740c75a18135b4341ebe0f40c4 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/JvmMetricStreamGraph.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/JvmMetricStreamGraph.java @@ -18,14 +18,22 @@ package org.skywalking.apm.collector.agent.stream.graph; +import org.skywalking.apm.collector.agent.stream.worker.jvm.CpuMetricPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.jvm.GCMetricPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.jvm.InstHeartBeatPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryMetricPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryPoolMetricPersistenceWorker; import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.GraphManager; +import org.skywalking.apm.collector.core.module.ModuleManager; +import org.skywalking.apm.collector.queue.QueueModule; +import org.skywalking.apm.collector.queue.service.QueueCreatorService; import org.skywalking.apm.collector.storage.table.jvm.CpuMetric; import org.skywalking.apm.collector.storage.table.jvm.GCMetric; import org.skywalking.apm.collector.storage.table.jvm.MemoryMetric; import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric; import org.skywalking.apm.collector.storage.table.register.Instance; -import org.skywalking.apm.collector.stream.worker.base.ProviderNotFoundException; +import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener; /** * @author peng-yongsheng @@ -38,28 +46,56 @@ public class JvmMetricStreamGraph { public static final int CPU_METRIC_GRAPH_ID = 103; public static final int INST_HEART_BEAT_GRAPH_ID = 104; + private final ModuleManager moduleManager; + private final WorkerCreateListener workerCreateListener; + + public JvmMetricStreamGraph(ModuleManager moduleManager, WorkerCreateListener workerCreateListener) { + this.moduleManager = moduleManager; + this.workerCreateListener = workerCreateListener; + } + + @SuppressWarnings("unchecked") public Graph createGcMetricGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + Graph graph = GraphManager.INSTANCE.createIfAbsent(GC_METRIC_GRAPH_ID, GCMetric.class); + graph.addNode(new GCMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); return graph; } - public Graph createCpuMetricGraph() throws ProviderNotFoundException { + @SuppressWarnings("unchecked") + public Graph createCpuMetricGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + Graph graph = GraphManager.INSTANCE.createIfAbsent(CPU_METRIC_GRAPH_ID, CpuMetric.class); + graph.addNode(new CpuMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); return graph; } + @SuppressWarnings("unchecked") public Graph createMemoryMetricGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + Graph graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_METRIC_GRAPH_ID, MemoryMetric.class); + graph.addNode(new MemoryMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); return graph; } + @SuppressWarnings("unchecked") public Graph createMemoryPoolMetricGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + Graph graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_POOL_METRIC_GRAPH_ID, MemoryPoolMetric.class); + graph.addNode(new MemoryPoolMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); return graph; } + @SuppressWarnings("unchecked") public Graph createHeartBeatGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + Graph graph = GraphManager.INSTANCE.createIfAbsent(INST_HEART_BEAT_GRAPH_ID, Instance.class); + graph.addNode(new InstHeartBeatPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); return graph; } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/RegisterStreamGraph.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/RegisterStreamGraph.java index 00f26ef7d4e670feaca405f4585d0b2ff2b11890..915259f580e7a2d82905bf5ebfdb7c1c3dd19c4b 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/RegisterStreamGraph.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/RegisterStreamGraph.java @@ -31,8 +31,6 @@ import org.skywalking.apm.collector.queue.QueueModule; import org.skywalking.apm.collector.queue.service.QueueCreatorService; import org.skywalking.apm.collector.remote.RemoteModule; import org.skywalking.apm.collector.remote.service.RemoteSenderService; -import org.skywalking.apm.collector.storage.StorageModule; -import org.skywalking.apm.collector.storage.service.DAOService; import org.skywalking.apm.collector.storage.table.register.Application; import org.skywalking.apm.collector.storage.table.register.Instance; import org.skywalking.apm.collector.storage.table.register.ServiceName; @@ -57,7 +55,6 @@ public class RegisterStreamGraph { @SuppressWarnings("unchecked") public Graph createApplicationRegisterGraph() { - DAOService daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class); RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); @@ -70,7 +67,6 @@ public class RegisterStreamGraph { @SuppressWarnings("unchecked") public Graph createInstanceRegisterGraph() { - DAOService daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class); RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); @@ -83,7 +79,6 @@ public class RegisterStreamGraph { @SuppressWarnings("unchecked") public Graph createServiceNameRegisterGraph() { - DAOService daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class); RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/TraceStreamGraph.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/TraceStreamGraph.java new file mode 100644 index 0000000000000000000000000000000000000000..06d18f85319c15cfb0475e61c5bce4b27f0e40e5 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/TraceStreamGraph.java @@ -0,0 +1,188 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.graph; + +import org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentStandardizationWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.global.GlobalTracePersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstPerformancePersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeComponentAggregationWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeComponentPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeComponentRemoteWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeMappingAggregationWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeMappingPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeMappingRemoteWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeReferenceAggregationWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeReferencePersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeReferenceRemoteWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentCostPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryAggregationWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryRemoteWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.serviceref.ServiceReferenceAggregationWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.serviceref.ServiceReferencePersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.trace.serviceref.ServiceReferenceRemoteWorker; +import org.skywalking.apm.collector.core.graph.Graph; +import org.skywalking.apm.collector.core.graph.GraphManager; +import org.skywalking.apm.collector.core.module.ModuleManager; +import org.skywalking.apm.collector.queue.QueueModule; +import org.skywalking.apm.collector.queue.service.QueueCreatorService; +import org.skywalking.apm.collector.remote.RemoteModule; +import org.skywalking.apm.collector.remote.service.RemoteSenderService; +import org.skywalking.apm.collector.storage.table.global.GlobalTrace; +import org.skywalking.apm.collector.storage.table.instance.InstPerformance; +import org.skywalking.apm.collector.storage.table.node.NodeComponent; +import org.skywalking.apm.collector.storage.table.node.NodeMapping; +import org.skywalking.apm.collector.storage.table.noderef.NodeReference; +import org.skywalking.apm.collector.storage.table.segment.Segment; +import org.skywalking.apm.collector.storage.table.segment.SegmentCost; +import org.skywalking.apm.collector.storage.table.service.ServiceEntry; +import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference; +import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener; +import org.skywalking.apm.network.proto.UpstreamSegment; + +/** + * @author peng-yongsheng + */ +public class TraceStreamGraph { + + public static final int GLOBAL_TRACE_GRAPH_ID = 300; + public static final int INST_PERFORMANCE_GRAPH_ID = 301; + public static final int NODE_COMPONENT_GRAPH_ID = 302; + public static final int NODE_MAPPING_GRAPH_ID = 303; + public static final int NODE_REFERENCE_GRAPH_ID = 304; + public static final int SERVICE_ENTRY_GRAPH_ID = 305; + public static final int SERVICE_REFERENCE_GRAPH_ID = 306; + public static final int SEGMENT_GRAPH_ID = 307; + public static final int SEGMENT_COST_GRAPH_ID = 308; + public static final int SEGMENT_STANDARDIZATION_GRAPH_ID = 309; + + private final ModuleManager moduleManager; + private final WorkerCreateListener workerCreateListener; + + public TraceStreamGraph(ModuleManager moduleManager, WorkerCreateListener workerCreateListener) { + this.moduleManager = moduleManager; + this.workerCreateListener = workerCreateListener; + } + + @SuppressWarnings("unchecked") + public Graph createSegmentStandardizationGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(SEGMENT_STANDARDIZATION_GRAPH_ID, UpstreamSegment.class); + graph.addNode(new SegmentStandardizationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); + return graph; + } + + @SuppressWarnings("unchecked") + public Graph createGlobalTraceGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(GLOBAL_TRACE_GRAPH_ID, GlobalTrace.class); + graph.addNode(new GlobalTracePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); + return graph; + } + + @SuppressWarnings("unchecked") + public Graph createInstPerformanceGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(INST_PERFORMANCE_GRAPH_ID, InstPerformance.class); + graph.addNode(new InstPerformancePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); + return graph; + } + + @SuppressWarnings("unchecked") + public Graph createNodeComponentGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(NODE_COMPONENT_GRAPH_ID, NodeComponent.class); + graph.addNode(new NodeComponentAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)) + .addNext(new NodeComponentRemoteWorker.Factory(moduleManager, remoteSenderService, NODE_COMPONENT_GRAPH_ID).create(workerCreateListener)) + .addNext(new NodeComponentPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); + return graph; + } + + @SuppressWarnings("unchecked") + public Graph createNodeMappingGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(NODE_MAPPING_GRAPH_ID, NodeMapping.class); + graph.addNode(new NodeMappingAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)) + .addNext(new NodeMappingRemoteWorker.Factory(moduleManager, remoteSenderService, NODE_MAPPING_GRAPH_ID).create(workerCreateListener)) + .addNext(new NodeMappingPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); + return graph; + } + + @SuppressWarnings("unchecked") + public Graph createNodeReferenceGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(NODE_REFERENCE_GRAPH_ID, NodeReference.class); + graph.addNode(new NodeReferenceAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)) + .addNext(new NodeReferenceRemoteWorker.Factory(moduleManager, remoteSenderService, NODE_REFERENCE_GRAPH_ID).create(workerCreateListener)) + .addNext(new NodeReferencePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); + return graph; + } + + @SuppressWarnings("unchecked") + public Graph createServiceEntryGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(SERVICE_ENTRY_GRAPH_ID, ServiceEntry.class); + graph.addNode(new ServiceEntryAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)) + .addNext(new ServiceEntryRemoteWorker.Factory(moduleManager, remoteSenderService, SERVICE_ENTRY_GRAPH_ID).create(workerCreateListener)) + .addNext(new ServiceEntryPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); + return graph; + } + + @SuppressWarnings("unchecked") + public Graph createServiceReferenceGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(SERVICE_REFERENCE_GRAPH_ID, ServiceReference.class); + graph.addNode(new ServiceReferenceAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)) + .addNext(new ServiceReferenceRemoteWorker.Factory(moduleManager, remoteSenderService, SERVICE_REFERENCE_GRAPH_ID).create(workerCreateListener)) + .addNext(new ServiceReferencePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); + return graph; + } + + @SuppressWarnings("unchecked") + public Graph createSegmentGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(SEGMENT_GRAPH_ID, Segment.class); + graph.addNode(new SegmentPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); + return graph; + } + + @SuppressWarnings("unchecked") + public Graph createSegmentCostGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(SEGMENT_COST_GRAPH_ID, SegmentCost.class); + graph.addNode(new SegmentCostPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); + return graph; + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SegmentParse.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SegmentParse.java index baca08a09feffcefbf3da3958b12bc58792589ab..982850f38879b45751c89b6fc04ce44058566561 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SegmentParse.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SegmentParse.java @@ -21,6 +21,7 @@ package org.skywalking.apm.collector.agent.stream.parser; import com.google.protobuf.InvalidProtocolBufferException; import java.util.ArrayList; import java.util.List; +import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceIdExchanger; import org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentDecorator; @@ -34,6 +35,8 @@ import org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeRefere import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentCostSpanListener; import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntrySpanListener; import org.skywalking.apm.collector.agent.stream.worker.trace.serviceref.ServiceReferenceSpanListener; +import org.skywalking.apm.collector.core.graph.Graph; +import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.storage.table.segment.Segment; import org.skywalking.apm.network.proto.SpanType; @@ -150,11 +153,14 @@ public class SegmentParse { private void buildSegment(String id, byte[] dataBinary) { Segment segment = new Segment(id); segment.setDataBinary(dataBinary); + Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SEGMENT_GRAPH_ID, Segment.class); + graph.start(segment); } private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) { logger.debug("send to segment buffer write worker, id: {}", id); -// context.getClusterWorkerContext().lookup(SegmentStandardizationWorker.WorkerRole.INSTANCE).tell(upstreamSegment); + Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SEGMENT_STANDARDIZATION_GRAPH_ID, UpstreamSegment.class); + graph.start(upstreamSegment); } private void notifyListenerToBuild() { diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java index ea32a2c44d276ea2475bc77c24279b6d878463ad..441b3680533c89cef4a7f55b53c9df29dd1f1b4b 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java @@ -41,7 +41,7 @@ public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker applicationRegisterGraph; public ApplicationIDService(ModuleManager moduleManager) { this.moduleManager = moduleManager; + this.applicationRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.APPLICATION_REGISTER_GRAPH_ID, Application.class); } - @SuppressWarnings("unchecked") public int getOrCreate(String applicationCode) throws ModuleNotFoundException, ServiceNotProvidedException { ApplicationCacheService service = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class); int applicationId = service.get(applicationCode); @@ -52,7 +54,7 @@ public class ApplicationIDService { application.setApplicationCode(applicationCode); application.setApplicationId(0); - GraphManager.INSTANCE.findGraph(RegisterStreamGraph.APPLICATION_REGISTER_GRAPH_ID).start(application); + applicationRegisterGraph.start(application); } return applicationId; } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/InstanceIDService.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/InstanceIDService.java index bc758dc4f8ddc99987286c3ef6ed0868afe7d179..82e01a87a5d9b7a59be17930016fbee06c97e500 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/InstanceIDService.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/InstanceIDService.java @@ -21,6 +21,7 @@ package org.skywalking.apm.collector.agent.stream.worker.register; import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph; import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.service.InstanceCacheService; +import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.module.ModuleNotFoundException; @@ -40,12 +41,13 @@ public class InstanceIDService { private final Logger logger = LoggerFactory.getLogger(InstanceIDService.class); private final ModuleManager moduleManager; + private final Graph instanceRegisterGraph; public InstanceIDService(ModuleManager moduleManager) { this.moduleManager = moduleManager; + this.instanceRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.INSTANCE_REGISTER_GRAPH_ID, Instance.class); } - @SuppressWarnings("unchecked") public int getOrCreate(int applicationId, String agentUUID, long registerTime, String osInfo) throws ModuleNotFoundException, ServiceNotProvidedException { logger.debug("get or create instance id, application id: {}, agentUUID: {}, registerTime: {}, osInfo: {}", applicationId, agentUUID, registerTime, osInfo); @@ -61,7 +63,7 @@ public class InstanceIDService { instance.setInstanceId(0); instance.setOsInfo(osInfo); - GraphManager.INSTANCE.findGraph(RegisterStreamGraph.INSTANCE_REGISTER_GRAPH_ID).start(instance); + instanceRegisterGraph.start(instance); } return instanceId; } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ServiceNameService.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ServiceNameService.java index ccecae1b5af5d31f157b596319d030d78f780ef3..cbf98107aa89f2dd47349290a00f5baf64b5bd71 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ServiceNameService.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ServiceNameService.java @@ -21,6 +21,7 @@ package org.skywalking.apm.collector.agent.stream.worker.register; import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph; import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.service.ServiceIdCacheService; +import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.storage.table.register.ServiceName; @@ -35,12 +36,13 @@ public class ServiceNameService { private final Logger logger = LoggerFactory.getLogger(ServiceNameService.class); private final ModuleManager moduleManager; + private final Graph serviceNameRegisterGraph; public ServiceNameService(ModuleManager moduleManager) { this.moduleManager = moduleManager; + this.serviceNameRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.APPLICATION_REGISTER_GRAPH_ID, ServiceName.class); } - @SuppressWarnings("unchecked") public int getOrCreate(int applicationId, String serviceName) { ServiceIdCacheService idCacheService = moduleManager.find(CacheModule.NAME).getService(ServiceIdCacheService.class); int serviceId = idCacheService.get(applicationId, serviceName); @@ -51,7 +53,7 @@ public class ServiceNameService { service.setServiceName(serviceName); service.setServiceId(0); - GraphManager.INSTANCE.findGraph(RegisterStreamGraph.SERVICE_NAME_REGISTER_GRAPH_ID).start(service); + serviceNameRegisterGraph.start(service); } return serviceId; } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/global/GlobalTraceSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/global/GlobalTraceSpanListener.java index c501e4b4242aa37c5d4a1a0e61312174ace42129..21436f3aa7d3835fbb7963692d4d0a403907d045 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/global/GlobalTraceSpanListener.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/global/GlobalTraceSpanListener.java @@ -20,9 +20,12 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.global; import java.util.ArrayList; import java.util.List; +import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; import org.skywalking.apm.collector.agent.stream.parser.GlobalTraceIdsListener; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; +import org.skywalking.apm.collector.core.graph.Graph; +import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.TimeBucketUtils; import org.skywalking.apm.collector.storage.table.global.GlobalTrace; @@ -63,11 +66,13 @@ public class GlobalTraceSpanListener implements FirstSpanListener, GlobalTraceId @Override public void build() { logger.debug("global trace listener build"); + Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.GLOBAL_TRACE_GRAPH_ID, GlobalTrace.class); for (String globalTraceId : globalTraceIds) { GlobalTrace globalTrace = new GlobalTrace(segmentId + Const.ID_SPLIT + globalTraceId); globalTrace.setGlobalTraceId(globalTraceId); globalTrace.setSegmentId(segmentId); globalTrace.setTimeBucket(timeBucket); + graph.start(globalTrace); } } } \ No newline at end of file diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/instance/InstPerformanceSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/instance/InstPerformanceSpanListener.java index 3ed7d16627dbb0926e90c5987fa28d647491d692..653b829f4d381cb8542985c1989ea13f92e46cb1 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/instance/InstPerformanceSpanListener.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/instance/InstPerformanceSpanListener.java @@ -18,9 +18,12 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.instance; +import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; +import org.skywalking.apm.collector.core.graph.Graph; +import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.TimeBucketUtils; import org.skywalking.apm.collector.storage.table.instance.InstPerformance; @@ -60,5 +63,8 @@ public class InstPerformanceSpanListener implements EntrySpanListener, FirstSpan instPerformance.setCalls(1); instPerformance.setCostTotal(cost); instPerformance.setTimeBucket(timeBucket); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.INST_PERFORMANCE_GRAPH_ID, InstPerformance.class); + graph.start(instPerformance); } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeComponentSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeComponentSpanListener.java index 6dbdd373e4a5ca76c24c7e86f3b401e7486c7c3a..a8a97c40ffe163b3b1ebf74d68cc699823021800 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeComponentSpanListener.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeComponentSpanListener.java @@ -20,10 +20,13 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.node; import java.util.ArrayList; import java.util.List; +import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.ExitSpanListener; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; +import org.skywalking.apm.collector.core.graph.Graph; +import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.TimeBucketUtils; import org.skywalking.apm.collector.storage.table.node.NodeComponent; @@ -89,9 +92,12 @@ public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanLis } @Override public void build() { + Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.NODE_COMPONENT_GRAPH_ID, NodeComponent.class); + nodeComponents.forEach(nodeComponent -> { nodeComponent.setId(timeBucket + Const.ID_SPLIT + nodeComponent.getId()); nodeComponent.setTimeBucket(timeBucket); + graph.start(nodeComponent); }); } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeMappingSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeMappingSpanListener.java index 0bf66c26c43e903cc4daba65aecee649375d537d..d1ac5bfaf1efa36dbdd489f1b1232e6be6c183fc 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeMappingSpanListener.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeMappingSpanListener.java @@ -20,10 +20,13 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.node; import java.util.ArrayList; import java.util.List; +import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; import org.skywalking.apm.collector.agent.stream.parser.RefsListener; import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; +import org.skywalking.apm.collector.core.graph.Graph; +import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.TimeBucketUtils; import org.skywalking.apm.collector.storage.table.node.NodeMapping; @@ -59,10 +62,13 @@ public class NodeMappingSpanListener implements RefsListener, FirstSpanListener @Override public void build() { logger.debug("node mapping listener build"); + Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.NODE_MAPPING_GRAPH_ID, NodeMapping.class); + for (NodeMapping nodeMapping : nodeMappings) { nodeMapping.setId(timeBucket + Const.ID_SPLIT + nodeMapping.getId()); nodeMapping.setTimeBucket(timeBucket); logger.debug("send to node mapping aggregation worker, id: {}", nodeMapping.getId()); + graph.start(nodeMapping); } } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/noderef/NodeReferenceSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/noderef/NodeReferenceSpanListener.java index 88a8d1395cd51da11e16f869942c476cbf3f066d..4ea030d438b408b2f647dfafb49d33a671fcfce6 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/noderef/NodeReferenceSpanListener.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/noderef/NodeReferenceSpanListener.java @@ -20,6 +20,7 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.noderef; import java.util.LinkedList; import java.util.List; +import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.ExitSpanListener; import org.skywalking.apm.collector.agent.stream.parser.RefsListener; @@ -27,6 +28,8 @@ import org.skywalking.apm.collector.agent.stream.parser.standardization.Referenc import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.service.InstanceCacheService; +import org.skywalking.apm.collector.core.graph.Graph; +import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.util.CollectionUtils; import org.skywalking.apm.collector.core.util.Const; @@ -105,7 +108,9 @@ public class NodeReferenceSpanListener implements EntrySpanListener, ExitSpanLis @Override public void build() { logger.debug("node reference summary listener build"); + Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.NODE_REFERENCE_GRAPH_ID, NodeReference.class); for (NodeReference nodeReference : nodeReferences) { + graph.start(nodeReference); } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/segment/SegmentCostSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/segment/SegmentCostSpanListener.java index bbd59775c6cb132d33b4760c781c2dc33f35719b..edd68f6522a25ed0c18a74a39a4a56017bda14d9 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/segment/SegmentCostSpanListener.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/segment/SegmentCostSpanListener.java @@ -20,6 +20,7 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.segment; import java.util.ArrayList; import java.util.List; +import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.ExitSpanListener; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; @@ -27,6 +28,8 @@ import org.skywalking.apm.collector.agent.stream.parser.LocalSpanListener; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.service.ServiceNameCacheService; +import org.skywalking.apm.collector.core.graph.Graph; +import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.TimeBucketUtils; @@ -91,10 +94,12 @@ public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListe } @Override public void build() { + Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SEGMENT_COST_GRAPH_ID, SegmentCost.class); logger.debug("segment cost listener build"); for (SegmentCost segmentCost : segmentCosts) { segmentCost.setIsError(isError); segmentCost.setTimeBucket(timeBucket); + graph.start(segmentCost); } } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceEntrySpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceEntrySpanListener.java index 3075cb2cc4af03c916a9cd96fd3f8fc6fd6988c2..0312bbf6de1e82e116997c1bc1195b4f79d51ce0 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceEntrySpanListener.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceEntrySpanListener.java @@ -18,6 +18,7 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.service; +import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; import org.skywalking.apm.collector.agent.stream.parser.RefsListener; @@ -25,6 +26,8 @@ import org.skywalking.apm.collector.agent.stream.parser.standardization.Referenc import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.service.ServiceNameCacheService; +import org.skywalking.apm.collector.core.graph.Graph; +import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.TimeBucketUtils; @@ -82,6 +85,8 @@ public class ServiceEntrySpanListener implements RefsListener, FirstSpanListener serviceEntry.setNewestTime(timeBucket); logger.debug("send to service entry aggregation worker, id: {}", serviceEntry.getId()); + Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SERVICE_ENTRY_GRAPH_ID, ServiceEntry.class); + graph.start(serviceEntry); } } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/serviceref/ServiceReferenceSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/serviceref/ServiceReferenceSpanListener.java index a8178014a87ed7d5b4059215e0b44f47201b96f9..45aa2a1e382174274dcb85ee76a02c6052712cb6 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/serviceref/ServiceReferenceSpanListener.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/serviceref/ServiceReferenceSpanListener.java @@ -20,11 +20,14 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.serviceref; import java.util.LinkedList; import java.util.List; +import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; import org.skywalking.apm.collector.agent.stream.parser.RefsListener; import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; +import org.skywalking.apm.collector.core.graph.Graph; +import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.TimeBucketUtils; import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference; @@ -128,5 +131,8 @@ public class ServiceReferenceSpanListener implements FirstSpanListener, EntrySpa serviceReference.setId(idBuilder.toString()); serviceReference.setTimeBucket(timeBucket); logger.debug("send to service reference aggregation worker, id: {}", serviceReference.getId()); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SERVICE_REFERENCE_GRAPH_ID, ServiceReference.class); + graph.start(serviceReference); } } diff --git a/apm-collector/apm-collector-boot/src/main/resources/log4j2.xml b/apm-collector/apm-collector-boot/src/main/resources/log4j2.xml index a76ab8c13034643898799c80431670a2550fcb20..2665f80d2d298a3579bce617bac25b70e4d55482 100644 --- a/apm-collector/apm-collector-boot/src/main/resources/log4j2.xml +++ b/apm-collector/apm-collector-boot/src/main/resources/log4j2.xml @@ -17,7 +17,7 @@ ~ Project repository: https://github.com/OpenSkywalking/skywalking --> - + @@ -26,9 +26,9 @@ - - - + + + diff --git a/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/CacheModuleGuavaProvider.java b/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/CacheModuleGuavaProvider.java index 58893f14a0b7726a9fd9660b2951770b4ca87bf3..fbfbc7cea111b8f3ddcafdc365fc3600abb59058 100644 --- a/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/CacheModuleGuavaProvider.java +++ b/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/CacheModuleGuavaProvider.java @@ -32,7 +32,6 @@ import org.skywalking.apm.collector.core.module.Module; import org.skywalking.apm.collector.core.module.ModuleProvider; import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; import org.skywalking.apm.collector.storage.StorageModule; -import org.skywalking.apm.collector.storage.service.DAOService; /** * @author peng-yongsheng @@ -48,15 +47,13 @@ public class CacheModuleGuavaProvider extends ModuleProvider { } @Override public void prepare(Properties config) throws ServiceNotProvidedException { + this.registerServiceImplementation(ApplicationCacheService.class, new ApplicationCacheGuavaService(getManager())); + this.registerServiceImplementation(InstanceCacheService.class, new InstanceCacheGuavaService(getManager())); + this.registerServiceImplementation(ServiceIdCacheService.class, new ServiceIdCacheGuavaService(getManager())); + this.registerServiceImplementation(ServiceNameCacheService.class, new ServiceNameCacheGuavaService(getManager())); } @Override public void start(Properties config) throws ServiceNotProvidedException { - DAOService daoService = getManager().find(StorageModule.NAME).getService(DAOService.class); - - this.registerServiceImplementation(ApplicationCacheService.class, new ApplicationCacheGuavaService(daoService)); - this.registerServiceImplementation(InstanceCacheService.class, new InstanceCacheGuavaService(daoService)); - this.registerServiceImplementation(ServiceIdCacheService.class, new ServiceIdCacheGuavaService(daoService)); - this.registerServiceImplementation(ServiceNameCacheService.class, new ServiceNameCacheGuavaService(daoService)); } @Override public void notifyAfterCompleted() throws ServiceNotProvidedException { diff --git a/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/ApplicationCacheGuavaService.java b/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/ApplicationCacheGuavaService.java index ce734634274270a5b0c11f2fb858bab8c79691c8..b980e5f4fa75bef43ad086b2349193a0c8565524 100644 --- a/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/ApplicationCacheGuavaService.java +++ b/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/ApplicationCacheGuavaService.java @@ -21,8 +21,10 @@ package org.skywalking.apm.collector.cache.guava.service; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import org.skywalking.apm.collector.cache.service.ApplicationCacheService; +import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.StringUtils; +import org.skywalking.apm.collector.storage.StorageModule; import org.skywalking.apm.collector.storage.dao.IApplicationCacheDAO; import org.skywalking.apm.collector.storage.service.DAOService; import org.slf4j.Logger; @@ -39,8 +41,8 @@ public class ApplicationCacheGuavaService implements ApplicationCacheService { private final DAOService daoService; - public ApplicationCacheGuavaService(DAOService daoService) { - this.daoService = daoService; + public ApplicationCacheGuavaService(ModuleManager moduleManager) { + this.daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class); } public int get(String applicationCode) { diff --git a/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/InstanceCacheGuavaService.java b/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/InstanceCacheGuavaService.java index 2e94d8d2aedde62683a44b1c4ecd8a34df79d9d0..a57e9cf1a0e1bc0a1843930b3820b4438a5181cd 100644 --- a/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/InstanceCacheGuavaService.java +++ b/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/InstanceCacheGuavaService.java @@ -21,7 +21,9 @@ package org.skywalking.apm.collector.cache.guava.service; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import org.skywalking.apm.collector.cache.service.InstanceCacheService; +import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.util.Const; +import org.skywalking.apm.collector.storage.StorageModule; import org.skywalking.apm.collector.storage.dao.IInstanceCacheDAO; import org.skywalking.apm.collector.storage.service.DAOService; import org.slf4j.Logger; @@ -40,8 +42,8 @@ public class InstanceCacheGuavaService implements InstanceCacheService { private final DAOService daoService; - public InstanceCacheGuavaService(DAOService daoService) { - this.daoService = daoService; + public InstanceCacheGuavaService(ModuleManager moduleManager) { + this.daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class); } public int get(int applicationInstanceId) { diff --git a/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/ServiceIdCacheGuavaService.java b/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/ServiceIdCacheGuavaService.java index 3d6105d3eeb597956bc4f8f8578a70a93eb06c2c..91815c423152f4521a64580ed65755ae36cab797 100644 --- a/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/ServiceIdCacheGuavaService.java +++ b/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/ServiceIdCacheGuavaService.java @@ -21,7 +21,9 @@ package org.skywalking.apm.collector.cache.guava.service; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import org.skywalking.apm.collector.cache.service.ServiceIdCacheService; +import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.util.Const; +import org.skywalking.apm.collector.storage.StorageModule; import org.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO; import org.skywalking.apm.collector.storage.service.DAOService; import org.slf4j.Logger; @@ -38,8 +40,8 @@ public class ServiceIdCacheGuavaService implements ServiceIdCacheService { private final DAOService daoService; - public ServiceIdCacheGuavaService(DAOService daoService) { - this.daoService = daoService; + public ServiceIdCacheGuavaService(ModuleManager moduleManager) { + this.daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class); } public int get(int applicationId, String serviceName) { diff --git a/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/ServiceNameCacheGuavaService.java b/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/ServiceNameCacheGuavaService.java index 9a895eb1f109964e5f345b34783665625d9c46de..de0596b0245d1d4cf807c25b90278e46a5bfa5ae 100644 --- a/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/ServiceNameCacheGuavaService.java +++ b/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/skywalking/apm/collector/cache/guava/service/ServiceNameCacheGuavaService.java @@ -21,8 +21,10 @@ package org.skywalking.apm.collector.cache.guava.service; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import org.skywalking.apm.collector.cache.service.ServiceNameCacheService; +import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.StringUtils; +import org.skywalking.apm.collector.storage.StorageModule; import org.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO; import org.skywalking.apm.collector.storage.service.DAOService; import org.slf4j.Logger; @@ -39,8 +41,8 @@ public class ServiceNameCacheGuavaService implements ServiceNameCacheService { private final DAOService daoService; - public ServiceNameCacheGuavaService(DAOService daoService) { - this.daoService = daoService; + public ServiceNameCacheGuavaService(ModuleManager moduleManager) { + this.daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class); } public String get(int serviceId) { diff --git a/apm-collector/apm-collector-core/src/main/resources/application-default.yml b/apm-collector/apm-collector-core/src/main/resources/application-default.yml index 6d3aefc7332d74a752eadd329dc81aeeb7db0b92..a77838189813107c713bedf8e6572a1df947530a 100644 --- a/apm-collector/apm-collector-core/src/main/resources/application-default.yml +++ b/apm-collector/apm-collector-core/src/main/resources/application-default.yml @@ -24,6 +24,8 @@ jetty_manager: jetty: gRPC_manager: gRPC: +stream: + worker: storage: h2: url: jdbc:h2:~/memorydb diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/StorageModule.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/StorageModule.java index 92a0d7fada92a929d39bd67cd3dc4e1c642f926d..944e553444657e8573b1b642dcbdea2eb1c8d341 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/StorageModule.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/StorageModule.java @@ -20,6 +20,7 @@ package org.skywalking.apm.collector.storage; import org.skywalking.apm.collector.core.module.Module; import org.skywalking.apm.collector.storage.service.DAOService; +import org.skywalking.apm.collector.storage.base.dao.IBatchDAO; /** * @author peng-yongsheng @@ -33,6 +34,6 @@ public class StorageModule extends Module { } @Override public Class[] services() { - return new Class[] {DAOService.class}; + return new Class[] {DAOService.class, IBatchDAO.class}; } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/base/dao/DAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/base/dao/DAO.java index e01e256c2212fbf6242b088d1ea9dc673e5f8a51..31ad2f62143985ae5ed6571556c801097ad02195 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/base/dao/DAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/base/dao/DAO.java @@ -18,8 +18,10 @@ package org.skywalking.apm.collector.storage.base.dao; +import org.skywalking.apm.collector.core.module.Service; + /** * @author peng-yongsheng */ -public interface DAO { +public interface DAO extends Service { } diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java index 2d3a810809d590d743f73647767f14630e02a640..404b4187a4417b09ede83757fcea952c78963ef3 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java @@ -29,6 +29,8 @@ import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; import org.skywalking.apm.collector.storage.StorageException; import org.skywalking.apm.collector.storage.StorageModule; import org.skywalking.apm.collector.storage.base.dao.DAOContainer; +import org.skywalking.apm.collector.storage.base.dao.IBatchDAO; +import org.skywalking.apm.collector.storage.es.base.dao.BatchEsDAO; import org.skywalking.apm.collector.storage.es.base.dao.EsDAO; import org.skywalking.apm.collector.storage.es.base.dao.EsDAODefineLoader; import org.skywalking.apm.collector.storage.es.base.define.ElasticSearchStorageInstaller; @@ -72,6 +74,10 @@ public class StorageModuleEsProvider extends ModuleProvider { elasticSearchClient = new ElasticSearchClient(clusterName, clusterTransportSniffer, clusterNodes); this.registerServiceImplementation(DAOService.class, new ElasticSearchDAOService(daoContainer)); + + BatchEsDAO batchEsDAO = new BatchEsDAO(); + batchEsDAO.setClient(elasticSearchClient); + this.registerServiceImplementation(IBatchDAO.class, batchEsDAO); } @Override public void start(Properties config) throws ServiceNotProvidedException { diff --git a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java index ce7cb558f6129ffd39057513c8ad68f441ecb79b..90835ef78bc1665c62f58d47f46eae9725852fa1 100644 --- a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java +++ b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java @@ -29,6 +29,8 @@ import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; import org.skywalking.apm.collector.storage.StorageException; import org.skywalking.apm.collector.storage.StorageModule; import org.skywalking.apm.collector.storage.base.dao.DAOContainer; +import org.skywalking.apm.collector.storage.base.dao.IBatchDAO; +import org.skywalking.apm.collector.storage.h2.base.dao.BatchH2DAO; import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO; import org.skywalking.apm.collector.storage.h2.base.dao.H2DAODefineLoader; import org.skywalking.apm.collector.storage.h2.base.define.H2StorageInstaller; @@ -70,6 +72,7 @@ public class StorageModuleH2Provider extends ModuleProvider { client = new H2Client(url, userName, password); this.registerServiceImplementation(DAOService.class, new H2DAOService(daoContainer)); + this.registerServiceImplementation(IBatchDAO.class, new BatchH2DAO()); } @Override public void start(Properties config) throws ServiceNotProvidedException { diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/StreamModuleProvider.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/StreamModuleProvider.java index 6516b6dd7f2f9122ac194cb628d570a7c1d67249..0e73dc8d25378f3ed1e825a24996f6d8a8647afb 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/StreamModuleProvider.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/StreamModuleProvider.java @@ -19,17 +19,9 @@ package org.skywalking.apm.collector.stream; import java.util.Properties; -import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.core.module.Module; import org.skywalking.apm.collector.core.module.ModuleProvider; import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; -import org.skywalking.apm.collector.queue.QueueModule; -import org.skywalking.apm.collector.queue.service.QueueCreatorService; -import org.skywalking.apm.collector.remote.RemoteModule; -import org.skywalking.apm.collector.remote.service.RemoteSenderService; -import org.skywalking.apm.collector.storage.StorageModule; -import org.skywalking.apm.collector.storage.service.DAOService; -import org.skywalking.apm.collector.stream.timer.PersistenceTimer; /** * @author peng-yongsheng @@ -48,11 +40,6 @@ public class StreamModuleProvider extends ModuleProvider { } @Override public void start(Properties config) throws ServiceNotProvidedException { - PersistenceTimer persistenceTimer = new PersistenceTimer(); - QueueCreatorService queueCreatorService = getManager().find(QueueModule.NAME).getService(QueueCreatorService.class); - RemoteSenderService remoteSenderService = getManager().find(RemoteModule.NAME).getService(RemoteSenderService.class); - DAOService daoService = getManager().find(StorageModule.NAME).getService(DAOService.class); - persistenceTimer.start(daoService); } @Override public void notifyAfterCompleted() throws ServiceNotProvidedException { @@ -60,6 +47,6 @@ public class StreamModuleProvider extends ModuleProvider { } @Override public String[] requiredModules() { - return new String[] {RemoteModule.NAME, QueueModule.NAME, StorageModule.NAME, CacheModule.NAME}; + return new String[] {}; } } diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/timer/PersistenceTimer.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/timer/PersistenceTimer.java index d69aa014933b6dd6582b3e150c14b30fed8d5bde..770effb76c34a58a6f17d5386b6a7511e835e1ac 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/timer/PersistenceTimer.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/timer/PersistenceTimer.java @@ -22,11 +22,11 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.skywalking.apm.collector.core.module.ModuleManager; +import org.skywalking.apm.collector.storage.StorageModule; import org.skywalking.apm.collector.storage.base.dao.IBatchDAO; -import org.skywalking.apm.collector.storage.service.DAOService; import org.skywalking.apm.collector.stream.worker.base.WorkerException; import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker; -import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorkerContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,19 +37,19 @@ public class PersistenceTimer { private final Logger logger = LoggerFactory.getLogger(PersistenceTimer.class); - public void start(DAOService daoService) { + public void start(ModuleManager moduleManager, List persistenceWorkers) { logger.info("persistence timer start"); //TODO timer value config // final long timeInterval = EsConfig.Es.Persistence.Timer.VALUE * 1000; final long timeInterval = 3; - Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> extractDataAndSave(daoService), 1, timeInterval, TimeUnit.SECONDS); + IBatchDAO batchDAO = moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class); + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> extractDataAndSave(batchDAO, persistenceWorkers), 1, timeInterval, TimeUnit.SECONDS); } - private void extractDataAndSave(DAOService daoService) { + private void extractDataAndSave(IBatchDAO batchDAO, List persistenceWorkers) { try { - List workers = PersistenceWorkerContainer.INSTANCE.getPersistenceWorkers(); List batchAllCollection = new ArrayList<>(); - workers.forEach((PersistenceWorker worker) -> { + persistenceWorkers.forEach((PersistenceWorker worker) -> { logger.debug("extract {} worker data and save", worker.getClass().getName()); try { worker.flushAndSwitch(); @@ -61,8 +61,7 @@ public class PersistenceTimer { } }); - IBatchDAO dao = (IBatchDAO)daoService.get(IBatchDAO.class); - dao.batchPersistence(batchAllCollection); + batchDAO.batchPersistence(batchAllCollection); } catch (Throwable e) { logger.error(e.getMessage(), e); } finally { diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerCreateListener.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerCreateListener.java index bc8903065eb94abe3e40ccb792c5877bf3da23f0..5107b10d0ae6de20a0320e63c17e4fd60fce18c5 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerCreateListener.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerCreateListener.java @@ -18,12 +18,28 @@ package org.skywalking.apm.collector.stream.worker.base; +import java.util.ArrayList; +import java.util.List; +import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker; + /** * @author peng-yongsheng */ public class WorkerCreateListener { + private final List persistenceWorkers; + + public WorkerCreateListener() { + this.persistenceWorkers = new ArrayList<>(); + } + public void addWorker(AbstractWorker worker) { + if (worker instanceof PersistenceWorker) { + persistenceWorkers.add((PersistenceWorker)worker); + } + } + public List getPersistenceWorkers() { + return persistenceWorkers; } } diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/impl/PersistenceWorkerContainer.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/impl/PersistenceWorkerContainer.java deleted file mode 100644 index 9dd5e4330006790939f8f9504c961be0484772d6..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/impl/PersistenceWorkerContainer.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.stream.worker.impl; - -import java.util.ArrayList; -import java.util.List; - -/** - * @author peng-yongsheng - */ -public enum PersistenceWorkerContainer { - INSTANCE; - - private List persistenceWorkers = new ArrayList<>(); - - public void addWorker(PersistenceWorker worker) { - persistenceWorkers.add(worker); - } - - public List getPersistenceWorkers() { - return persistenceWorkers; - } -} diff --git a/pom.xml b/pom.xml index bf3ea48ad994bf111a8122aac9aa07493a6c2b85..65e9f0a57163663a392c68682a0a8169aaabfd36 100644 --- a/pom.xml +++ b/pom.xml @@ -25,38 +25,29 @@ apm 3.2.4-2017 - - - GNU GENERAL PUBLIC LICENSE V3 - https://github.com/wu-sheng/sky-walking/blob/master/LICENSE - - - Wu Sheng wu.sheng@foxmail.com - https://wu-sheng.github.io/me/ - - - Zhang Xin - https://github.com/ascrutae - - - Tan Zhen - https://github.com/mircoteam - - - Xu Yan - https://github.com/TastySummer + https://github.com/wu-sheng + + Founder + PMC member + Peng Yongsheng - 8082209@qq.com https://github.com/peng-yongsheng + + PMC member + - Dai Wen + Zhang Xin + https://github.com/ascrutae + + PMC member +