未验证 提交 2d80d8dc 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge branch 'feature/collector-modelization' into feature/agent-memory-control

...@@ -21,8 +21,8 @@ Sky Walking | [中文](README_ZH.md) ...@@ -21,8 +21,8 @@ Sky Walking | [中文](README_ZH.md)
* The UI released on [skywalking-ui](https://github.com/OpenSkywalking/sky-walking-ui) * The UI released on [skywalking-ui](https://github.com/OpenSkywalking/sky-walking-ui)
# Architecture # Architecture
* Architecture graph for 3.2+ * Architecture graph for 3.2.5+
<img src="https://skywalkingtest.github.io/page-resources/3.x-architecture.jpg"/> <img src="https://skywalkingtest.github.io/page-resources/3.2.5%2b_architecture.jpg"/>
# Document # Document
[![EN doc](https://img.shields.io/badge/document-English-blue.svg)](docs/README.md) [![cn doc](https://img.shields.io/badge/document-中文-blue.svg)](docs/README_ZH.md) [![EN doc](https://img.shields.io/badge/document-English-blue.svg)](docs/README.md) [![cn doc](https://img.shields.io/badge/document-中文-blue.svg)](docs/README_ZH.md)
......
...@@ -25,7 +25,7 @@ Sky Walking | [English](README.md) ...@@ -25,7 +25,7 @@ Sky Walking | [English](README.md)
# Architecture # Architecture
* 3.2+版本架构图 * 3.2+版本架构图
<img src="https://skywalkingtest.github.io/page-resources/3.x-architecture.jpg"/> <img src="https://skywalkingtest.github.io/page-resources/3.2.5%2b_architecture.jpg"/>
# Document # Document
[![EN doc](https://img.shields.io/badge/document-English-blue.svg)](docs/README.md) [![cn doc](https://img.shields.io/badge/document-中文-blue.svg)](docs/README_ZH.md) [![EN doc](https://img.shields.io/badge/document-English-blue.svg)](docs/README.md) [![cn doc](https://img.shields.io/badge/document-中文-blue.svg)](docs/README_ZH.md)
......
...@@ -42,7 +42,7 @@ import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService; ...@@ -42,7 +42,7 @@ import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.skywalking.apm.collector.remote.RemoteModule; import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.server.Server; import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.storage.StorageModule; import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener; import org.skywalking.apm.collector.stream.StreamModule;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
...@@ -82,7 +82,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider { ...@@ -82,7 +82,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class); GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class);
Server gRPCServer = managerService.createIfAbsent(host, port); Server gRPCServer = managerService.createIfAbsent(host, port);
AgentStreamSingleton.getInstance(getManager(), new WorkerCreateListener()); AgentStreamSingleton.createInstanceIfAbsent(getManager());
addHandlers(gRPCServer); addHandlers(gRPCServer);
} }
...@@ -91,7 +91,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider { ...@@ -91,7 +91,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
} }
@Override public String[] requiredModules() { @Override public String[] requiredModules() {
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, GRPCManagerModule.NAME, CacheModule.NAME, RemoteModule.NAME}; return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, GRPCManagerModule.NAME, CacheModule.NAME, RemoteModule.NAME, StreamModule.NAME};
} }
private void addHandlers(Server gRPCServer) { private void addHandlers(Server gRPCServer) {
......
...@@ -68,7 +68,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe ...@@ -68,7 +68,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
request.getMetricsList().forEach(metric -> { request.getMetricsList().forEach(metric -> {
long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime()); long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime());
senToInstanceHeartBeatPersistenceWorker(instanceId, metric.getTime()); sendToInstanceHeartBeatPersistenceWorker(instanceId, metric.getTime());
sendToCpuMetricPersistenceWorker(instanceId, time, metric.getCpu()); sendToCpuMetricPersistenceWorker(instanceId, time, metric.getCpu());
sendToMemoryMetricPersistenceWorker(instanceId, time, metric.getMemoryList()); sendToMemoryMetricPersistenceWorker(instanceId, time, metric.getMemoryList());
sendToMemoryPoolMetricPersistenceWorker(instanceId, time, metric.getMemoryPoolList()); sendToMemoryPoolMetricPersistenceWorker(instanceId, time, metric.getMemoryPoolList());
...@@ -79,7 +79,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe ...@@ -79,7 +79,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
responseObserver.onCompleted(); responseObserver.onCompleted();
} }
private void senToInstanceHeartBeatPersistenceWorker(int instanceId, long heartBeatTime) { private void sendToInstanceHeartBeatPersistenceWorker(int instanceId, long heartBeatTime) {
Instance instance = new Instance(String.valueOf(instanceId)); Instance instance = new Instance(String.valueOf(instanceId));
instance.setHeartBeatTime(TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartBeatTime)); instance.setHeartBeatTime(TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartBeatTime));
instance.setInstanceId(instanceId); instance.setInstanceId(instanceId);
......
...@@ -35,7 +35,6 @@ public class ApplicationRegisterServiceHandlerTestCase { ...@@ -35,7 +35,6 @@ public class ApplicationRegisterServiceHandlerTestCase {
private ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub stub; private ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub stub;
//@Test
public void testRegister() { public void testRegister() {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build(); ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build();
stub = ApplicationRegisterServiceGrpc.newBlockingStub(channel); stub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);
......
...@@ -36,7 +36,7 @@ import org.skywalking.apm.collector.naming.NamingModule; ...@@ -36,7 +36,7 @@ import org.skywalking.apm.collector.naming.NamingModule;
import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService; import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.skywalking.apm.collector.server.Server; import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.storage.StorageModule; import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.service.DAOService; import org.skywalking.apm.collector.stream.StreamModule;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
...@@ -75,11 +75,9 @@ public class AgentModuleJettyProvider extends ModuleProvider { ...@@ -75,11 +75,9 @@ public class AgentModuleJettyProvider extends ModuleProvider {
NamingHandlerRegisterService namingHandlerRegisterService = getManager().find(NamingModule.NAME).getService(NamingHandlerRegisterService.class); NamingHandlerRegisterService namingHandlerRegisterService = getManager().find(NamingModule.NAME).getService(NamingHandlerRegisterService.class);
namingHandlerRegisterService.register(new AgentJettyNamingHandler(namingListener)); namingHandlerRegisterService.register(new AgentJettyNamingHandler(namingListener));
DAOService daoService = getManager().find(StorageModule.NAME).getService(DAOService.class);
JettyManagerService managerService = getManager().find(JettyManagerModule.NAME).getService(JettyManagerService.class); JettyManagerService managerService = getManager().find(JettyManagerModule.NAME).getService(JettyManagerService.class);
Server jettyServer = managerService.createIfAbsent(host, port, contextPath); Server jettyServer = managerService.createIfAbsent(host, port, contextPath);
addHandlers(daoService, jettyServer); addHandlers(jettyServer);
} }
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException { @Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
...@@ -87,10 +85,10 @@ public class AgentModuleJettyProvider extends ModuleProvider { ...@@ -87,10 +85,10 @@ public class AgentModuleJettyProvider extends ModuleProvider {
} }
@Override public String[] requiredModules() { @Override public String[] requiredModules() {
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, JettyManagerModule.NAME, CacheModule.NAME}; return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, JettyManagerModule.NAME, CacheModule.NAME, StreamModule.NAME};
} }
private void addHandlers(DAOService daoService, Server jettyServer) { private void addHandlers(Server jettyServer) {
jettyServer.addHandler(new TraceSegmentServletHandler()); jettyServer.addHandler(new TraceSegmentServletHandler());
} }
} }
...@@ -18,9 +18,12 @@ ...@@ -18,9 +18,12 @@
package org.skywalking.apm.collector.agent.stream; package org.skywalking.apm.collector.agent.stream;
import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph;
import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph; import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph;
import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.ObjectUtils; import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.stream.timer.PersistenceTimer;
import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener; import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;
/** /**
...@@ -33,24 +36,35 @@ public class AgentStreamSingleton { ...@@ -33,24 +36,35 @@ public class AgentStreamSingleton {
private final ModuleManager moduleManager; private final ModuleManager moduleManager;
private final WorkerCreateListener workerCreateListener; private final WorkerCreateListener workerCreateListener;
public AgentStreamSingleton(ModuleManager moduleManager, WorkerCreateListener workerCreateListener) { private AgentStreamSingleton(ModuleManager moduleManager) {
this.moduleManager = moduleManager; this.moduleManager = moduleManager;
this.workerCreateListener = workerCreateListener; this.workerCreateListener = new WorkerCreateListener();
createJVMGraph(); this.create();
createRegisterGraph();
createTraceGraph();
} }
public static synchronized AgentStreamSingleton getInstance(ModuleManager moduleManager, public static synchronized AgentStreamSingleton createInstanceIfAbsent(ModuleManager moduleManager) {
WorkerCreateListener workerCreateListener) {
if (ObjectUtils.isEmpty(INSTANCE)) { if (ObjectUtils.isEmpty(INSTANCE)) {
INSTANCE = new AgentStreamSingleton(moduleManager, workerCreateListener); INSTANCE = new AgentStreamSingleton(moduleManager);
} }
return INSTANCE; return INSTANCE;
} }
private void createJVMGraph() { private void create() {
createJVMGraph();
createRegisterGraph();
createTraceGraph();
PersistenceTimer timer = new PersistenceTimer();
timer.start(moduleManager, workerCreateListener.getPersistenceWorkers());
}
private void createJVMGraph() {
JvmMetricStreamGraph jvmMetricStreamGraph = new JvmMetricStreamGraph(moduleManager, workerCreateListener);
jvmMetricStreamGraph.createCpuMetricGraph();
jvmMetricStreamGraph.createGcMetricGraph();
jvmMetricStreamGraph.createMemoryMetricGraph();
jvmMetricStreamGraph.createMemoryPoolMetricGraph();
jvmMetricStreamGraph.createHeartBeatGraph();
} }
private void createRegisterGraph() { private void createRegisterGraph() {
...@@ -61,6 +75,16 @@ public class AgentStreamSingleton { ...@@ -61,6 +75,16 @@ public class AgentStreamSingleton {
} }
private void createTraceGraph() { private void createTraceGraph() {
TraceStreamGraph traceStreamGraph = new TraceStreamGraph(moduleManager, workerCreateListener);
traceStreamGraph.createSegmentStandardizationGraph();
traceStreamGraph.createGlobalTraceGraph();
traceStreamGraph.createInstPerformanceGraph();
traceStreamGraph.createNodeComponentGraph();
traceStreamGraph.createNodeMappingGraph();
traceStreamGraph.createNodeReferenceGraph();
traceStreamGraph.createServiceEntryGraph();
traceStreamGraph.createServiceReferenceGraph();
traceStreamGraph.createSegmentGraph();
traceStreamGraph.createSegmentCostGraph();
} }
} }
...@@ -18,14 +18,22 @@ ...@@ -18,14 +18,22 @@
package org.skywalking.apm.collector.agent.stream.graph; package org.skywalking.apm.collector.agent.stream.graph;
import org.skywalking.apm.collector.agent.stream.worker.jvm.CpuMetricPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.GCMetricPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.InstHeartBeatPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryMetricPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryPoolMetricPersistenceWorker;
import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.queue.QueueModule;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.table.jvm.CpuMetric; import org.skywalking.apm.collector.storage.table.jvm.CpuMetric;
import org.skywalking.apm.collector.storage.table.jvm.GCMetric; import org.skywalking.apm.collector.storage.table.jvm.GCMetric;
import org.skywalking.apm.collector.storage.table.jvm.MemoryMetric; import org.skywalking.apm.collector.storage.table.jvm.MemoryMetric;
import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric; import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric;
import org.skywalking.apm.collector.storage.table.register.Instance; import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.stream.worker.base.ProviderNotFoundException; import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
...@@ -38,28 +46,56 @@ public class JvmMetricStreamGraph { ...@@ -38,28 +46,56 @@ public class JvmMetricStreamGraph {
public static final int CPU_METRIC_GRAPH_ID = 103; public static final int CPU_METRIC_GRAPH_ID = 103;
public static final int INST_HEART_BEAT_GRAPH_ID = 104; public static final int INST_HEART_BEAT_GRAPH_ID = 104;
private final ModuleManager moduleManager;
private final WorkerCreateListener workerCreateListener;
public JvmMetricStreamGraph(ModuleManager moduleManager, WorkerCreateListener workerCreateListener) {
this.moduleManager = moduleManager;
this.workerCreateListener = workerCreateListener;
}
@SuppressWarnings("unchecked")
public Graph<GCMetric> createGcMetricGraph() { public Graph<GCMetric> createGcMetricGraph() {
QueueCreatorService<GCMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<GCMetric> graph = GraphManager.INSTANCE.createIfAbsent(GC_METRIC_GRAPH_ID, GCMetric.class); Graph<GCMetric> graph = GraphManager.INSTANCE.createIfAbsent(GC_METRIC_GRAPH_ID, GCMetric.class);
graph.addNode(new GCMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph; return graph;
} }
public Graph<CpuMetric> createCpuMetricGraph() throws ProviderNotFoundException { @SuppressWarnings("unchecked")
public Graph<CpuMetric> createCpuMetricGraph() {
QueueCreatorService<CpuMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<CpuMetric> graph = GraphManager.INSTANCE.createIfAbsent(CPU_METRIC_GRAPH_ID, CpuMetric.class); Graph<CpuMetric> graph = GraphManager.INSTANCE.createIfAbsent(CPU_METRIC_GRAPH_ID, CpuMetric.class);
graph.addNode(new CpuMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph; return graph;
} }
@SuppressWarnings("unchecked")
public Graph<MemoryMetric> createMemoryMetricGraph() { public Graph<MemoryMetric> createMemoryMetricGraph() {
QueueCreatorService<MemoryMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<MemoryMetric> graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_METRIC_GRAPH_ID, MemoryMetric.class); Graph<MemoryMetric> graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_METRIC_GRAPH_ID, MemoryMetric.class);
graph.addNode(new MemoryMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph; return graph;
} }
@SuppressWarnings("unchecked")
public Graph<MemoryPoolMetric> createMemoryPoolMetricGraph() { public Graph<MemoryPoolMetric> createMemoryPoolMetricGraph() {
QueueCreatorService<MemoryPoolMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<MemoryPoolMetric> graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_POOL_METRIC_GRAPH_ID, MemoryPoolMetric.class); Graph<MemoryPoolMetric> graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_POOL_METRIC_GRAPH_ID, MemoryPoolMetric.class);
graph.addNode(new MemoryPoolMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph; return graph;
} }
@SuppressWarnings("unchecked")
public Graph<Instance> createHeartBeatGraph() { public Graph<Instance> createHeartBeatGraph() {
QueueCreatorService<Instance> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<Instance> graph = GraphManager.INSTANCE.createIfAbsent(INST_HEART_BEAT_GRAPH_ID, Instance.class); Graph<Instance> graph = GraphManager.INSTANCE.createIfAbsent(INST_HEART_BEAT_GRAPH_ID, Instance.class);
graph.addNode(new InstHeartBeatPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph; return graph;
} }
} }
...@@ -31,8 +31,6 @@ import org.skywalking.apm.collector.queue.QueueModule; ...@@ -31,8 +31,6 @@ import org.skywalking.apm.collector.queue.QueueModule;
import org.skywalking.apm.collector.queue.service.QueueCreatorService; import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.remote.RemoteModule; import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.remote.service.RemoteSenderService; import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.register.Application; import org.skywalking.apm.collector.storage.table.register.Application;
import org.skywalking.apm.collector.storage.table.register.Instance; import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.storage.table.register.ServiceName; import org.skywalking.apm.collector.storage.table.register.ServiceName;
...@@ -57,7 +55,6 @@ public class RegisterStreamGraph { ...@@ -57,7 +55,6 @@ public class RegisterStreamGraph {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Graph<Application> createApplicationRegisterGraph() { public Graph<Application> createApplicationRegisterGraph() {
DAOService daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);
QueueCreatorService<Application> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); QueueCreatorService<Application> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
...@@ -70,7 +67,6 @@ public class RegisterStreamGraph { ...@@ -70,7 +67,6 @@ public class RegisterStreamGraph {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Graph<Instance> createInstanceRegisterGraph() { public Graph<Instance> createInstanceRegisterGraph() {
DAOService daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);
QueueCreatorService<Instance> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); QueueCreatorService<Instance> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
...@@ -83,7 +79,6 @@ public class RegisterStreamGraph { ...@@ -83,7 +79,6 @@ public class RegisterStreamGraph {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Graph<ServiceName> createServiceNameRegisterGraph() { public Graph<ServiceName> createServiceNameRegisterGraph() {
DAOService daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);
QueueCreatorService<ServiceName> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); QueueCreatorService<ServiceName> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.graph;
import org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentStandardizationWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.global.GlobalTracePersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstPerformancePersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeComponentAggregationWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeComponentPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeComponentRemoteWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeMappingAggregationWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeMappingPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeMappingRemoteWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeReferenceAggregationWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeReferencePersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeReferenceRemoteWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentCostPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryAggregationWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryRemoteWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.serviceref.ServiceReferenceAggregationWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.serviceref.ServiceReferencePersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.serviceref.ServiceReferenceRemoteWorker;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.queue.QueueModule;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.storage.table.global.GlobalTrace;
import org.skywalking.apm.collector.storage.table.instance.InstPerformance;
import org.skywalking.apm.collector.storage.table.node.NodeComponent;
import org.skywalking.apm.collector.storage.table.node.NodeMapping;
import org.skywalking.apm.collector.storage.table.noderef.NodeReference;
import org.skywalking.apm.collector.storage.table.segment.Segment;
import org.skywalking.apm.collector.storage.table.segment.SegmentCost;
import org.skywalking.apm.collector.storage.table.service.ServiceEntry;
import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference;
import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;
import org.skywalking.apm.network.proto.UpstreamSegment;
/**
* @author peng-yongsheng
*/
public class TraceStreamGraph {
public static final int GLOBAL_TRACE_GRAPH_ID = 300;
public static final int INST_PERFORMANCE_GRAPH_ID = 301;
public static final int NODE_COMPONENT_GRAPH_ID = 302;
public static final int NODE_MAPPING_GRAPH_ID = 303;
public static final int NODE_REFERENCE_GRAPH_ID = 304;
public static final int SERVICE_ENTRY_GRAPH_ID = 305;
public static final int SERVICE_REFERENCE_GRAPH_ID = 306;
public static final int SEGMENT_GRAPH_ID = 307;
public static final int SEGMENT_COST_GRAPH_ID = 308;
public static final int SEGMENT_STANDARDIZATION_GRAPH_ID = 309;
private final ModuleManager moduleManager;
private final WorkerCreateListener workerCreateListener;
public TraceStreamGraph(ModuleManager moduleManager, WorkerCreateListener workerCreateListener) {
this.moduleManager = moduleManager;
this.workerCreateListener = workerCreateListener;
}
@SuppressWarnings("unchecked")
public Graph<UpstreamSegment> createSegmentStandardizationGraph() {
QueueCreatorService<UpstreamSegment> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<UpstreamSegment> graph = GraphManager.INSTANCE.createIfAbsent(SEGMENT_STANDARDIZATION_GRAPH_ID, UpstreamSegment.class);
graph.addNode(new SegmentStandardizationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}
@SuppressWarnings("unchecked")
public Graph<GlobalTrace> createGlobalTraceGraph() {
QueueCreatorService<GlobalTrace> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<GlobalTrace> graph = GraphManager.INSTANCE.createIfAbsent(GLOBAL_TRACE_GRAPH_ID, GlobalTrace.class);
graph.addNode(new GlobalTracePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}
@SuppressWarnings("unchecked")
public Graph<InstPerformance> createInstPerformanceGraph() {
QueueCreatorService<InstPerformance> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<InstPerformance> graph = GraphManager.INSTANCE.createIfAbsent(INST_PERFORMANCE_GRAPH_ID, InstPerformance.class);
graph.addNode(new InstPerformancePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}
@SuppressWarnings("unchecked")
public Graph<NodeComponent> createNodeComponentGraph() {
QueueCreatorService<NodeComponent> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);
Graph<NodeComponent> graph = GraphManager.INSTANCE.createIfAbsent(NODE_COMPONENT_GRAPH_ID, NodeComponent.class);
graph.addNode(new NodeComponentAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener))
.addNext(new NodeComponentRemoteWorker.Factory(moduleManager, remoteSenderService, NODE_COMPONENT_GRAPH_ID).create(workerCreateListener))
.addNext(new NodeComponentPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}
@SuppressWarnings("unchecked")
public Graph<NodeMapping> createNodeMappingGraph() {
QueueCreatorService<NodeMapping> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);
Graph<NodeMapping> graph = GraphManager.INSTANCE.createIfAbsent(NODE_MAPPING_GRAPH_ID, NodeMapping.class);
graph.addNode(new NodeMappingAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener))
.addNext(new NodeMappingRemoteWorker.Factory(moduleManager, remoteSenderService, NODE_MAPPING_GRAPH_ID).create(workerCreateListener))
.addNext(new NodeMappingPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}
@SuppressWarnings("unchecked")
public Graph<NodeReference> createNodeReferenceGraph() {
QueueCreatorService<NodeReference> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);
Graph<NodeReference> graph = GraphManager.INSTANCE.createIfAbsent(NODE_REFERENCE_GRAPH_ID, NodeReference.class);
graph.addNode(new NodeReferenceAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener))
.addNext(new NodeReferenceRemoteWorker.Factory(moduleManager, remoteSenderService, NODE_REFERENCE_GRAPH_ID).create(workerCreateListener))
.addNext(new NodeReferencePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}
@SuppressWarnings("unchecked")
public Graph<ServiceEntry> createServiceEntryGraph() {
QueueCreatorService<ServiceEntry> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);
Graph<ServiceEntry> graph = GraphManager.INSTANCE.createIfAbsent(SERVICE_ENTRY_GRAPH_ID, ServiceEntry.class);
graph.addNode(new ServiceEntryAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener))
.addNext(new ServiceEntryRemoteWorker.Factory(moduleManager, remoteSenderService, SERVICE_ENTRY_GRAPH_ID).create(workerCreateListener))
.addNext(new ServiceEntryPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}
@SuppressWarnings("unchecked")
public Graph<ServiceReference> createServiceReferenceGraph() {
QueueCreatorService<ServiceReference> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);
Graph<ServiceReference> graph = GraphManager.INSTANCE.createIfAbsent(SERVICE_REFERENCE_GRAPH_ID, ServiceReference.class);
graph.addNode(new ServiceReferenceAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener))
.addNext(new ServiceReferenceRemoteWorker.Factory(moduleManager, remoteSenderService, SERVICE_REFERENCE_GRAPH_ID).create(workerCreateListener))
.addNext(new ServiceReferencePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}
@SuppressWarnings("unchecked")
public Graph<Segment> createSegmentGraph() {
QueueCreatorService<Segment> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<Segment> graph = GraphManager.INSTANCE.createIfAbsent(SEGMENT_GRAPH_ID, Segment.class);
graph.addNode(new SegmentPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}
@SuppressWarnings("unchecked")
public Graph<SegmentCost> createSegmentCostGraph() {
QueueCreatorService<SegmentCost> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<SegmentCost> graph = GraphManager.INSTANCE.createIfAbsent(SEGMENT_COST_GRAPH_ID, SegmentCost.class);
graph.addNode(new SegmentCostPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}
}
...@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.agent.stream.parser; ...@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.agent.stream.parser;
import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.InvalidProtocolBufferException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator;
import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceIdExchanger; import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceIdExchanger;
import org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentDecorator;
...@@ -34,6 +35,8 @@ import org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeRefere ...@@ -34,6 +35,8 @@ import org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeRefere
import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentCostSpanListener; import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentCostSpanListener;
import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntrySpanListener; import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntrySpanListener;
import org.skywalking.apm.collector.agent.stream.worker.trace.serviceref.ServiceReferenceSpanListener; import org.skywalking.apm.collector.agent.stream.worker.trace.serviceref.ServiceReferenceSpanListener;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.storage.table.segment.Segment; import org.skywalking.apm.collector.storage.table.segment.Segment;
import org.skywalking.apm.network.proto.SpanType; import org.skywalking.apm.network.proto.SpanType;
...@@ -150,11 +153,14 @@ public class SegmentParse { ...@@ -150,11 +153,14 @@ public class SegmentParse {
private void buildSegment(String id, byte[] dataBinary) { private void buildSegment(String id, byte[] dataBinary) {
Segment segment = new Segment(id); Segment segment = new Segment(id);
segment.setDataBinary(dataBinary); segment.setDataBinary(dataBinary);
Graph<Segment> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SEGMENT_GRAPH_ID, Segment.class);
graph.start(segment);
} }
private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) { private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) {
logger.debug("send to segment buffer write worker, id: {}", id); logger.debug("send to segment buffer write worker, id: {}", id);
// context.getClusterWorkerContext().lookup(SegmentStandardizationWorker.WorkerRole.INSTANCE).tell(upstreamSegment); Graph<UpstreamSegment> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SEGMENT_STANDARDIZATION_GRAPH_ID, UpstreamSegment.class);
graph.start(upstreamSegment);
} }
private void notifyListenerToBuild() { private void notifyListenerToBuild() {
......
...@@ -41,7 +41,7 @@ public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker<Upstr ...@@ -41,7 +41,7 @@ public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker<Upstr
} }
@Override public int id() { @Override public int id() {
return 0; return SegmentStandardizationWorker.class.hashCode();
} }
@Override protected void onWork(UpstreamSegment upstreamSegment) throws WorkerException { @Override protected void onWork(UpstreamSegment upstreamSegment) throws WorkerException {
......
...@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.agent.stream.worker.register; ...@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.agent.stream.worker.register;
import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph; import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph;
import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.ApplicationCacheService; import org.skywalking.apm.collector.cache.service.ApplicationCacheService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException; import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
...@@ -37,12 +38,13 @@ public class ApplicationIDService { ...@@ -37,12 +38,13 @@ public class ApplicationIDService {
private final Logger logger = LoggerFactory.getLogger(ApplicationIDService.class); private final Logger logger = LoggerFactory.getLogger(ApplicationIDService.class);
private final ModuleManager moduleManager; private final ModuleManager moduleManager;
private final Graph<Application> applicationRegisterGraph;
public ApplicationIDService(ModuleManager moduleManager) { public ApplicationIDService(ModuleManager moduleManager) {
this.moduleManager = moduleManager; this.moduleManager = moduleManager;
this.applicationRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.APPLICATION_REGISTER_GRAPH_ID, Application.class);
} }
@SuppressWarnings("unchecked")
public int getOrCreate(String applicationCode) throws ModuleNotFoundException, ServiceNotProvidedException { public int getOrCreate(String applicationCode) throws ModuleNotFoundException, ServiceNotProvidedException {
ApplicationCacheService service = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class); ApplicationCacheService service = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
int applicationId = service.get(applicationCode); int applicationId = service.get(applicationCode);
...@@ -52,7 +54,7 @@ public class ApplicationIDService { ...@@ -52,7 +54,7 @@ public class ApplicationIDService {
application.setApplicationCode(applicationCode); application.setApplicationCode(applicationCode);
application.setApplicationId(0); application.setApplicationId(0);
GraphManager.INSTANCE.findGraph(RegisterStreamGraph.APPLICATION_REGISTER_GRAPH_ID).start(application); applicationRegisterGraph.start(application);
} }
return applicationId; return applicationId;
} }
......
...@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.agent.stream.worker.register; ...@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.agent.stream.worker.register;
import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph; import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph;
import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.InstanceCacheService; import org.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException; import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
...@@ -40,12 +41,13 @@ public class InstanceIDService { ...@@ -40,12 +41,13 @@ public class InstanceIDService {
private final Logger logger = LoggerFactory.getLogger(InstanceIDService.class); private final Logger logger = LoggerFactory.getLogger(InstanceIDService.class);
private final ModuleManager moduleManager; private final ModuleManager moduleManager;
private final Graph<Instance> instanceRegisterGraph;
public InstanceIDService(ModuleManager moduleManager) { public InstanceIDService(ModuleManager moduleManager) {
this.moduleManager = moduleManager; this.moduleManager = moduleManager;
this.instanceRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.INSTANCE_REGISTER_GRAPH_ID, Instance.class);
} }
@SuppressWarnings("unchecked")
public int getOrCreate(int applicationId, String agentUUID, long registerTime, public int getOrCreate(int applicationId, String agentUUID, long registerTime,
String osInfo) throws ModuleNotFoundException, ServiceNotProvidedException { String osInfo) throws ModuleNotFoundException, ServiceNotProvidedException {
logger.debug("get or create instance id, application id: {}, agentUUID: {}, registerTime: {}, osInfo: {}", applicationId, agentUUID, registerTime, osInfo); logger.debug("get or create instance id, application id: {}, agentUUID: {}, registerTime: {}, osInfo: {}", applicationId, agentUUID, registerTime, osInfo);
...@@ -61,7 +63,7 @@ public class InstanceIDService { ...@@ -61,7 +63,7 @@ public class InstanceIDService {
instance.setInstanceId(0); instance.setInstanceId(0);
instance.setOsInfo(osInfo); instance.setOsInfo(osInfo);
GraphManager.INSTANCE.findGraph(RegisterStreamGraph.INSTANCE_REGISTER_GRAPH_ID).start(instance); instanceRegisterGraph.start(instance);
} }
return instanceId; return instanceId;
} }
......
...@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.agent.stream.worker.register; ...@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.agent.stream.worker.register;
import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph; import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph;
import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.ServiceIdCacheService; import org.skywalking.apm.collector.cache.service.ServiceIdCacheService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.storage.table.register.ServiceName; import org.skywalking.apm.collector.storage.table.register.ServiceName;
...@@ -35,12 +36,13 @@ public class ServiceNameService { ...@@ -35,12 +36,13 @@ public class ServiceNameService {
private final Logger logger = LoggerFactory.getLogger(ServiceNameService.class); private final Logger logger = LoggerFactory.getLogger(ServiceNameService.class);
private final ModuleManager moduleManager; private final ModuleManager moduleManager;
private final Graph<ServiceName> serviceNameRegisterGraph;
public ServiceNameService(ModuleManager moduleManager) { public ServiceNameService(ModuleManager moduleManager) {
this.moduleManager = moduleManager; this.moduleManager = moduleManager;
this.serviceNameRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.APPLICATION_REGISTER_GRAPH_ID, ServiceName.class);
} }
@SuppressWarnings("unchecked")
public int getOrCreate(int applicationId, String serviceName) { public int getOrCreate(int applicationId, String serviceName) {
ServiceIdCacheService idCacheService = moduleManager.find(CacheModule.NAME).getService(ServiceIdCacheService.class); ServiceIdCacheService idCacheService = moduleManager.find(CacheModule.NAME).getService(ServiceIdCacheService.class);
int serviceId = idCacheService.get(applicationId, serviceName); int serviceId = idCacheService.get(applicationId, serviceName);
...@@ -51,7 +53,7 @@ public class ServiceNameService { ...@@ -51,7 +53,7 @@ public class ServiceNameService {
service.setServiceName(serviceName); service.setServiceName(serviceName);
service.setServiceId(0); service.setServiceId(0);
GraphManager.INSTANCE.findGraph(RegisterStreamGraph.SERVICE_NAME_REGISTER_GRAPH_ID).start(service); serviceNameRegisterGraph.start(service);
} }
return serviceId; return serviceId;
} }
......
...@@ -20,9 +20,12 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.global; ...@@ -20,9 +20,12 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.global;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener;
import org.skywalking.apm.collector.agent.stream.parser.GlobalTraceIdsListener; import org.skywalking.apm.collector.agent.stream.parser.GlobalTraceIdsListener;
import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils; import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.table.global.GlobalTrace; import org.skywalking.apm.collector.storage.table.global.GlobalTrace;
...@@ -63,11 +66,13 @@ public class GlobalTraceSpanListener implements FirstSpanListener, GlobalTraceId ...@@ -63,11 +66,13 @@ public class GlobalTraceSpanListener implements FirstSpanListener, GlobalTraceId
@Override public void build() { @Override public void build() {
logger.debug("global trace listener build"); logger.debug("global trace listener build");
Graph<GlobalTrace> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.GLOBAL_TRACE_GRAPH_ID, GlobalTrace.class);
for (String globalTraceId : globalTraceIds) { for (String globalTraceId : globalTraceIds) {
GlobalTrace globalTrace = new GlobalTrace(segmentId + Const.ID_SPLIT + globalTraceId); GlobalTrace globalTrace = new GlobalTrace(segmentId + Const.ID_SPLIT + globalTraceId);
globalTrace.setGlobalTraceId(globalTraceId); globalTrace.setGlobalTraceId(globalTraceId);
globalTrace.setSegmentId(segmentId); globalTrace.setSegmentId(segmentId);
globalTrace.setTimeBucket(timeBucket); globalTrace.setTimeBucket(timeBucket);
graph.start(globalTrace);
} }
} }
} }
\ No newline at end of file
...@@ -18,9 +18,12 @@ ...@@ -18,9 +18,12 @@
package org.skywalking.apm.collector.agent.stream.worker.trace.instance; package org.skywalking.apm.collector.agent.stream.worker.trace.instance;
import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener;
import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener;
import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils; import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.table.instance.InstPerformance; import org.skywalking.apm.collector.storage.table.instance.InstPerformance;
...@@ -60,5 +63,8 @@ public class InstPerformanceSpanListener implements EntrySpanListener, FirstSpan ...@@ -60,5 +63,8 @@ public class InstPerformanceSpanListener implements EntrySpanListener, FirstSpan
instPerformance.setCalls(1); instPerformance.setCalls(1);
instPerformance.setCostTotal(cost); instPerformance.setCostTotal(cost);
instPerformance.setTimeBucket(timeBucket); instPerformance.setTimeBucket(timeBucket);
Graph<InstPerformance> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.INST_PERFORMANCE_GRAPH_ID, InstPerformance.class);
graph.start(instPerformance);
} }
} }
...@@ -20,10 +20,13 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.node; ...@@ -20,10 +20,13 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.node;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener;
import org.skywalking.apm.collector.agent.stream.parser.ExitSpanListener; import org.skywalking.apm.collector.agent.stream.parser.ExitSpanListener;
import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener;
import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils; import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.table.node.NodeComponent; import org.skywalking.apm.collector.storage.table.node.NodeComponent;
...@@ -89,9 +92,12 @@ public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanLis ...@@ -89,9 +92,12 @@ public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanLis
} }
@Override public void build() { @Override public void build() {
Graph<NodeComponent> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.NODE_COMPONENT_GRAPH_ID, NodeComponent.class);
nodeComponents.forEach(nodeComponent -> { nodeComponents.forEach(nodeComponent -> {
nodeComponent.setId(timeBucket + Const.ID_SPLIT + nodeComponent.getId()); nodeComponent.setId(timeBucket + Const.ID_SPLIT + nodeComponent.getId());
nodeComponent.setTimeBucket(timeBucket); nodeComponent.setTimeBucket(timeBucket);
graph.start(nodeComponent);
}); });
} }
} }
...@@ -20,10 +20,13 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.node; ...@@ -20,10 +20,13 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.node;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener;
import org.skywalking.apm.collector.agent.stream.parser.RefsListener; import org.skywalking.apm.collector.agent.stream.parser.RefsListener;
import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator;
import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils; import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.table.node.NodeMapping; import org.skywalking.apm.collector.storage.table.node.NodeMapping;
...@@ -59,10 +62,13 @@ public class NodeMappingSpanListener implements RefsListener, FirstSpanListener ...@@ -59,10 +62,13 @@ public class NodeMappingSpanListener implements RefsListener, FirstSpanListener
@Override public void build() { @Override public void build() {
logger.debug("node mapping listener build"); logger.debug("node mapping listener build");
Graph<NodeMapping> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.NODE_MAPPING_GRAPH_ID, NodeMapping.class);
for (NodeMapping nodeMapping : nodeMappings) { for (NodeMapping nodeMapping : nodeMappings) {
nodeMapping.setId(timeBucket + Const.ID_SPLIT + nodeMapping.getId()); nodeMapping.setId(timeBucket + Const.ID_SPLIT + nodeMapping.getId());
nodeMapping.setTimeBucket(timeBucket); nodeMapping.setTimeBucket(timeBucket);
logger.debug("send to node mapping aggregation worker, id: {}", nodeMapping.getId()); logger.debug("send to node mapping aggregation worker, id: {}", nodeMapping.getId());
graph.start(nodeMapping);
} }
} }
} }
...@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.noderef; ...@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.noderef;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener;
import org.skywalking.apm.collector.agent.stream.parser.ExitSpanListener; import org.skywalking.apm.collector.agent.stream.parser.ExitSpanListener;
import org.skywalking.apm.collector.agent.stream.parser.RefsListener; import org.skywalking.apm.collector.agent.stream.parser.RefsListener;
...@@ -27,6 +28,8 @@ import org.skywalking.apm.collector.agent.stream.parser.standardization.Referenc ...@@ -27,6 +28,8 @@ import org.skywalking.apm.collector.agent.stream.parser.standardization.Referenc
import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator;
import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.InstanceCacheService; import org.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.CollectionUtils; import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.Const;
...@@ -105,7 +108,9 @@ public class NodeReferenceSpanListener implements EntrySpanListener, ExitSpanLis ...@@ -105,7 +108,9 @@ public class NodeReferenceSpanListener implements EntrySpanListener, ExitSpanLis
@Override public void build() { @Override public void build() {
logger.debug("node reference summary listener build"); logger.debug("node reference summary listener build");
Graph<NodeReference> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.NODE_REFERENCE_GRAPH_ID, NodeReference.class);
for (NodeReference nodeReference : nodeReferences) { for (NodeReference nodeReference : nodeReferences) {
graph.start(nodeReference);
} }
} }
......
...@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.segment; ...@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.segment;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener;
import org.skywalking.apm.collector.agent.stream.parser.ExitSpanListener; import org.skywalking.apm.collector.agent.stream.parser.ExitSpanListener;
import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener;
...@@ -27,6 +28,8 @@ import org.skywalking.apm.collector.agent.stream.parser.LocalSpanListener; ...@@ -27,6 +28,8 @@ import org.skywalking.apm.collector.agent.stream.parser.LocalSpanListener;
import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator;
import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.ServiceNameCacheService; import org.skywalking.apm.collector.cache.service.ServiceNameCacheService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils; import org.skywalking.apm.collector.core.util.TimeBucketUtils;
...@@ -91,10 +94,12 @@ public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListe ...@@ -91,10 +94,12 @@ public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListe
} }
@Override public void build() { @Override public void build() {
Graph<SegmentCost> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SEGMENT_COST_GRAPH_ID, SegmentCost.class);
logger.debug("segment cost listener build"); logger.debug("segment cost listener build");
for (SegmentCost segmentCost : segmentCosts) { for (SegmentCost segmentCost : segmentCosts) {
segmentCost.setIsError(isError); segmentCost.setIsError(isError);
segmentCost.setTimeBucket(timeBucket); segmentCost.setTimeBucket(timeBucket);
graph.start(segmentCost);
} }
} }
} }
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.trace.service; package org.skywalking.apm.collector.agent.stream.worker.trace.service;
import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener;
import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener;
import org.skywalking.apm.collector.agent.stream.parser.RefsListener; import org.skywalking.apm.collector.agent.stream.parser.RefsListener;
...@@ -25,6 +26,8 @@ import org.skywalking.apm.collector.agent.stream.parser.standardization.Referenc ...@@ -25,6 +26,8 @@ import org.skywalking.apm.collector.agent.stream.parser.standardization.Referenc
import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator;
import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.ServiceNameCacheService; import org.skywalking.apm.collector.cache.service.ServiceNameCacheService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils; import org.skywalking.apm.collector.core.util.TimeBucketUtils;
...@@ -82,6 +85,8 @@ public class ServiceEntrySpanListener implements RefsListener, FirstSpanListener ...@@ -82,6 +85,8 @@ public class ServiceEntrySpanListener implements RefsListener, FirstSpanListener
serviceEntry.setNewestTime(timeBucket); serviceEntry.setNewestTime(timeBucket);
logger.debug("send to service entry aggregation worker, id: {}", serviceEntry.getId()); logger.debug("send to service entry aggregation worker, id: {}", serviceEntry.getId());
Graph<ServiceEntry> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SERVICE_ENTRY_GRAPH_ID, ServiceEntry.class);
graph.start(serviceEntry);
} }
} }
} }
...@@ -20,11 +20,14 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.serviceref; ...@@ -20,11 +20,14 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.serviceref;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener;
import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener;
import org.skywalking.apm.collector.agent.stream.parser.RefsListener; import org.skywalking.apm.collector.agent.stream.parser.RefsListener;
import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator;
import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils; import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference; import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference;
...@@ -128,5 +131,8 @@ public class ServiceReferenceSpanListener implements FirstSpanListener, EntrySpa ...@@ -128,5 +131,8 @@ public class ServiceReferenceSpanListener implements FirstSpanListener, EntrySpa
serviceReference.setId(idBuilder.toString()); serviceReference.setId(idBuilder.toString());
serviceReference.setTimeBucket(timeBucket); serviceReference.setTimeBucket(timeBucket);
logger.debug("send to service reference aggregation worker, id: {}", serviceReference.getId()); logger.debug("send to service reference aggregation worker, id: {}", serviceReference.getId());
Graph<ServiceReference> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SERVICE_REFERENCE_GRAPH_ID, ServiceReference.class);
graph.start(serviceReference);
} }
} }
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
~ Project repository: https://github.com/OpenSkywalking/skywalking ~ Project repository: https://github.com/OpenSkywalking/skywalking
--> -->
<Configuration status="info"> <Configuration status="debug">
<Appenders> <Appenders>
<Console name="Console" target="SYSTEM_OUT"> <Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/> <PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
...@@ -26,9 +26,9 @@ ...@@ -26,9 +26,9 @@
<Loggers> <Loggers>
<logger name="org.eclipse.jetty" level="INFO"/> <logger name="org.eclipse.jetty" level="INFO"/>
<logger name="org.apache.zookeeper" level="INFO"/> <logger name="org.apache.zookeeper" level="INFO"/>
<logger name="org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer" level="INFO"/> <logger name="org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer" level="debug"/>
<logger name="io.grpc.netty.NettyServerHandler" level="INFO"/> <logger name="io.grpc.netty" level="INFO"/>
<Root level="info"> <Root level="debug">
<AppenderRef ref="Console"/> <AppenderRef ref="Console"/>
</Root> </Root>
</Loggers> </Loggers>
......
...@@ -32,7 +32,6 @@ import org.skywalking.apm.collector.core.module.Module; ...@@ -32,7 +32,6 @@ import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider; import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.storage.StorageModule; import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.service.DAOService;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
...@@ -48,15 +47,13 @@ public class CacheModuleGuavaProvider extends ModuleProvider { ...@@ -48,15 +47,13 @@ public class CacheModuleGuavaProvider extends ModuleProvider {
} }
@Override public void prepare(Properties config) throws ServiceNotProvidedException { @Override public void prepare(Properties config) throws ServiceNotProvidedException {
this.registerServiceImplementation(ApplicationCacheService.class, new ApplicationCacheGuavaService(getManager()));
this.registerServiceImplementation(InstanceCacheService.class, new InstanceCacheGuavaService(getManager()));
this.registerServiceImplementation(ServiceIdCacheService.class, new ServiceIdCacheGuavaService(getManager()));
this.registerServiceImplementation(ServiceNameCacheService.class, new ServiceNameCacheGuavaService(getManager()));
} }
@Override public void start(Properties config) throws ServiceNotProvidedException { @Override public void start(Properties config) throws ServiceNotProvidedException {
DAOService daoService = getManager().find(StorageModule.NAME).getService(DAOService.class);
this.registerServiceImplementation(ApplicationCacheService.class, new ApplicationCacheGuavaService(daoService));
this.registerServiceImplementation(InstanceCacheService.class, new InstanceCacheGuavaService(daoService));
this.registerServiceImplementation(ServiceIdCacheService.class, new ServiceIdCacheGuavaService(daoService));
this.registerServiceImplementation(ServiceNameCacheService.class, new ServiceNameCacheGuavaService(daoService));
} }
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException { @Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
......
...@@ -21,8 +21,10 @@ package org.skywalking.apm.collector.cache.guava.service; ...@@ -21,8 +21,10 @@ package org.skywalking.apm.collector.cache.guava.service;
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.cache.service.ApplicationCacheService; import org.skywalking.apm.collector.cache.service.ApplicationCacheService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils; import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.dao.IApplicationCacheDAO; import org.skywalking.apm.collector.storage.dao.IApplicationCacheDAO;
import org.skywalking.apm.collector.storage.service.DAOService; import org.skywalking.apm.collector.storage.service.DAOService;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -39,8 +41,8 @@ public class ApplicationCacheGuavaService implements ApplicationCacheService { ...@@ -39,8 +41,8 @@ public class ApplicationCacheGuavaService implements ApplicationCacheService {
private final DAOService daoService; private final DAOService daoService;
public ApplicationCacheGuavaService(DAOService daoService) { public ApplicationCacheGuavaService(ModuleManager moduleManager) {
this.daoService = daoService; this.daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class);
} }
public int get(String applicationCode) { public int get(String applicationCode) {
......
...@@ -21,7 +21,9 @@ package org.skywalking.apm.collector.cache.guava.service; ...@@ -21,7 +21,9 @@ package org.skywalking.apm.collector.cache.guava.service;
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.cache.service.InstanceCacheService; import org.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.dao.IInstanceCacheDAO; import org.skywalking.apm.collector.storage.dao.IInstanceCacheDAO;
import org.skywalking.apm.collector.storage.service.DAOService; import org.skywalking.apm.collector.storage.service.DAOService;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -40,8 +42,8 @@ public class InstanceCacheGuavaService implements InstanceCacheService { ...@@ -40,8 +42,8 @@ public class InstanceCacheGuavaService implements InstanceCacheService {
private final DAOService daoService; private final DAOService daoService;
public InstanceCacheGuavaService(DAOService daoService) { public InstanceCacheGuavaService(ModuleManager moduleManager) {
this.daoService = daoService; this.daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class);
} }
public int get(int applicationInstanceId) { public int get(int applicationInstanceId) {
......
...@@ -21,7 +21,9 @@ package org.skywalking.apm.collector.cache.guava.service; ...@@ -21,7 +21,9 @@ package org.skywalking.apm.collector.cache.guava.service;
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.cache.service.ServiceIdCacheService; import org.skywalking.apm.collector.cache.service.ServiceIdCacheService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO; import org.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO;
import org.skywalking.apm.collector.storage.service.DAOService; import org.skywalking.apm.collector.storage.service.DAOService;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -38,8 +40,8 @@ public class ServiceIdCacheGuavaService implements ServiceIdCacheService { ...@@ -38,8 +40,8 @@ public class ServiceIdCacheGuavaService implements ServiceIdCacheService {
private final DAOService daoService; private final DAOService daoService;
public ServiceIdCacheGuavaService(DAOService daoService) { public ServiceIdCacheGuavaService(ModuleManager moduleManager) {
this.daoService = daoService; this.daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class);
} }
public int get(int applicationId, String serviceName) { public int get(int applicationId, String serviceName) {
......
...@@ -21,8 +21,10 @@ package org.skywalking.apm.collector.cache.guava.service; ...@@ -21,8 +21,10 @@ package org.skywalking.apm.collector.cache.guava.service;
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.cache.service.ServiceNameCacheService; import org.skywalking.apm.collector.cache.service.ServiceNameCacheService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils; import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO; import org.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO;
import org.skywalking.apm.collector.storage.service.DAOService; import org.skywalking.apm.collector.storage.service.DAOService;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -39,8 +41,8 @@ public class ServiceNameCacheGuavaService implements ServiceNameCacheService { ...@@ -39,8 +41,8 @@ public class ServiceNameCacheGuavaService implements ServiceNameCacheService {
private final DAOService daoService; private final DAOService daoService;
public ServiceNameCacheGuavaService(DAOService daoService) { public ServiceNameCacheGuavaService(ModuleManager moduleManager) {
this.daoService = daoService; this.daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class);
} }
public String get(int serviceId) { public String get(int serviceId) {
......
...@@ -24,6 +24,8 @@ jetty_manager: ...@@ -24,6 +24,8 @@ jetty_manager:
jetty: jetty:
gRPC_manager: gRPC_manager:
gRPC: gRPC:
stream:
worker:
storage: storage:
h2: h2:
url: jdbc:h2:~/memorydb url: jdbc:h2:~/memorydb
......
...@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.storage; ...@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.storage;
import org.skywalking.apm.collector.core.module.Module; import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.storage.service.DAOService; import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.base.dao.IBatchDAO;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
...@@ -33,6 +34,6 @@ public class StorageModule extends Module { ...@@ -33,6 +34,6 @@ public class StorageModule extends Module {
} }
@Override public Class[] services() { @Override public Class[] services() {
return new Class[] {DAOService.class}; return new Class[] {DAOService.class, IBatchDAO.class};
} }
} }
...@@ -18,8 +18,10 @@ ...@@ -18,8 +18,10 @@
package org.skywalking.apm.collector.storage.base.dao; package org.skywalking.apm.collector.storage.base.dao;
import org.skywalking.apm.collector.core.module.Service;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public interface DAO { public interface DAO extends Service {
} }
...@@ -29,6 +29,8 @@ import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; ...@@ -29,6 +29,8 @@ import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.storage.StorageException; import org.skywalking.apm.collector.storage.StorageException;
import org.skywalking.apm.collector.storage.StorageModule; import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.DAOContainer; import org.skywalking.apm.collector.storage.base.dao.DAOContainer;
import org.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.skywalking.apm.collector.storage.es.base.dao.BatchEsDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO; import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAODefineLoader; import org.skywalking.apm.collector.storage.es.base.dao.EsDAODefineLoader;
import org.skywalking.apm.collector.storage.es.base.define.ElasticSearchStorageInstaller; import org.skywalking.apm.collector.storage.es.base.define.ElasticSearchStorageInstaller;
...@@ -72,6 +74,10 @@ public class StorageModuleEsProvider extends ModuleProvider { ...@@ -72,6 +74,10 @@ public class StorageModuleEsProvider extends ModuleProvider {
elasticSearchClient = new ElasticSearchClient(clusterName, clusterTransportSniffer, clusterNodes); elasticSearchClient = new ElasticSearchClient(clusterName, clusterTransportSniffer, clusterNodes);
this.registerServiceImplementation(DAOService.class, new ElasticSearchDAOService(daoContainer)); this.registerServiceImplementation(DAOService.class, new ElasticSearchDAOService(daoContainer));
BatchEsDAO batchEsDAO = new BatchEsDAO();
batchEsDAO.setClient(elasticSearchClient);
this.registerServiceImplementation(IBatchDAO.class, batchEsDAO);
} }
@Override public void start(Properties config) throws ServiceNotProvidedException { @Override public void start(Properties config) throws ServiceNotProvidedException {
......
...@@ -29,6 +29,8 @@ import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; ...@@ -29,6 +29,8 @@ import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.storage.StorageException; import org.skywalking.apm.collector.storage.StorageException;
import org.skywalking.apm.collector.storage.StorageModule; import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.DAOContainer; import org.skywalking.apm.collector.storage.base.dao.DAOContainer;
import org.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.BatchH2DAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO; import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAODefineLoader; import org.skywalking.apm.collector.storage.h2.base.dao.H2DAODefineLoader;
import org.skywalking.apm.collector.storage.h2.base.define.H2StorageInstaller; import org.skywalking.apm.collector.storage.h2.base.define.H2StorageInstaller;
...@@ -70,6 +72,7 @@ public class StorageModuleH2Provider extends ModuleProvider { ...@@ -70,6 +72,7 @@ public class StorageModuleH2Provider extends ModuleProvider {
client = new H2Client(url, userName, password); client = new H2Client(url, userName, password);
this.registerServiceImplementation(DAOService.class, new H2DAOService(daoContainer)); this.registerServiceImplementation(DAOService.class, new H2DAOService(daoContainer));
this.registerServiceImplementation(IBatchDAO.class, new BatchH2DAO());
} }
@Override public void start(Properties config) throws ServiceNotProvidedException { @Override public void start(Properties config) throws ServiceNotProvidedException {
......
...@@ -19,17 +19,9 @@ ...@@ -19,17 +19,9 @@
package org.skywalking.apm.collector.stream; package org.skywalking.apm.collector.stream;
import java.util.Properties; import java.util.Properties;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.core.module.Module; import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider; import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.queue.QueueModule;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.stream.timer.PersistenceTimer;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
...@@ -48,11 +40,6 @@ public class StreamModuleProvider extends ModuleProvider { ...@@ -48,11 +40,6 @@ public class StreamModuleProvider extends ModuleProvider {
} }
@Override public void start(Properties config) throws ServiceNotProvidedException { @Override public void start(Properties config) throws ServiceNotProvidedException {
PersistenceTimer persistenceTimer = new PersistenceTimer();
QueueCreatorService queueCreatorService = getManager().find(QueueModule.NAME).getService(QueueCreatorService.class);
RemoteSenderService remoteSenderService = getManager().find(RemoteModule.NAME).getService(RemoteSenderService.class);
DAOService daoService = getManager().find(StorageModule.NAME).getService(DAOService.class);
persistenceTimer.start(daoService);
} }
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException { @Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
...@@ -60,6 +47,6 @@ public class StreamModuleProvider extends ModuleProvider { ...@@ -60,6 +47,6 @@ public class StreamModuleProvider extends ModuleProvider {
} }
@Override public String[] requiredModules() { @Override public String[] requiredModules() {
return new String[] {RemoteModule.NAME, QueueModule.NAME, StorageModule.NAME, CacheModule.NAME}; return new String[] {};
} }
} }
...@@ -22,11 +22,11 @@ import java.util.ArrayList; ...@@ -22,11 +22,11 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.IBatchDAO; import org.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.stream.worker.base.WorkerException; import org.skywalking.apm.collector.stream.worker.base.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker; import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorkerContainer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -37,19 +37,19 @@ public class PersistenceTimer { ...@@ -37,19 +37,19 @@ public class PersistenceTimer {
private final Logger logger = LoggerFactory.getLogger(PersistenceTimer.class); private final Logger logger = LoggerFactory.getLogger(PersistenceTimer.class);
public void start(DAOService daoService) { public void start(ModuleManager moduleManager, List<PersistenceWorker> persistenceWorkers) {
logger.info("persistence timer start"); logger.info("persistence timer start");
//TODO timer value config //TODO timer value config
// final long timeInterval = EsConfig.Es.Persistence.Timer.VALUE * 1000; // final long timeInterval = EsConfig.Es.Persistence.Timer.VALUE * 1000;
final long timeInterval = 3; final long timeInterval = 3;
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> extractDataAndSave(daoService), 1, timeInterval, TimeUnit.SECONDS); IBatchDAO batchDAO = moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> extractDataAndSave(batchDAO, persistenceWorkers), 1, timeInterval, TimeUnit.SECONDS);
} }
private void extractDataAndSave(DAOService daoService) { private void extractDataAndSave(IBatchDAO batchDAO, List<PersistenceWorker> persistenceWorkers) {
try { try {
List<PersistenceWorker> workers = PersistenceWorkerContainer.INSTANCE.getPersistenceWorkers();
List batchAllCollection = new ArrayList<>(); List batchAllCollection = new ArrayList<>();
workers.forEach((PersistenceWorker worker) -> { persistenceWorkers.forEach((PersistenceWorker worker) -> {
logger.debug("extract {} worker data and save", worker.getClass().getName()); logger.debug("extract {} worker data and save", worker.getClass().getName());
try { try {
worker.flushAndSwitch(); worker.flushAndSwitch();
...@@ -61,8 +61,7 @@ public class PersistenceTimer { ...@@ -61,8 +61,7 @@ public class PersistenceTimer {
} }
}); });
IBatchDAO dao = (IBatchDAO)daoService.get(IBatchDAO.class); batchDAO.batchPersistence(batchAllCollection);
dao.batchPersistence(batchAllCollection);
} catch (Throwable e) { } catch (Throwable e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
} finally { } finally {
......
...@@ -18,12 +18,28 @@ ...@@ -18,12 +18,28 @@
package org.skywalking.apm.collector.stream.worker.base; package org.skywalking.apm.collector.stream.worker.base;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public class WorkerCreateListener { public class WorkerCreateListener {
private final List<PersistenceWorker> persistenceWorkers;
public WorkerCreateListener() {
this.persistenceWorkers = new ArrayList<>();
}
public void addWorker(AbstractWorker worker) { public void addWorker(AbstractWorker worker) {
if (worker instanceof PersistenceWorker) {
persistenceWorkers.add((PersistenceWorker)worker);
}
}
public List<PersistenceWorker> getPersistenceWorkers() {
return persistenceWorkers;
} }
} }
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.worker.impl;
import java.util.ArrayList;
import java.util.List;
/**
* @author peng-yongsheng
*/
public enum PersistenceWorkerContainer {
INSTANCE;
private List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
public void addWorker(PersistenceWorker worker) {
persistenceWorkers.add(worker);
}
public List<PersistenceWorker> getPersistenceWorkers() {
return persistenceWorkers;
}
}
...@@ -25,38 +25,29 @@ ...@@ -25,38 +25,29 @@
<artifactId>apm</artifactId> <artifactId>apm</artifactId>
<version>3.2.4-2017</version> <version>3.2.4-2017</version>
<licenses>
<license>
<name>GNU GENERAL PUBLIC LICENSE V3</name>
<url>https://github.com/wu-sheng/sky-walking/blob/master/LICENSE</url>
</license>
</licenses>
<developers> <developers>
<developer> <developer>
<name>Wu Sheng</name> <name>Wu Sheng</name>
<email>wu.sheng@foxmail.com</email> <email>wu.sheng@foxmail.com</email>
<url>https://wu-sheng.github.io/me/</url> <url>https://github.com/wu-sheng</url>
</developer> <roles>
<developer> <role>Founder</role>
<name>Zhang Xin</name> <role>PMC member</role>
<url>https://github.com/ascrutae</url> </roles>
</developer>
<developer>
<name>Tan Zhen</name>
<url>https://github.com/mircoteam</url>
</developer>
<developer>
<name>Xu Yan</name>
<url>https://github.com/TastySummer</url>
</developer> </developer>
<developer> <developer>
<name>Peng Yongsheng</name> <name>Peng Yongsheng</name>
<email>8082209@qq.com</email>
<url>https://github.com/peng-yongsheng</url> <url>https://github.com/peng-yongsheng</url>
<roles>
<role>PMC member</role>
</roles>
</developer> </developer>
<developer> <developer>
<name>Dai Wen</name> <name>Zhang Xin</name>
<url>https://github.com/ascrutae</url>
<roles>
<role>PMC member</role>
</roles>
</developer> </developer>
</developers> </developers>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册