From 34aad0f00a57e50b11ffc85953f720313e2a54ec Mon Sep 17 00:00:00 2001 From: peng-yongsheng <8082209@qq.com> Date: Tue, 14 Nov 2017 23:40:39 +0800 Subject: [PATCH] jvm stream finish. --- .../agent/grpc/AgentModuleGRPCProvider.java | 6 +-- .../handler/JVMMetricsServiceHandler.java | 4 +- .../agent/jetty/AgentModuleJettyProvider.java | 10 ++--- .../agent/stream/AgentStreamSingleton.java | 31 +++++++++----- .../stream/graph/JvmMetricStreamGraph.java | 40 ++++++++++++++++++- .../stream/graph/RegisterStreamGraph.java | 5 --- .../worker/register/ApplicationIDService.java | 6 ++- .../worker/register/InstanceIDService.java | 6 ++- .../worker/register/ServiceNameService.java | 6 ++- .../src/main/resources/log4j2.xml | 8 ++-- .../main/resources/application-default.yml | 2 + .../apm/collector/storage/StorageModule.java | 3 +- .../apm/collector/storage/base/dao/DAO.java | 4 +- .../storage/es/StorageModuleEsProvider.java | 6 +++ .../storage/h2/StorageModuleH2Provider.java | 3 ++ .../stream/StreamModuleProvider.java | 15 +------ .../stream/timer/PersistenceTimer.java | 17 ++++---- .../worker/base/WorkerCreateListener.java | 16 ++++++++ .../impl/PersistenceWorkerContainer.java | 39 ------------------ 19 files changed, 126 insertions(+), 101 deletions(-) delete mode 100644 apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/impl/PersistenceWorkerContainer.java diff --git a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/AgentModuleGRPCProvider.java b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/AgentModuleGRPCProvider.java index f13aea73c3..8b13cfd82d 100644 --- a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/AgentModuleGRPCProvider.java +++ b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/AgentModuleGRPCProvider.java @@ -42,7 +42,7 @@ import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService; import org.skywalking.apm.collector.remote.RemoteModule; import org.skywalking.apm.collector.server.Server; import org.skywalking.apm.collector.storage.StorageModule; -import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener; +import org.skywalking.apm.collector.stream.StreamModule; /** * @author peng-yongsheng @@ -82,7 +82,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider { GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class); Server gRPCServer = managerService.createIfAbsent(host, port); - AgentStreamSingleton.getInstance(getManager(), new WorkerCreateListener()); + AgentStreamSingleton.createInstanceIfAbsent(getManager()); addHandlers(gRPCServer); } @@ -91,7 +91,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider { } @Override public String[] requiredModules() { - return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, GRPCManagerModule.NAME, CacheModule.NAME, RemoteModule.NAME}; + return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, GRPCManagerModule.NAME, CacheModule.NAME, RemoteModule.NAME, StreamModule.NAME}; } private void addHandlers(Server gRPCServer) { diff --git a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/JVMMetricsServiceHandler.java b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/JVMMetricsServiceHandler.java index 6aff9a20c0..e1d86f431a 100644 --- a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/JVMMetricsServiceHandler.java +++ b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/JVMMetricsServiceHandler.java @@ -68,7 +68,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe request.getMetricsList().forEach(metric -> { long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime()); - senToInstanceHeartBeatPersistenceWorker(instanceId, metric.getTime()); + sendToInstanceHeartBeatPersistenceWorker(instanceId, metric.getTime()); sendToCpuMetricPersistenceWorker(instanceId, time, metric.getCpu()); sendToMemoryMetricPersistenceWorker(instanceId, time, metric.getMemoryList()); sendToMemoryPoolMetricPersistenceWorker(instanceId, time, metric.getMemoryPoolList()); @@ -79,7 +79,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe responseObserver.onCompleted(); } - private void senToInstanceHeartBeatPersistenceWorker(int instanceId, long heartBeatTime) { + private void sendToInstanceHeartBeatPersistenceWorker(int instanceId, long heartBeatTime) { Instance instance = new Instance(String.valueOf(instanceId)); instance.setHeartBeatTime(TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartBeatTime)); instance.setInstanceId(instanceId); diff --git a/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/AgentModuleJettyProvider.java b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/AgentModuleJettyProvider.java index 6a57ab3106..68ac588053 100644 --- a/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/AgentModuleJettyProvider.java +++ b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/AgentModuleJettyProvider.java @@ -36,7 +36,7 @@ import org.skywalking.apm.collector.naming.NamingModule; import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService; import org.skywalking.apm.collector.server.Server; import org.skywalking.apm.collector.storage.StorageModule; -import org.skywalking.apm.collector.storage.service.DAOService; +import org.skywalking.apm.collector.stream.StreamModule; /** * @author peng-yongsheng @@ -75,11 +75,9 @@ public class AgentModuleJettyProvider extends ModuleProvider { NamingHandlerRegisterService namingHandlerRegisterService = getManager().find(NamingModule.NAME).getService(NamingHandlerRegisterService.class); namingHandlerRegisterService.register(new AgentJettyNamingHandler(namingListener)); - DAOService daoService = getManager().find(StorageModule.NAME).getService(DAOService.class); - JettyManagerService managerService = getManager().find(JettyManagerModule.NAME).getService(JettyManagerService.class); Server jettyServer = managerService.createIfAbsent(host, port, contextPath); - addHandlers(daoService, jettyServer); + addHandlers(jettyServer); } @Override public void notifyAfterCompleted() throws ServiceNotProvidedException { @@ -87,10 +85,10 @@ public class AgentModuleJettyProvider extends ModuleProvider { } @Override public String[] requiredModules() { - return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, JettyManagerModule.NAME, CacheModule.NAME}; + return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, JettyManagerModule.NAME, CacheModule.NAME, StreamModule.NAME}; } - private void addHandlers(DAOService daoService, Server jettyServer) { + private void addHandlers(Server jettyServer) { jettyServer.addHandler(new TraceSegmentServletHandler()); } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/AgentStreamSingleton.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/AgentStreamSingleton.java index 010a8653f1..45c1f13f94 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/AgentStreamSingleton.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/AgentStreamSingleton.java @@ -18,9 +18,11 @@ package org.skywalking.apm.collector.agent.stream; +import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph; import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.util.ObjectUtils; +import org.skywalking.apm.collector.stream.timer.PersistenceTimer; import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener; /** @@ -33,24 +35,35 @@ public class AgentStreamSingleton { private final ModuleManager moduleManager; private final WorkerCreateListener workerCreateListener; - public AgentStreamSingleton(ModuleManager moduleManager, WorkerCreateListener workerCreateListener) { + private AgentStreamSingleton(ModuleManager moduleManager) { this.moduleManager = moduleManager; - this.workerCreateListener = workerCreateListener; - createJVMGraph(); - createRegisterGraph(); - createTraceGraph(); + this.workerCreateListener = new WorkerCreateListener(); + this.create(); } - public static synchronized AgentStreamSingleton getInstance(ModuleManager moduleManager, - WorkerCreateListener workerCreateListener) { + public static synchronized AgentStreamSingleton createInstanceIfAbsent(ModuleManager moduleManager) { if (ObjectUtils.isEmpty(INSTANCE)) { - INSTANCE = new AgentStreamSingleton(moduleManager, workerCreateListener); + INSTANCE = new AgentStreamSingleton(moduleManager); } return INSTANCE; } - private void createJVMGraph() { + private void create() { + createJVMGraph(); + createRegisterGraph(); + createTraceGraph(); + PersistenceTimer timer = new PersistenceTimer(); + timer.start(moduleManager, workerCreateListener.getPersistenceWorkers()); + } + + private void createJVMGraph() { + JvmMetricStreamGraph jvmMetricStreamGraph = new JvmMetricStreamGraph(moduleManager, workerCreateListener); + jvmMetricStreamGraph.createCpuMetricGraph(); + jvmMetricStreamGraph.createGcMetricGraph(); + jvmMetricStreamGraph.createMemoryMetricGraph(); + jvmMetricStreamGraph.createMemoryPoolMetricGraph(); + jvmMetricStreamGraph.createHeartBeatGraph(); } private void createRegisterGraph() { diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/JvmMetricStreamGraph.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/JvmMetricStreamGraph.java index ff599f5355..defdcca6a1 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/JvmMetricStreamGraph.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/JvmMetricStreamGraph.java @@ -18,14 +18,22 @@ package org.skywalking.apm.collector.agent.stream.graph; +import org.skywalking.apm.collector.agent.stream.worker.jvm.CpuMetricPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.jvm.GCMetricPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.jvm.InstHeartBeatPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryMetricPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryPoolMetricPersistenceWorker; import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.GraphManager; +import org.skywalking.apm.collector.core.module.ModuleManager; +import org.skywalking.apm.collector.queue.QueueModule; +import org.skywalking.apm.collector.queue.service.QueueCreatorService; import org.skywalking.apm.collector.storage.table.jvm.CpuMetric; import org.skywalking.apm.collector.storage.table.jvm.GCMetric; import org.skywalking.apm.collector.storage.table.jvm.MemoryMetric; import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric; import org.skywalking.apm.collector.storage.table.register.Instance; -import org.skywalking.apm.collector.stream.worker.base.ProviderNotFoundException; +import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener; /** * @author peng-yongsheng @@ -38,28 +46,56 @@ public class JvmMetricStreamGraph { public static final int CPU_METRIC_GRAPH_ID = 103; public static final int INST_HEART_BEAT_GRAPH_ID = 104; + private final ModuleManager moduleManager; + private final WorkerCreateListener workerCreateListener; + + public JvmMetricStreamGraph(ModuleManager moduleManager, WorkerCreateListener workerCreateListener) { + this.moduleManager = moduleManager; + this.workerCreateListener = workerCreateListener; + } + + @SuppressWarnings("unchecked") public Graph createGcMetricGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + Graph graph = GraphManager.INSTANCE.createIfAbsent(GC_METRIC_GRAPH_ID, GCMetric.class); + graph.addNode(new GCMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); return graph; } - public Graph createCpuMetricGraph() throws ProviderNotFoundException { + @SuppressWarnings("unchecked") + public Graph createCpuMetricGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + Graph graph = GraphManager.INSTANCE.createIfAbsent(CPU_METRIC_GRAPH_ID, CpuMetric.class); + graph.addNode(new CpuMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); return graph; } + @SuppressWarnings("unchecked") public Graph createMemoryMetricGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + Graph graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_METRIC_GRAPH_ID, MemoryMetric.class); + graph.addNode(new MemoryMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); return graph; } + @SuppressWarnings("unchecked") public Graph createMemoryPoolMetricGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + Graph graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_POOL_METRIC_GRAPH_ID, MemoryPoolMetric.class); + graph.addNode(new MemoryPoolMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); return graph; } + @SuppressWarnings("unchecked") public Graph createHeartBeatGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + Graph graph = GraphManager.INSTANCE.createIfAbsent(INST_HEART_BEAT_GRAPH_ID, Instance.class); + graph.addNode(new InstHeartBeatPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); return graph; } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/RegisterStreamGraph.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/RegisterStreamGraph.java index 00f26ef7d4..915259f580 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/RegisterStreamGraph.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/RegisterStreamGraph.java @@ -31,8 +31,6 @@ import org.skywalking.apm.collector.queue.QueueModule; import org.skywalking.apm.collector.queue.service.QueueCreatorService; import org.skywalking.apm.collector.remote.RemoteModule; import org.skywalking.apm.collector.remote.service.RemoteSenderService; -import org.skywalking.apm.collector.storage.StorageModule; -import org.skywalking.apm.collector.storage.service.DAOService; import org.skywalking.apm.collector.storage.table.register.Application; import org.skywalking.apm.collector.storage.table.register.Instance; import org.skywalking.apm.collector.storage.table.register.ServiceName; @@ -57,7 +55,6 @@ public class RegisterStreamGraph { @SuppressWarnings("unchecked") public Graph createApplicationRegisterGraph() { - DAOService daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class); RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); @@ -70,7 +67,6 @@ public class RegisterStreamGraph { @SuppressWarnings("unchecked") public Graph createInstanceRegisterGraph() { - DAOService daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class); RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); @@ -83,7 +79,6 @@ public class RegisterStreamGraph { @SuppressWarnings("unchecked") public Graph createServiceNameRegisterGraph() { - DAOService daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class); RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationIDService.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationIDService.java index b51dac2264..bdd80b658b 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationIDService.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationIDService.java @@ -21,6 +21,7 @@ package org.skywalking.apm.collector.agent.stream.worker.register; import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph; import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.service.ApplicationCacheService; +import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.module.ModuleNotFoundException; @@ -37,12 +38,13 @@ public class ApplicationIDService { private final Logger logger = LoggerFactory.getLogger(ApplicationIDService.class); private final ModuleManager moduleManager; + private final Graph applicationRegisterGraph; public ApplicationIDService(ModuleManager moduleManager) { this.moduleManager = moduleManager; + this.applicationRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.APPLICATION_REGISTER_GRAPH_ID, Application.class); } - @SuppressWarnings("unchecked") public int getOrCreate(String applicationCode) throws ModuleNotFoundException, ServiceNotProvidedException { ApplicationCacheService service = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class); int applicationId = service.get(applicationCode); @@ -52,7 +54,7 @@ public class ApplicationIDService { application.setApplicationCode(applicationCode); application.setApplicationId(0); - GraphManager.INSTANCE.findGraph(RegisterStreamGraph.APPLICATION_REGISTER_GRAPH_ID).start(application); + applicationRegisterGraph.start(application); } return applicationId; } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/InstanceIDService.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/InstanceIDService.java index bc758dc4f8..82e01a87a5 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/InstanceIDService.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/InstanceIDService.java @@ -21,6 +21,7 @@ package org.skywalking.apm.collector.agent.stream.worker.register; import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph; import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.service.InstanceCacheService; +import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.module.ModuleNotFoundException; @@ -40,12 +41,13 @@ public class InstanceIDService { private final Logger logger = LoggerFactory.getLogger(InstanceIDService.class); private final ModuleManager moduleManager; + private final Graph instanceRegisterGraph; public InstanceIDService(ModuleManager moduleManager) { this.moduleManager = moduleManager; + this.instanceRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.INSTANCE_REGISTER_GRAPH_ID, Instance.class); } - @SuppressWarnings("unchecked") public int getOrCreate(int applicationId, String agentUUID, long registerTime, String osInfo) throws ModuleNotFoundException, ServiceNotProvidedException { logger.debug("get or create instance id, application id: {}, agentUUID: {}, registerTime: {}, osInfo: {}", applicationId, agentUUID, registerTime, osInfo); @@ -61,7 +63,7 @@ public class InstanceIDService { instance.setInstanceId(0); instance.setOsInfo(osInfo); - GraphManager.INSTANCE.findGraph(RegisterStreamGraph.INSTANCE_REGISTER_GRAPH_ID).start(instance); + instanceRegisterGraph.start(instance); } return instanceId; } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ServiceNameService.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ServiceNameService.java index ccecae1b5a..cbf98107aa 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ServiceNameService.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ServiceNameService.java @@ -21,6 +21,7 @@ package org.skywalking.apm.collector.agent.stream.worker.register; import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph; import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.service.ServiceIdCacheService; +import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.storage.table.register.ServiceName; @@ -35,12 +36,13 @@ public class ServiceNameService { private final Logger logger = LoggerFactory.getLogger(ServiceNameService.class); private final ModuleManager moduleManager; + private final Graph serviceNameRegisterGraph; public ServiceNameService(ModuleManager moduleManager) { this.moduleManager = moduleManager; + this.serviceNameRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.APPLICATION_REGISTER_GRAPH_ID, ServiceName.class); } - @SuppressWarnings("unchecked") public int getOrCreate(int applicationId, String serviceName) { ServiceIdCacheService idCacheService = moduleManager.find(CacheModule.NAME).getService(ServiceIdCacheService.class); int serviceId = idCacheService.get(applicationId, serviceName); @@ -51,7 +53,7 @@ public class ServiceNameService { service.setServiceName(serviceName); service.setServiceId(0); - GraphManager.INSTANCE.findGraph(RegisterStreamGraph.SERVICE_NAME_REGISTER_GRAPH_ID).start(service); + serviceNameRegisterGraph.start(service); } return serviceId; } diff --git a/apm-collector/apm-collector-boot/src/main/resources/log4j2.xml b/apm-collector/apm-collector-boot/src/main/resources/log4j2.xml index a76ab8c130..2665f80d2d 100644 --- a/apm-collector/apm-collector-boot/src/main/resources/log4j2.xml +++ b/apm-collector/apm-collector-boot/src/main/resources/log4j2.xml @@ -17,7 +17,7 @@ ~ Project repository: https://github.com/OpenSkywalking/skywalking --> - + @@ -26,9 +26,9 @@ - - - + + + diff --git a/apm-collector/apm-collector-core/src/main/resources/application-default.yml b/apm-collector/apm-collector-core/src/main/resources/application-default.yml index 6d3aefc733..a778381898 100644 --- a/apm-collector/apm-collector-core/src/main/resources/application-default.yml +++ b/apm-collector/apm-collector-core/src/main/resources/application-default.yml @@ -24,6 +24,8 @@ jetty_manager: jetty: gRPC_manager: gRPC: +stream: + worker: storage: h2: url: jdbc:h2:~/memorydb diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/StorageModule.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/StorageModule.java index 92a0d7fada..944e553444 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/StorageModule.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/StorageModule.java @@ -20,6 +20,7 @@ package org.skywalking.apm.collector.storage; import org.skywalking.apm.collector.core.module.Module; import org.skywalking.apm.collector.storage.service.DAOService; +import org.skywalking.apm.collector.storage.base.dao.IBatchDAO; /** * @author peng-yongsheng @@ -33,6 +34,6 @@ public class StorageModule extends Module { } @Override public Class[] services() { - return new Class[] {DAOService.class}; + return new Class[] {DAOService.class, IBatchDAO.class}; } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/base/dao/DAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/base/dao/DAO.java index e01e256c22..31ad2f6214 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/base/dao/DAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/base/dao/DAO.java @@ -18,8 +18,10 @@ package org.skywalking.apm.collector.storage.base.dao; +import org.skywalking.apm.collector.core.module.Service; + /** * @author peng-yongsheng */ -public interface DAO { +public interface DAO extends Service { } diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java index 2d3a810809..404b4187a4 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java @@ -29,6 +29,8 @@ import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; import org.skywalking.apm.collector.storage.StorageException; import org.skywalking.apm.collector.storage.StorageModule; import org.skywalking.apm.collector.storage.base.dao.DAOContainer; +import org.skywalking.apm.collector.storage.base.dao.IBatchDAO; +import org.skywalking.apm.collector.storage.es.base.dao.BatchEsDAO; import org.skywalking.apm.collector.storage.es.base.dao.EsDAO; import org.skywalking.apm.collector.storage.es.base.dao.EsDAODefineLoader; import org.skywalking.apm.collector.storage.es.base.define.ElasticSearchStorageInstaller; @@ -72,6 +74,10 @@ public class StorageModuleEsProvider extends ModuleProvider { elasticSearchClient = new ElasticSearchClient(clusterName, clusterTransportSniffer, clusterNodes); this.registerServiceImplementation(DAOService.class, new ElasticSearchDAOService(daoContainer)); + + BatchEsDAO batchEsDAO = new BatchEsDAO(); + batchEsDAO.setClient(elasticSearchClient); + this.registerServiceImplementation(IBatchDAO.class, batchEsDAO); } @Override public void start(Properties config) throws ServiceNotProvidedException { diff --git a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java index ce7cb558f6..90835ef78b 100644 --- a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java +++ b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java @@ -29,6 +29,8 @@ import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; import org.skywalking.apm.collector.storage.StorageException; import org.skywalking.apm.collector.storage.StorageModule; import org.skywalking.apm.collector.storage.base.dao.DAOContainer; +import org.skywalking.apm.collector.storage.base.dao.IBatchDAO; +import org.skywalking.apm.collector.storage.h2.base.dao.BatchH2DAO; import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO; import org.skywalking.apm.collector.storage.h2.base.dao.H2DAODefineLoader; import org.skywalking.apm.collector.storage.h2.base.define.H2StorageInstaller; @@ -70,6 +72,7 @@ public class StorageModuleH2Provider extends ModuleProvider { client = new H2Client(url, userName, password); this.registerServiceImplementation(DAOService.class, new H2DAOService(daoContainer)); + this.registerServiceImplementation(IBatchDAO.class, new BatchH2DAO()); } @Override public void start(Properties config) throws ServiceNotProvidedException { diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/StreamModuleProvider.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/StreamModuleProvider.java index 6516b6dd7f..0e73dc8d25 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/StreamModuleProvider.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/StreamModuleProvider.java @@ -19,17 +19,9 @@ package org.skywalking.apm.collector.stream; import java.util.Properties; -import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.core.module.Module; import org.skywalking.apm.collector.core.module.ModuleProvider; import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; -import org.skywalking.apm.collector.queue.QueueModule; -import org.skywalking.apm.collector.queue.service.QueueCreatorService; -import org.skywalking.apm.collector.remote.RemoteModule; -import org.skywalking.apm.collector.remote.service.RemoteSenderService; -import org.skywalking.apm.collector.storage.StorageModule; -import org.skywalking.apm.collector.storage.service.DAOService; -import org.skywalking.apm.collector.stream.timer.PersistenceTimer; /** * @author peng-yongsheng @@ -48,11 +40,6 @@ public class StreamModuleProvider extends ModuleProvider { } @Override public void start(Properties config) throws ServiceNotProvidedException { - PersistenceTimer persistenceTimer = new PersistenceTimer(); - QueueCreatorService queueCreatorService = getManager().find(QueueModule.NAME).getService(QueueCreatorService.class); - RemoteSenderService remoteSenderService = getManager().find(RemoteModule.NAME).getService(RemoteSenderService.class); - DAOService daoService = getManager().find(StorageModule.NAME).getService(DAOService.class); - persistenceTimer.start(daoService); } @Override public void notifyAfterCompleted() throws ServiceNotProvidedException { @@ -60,6 +47,6 @@ public class StreamModuleProvider extends ModuleProvider { } @Override public String[] requiredModules() { - return new String[] {RemoteModule.NAME, QueueModule.NAME, StorageModule.NAME, CacheModule.NAME}; + return new String[] {}; } } diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/timer/PersistenceTimer.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/timer/PersistenceTimer.java index d69aa01493..770effb76c 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/timer/PersistenceTimer.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/timer/PersistenceTimer.java @@ -22,11 +22,11 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.skywalking.apm.collector.core.module.ModuleManager; +import org.skywalking.apm.collector.storage.StorageModule; import org.skywalking.apm.collector.storage.base.dao.IBatchDAO; -import org.skywalking.apm.collector.storage.service.DAOService; import org.skywalking.apm.collector.stream.worker.base.WorkerException; import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker; -import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorkerContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,19 +37,19 @@ public class PersistenceTimer { private final Logger logger = LoggerFactory.getLogger(PersistenceTimer.class); - public void start(DAOService daoService) { + public void start(ModuleManager moduleManager, List persistenceWorkers) { logger.info("persistence timer start"); //TODO timer value config // final long timeInterval = EsConfig.Es.Persistence.Timer.VALUE * 1000; final long timeInterval = 3; - Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> extractDataAndSave(daoService), 1, timeInterval, TimeUnit.SECONDS); + IBatchDAO batchDAO = moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class); + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> extractDataAndSave(batchDAO, persistenceWorkers), 1, timeInterval, TimeUnit.SECONDS); } - private void extractDataAndSave(DAOService daoService) { + private void extractDataAndSave(IBatchDAO batchDAO, List persistenceWorkers) { try { - List workers = PersistenceWorkerContainer.INSTANCE.getPersistenceWorkers(); List batchAllCollection = new ArrayList<>(); - workers.forEach((PersistenceWorker worker) -> { + persistenceWorkers.forEach((PersistenceWorker worker) -> { logger.debug("extract {} worker data and save", worker.getClass().getName()); try { worker.flushAndSwitch(); @@ -61,8 +61,7 @@ public class PersistenceTimer { } }); - IBatchDAO dao = (IBatchDAO)daoService.get(IBatchDAO.class); - dao.batchPersistence(batchAllCollection); + batchDAO.batchPersistence(batchAllCollection); } catch (Throwable e) { logger.error(e.getMessage(), e); } finally { diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerCreateListener.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerCreateListener.java index bc8903065e..5107b10d0a 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerCreateListener.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerCreateListener.java @@ -18,12 +18,28 @@ package org.skywalking.apm.collector.stream.worker.base; +import java.util.ArrayList; +import java.util.List; +import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker; + /** * @author peng-yongsheng */ public class WorkerCreateListener { + private final List persistenceWorkers; + + public WorkerCreateListener() { + this.persistenceWorkers = new ArrayList<>(); + } + public void addWorker(AbstractWorker worker) { + if (worker instanceof PersistenceWorker) { + persistenceWorkers.add((PersistenceWorker)worker); + } + } + public List getPersistenceWorkers() { + return persistenceWorkers; } } diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/impl/PersistenceWorkerContainer.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/impl/PersistenceWorkerContainer.java deleted file mode 100644 index 9dd5e43300..0000000000 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/impl/PersistenceWorkerContainer.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.stream.worker.impl; - -import java.util.ArrayList; -import java.util.List; - -/** - * @author peng-yongsheng - */ -public enum PersistenceWorkerContainer { - INSTANCE; - - private List persistenceWorkers = new ArrayList<>(); - - public void addWorker(PersistenceWorker worker) { - persistenceWorkers.add(worker); - } - - public List getPersistenceWorkers() { - return persistenceWorkers; - } -} -- GitLab