提交 34aad0f0 编写于 作者: P peng-yongsheng

jvm stream finish.

上级 714b4ccb
......@@ -42,7 +42,7 @@ import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;
import org.skywalking.apm.collector.stream.StreamModule;
/**
* @author peng-yongsheng
......@@ -82,7 +82,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class);
Server gRPCServer = managerService.createIfAbsent(host, port);
AgentStreamSingleton.getInstance(getManager(), new WorkerCreateListener());
AgentStreamSingleton.createInstanceIfAbsent(getManager());
addHandlers(gRPCServer);
}
......@@ -91,7 +91,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, GRPCManagerModule.NAME, CacheModule.NAME, RemoteModule.NAME};
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, GRPCManagerModule.NAME, CacheModule.NAME, RemoteModule.NAME, StreamModule.NAME};
}
private void addHandlers(Server gRPCServer) {
......
......@@ -68,7 +68,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
request.getMetricsList().forEach(metric -> {
long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime());
senToInstanceHeartBeatPersistenceWorker(instanceId, metric.getTime());
sendToInstanceHeartBeatPersistenceWorker(instanceId, metric.getTime());
sendToCpuMetricPersistenceWorker(instanceId, time, metric.getCpu());
sendToMemoryMetricPersistenceWorker(instanceId, time, metric.getMemoryList());
sendToMemoryPoolMetricPersistenceWorker(instanceId, time, metric.getMemoryPoolList());
......@@ -79,7 +79,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
responseObserver.onCompleted();
}
private void senToInstanceHeartBeatPersistenceWorker(int instanceId, long heartBeatTime) {
private void sendToInstanceHeartBeatPersistenceWorker(int instanceId, long heartBeatTime) {
Instance instance = new Instance(String.valueOf(instanceId));
instance.setHeartBeatTime(TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartBeatTime));
instance.setInstanceId(instanceId);
......
......@@ -36,7 +36,7 @@ import org.skywalking.apm.collector.naming.NamingModule;
import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.stream.StreamModule;
/**
* @author peng-yongsheng
......@@ -75,11 +75,9 @@ public class AgentModuleJettyProvider extends ModuleProvider {
NamingHandlerRegisterService namingHandlerRegisterService = getManager().find(NamingModule.NAME).getService(NamingHandlerRegisterService.class);
namingHandlerRegisterService.register(new AgentJettyNamingHandler(namingListener));
DAOService daoService = getManager().find(StorageModule.NAME).getService(DAOService.class);
JettyManagerService managerService = getManager().find(JettyManagerModule.NAME).getService(JettyManagerService.class);
Server jettyServer = managerService.createIfAbsent(host, port, contextPath);
addHandlers(daoService, jettyServer);
addHandlers(jettyServer);
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
......@@ -87,10 +85,10 @@ public class AgentModuleJettyProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, JettyManagerModule.NAME, CacheModule.NAME};
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, JettyManagerModule.NAME, CacheModule.NAME, StreamModule.NAME};
}
private void addHandlers(DAOService daoService, Server jettyServer) {
private void addHandlers(Server jettyServer) {
jettyServer.addHandler(new TraceSegmentServletHandler());
}
}
......@@ -18,9 +18,11 @@
package org.skywalking.apm.collector.agent.stream;
import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph;
import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.stream.timer.PersistenceTimer;
import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;
/**
......@@ -33,24 +35,35 @@ public class AgentStreamSingleton {
private final ModuleManager moduleManager;
private final WorkerCreateListener workerCreateListener;
public AgentStreamSingleton(ModuleManager moduleManager, WorkerCreateListener workerCreateListener) {
private AgentStreamSingleton(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.workerCreateListener = workerCreateListener;
createJVMGraph();
createRegisterGraph();
createTraceGraph();
this.workerCreateListener = new WorkerCreateListener();
this.create();
}
public static synchronized AgentStreamSingleton getInstance(ModuleManager moduleManager,
WorkerCreateListener workerCreateListener) {
public static synchronized AgentStreamSingleton createInstanceIfAbsent(ModuleManager moduleManager) {
if (ObjectUtils.isEmpty(INSTANCE)) {
INSTANCE = new AgentStreamSingleton(moduleManager, workerCreateListener);
INSTANCE = new AgentStreamSingleton(moduleManager);
}
return INSTANCE;
}
private void createJVMGraph() {
private void create() {
createJVMGraph();
createRegisterGraph();
createTraceGraph();
PersistenceTimer timer = new PersistenceTimer();
timer.start(moduleManager, workerCreateListener.getPersistenceWorkers());
}
private void createJVMGraph() {
JvmMetricStreamGraph jvmMetricStreamGraph = new JvmMetricStreamGraph(moduleManager, workerCreateListener);
jvmMetricStreamGraph.createCpuMetricGraph();
jvmMetricStreamGraph.createGcMetricGraph();
jvmMetricStreamGraph.createMemoryMetricGraph();
jvmMetricStreamGraph.createMemoryPoolMetricGraph();
jvmMetricStreamGraph.createHeartBeatGraph();
}
private void createRegisterGraph() {
......
......@@ -18,14 +18,22 @@
package org.skywalking.apm.collector.agent.stream.graph;
import org.skywalking.apm.collector.agent.stream.worker.jvm.CpuMetricPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.GCMetricPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.InstHeartBeatPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryMetricPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryPoolMetricPersistenceWorker;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.queue.QueueModule;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.table.jvm.CpuMetric;
import org.skywalking.apm.collector.storage.table.jvm.GCMetric;
import org.skywalking.apm.collector.storage.table.jvm.MemoryMetric;
import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.stream.worker.base.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;
/**
* @author peng-yongsheng
......@@ -38,28 +46,56 @@ public class JvmMetricStreamGraph {
public static final int CPU_METRIC_GRAPH_ID = 103;
public static final int INST_HEART_BEAT_GRAPH_ID = 104;
private final ModuleManager moduleManager;
private final WorkerCreateListener workerCreateListener;
public JvmMetricStreamGraph(ModuleManager moduleManager, WorkerCreateListener workerCreateListener) {
this.moduleManager = moduleManager;
this.workerCreateListener = workerCreateListener;
}
@SuppressWarnings("unchecked")
public Graph<GCMetric> createGcMetricGraph() {
QueueCreatorService<GCMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<GCMetric> graph = GraphManager.INSTANCE.createIfAbsent(GC_METRIC_GRAPH_ID, GCMetric.class);
graph.addNode(new GCMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}
public Graph<CpuMetric> createCpuMetricGraph() throws ProviderNotFoundException {
@SuppressWarnings("unchecked")
public Graph<CpuMetric> createCpuMetricGraph() {
QueueCreatorService<CpuMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<CpuMetric> graph = GraphManager.INSTANCE.createIfAbsent(CPU_METRIC_GRAPH_ID, CpuMetric.class);
graph.addNode(new CpuMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}
@SuppressWarnings("unchecked")
public Graph<MemoryMetric> createMemoryMetricGraph() {
QueueCreatorService<MemoryMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<MemoryMetric> graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_METRIC_GRAPH_ID, MemoryMetric.class);
graph.addNode(new MemoryMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}
@SuppressWarnings("unchecked")
public Graph<MemoryPoolMetric> createMemoryPoolMetricGraph() {
QueueCreatorService<MemoryPoolMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<MemoryPoolMetric> graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_POOL_METRIC_GRAPH_ID, MemoryPoolMetric.class);
graph.addNode(new MemoryPoolMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}
@SuppressWarnings("unchecked")
public Graph<Instance> createHeartBeatGraph() {
QueueCreatorService<Instance> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<Instance> graph = GraphManager.INSTANCE.createIfAbsent(INST_HEART_BEAT_GRAPH_ID, Instance.class);
graph.addNode(new InstHeartBeatPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}
}
......@@ -31,8 +31,6 @@ import org.skywalking.apm.collector.queue.QueueModule;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.storage.table.register.ServiceName;
......@@ -57,7 +55,6 @@ public class RegisterStreamGraph {
@SuppressWarnings("unchecked")
public Graph<Application> createApplicationRegisterGraph() {
DAOService daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);
QueueCreatorService<Application> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
......@@ -70,7 +67,6 @@ public class RegisterStreamGraph {
@SuppressWarnings("unchecked")
public Graph<Instance> createInstanceRegisterGraph() {
DAOService daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);
QueueCreatorService<Instance> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
......@@ -83,7 +79,6 @@ public class RegisterStreamGraph {
@SuppressWarnings("unchecked")
public Graph<ServiceName> createServiceNameRegisterGraph() {
DAOService daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);
QueueCreatorService<ServiceName> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
......
......@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.agent.stream.worker.register;
import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.ApplicationCacheService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
......@@ -37,12 +38,13 @@ public class ApplicationIDService {
private final Logger logger = LoggerFactory.getLogger(ApplicationIDService.class);
private final ModuleManager moduleManager;
private final Graph<Application> applicationRegisterGraph;
public ApplicationIDService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.applicationRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.APPLICATION_REGISTER_GRAPH_ID, Application.class);
}
@SuppressWarnings("unchecked")
public int getOrCreate(String applicationCode) throws ModuleNotFoundException, ServiceNotProvidedException {
ApplicationCacheService service = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
int applicationId = service.get(applicationCode);
......@@ -52,7 +54,7 @@ public class ApplicationIDService {
application.setApplicationCode(applicationCode);
application.setApplicationId(0);
GraphManager.INSTANCE.findGraph(RegisterStreamGraph.APPLICATION_REGISTER_GRAPH_ID).start(application);
applicationRegisterGraph.start(application);
}
return applicationId;
}
......
......@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.agent.stream.worker.register;
import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
......@@ -40,12 +41,13 @@ public class InstanceIDService {
private final Logger logger = LoggerFactory.getLogger(InstanceIDService.class);
private final ModuleManager moduleManager;
private final Graph<Instance> instanceRegisterGraph;
public InstanceIDService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.instanceRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.INSTANCE_REGISTER_GRAPH_ID, Instance.class);
}
@SuppressWarnings("unchecked")
public int getOrCreate(int applicationId, String agentUUID, long registerTime,
String osInfo) throws ModuleNotFoundException, ServiceNotProvidedException {
logger.debug("get or create instance id, application id: {}, agentUUID: {}, registerTime: {}, osInfo: {}", applicationId, agentUUID, registerTime, osInfo);
......@@ -61,7 +63,7 @@ public class InstanceIDService {
instance.setInstanceId(0);
instance.setOsInfo(osInfo);
GraphManager.INSTANCE.findGraph(RegisterStreamGraph.INSTANCE_REGISTER_GRAPH_ID).start(instance);
instanceRegisterGraph.start(instance);
}
return instanceId;
}
......
......@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.agent.stream.worker.register;
import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.ServiceIdCacheService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.storage.table.register.ServiceName;
......@@ -35,12 +36,13 @@ public class ServiceNameService {
private final Logger logger = LoggerFactory.getLogger(ServiceNameService.class);
private final ModuleManager moduleManager;
private final Graph<ServiceName> serviceNameRegisterGraph;
public ServiceNameService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.serviceNameRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.APPLICATION_REGISTER_GRAPH_ID, ServiceName.class);
}
@SuppressWarnings("unchecked")
public int getOrCreate(int applicationId, String serviceName) {
ServiceIdCacheService idCacheService = moduleManager.find(CacheModule.NAME).getService(ServiceIdCacheService.class);
int serviceId = idCacheService.get(applicationId, serviceName);
......@@ -51,7 +53,7 @@ public class ServiceNameService {
service.setServiceName(serviceName);
service.setServiceId(0);
GraphManager.INSTANCE.findGraph(RegisterStreamGraph.SERVICE_NAME_REGISTER_GRAPH_ID).start(service);
serviceNameRegisterGraph.start(service);
}
return serviceId;
}
......
......@@ -17,7 +17,7 @@
~ Project repository: https://github.com/OpenSkywalking/skywalking
-->
<Configuration status="info">
<Configuration status="debug">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
......@@ -26,9 +26,9 @@
<Loggers>
<logger name="org.eclipse.jetty" level="INFO"/>
<logger name="org.apache.zookeeper" level="INFO"/>
<logger name="org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer" level="INFO"/>
<logger name="io.grpc.netty.NettyServerHandler" level="INFO"/>
<Root level="info">
<logger name="org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer" level="debug"/>
<logger name="io.grpc.netty" level="INFO"/>
<Root level="debug">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
......
......@@ -24,6 +24,8 @@ jetty_manager:
jetty:
gRPC_manager:
gRPC:
stream:
worker:
storage:
h2:
url: jdbc:h2:~/memorydb
......
......@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.storage;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.base.dao.IBatchDAO;
/**
* @author peng-yongsheng
......@@ -33,6 +34,6 @@ public class StorageModule extends Module {
}
@Override public Class[] services() {
return new Class[] {DAOService.class};
return new Class[] {DAOService.class, IBatchDAO.class};
}
}
......@@ -18,8 +18,10 @@
package org.skywalking.apm.collector.storage.base.dao;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface DAO {
public interface DAO extends Service {
}
......@@ -29,6 +29,8 @@ import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.storage.StorageException;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.DAOContainer;
import org.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.skywalking.apm.collector.storage.es.base.dao.BatchEsDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAODefineLoader;
import org.skywalking.apm.collector.storage.es.base.define.ElasticSearchStorageInstaller;
......@@ -72,6 +74,10 @@ public class StorageModuleEsProvider extends ModuleProvider {
elasticSearchClient = new ElasticSearchClient(clusterName, clusterTransportSniffer, clusterNodes);
this.registerServiceImplementation(DAOService.class, new ElasticSearchDAOService(daoContainer));
BatchEsDAO batchEsDAO = new BatchEsDAO();
batchEsDAO.setClient(elasticSearchClient);
this.registerServiceImplementation(IBatchDAO.class, batchEsDAO);
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
......
......@@ -29,6 +29,8 @@ import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.storage.StorageException;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.DAOContainer;
import org.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.BatchH2DAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAODefineLoader;
import org.skywalking.apm.collector.storage.h2.base.define.H2StorageInstaller;
......@@ -70,6 +72,7 @@ public class StorageModuleH2Provider extends ModuleProvider {
client = new H2Client(url, userName, password);
this.registerServiceImplementation(DAOService.class, new H2DAOService(daoContainer));
this.registerServiceImplementation(IBatchDAO.class, new BatchH2DAO());
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
......
......@@ -19,17 +19,9 @@
package org.skywalking.apm.collector.stream;
import java.util.Properties;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.queue.QueueModule;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.stream.timer.PersistenceTimer;
/**
* @author peng-yongsheng
......@@ -48,11 +40,6 @@ public class StreamModuleProvider extends ModuleProvider {
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
PersistenceTimer persistenceTimer = new PersistenceTimer();
QueueCreatorService queueCreatorService = getManager().find(QueueModule.NAME).getService(QueueCreatorService.class);
RemoteSenderService remoteSenderService = getManager().find(RemoteModule.NAME).getService(RemoteSenderService.class);
DAOService daoService = getManager().find(StorageModule.NAME).getService(DAOService.class);
persistenceTimer.start(daoService);
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
......@@ -60,6 +47,6 @@ public class StreamModuleProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
return new String[] {RemoteModule.NAME, QueueModule.NAME, StorageModule.NAME, CacheModule.NAME};
return new String[] {};
}
}
......@@ -22,11 +22,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorkerContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -37,19 +37,19 @@ public class PersistenceTimer {
private final Logger logger = LoggerFactory.getLogger(PersistenceTimer.class);
public void start(DAOService daoService) {
public void start(ModuleManager moduleManager, List<PersistenceWorker> persistenceWorkers) {
logger.info("persistence timer start");
//TODO timer value config
// final long timeInterval = EsConfig.Es.Persistence.Timer.VALUE * 1000;
final long timeInterval = 3;
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> extractDataAndSave(daoService), 1, timeInterval, TimeUnit.SECONDS);
IBatchDAO batchDAO = moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> extractDataAndSave(batchDAO, persistenceWorkers), 1, timeInterval, TimeUnit.SECONDS);
}
private void extractDataAndSave(DAOService daoService) {
private void extractDataAndSave(IBatchDAO batchDAO, List<PersistenceWorker> persistenceWorkers) {
try {
List<PersistenceWorker> workers = PersistenceWorkerContainer.INSTANCE.getPersistenceWorkers();
List batchAllCollection = new ArrayList<>();
workers.forEach((PersistenceWorker worker) -> {
persistenceWorkers.forEach((PersistenceWorker worker) -> {
logger.debug("extract {} worker data and save", worker.getClass().getName());
try {
worker.flushAndSwitch();
......@@ -61,8 +61,7 @@ public class PersistenceTimer {
}
});
IBatchDAO dao = (IBatchDAO)daoService.get(IBatchDAO.class);
dao.batchPersistence(batchAllCollection);
batchDAO.batchPersistence(batchAllCollection);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
} finally {
......
......@@ -18,12 +18,28 @@
package org.skywalking.apm.collector.stream.worker.base;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
/**
* @author peng-yongsheng
*/
public class WorkerCreateListener {
private final List<PersistenceWorker> persistenceWorkers;
public WorkerCreateListener() {
this.persistenceWorkers = new ArrayList<>();
}
public void addWorker(AbstractWorker worker) {
if (worker instanceof PersistenceWorker) {
persistenceWorkers.add((PersistenceWorker)worker);
}
}
public List<PersistenceWorker> getPersistenceWorkers() {
return persistenceWorkers;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.worker.impl;
import java.util.ArrayList;
import java.util.List;
/**
* @author peng-yongsheng
*/
public enum PersistenceWorkerContainer {
INSTANCE;
private List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
public void addWorker(PersistenceWorker worker) {
persistenceWorkers.add(worker);
}
public List<PersistenceWorker> getPersistenceWorkers() {
return persistenceWorkers;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册