提交 c10b7b18 编写于 作者: P peng-yongsheng

Change all daos to be services.

上级 b744d402
......@@ -23,7 +23,6 @@ import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ICpuMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.jvm.CpuMetric;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
......@@ -46,7 +45,7 @@ public class CpuMetricPersistenceWorker extends PersistenceWorker<CpuMetric, Cpu
}
@Override protected IPersistenceDAO persistenceDAO() {
return getModuleManager().find(StorageModule.NAME).getService(DAOService.class).getPersistenceDAO(ICpuMetricPersistenceDAO.class);
return getModuleManager().find(StorageModule.NAME).getService(ICpuMetricPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<CpuMetric, CpuMetric, CpuMetricPersistenceWorker> {
......
......@@ -23,7 +23,6 @@ import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IGCMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.jvm.GCMetric;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
......@@ -33,11 +32,8 @@ import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
*/
public class GCMetricPersistenceWorker extends PersistenceWorker<GCMetric, GCMetric> {
private final DAOService daoService;
public GCMetricPersistenceWorker(ModuleManager moduleManager) {
super(moduleManager);
this.daoService = getModuleManager().find(StorageModule.NAME).getService(DAOService.class);
}
@Override public int id() {
......@@ -49,7 +45,7 @@ public class GCMetricPersistenceWorker extends PersistenceWorker<GCMetric, GCMet
}
@Override protected IPersistenceDAO persistenceDAO() {
return daoService.getPersistenceDAO(IGCMetricPersistenceDAO.class);
return getModuleManager().find(StorageModule.NAME).getService(IGCMetricPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<GCMetric, GCMetric, GCMetricPersistenceWorker> {
......
......@@ -23,7 +23,6 @@ import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
......@@ -33,11 +32,8 @@ import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
*/
public class InstHeartBeatPersistenceWorker extends PersistenceWorker<Instance, Instance> {
private final DAOService daoService;
public InstHeartBeatPersistenceWorker(ModuleManager moduleManager) {
super(moduleManager);
this.daoService = getModuleManager().find(StorageModule.NAME).getService(DAOService.class);
}
@Override public int id() {
......@@ -49,7 +45,7 @@ public class InstHeartBeatPersistenceWorker extends PersistenceWorker<Instance,
}
@Override protected IPersistenceDAO persistenceDAO() {
return daoService.getPersistenceDAO(IInstanceHeartBeatPersistenceDAO.class);
return getModuleManager().find(StorageModule.NAME).getService(IInstanceHeartBeatPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<Instance, Instance, InstHeartBeatPersistenceWorker> {
......
......@@ -23,7 +23,6 @@ import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.jvm.MemoryMetric;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
......@@ -33,11 +32,8 @@ import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
*/
public class MemoryMetricPersistenceWorker extends PersistenceWorker<MemoryMetric, MemoryMetric> {
private final DAOService daoService;
public MemoryMetricPersistenceWorker(ModuleManager moduleManager) {
super(moduleManager);
this.daoService = getModuleManager().find(StorageModule.NAME).getService(DAOService.class);
}
@Override public int id() {
......@@ -49,7 +45,7 @@ public class MemoryMetricPersistenceWorker extends PersistenceWorker<MemoryMetri
}
@Override protected IPersistenceDAO persistenceDAO() {
return daoService.getPersistenceDAO(IMemoryMetricPersistenceDAO.class);
return getModuleManager().find(StorageModule.NAME).getService(IMemoryMetricPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<MemoryMetric, MemoryMetric, MemoryMetricPersistenceWorker> {
......
......@@ -23,7 +23,6 @@ import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryPoolMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
......@@ -37,11 +36,8 @@ public class MemoryPoolMetricPersistenceWorker extends PersistenceWorker<MemoryP
return 0;
}
private final DAOService daoService;
public MemoryPoolMetricPersistenceWorker(ModuleManager moduleManager) {
super(moduleManager);
this.daoService = getModuleManager().find(StorageModule.NAME).getService(DAOService.class);
}
@Override protected boolean needMergeDBData() {
......@@ -49,7 +45,7 @@ public class MemoryPoolMetricPersistenceWorker extends PersistenceWorker<MemoryP
}
@Override protected IPersistenceDAO persistenceDAO() {
return daoService.getPersistenceDAO(IMemoryPoolMetricPersistenceDAO.class);
return getModuleManager().find(StorageModule.NAME).getService(IMemoryPoolMetricPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<MemoryPoolMetric, MemoryPoolMetric, MemoryPoolMetricPersistenceWorker> {
......
......@@ -25,8 +25,7 @@ import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.dao.IApplicationStreamDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.dao.IApplicationRegisterDAO;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
......@@ -41,12 +40,12 @@ public class ApplicationRegisterSerialWorker extends AbstractLocalAsyncWorker<Ap
private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterSerialWorker.class);
private final DAOService daoService;
private final IApplicationRegisterDAO applicationRegisterDAO;
private final ApplicationCacheService applicationCacheService;
public ApplicationRegisterSerialWorker(ModuleManager moduleManager) {
super(moduleManager);
this.daoService = getModuleManager().find(StorageModule.NAME).getService(DAOService.class);
this.applicationRegisterDAO = getModuleManager().find(StorageModule.NAME).getService(IApplicationRegisterDAO.class);
this.applicationCacheService = getModuleManager().find(CacheModule.NAME).getService(ApplicationCacheService.class);
}
......@@ -59,27 +58,26 @@ public class ApplicationRegisterSerialWorker extends AbstractLocalAsyncWorker<Ap
int applicationId = applicationCacheService.get(application.getApplicationCode());
if (applicationId == 0) {
IApplicationStreamDAO dao = (IApplicationStreamDAO)daoService.get(IApplicationStreamDAO.class);
Application newApplication;
int min = dao.getMinApplicationId();
int min = applicationRegisterDAO.getMinApplicationId();
if (min == 0) {
Application userApplication = new Application(String.valueOf(Const.USER_ID));
userApplication.setApplicationCode(Const.USER_CODE);
userApplication.setApplicationId(Const.USER_ID);
dao.save(userApplication);
applicationRegisterDAO.save(userApplication);
newApplication = new Application("-1");
newApplication.setApplicationId(-1);
newApplication.setApplicationCode(application.getApplicationCode());
} else {
int max = dao.getMaxApplicationId();
int max = applicationRegisterDAO.getMaxApplicationId();
applicationId = IdAutoIncrement.INSTANCE.increment(min, max);
newApplication = new Application(String.valueOf(applicationId));
newApplication.setApplicationId(applicationId);
newApplication.setApplicationCode(application.getApplicationCode());
}
dao.save(newApplication);
applicationRegisterDAO.save(newApplication);
}
}
......
......@@ -27,8 +27,7 @@ import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.dao.IInstanceStreamDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.dao.IInstanceRegisterDAO;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -42,10 +41,12 @@ public class InstanceIDService {
private final ModuleManager moduleManager;
private final Graph<Instance> instanceRegisterGraph;
private final IInstanceRegisterDAO instanceRegisterDAO;
public InstanceIDService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.instanceRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.INSTANCE_REGISTER_GRAPH_ID, Instance.class);
this.instanceRegisterDAO = moduleManager.find(StorageModule.NAME).getService(IInstanceRegisterDAO.class);
}
public int getOrCreate(int applicationId, String agentUUID, long registerTime,
......@@ -71,8 +72,6 @@ public class InstanceIDService {
public void recover(int instanceId, int applicationId, long registerTime,
String osInfo) throws ModuleNotFoundException, ServiceNotProvidedException {
logger.debug("instance recover, instance id: {}, application id: {}, register time: {}", instanceId, applicationId, registerTime);
DAOService daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class);
IInstanceStreamDAO dao = (IInstanceStreamDAO)daoService.get(IInstanceStreamDAO.class);
Instance instance = new Instance(String.valueOf(instanceId));
instance.setApplicationId(applicationId);
......@@ -81,6 +80,6 @@ public class InstanceIDService {
instance.setHeartBeatTime(registerTime);
instance.setInstanceId(instanceId);
instance.setOsInfo(osInfo);
dao.save(instance);
instanceRegisterDAO.save(instance);
}
}
......@@ -23,8 +23,7 @@ import org.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.dao.IInstanceStreamDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.dao.IInstanceRegisterDAO;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
......@@ -40,12 +39,12 @@ public class InstanceRegisterSerialWorker extends AbstractLocalAsyncWorker<Insta
private final Logger logger = LoggerFactory.getLogger(InstanceRegisterSerialWorker.class);
private final InstanceCacheService instanceCacheService;
private final DAOService daoService;
private final IInstanceRegisterDAO instanceRegisterDAO;
public InstanceRegisterSerialWorker(ModuleManager moduleManager) {
super(moduleManager);
this.instanceCacheService = getModuleManager().find(CacheModule.NAME).getService(InstanceCacheService.class);
this.daoService = getModuleManager().find(StorageModule.NAME).getService(DAOService.class);
this.instanceRegisterDAO = getModuleManager().find(StorageModule.NAME).getService(IInstanceRegisterDAO.class);
}
@Override public int id() {
......@@ -56,11 +55,10 @@ public class InstanceRegisterSerialWorker extends AbstractLocalAsyncWorker<Insta
logger.debug("register instance, application id: {}, agentUUID: {}", instance.getApplicationId(), instance.getAgentUUID());
int instanceId = instanceCacheService.getInstanceId(instance.getApplicationId(), instance.getAgentUUID());
if (instanceId == 0) {
IInstanceStreamDAO dao = (IInstanceStreamDAO)daoService.get(IInstanceStreamDAO.class);
Instance newInstance;
int min = dao.getMinInstanceId();
int max = dao.getMaxInstanceId();
int min = instanceRegisterDAO.getMinInstanceId();
int max = instanceRegisterDAO.getMaxInstanceId();
if (min == 0 && max == 0) {
newInstance = new Instance("1");
newInstance.setInstanceId(1);
......@@ -78,7 +76,7 @@ public class InstanceRegisterSerialWorker extends AbstractLocalAsyncWorker<Insta
newInstance.setOsInfo(instance.getOsInfo());
newInstance.setRegisterTime(instance.getRegisterTime());
}
dao.save(newInstance);
instanceRegisterDAO.save(newInstance);
}
}
......
......@@ -25,8 +25,7 @@ import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.dao.IServiceNameStreamDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.dao.IServiceNameRegisterDAO;
import org.skywalking.apm.collector.storage.table.register.ServiceName;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
......@@ -41,12 +40,12 @@ public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker<Se
private final Logger logger = LoggerFactory.getLogger(ServiceNameRegisterSerialWorker.class);
private final DAOService daoService;
private final IServiceNameRegisterDAO serviceNameRegisterDAO;
private final ServiceIdCacheService serviceIdCacheService;
public ServiceNameRegisterSerialWorker(ModuleManager moduleManager) {
super(moduleManager);
this.daoService = getModuleManager().find(StorageModule.NAME).getService(DAOService.class);
this.serviceNameRegisterDAO = getModuleManager().find(StorageModule.NAME).getService(IServiceNameRegisterDAO.class);
this.serviceIdCacheService = getModuleManager().find(CacheModule.NAME).getService(ServiceIdCacheService.class);
}
......@@ -58,23 +57,22 @@ public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker<Se
logger.debug("register service name: {}, application id: {}", serviceName.getServiceName(), serviceName.getApplicationId());
int serviceId = serviceIdCacheService.get(serviceName.getApplicationId(), serviceName.getServiceName());
if (serviceId == 0) {
IServiceNameStreamDAO dao = (IServiceNameStreamDAO)daoService.get(IServiceNameStreamDAO.class);
ServiceName newServiceName;
int min = dao.getMinServiceId();
int min = serviceNameRegisterDAO.getMinServiceId();
if (min == 0) {
ServiceName noneServiceName = new ServiceName("1");
noneServiceName.setApplicationId(0);
noneServiceName.setServiceId(Const.NONE_SERVICE_ID);
noneServiceName.setServiceName(Const.NONE_SERVICE_NAME);
dao.save(noneServiceName);
serviceNameRegisterDAO.save(noneServiceName);
newServiceName = new ServiceName("-1");
newServiceName.setApplicationId(serviceName.getApplicationId());
newServiceName.setServiceId(-1);
newServiceName.setServiceName(serviceName.getServiceName());
} else {
int max = dao.getMaxServiceId();
int max = serviceNameRegisterDAO.getMaxServiceId();
serviceId = IdAutoIncrement.INSTANCE.increment(min, max);
newServiceName = new ServiceName(String.valueOf(serviceId));
......@@ -82,7 +80,7 @@ public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker<Se
newServiceName.setServiceId(serviceId);
newServiceName.setServiceName(serviceName.getServiceName());
}
dao.save(newServiceName);
serviceNameRegisterDAO.save(newServiceName);
}
}
......
......@@ -23,7 +23,6 @@ import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.global.GlobalTrace;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
......@@ -33,11 +32,8 @@ import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
*/
public class GlobalTracePersistenceWorker extends PersistenceWorker<GlobalTrace, GlobalTrace> {
private final DAOService daoService;
public GlobalTracePersistenceWorker(ModuleManager moduleManager) {
super(moduleManager);
this.daoService = getModuleManager().find(StorageModule.NAME).getService(DAOService.class);
}
@Override public int id() {
......@@ -49,7 +45,7 @@ public class GlobalTracePersistenceWorker extends PersistenceWorker<GlobalTrace,
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IGlobalTracePersistenceDAO)daoService.get(IGlobalTracePersistenceDAO.class);
return getModuleManager().find(StorageModule.NAME).getService(IGlobalTracePersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<GlobalTrace, GlobalTrace, GlobalTracePersistenceWorker> {
......
......@@ -23,7 +23,6 @@ import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IInstPerformancePersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.instance.InstPerformance;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
......@@ -33,11 +32,8 @@ import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
*/
public class InstPerformancePersistenceWorker extends PersistenceWorker<InstPerformance, InstPerformance> {
private final DAOService daoService;
public InstPerformancePersistenceWorker(ModuleManager moduleManager) {
super(moduleManager);
this.daoService = getModuleManager().find(StorageModule.NAME).getService(DAOService.class);
}
@Override public int id() {
......@@ -49,7 +45,7 @@ public class InstPerformancePersistenceWorker extends PersistenceWorker<InstPerf
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)daoService.get(IInstPerformancePersistenceDAO.class);
return getModuleManager().find(StorageModule.NAME).getService(IInstPerformancePersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<InstPerformance, InstPerformance, InstPerformancePersistenceWorker> {
......
......@@ -23,7 +23,6 @@ import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeComponentPersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.node.NodeComponent;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
......@@ -33,11 +32,8 @@ import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
*/
public class NodeComponentPersistenceWorker extends PersistenceWorker<NodeComponent, NodeComponent> {
private final DAOService daoService;
public NodeComponentPersistenceWorker(ModuleManager moduleManager) {
super(moduleManager);
this.daoService = getModuleManager().find(StorageModule.NAME).getService(DAOService.class);
}
@Override public int id() {
......@@ -49,7 +45,7 @@ public class NodeComponentPersistenceWorker extends PersistenceWorker<NodeCompon
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)daoService.get(INodeComponentPersistenceDAO.class);
return getModuleManager().find(StorageModule.NAME).getService(INodeComponentPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeComponent, NodeComponent, NodeComponentPersistenceWorker> {
......
......@@ -23,7 +23,6 @@ import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeMappingPersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.node.NodeMapping;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
......@@ -33,11 +32,8 @@ import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
*/
public class NodeMappingPersistenceWorker extends PersistenceWorker<NodeMapping, NodeMapping> {
private final DAOService daoService;
public NodeMappingPersistenceWorker(ModuleManager moduleManager) {
super(moduleManager);
this.daoService = getModuleManager().find(StorageModule.NAME).getService(DAOService.class);
}
@Override public int id() {
......@@ -49,7 +45,7 @@ public class NodeMappingPersistenceWorker extends PersistenceWorker<NodeMapping,
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)daoService.get(INodeMappingPersistenceDAO.class);
return getModuleManager().find(StorageModule.NAME).getService(INodeMappingPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeMapping, NodeMapping, NodeMappingPersistenceWorker> {
......
......@@ -23,7 +23,6 @@ import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeReferencePersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.noderef.NodeReference;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
......@@ -33,11 +32,8 @@ import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
*/
public class NodeReferencePersistenceWorker extends PersistenceWorker<NodeReference, NodeReference> {
private final DAOService daoService;
public NodeReferencePersistenceWorker(ModuleManager moduleManager) {
super(moduleManager);
this.daoService = getModuleManager().find(StorageModule.NAME).getService(DAOService.class);
}
@Override public int id() {
......@@ -49,7 +45,7 @@ public class NodeReferencePersistenceWorker extends PersistenceWorker<NodeRefere
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)daoService.get(INodeReferencePersistenceDAO.class);
return getModuleManager().find(StorageModule.NAME).getService(INodeReferencePersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeReference, NodeReference, NodeReferencePersistenceWorker> {
......
......@@ -23,7 +23,6 @@ import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentCostPersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.segment.SegmentCost;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
......@@ -33,11 +32,8 @@ import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
*/
public class SegmentCostPersistenceWorker extends PersistenceWorker<SegmentCost, SegmentCost> {
private final DAOService daoService;
public SegmentCostPersistenceWorker(ModuleManager moduleManager) {
super(moduleManager);
this.daoService = getModuleManager().find(StorageModule.NAME).getService(DAOService.class);
}
@Override public int id() {
......@@ -49,7 +45,7 @@ public class SegmentCostPersistenceWorker extends PersistenceWorker<SegmentCost,
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)daoService.get(ISegmentCostPersistenceDAO.class);
return getModuleManager().find(StorageModule.NAME).getService(ISegmentCostPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<SegmentCost, SegmentCost, SegmentCostPersistenceWorker> {
......
......@@ -23,7 +23,6 @@ import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.segment.Segment;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
......@@ -46,8 +45,7 @@ public class SegmentPersistenceWorker extends PersistenceWorker<Segment, Segment
}
@Override protected IPersistenceDAO persistenceDAO() {
DAOService daoService = getModuleManager().find(StorageModule.NAME).getService(DAOService.class);
return (IPersistenceDAO)daoService.get(ISegmentPersistenceDAO.class);
return getModuleManager().find(StorageModule.NAME).getService(ISegmentPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<Segment, Segment, SegmentPersistenceWorker> {
......
......@@ -23,7 +23,6 @@ import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IServiceEntryPersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.service.ServiceEntry;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
......@@ -33,11 +32,8 @@ import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
*/
public class ServiceEntryPersistenceWorker extends PersistenceWorker<ServiceEntry, ServiceEntry> {
private final DAOService daoService;
public ServiceEntryPersistenceWorker(ModuleManager moduleManager) {
super(moduleManager);
this.daoService = getModuleManager().find(StorageModule.NAME).getService(DAOService.class);
}
@Override public int id() {
......@@ -49,7 +45,7 @@ public class ServiceEntryPersistenceWorker extends PersistenceWorker<ServiceEntr
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)daoService.get(IServiceEntryPersistenceDAO.class);
return getModuleManager().find(StorageModule.NAME).getService(IServiceEntryPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceEntry, ServiceEntry, ServiceEntryPersistenceWorker> {
......
......@@ -23,7 +23,6 @@ import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IServiceReferencePersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
......@@ -33,11 +32,8 @@ import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
*/
public class ServiceReferencePersistenceWorker extends PersistenceWorker<ServiceReference, ServiceReference> {
private final DAOService daoService;
public ServiceReferencePersistenceWorker(ModuleManager moduleManager) {
super(moduleManager);
this.daoService = getModuleManager().find(StorageModule.NAME).getService(DAOService.class);
}
@Override public int id() {
......@@ -49,7 +45,7 @@ public class ServiceReferencePersistenceWorker extends PersistenceWorker<Service
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)daoService.get(IServiceReferencePersistenceDAO.class);
return getModuleManager().find(StorageModule.NAME).getService(IServiceReferencePersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceReference, ServiceReference, ServiceReferencePersistenceWorker> {
......
......@@ -26,7 +26,6 @@ import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.dao.IApplicationCacheDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -39,24 +38,22 @@ public class ApplicationCacheGuavaService implements ApplicationCacheService {
private final Cache<String, Integer> codeCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(1000).build();
private final DAOService daoService;
private final IApplicationCacheDAO applicationCacheDAO;
public ApplicationCacheGuavaService(ModuleManager moduleManager) {
this.daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class);
this.applicationCacheDAO = moduleManager.find(StorageModule.NAME).getService(IApplicationCacheDAO.class);
}
public int get(String applicationCode) {
IApplicationCacheDAO dao = (IApplicationCacheDAO)daoService.get(IApplicationCacheDAO.class);
int applicationId = 0;
try {
applicationId = codeCache.get(applicationCode, () -> dao.getApplicationId(applicationCode));
applicationId = codeCache.get(applicationCode, () -> applicationCacheDAO.getApplicationId(applicationCode));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (applicationId == 0) {
applicationId = dao.getApplicationId(applicationCode);
applicationId = applicationCacheDAO.getApplicationId(applicationCode);
if (applicationId != 0) {
codeCache.put(applicationCode, applicationId);
}
......@@ -67,17 +64,15 @@ public class ApplicationCacheGuavaService implements ApplicationCacheService {
private final Cache<Integer, String> idCache = CacheBuilder.newBuilder().maximumSize(1000).build();
public String get(int applicationId) {
IApplicationCacheDAO dao = (IApplicationCacheDAO)daoService.get(IApplicationCacheDAO.class);
String applicationCode = Const.EMPTY_STRING;
try {
applicationCode = idCache.get(applicationId, () -> dao.getApplicationCode(applicationId));
applicationCode = idCache.get(applicationId, () -> applicationCacheDAO.getApplicationCode(applicationId));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (StringUtils.isEmpty(applicationCode)) {
applicationCode = dao.getApplicationCode(applicationId);
applicationCode = applicationCacheDAO.getApplicationCode(applicationId);
if (StringUtils.isNotEmpty(applicationCode)) {
codeCache.put(applicationCode, applicationId);
}
......
......@@ -25,7 +25,6 @@ import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.dao.IInstanceCacheDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -40,24 +39,23 @@ public class InstanceCacheGuavaService implements InstanceCacheService {
private final Cache<String, Integer> stringCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(5000).build();
private final DAOService daoService;
private final IInstanceCacheDAO instanceCacheDAO;
public InstanceCacheGuavaService(ModuleManager moduleManager) {
this.daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class);
this.instanceCacheDAO = moduleManager.find(StorageModule.NAME).getService(IInstanceCacheDAO.class);
}
public int get(int applicationInstanceId) {
IInstanceCacheDAO dao = (IInstanceCacheDAO)daoService.get(IInstanceCacheDAO.class);
int applicationId = 0;
try {
applicationId = integerCache.get(applicationInstanceId, () -> dao.getApplicationId(applicationInstanceId));
applicationId = integerCache.get(applicationInstanceId, () -> instanceCacheDAO.getApplicationId(applicationInstanceId));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (applicationId == 0) {
applicationId = dao.getApplicationId(applicationInstanceId);
applicationId = instanceCacheDAO.getApplicationId(applicationInstanceId);
if (applicationId != 0) {
integerCache.put(applicationInstanceId, applicationId);
}
......@@ -66,18 +64,17 @@ public class InstanceCacheGuavaService implements InstanceCacheService {
}
@Override public int getInstanceId(int applicationId, String agentUUID) {
IInstanceCacheDAO dao = (IInstanceCacheDAO)daoService.get(IInstanceCacheDAO.class);
String key = applicationId + Const.ID_SPLIT + agentUUID;
int instanceId = 0;
try {
instanceId = stringCache.get(key, () -> dao.getInstanceId(applicationId, agentUUID));
instanceId = stringCache.get(key, () -> instanceCacheDAO.getInstanceId(applicationId, agentUUID));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (instanceId == 0) {
instanceId = dao.getInstanceId(applicationId, agentUUID);
instanceId = instanceCacheDAO.getInstanceId(applicationId, agentUUID);
if (applicationId != 0) {
stringCache.put(key, instanceId);
}
......
......@@ -25,7 +25,6 @@ import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -38,24 +37,22 @@ public class ServiceIdCacheGuavaService implements ServiceIdCacheService {
private final Cache<String, Integer> serviceIdCache = CacheBuilder.newBuilder().maximumSize(1000).build();
private final DAOService daoService;
private final IServiceNameCacheDAO serviceNameCacheDAO;
public ServiceIdCacheGuavaService(ModuleManager moduleManager) {
this.daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class);
this.serviceNameCacheDAO = moduleManager.find(StorageModule.NAME).getService(IServiceNameCacheDAO.class);
}
public int get(int applicationId, String serviceName) {
IServiceNameCacheDAO dao = (IServiceNameCacheDAO)daoService.get(IServiceNameCacheDAO.class);
int serviceId = 0;
try {
serviceId = serviceIdCache.get(applicationId + Const.ID_SPLIT + serviceName, () -> dao.getServiceId(applicationId, serviceName));
serviceId = serviceIdCache.get(applicationId + Const.ID_SPLIT + serviceName, () -> serviceNameCacheDAO.getServiceId(applicationId, serviceName));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (serviceId == 0) {
serviceId = dao.getServiceId(applicationId, serviceName);
serviceId = serviceNameCacheDAO.getServiceId(applicationId, serviceName);
if (serviceId != 0) {
serviceIdCache.put(applicationId + Const.ID_SPLIT + serviceName, serviceId);
}
......
......@@ -26,7 +26,6 @@ import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -39,24 +38,22 @@ public class ServiceNameCacheGuavaService implements ServiceNameCacheService {
private final Cache<Integer, String> serviceNameCache = CacheBuilder.newBuilder().maximumSize(10000).build();
private final DAOService daoService;
private final IServiceNameCacheDAO serviceNameCacheDAO;
public ServiceNameCacheGuavaService(ModuleManager moduleManager) {
this.daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class);
this.serviceNameCacheDAO = moduleManager.find(StorageModule.NAME).getService(IServiceNameCacheDAO.class);
}
public String get(int serviceId) {
IServiceNameCacheDAO dao = (IServiceNameCacheDAO)daoService.get(IServiceNameCacheDAO.class);
String serviceName = Const.EMPTY_STRING;
try {
serviceName = serviceNameCache.get(serviceId, () -> dao.getServiceName(serviceId));
serviceName = serviceNameCache.get(serviceId, () -> serviceNameCacheDAO.getServiceName(serviceId));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (StringUtils.isEmpty(serviceName)) {
serviceName = dao.getServiceName(serviceId);
serviceName = serviceNameCacheDAO.getServiceName(serviceId);
if (StringUtils.isNotEmpty(serviceName)) {
serviceNameCache.put(serviceId, serviceName);
}
......
......@@ -18,9 +18,44 @@
package org.skywalking.apm.collector.storage;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.skywalking.apm.collector.storage.dao.IApplicationCacheDAO;
import org.skywalking.apm.collector.storage.dao.IApplicationRegisterDAO;
import org.skywalking.apm.collector.storage.dao.ICpuMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ICpuMetricUIDAO;
import org.skywalking.apm.collector.storage.dao.IGCMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IGCMetricUIDAO;
import org.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IGlobalTraceUIDAO;
import org.skywalking.apm.collector.storage.dao.IInstPerformancePersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IInstPerformanceUIDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceCacheDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceRegisterDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceUIDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryMetricUIDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryPoolMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryPoolMetricUIDAO;
import org.skywalking.apm.collector.storage.dao.INodeComponentPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeComponentUIDAO;
import org.skywalking.apm.collector.storage.dao.INodeMappingPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeMappingUIDAO;
import org.skywalking.apm.collector.storage.dao.INodeReferencePersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeReferenceUIDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentCostPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentCostUIDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentUIDAO;
import org.skywalking.apm.collector.storage.dao.IServiceEntryPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IServiceEntryUIDAO;
import org.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO;
import org.skywalking.apm.collector.storage.dao.IServiceNameRegisterDAO;
import org.skywalking.apm.collector.storage.dao.IServiceReferencePersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IServiceReferenceUIDAO;
/**
* @author peng-yongsheng
......@@ -34,6 +69,64 @@ public class StorageModule extends Module {
}
@Override public Class[] services() {
return new Class[] {DAOService.class, IBatchDAO.class};
List<Class> classes = new ArrayList<>();
classes.add(IBatchDAO.class);
addCacheDAO(classes);
addRegisterDAO(classes);
addPersistenceDAO(classes);
addUiDAO(classes);
return classes.toArray(new Class[] {});
}
private void addCacheDAO(List<Class> classes) {
classes.add(IApplicationCacheDAO.class);
classes.add(IInstanceCacheDAO.class);
classes.add(IServiceNameCacheDAO.class);
}
private void addRegisterDAO(List<Class> classes) {
classes.add(IApplicationRegisterDAO.class);
classes.add(IInstanceRegisterDAO.class);
classes.add(IServiceNameRegisterDAO.class);
}
private void addPersistenceDAO(List<Class> classes) {
classes.add(ICpuMetricPersistenceDAO.class);
classes.add(IGCMetricPersistenceDAO.class);
classes.add(IMemoryMetricPersistenceDAO.class);
classes.add(IMemoryPoolMetricPersistenceDAO.class);
classes.add(IGlobalTracePersistenceDAO.class);
classes.add(IInstPerformancePersistenceDAO.class);
classes.add(INodeComponentPersistenceDAO.class);
classes.add(INodeMappingPersistenceDAO.class);
classes.add(INodeReferencePersistenceDAO.class);
classes.add(ISegmentCostPersistenceDAO.class);
classes.add(ISegmentPersistenceDAO.class);
classes.add(IServiceEntryPersistenceDAO.class);
classes.add(IServiceReferencePersistenceDAO.class);
classes.add(IInstanceHeartBeatPersistenceDAO.class);
}
private void addUiDAO(List<Class> classes) {
classes.add(IInstanceUIDAO.class);
classes.add(ICpuMetricUIDAO.class);
classes.add(IGCMetricUIDAO.class);
classes.add(IMemoryMetricUIDAO.class);
classes.add(IMemoryPoolMetricUIDAO.class);
classes.add(IGlobalTraceUIDAO.class);
classes.add(IInstPerformanceUIDAO.class);
classes.add(INodeComponentUIDAO.class);
classes.add(INodeMappingUIDAO.class);
classes.add(INodeReferenceUIDAO.class);
classes.add(ISegmentCostUIDAO.class);
classes.add(ISegmentUIDAO.class);
classes.add(IServiceEntryUIDAO.class);
classes.add(IServiceReferenceUIDAO.class);
}
}
......@@ -24,13 +24,13 @@ import org.skywalking.apm.collector.client.Client;
* @author peng-yongsheng
*/
public abstract class AbstractDAO<C extends Client> implements DAO {
private C client;
private final C client;
public final C getClient() {
return client;
public AbstractDAO(C client) {
this.client = client;
}
public final void setClient(C client) {
this.client = client;
public final C getClient() {
return client;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.base.dao;
import java.util.HashMap;
import java.util.Map;
/**
* @author peng-yongsheng
*/
public class DAOContainer {
private Map<String, DAO> daoImplMap;
private Map<String, IPersistenceDAO> persistenceDaoImpl;
public DAOContainer() {
daoImplMap = new HashMap<>();
persistenceDaoImpl = new HashMap<>();
}
public void put(String interfaceName, DAO daoImpl) {
if (daoImpl instanceof IPersistenceDAO) {
persistenceDaoImpl.put(interfaceName, (IPersistenceDAO)daoImpl);
} else {
daoImplMap.put(interfaceName, daoImpl);
}
}
public DAO get(String interfaceName) {
return daoImplMap.get(interfaceName);
}
public IPersistenceDAO getPersistenceDAO(String interfaceName) {
return persistenceDaoImpl.get(interfaceName);
}
}
......@@ -24,7 +24,7 @@ import org.skywalking.apm.collector.storage.table.register.Application;
/**
* @author peng-yongsheng
*/
public interface IApplicationStreamDAO extends DAO {
public interface IApplicationRegisterDAO extends DAO {
int getMaxApplicationId();
int getMinApplicationId();
......
......@@ -24,7 +24,7 @@ import org.skywalking.apm.collector.storage.table.register.Instance;
/**
* @author peng-yongsheng
*/
public interface IInstanceStreamDAO extends DAO {
public interface IInstanceRegisterDAO extends DAO {
int getMaxInstanceId();
int getMinInstanceId();
......
......@@ -24,7 +24,7 @@ import org.skywalking.apm.collector.storage.table.register.ServiceName;
/**
* @author peng-yongsheng
*/
public interface IServiceNameStreamDAO extends DAO {
public interface IServiceNameRegisterDAO extends DAO {
int getMaxServiceId();
int getMinServiceId();
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.service;
import org.skywalking.apm.collector.core.module.Service;
import org.skywalking.apm.collector.storage.base.dao.DAO;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
/**
* @author peng-yongsheng
*/
public interface DAOService extends Service {
DAO get(Class<? extends DAO> daoInterfaceClass);
IPersistenceDAO getPersistenceDAO(Class<? extends IPersistenceDAO> daoInterfaceClass);
}
......@@ -18,24 +18,81 @@
package org.skywalking.apm.collector.storage.es;
import java.util.List;
import java.util.Properties;
import org.skywalking.apm.collector.client.ClientException;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.define.DefineException;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.storage.StorageException;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.DAOContainer;
import org.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.skywalking.apm.collector.storage.dao.IApplicationCacheDAO;
import org.skywalking.apm.collector.storage.dao.IApplicationRegisterDAO;
import org.skywalking.apm.collector.storage.dao.ICpuMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ICpuMetricUIDAO;
import org.skywalking.apm.collector.storage.dao.IGCMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IGCMetricUIDAO;
import org.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IGlobalTraceUIDAO;
import org.skywalking.apm.collector.storage.dao.IInstPerformancePersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IInstPerformanceUIDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceCacheDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceRegisterDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceUIDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryMetricUIDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryPoolMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeComponentPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeComponentUIDAO;
import org.skywalking.apm.collector.storage.dao.INodeMappingPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeMappingUIDAO;
import org.skywalking.apm.collector.storage.dao.INodeReferencePersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeReferenceUIDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentCostPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentCostUIDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentUIDAO;
import org.skywalking.apm.collector.storage.dao.IServiceEntryPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IServiceEntryUIDAO;
import org.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO;
import org.skywalking.apm.collector.storage.dao.IServiceNameRegisterDAO;
import org.skywalking.apm.collector.storage.dao.IServiceReferencePersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.BatchEsDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAODefineLoader;
import org.skywalking.apm.collector.storage.es.base.define.ElasticSearchStorageInstaller;
import org.skywalking.apm.collector.storage.es.service.ElasticSearchDAOService;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.es.dao.ApplicationEsCacheDAO;
import org.skywalking.apm.collector.storage.es.dao.ApplicationEsRegisterDAO;
import org.skywalking.apm.collector.storage.es.dao.CpuMetricEsPersistenceDAO;
import org.skywalking.apm.collector.storage.es.dao.CpuMetricEsUIDAO;
import org.skywalking.apm.collector.storage.es.dao.GCMetricEsPersistenceDAO;
import org.skywalking.apm.collector.storage.es.dao.GCMetricEsUIDAO;
import org.skywalking.apm.collector.storage.es.dao.GlobalTraceEsPersistenceDAO;
import org.skywalking.apm.collector.storage.es.dao.GlobalTraceEsUIDAO;
import org.skywalking.apm.collector.storage.es.dao.InstPerformanceEsPersistenceDAO;
import org.skywalking.apm.collector.storage.es.dao.InstPerformanceEsUIDAO;
import org.skywalking.apm.collector.storage.es.dao.InstanceEsCacheDAO;
import org.skywalking.apm.collector.storage.es.dao.InstanceEsRegisterDAO;
import org.skywalking.apm.collector.storage.es.dao.InstanceEsUIDAO;
import org.skywalking.apm.collector.storage.es.dao.InstanceHeartBeatEsPersistenceDAO;
import org.skywalking.apm.collector.storage.es.dao.MemoryMetricEsPersistenceDAO;
import org.skywalking.apm.collector.storage.es.dao.MemoryMetricEsUIDAO;
import org.skywalking.apm.collector.storage.es.dao.MemoryPoolMetricEsPersistenceDAO;
import org.skywalking.apm.collector.storage.es.dao.NodeComponentEsPersistenceDAO;
import org.skywalking.apm.collector.storage.es.dao.NodeComponentEsUIDAO;
import org.skywalking.apm.collector.storage.es.dao.NodeMappingEsPersistenceDAO;
import org.skywalking.apm.collector.storage.es.dao.NodeMappingEsUIDAO;
import org.skywalking.apm.collector.storage.es.dao.NodeReferenceEsPersistenceDAO;
import org.skywalking.apm.collector.storage.es.dao.NodeReferenceEsUIDAO;
import org.skywalking.apm.collector.storage.es.dao.SegmentCostEsPersistenceDAO;
import org.skywalking.apm.collector.storage.es.dao.SegmentCostEsUIDAO;
import org.skywalking.apm.collector.storage.es.dao.SegmentEsPersistenceDAO;
import org.skywalking.apm.collector.storage.es.dao.SegmentEsUIDAO;
import org.skywalking.apm.collector.storage.es.dao.ServiceEntryEsPersistenceDAO;
import org.skywalking.apm.collector.storage.es.dao.ServiceEntryEsUIDAO;
import org.skywalking.apm.collector.storage.es.dao.ServiceNameEsCacheDAO;
import org.skywalking.apm.collector.storage.es.dao.ServiceNameEsRegisterDAO;
import org.skywalking.apm.collector.storage.es.dao.ServiceReferenceEsPersistenceDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -52,13 +109,8 @@ public class StorageModuleEsProvider extends ModuleProvider {
private static final String INDEX_SHARDS_NUMBER = "index_shards_number";
private static final String INDEX_REPLICAS_NUMBER = "index_replicas_number";
private final DAOContainer daoContainer;
private ElasticSearchClient elasticSearchClient;
public StorageModuleEsProvider() {
this.daoContainer = new DAOContainer();
}
@Override public String name() {
return "elasticsearch";
}
......@@ -73,11 +125,11 @@ public class StorageModuleEsProvider extends ModuleProvider {
String clusterNodes = config.getProperty(CLUSTER_NODES);
elasticSearchClient = new ElasticSearchClient(clusterName, clusterTransportSniffer, clusterNodes);
this.registerServiceImplementation(DAOService.class, new ElasticSearchDAOService(daoContainer));
BatchEsDAO batchEsDAO = new BatchEsDAO();
batchEsDAO.setClient(elasticSearchClient);
this.registerServiceImplementation(IBatchDAO.class, batchEsDAO);
this.registerServiceImplementation(IBatchDAO.class, new BatchEsDAO(elasticSearchClient));
registerCacheDAO();
registerRegisterDAO();
registerPersistenceDAO();
registerUiDAO();
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
......@@ -86,17 +138,9 @@ public class StorageModuleEsProvider extends ModuleProvider {
try {
elasticSearchClient.initialize();
EsDAODefineLoader loader = new EsDAODefineLoader();
List<EsDAO> esDAOs = loader.load();
esDAOs.forEach(esDAO -> {
esDAO.setClient(elasticSearchClient);
String interFaceName = esDAO.getClass().getInterfaces()[0].getName();
daoContainer.put(interFaceName, esDAO);
});
ElasticSearchStorageInstaller installer = new ElasticSearchStorageInstaller(indexShardsNumber, indexReplicasNumber);
installer.install(elasticSearchClient);
} catch (ClientException | DefineException | StorageException e) {
} catch (ClientException | StorageException e) {
logger.error(e.getMessage(), e);
}
}
......@@ -108,4 +152,54 @@ public class StorageModuleEsProvider extends ModuleProvider {
@Override public String[] requiredModules() {
return new String[0];
}
private void registerCacheDAO() throws ServiceNotProvidedException {
this.registerServiceImplementation(IApplicationCacheDAO.class, new ApplicationEsCacheDAO(elasticSearchClient));
this.registerServiceImplementation(IInstanceCacheDAO.class, new InstanceEsCacheDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceNameCacheDAO.class, new ServiceNameEsCacheDAO(elasticSearchClient));
}
private void registerRegisterDAO() throws ServiceNotProvidedException {
this.registerServiceImplementation(IApplicationRegisterDAO.class, new ApplicationEsRegisterDAO(elasticSearchClient));
this.registerServiceImplementation(IInstanceRegisterDAO.class, new InstanceEsRegisterDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceNameRegisterDAO.class, new ServiceNameEsRegisterDAO(elasticSearchClient));
}
private void registerPersistenceDAO() throws ServiceNotProvidedException {
this.registerServiceImplementation(ICpuMetricPersistenceDAO.class, new CpuMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IGCMetricPersistenceDAO.class, new GCMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IMemoryMetricPersistenceDAO.class, new MemoryMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IMemoryPoolMetricPersistenceDAO.class, new MemoryPoolMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IGlobalTracePersistenceDAO.class, new GlobalTraceEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IInstPerformancePersistenceDAO.class, new InstPerformanceEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(INodeComponentPersistenceDAO.class, new NodeComponentEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(INodeMappingPersistenceDAO.class, new NodeMappingEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(INodeReferencePersistenceDAO.class, new NodeReferenceEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(ISegmentCostPersistenceDAO.class, new SegmentCostEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(ISegmentPersistenceDAO.class, new SegmentEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceEntryPersistenceDAO.class, new ServiceEntryEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceReferencePersistenceDAO.class, new ServiceReferenceEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IInstanceHeartBeatPersistenceDAO.class, new InstanceHeartBeatEsPersistenceDAO(elasticSearchClient));
}
private void registerUiDAO() throws ServiceNotProvidedException {
this.registerServiceImplementation(IInstanceUIDAO.class, new InstanceEsUIDAO(elasticSearchClient));
this.registerServiceImplementation(ICpuMetricUIDAO.class, new CpuMetricEsUIDAO(elasticSearchClient));
this.registerServiceImplementation(IGCMetricUIDAO.class, new GCMetricEsUIDAO(elasticSearchClient));
this.registerServiceImplementation(IMemoryMetricUIDAO.class, new MemoryMetricEsUIDAO(elasticSearchClient));
// this.registerServiceImplementation(IMemoryPoolMetricUIDAO.class, new MemoryPoolMetricEsUIDAO(elasticSearchClient));
this.registerServiceImplementation(IGlobalTraceUIDAO.class, new GlobalTraceEsUIDAO(elasticSearchClient));
this.registerServiceImplementation(IInstPerformanceUIDAO.class, new InstPerformanceEsUIDAO(elasticSearchClient));
this.registerServiceImplementation(INodeComponentUIDAO.class, new NodeComponentEsUIDAO(elasticSearchClient));
this.registerServiceImplementation(INodeMappingUIDAO.class, new NodeMappingEsUIDAO(elasticSearchClient));
this.registerServiceImplementation(INodeReferenceUIDAO.class, new NodeReferenceEsUIDAO(elasticSearchClient));
this.registerServiceImplementation(ISegmentCostUIDAO.class, new SegmentCostEsUIDAO(elasticSearchClient));
this.registerServiceImplementation(ISegmentUIDAO.class, new SegmentEsUIDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceEntryUIDAO.class, new ServiceEntryEsUIDAO(elasticSearchClient));
// this.registerServiceImplementation(IServiceReferenceUIDAO.class, new ServiceReferenceEsUIDAO(elasticSearchClient));
}
}
......@@ -23,6 +23,7 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.slf4j.Logger;
......@@ -35,6 +36,10 @@ public class BatchEsDAO extends EsDAO implements IBatchDAO {
private final Logger logger = LoggerFactory.getLogger(BatchEsDAO.class);
public BatchEsDAO(ElasticSearchClient client) {
super(client);
}
@Override public void batchPersistence(List<?> batchCollection) {
BulkRequestBuilder bulkRequest = getClient().prepareBulk();
......
......@@ -33,6 +33,10 @@ import org.skywalking.apm.collector.storage.base.dao.AbstractDAO;
*/
public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
public EsDAO(ElasticSearchClient client) {
super(client);
}
public final int getMaxId(String indexName, String columnName) {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indexName);
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.es.base.dao;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.define.DefineException;
import org.skywalking.apm.collector.core.define.DefinitionLoader;
import org.skywalking.apm.collector.core.define.Loader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class EsDAODefineLoader implements Loader<List<EsDAO>> {
private final Logger logger = LoggerFactory.getLogger(EsDAODefineLoader.class);
@Override public List<EsDAO> load() throws DefineException {
List<EsDAO> esDAOs = new ArrayList<>();
EsDAODefinitionFile definitionFile = new EsDAODefinitionFile();
logger.info("elasticsearch dao definition file name: {}", definitionFile.fileName());
DefinitionLoader<EsDAO> definitionLoader = DefinitionLoader.load(EsDAO.class, definitionFile);
for (EsDAO dao : definitionLoader) {
logger.info("loaded elasticsearch dao definition class: {}", dao.getClass().getName());
esDAOs.add(dao);
}
return esDAOs;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.es.base.dao;
import org.skywalking.apm.collector.core.define.DefinitionFile;
/**
* @author peng-yongsheng
*/
public class EsDAODefinitionFile extends DefinitionFile {
@Override protected String fileName() {
return "es_dao.define";
}
}
......@@ -40,6 +40,10 @@ public class ApplicationEsCacheDAO extends EsDAO implements IApplicationCacheDAO
private final Logger logger = LoggerFactory.getLogger(ApplicationEsCacheDAO.class);
public ApplicationEsCacheDAO(ElasticSearchClient client) {
super(client);
}
@Override public int getApplicationId(String applicationCode) {
ElasticSearchClient client = getClient();
......
......@@ -23,7 +23,7 @@ import java.util.Map;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.dao.IApplicationStreamDAO;
import org.skywalking.apm.collector.storage.dao.IApplicationRegisterDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.skywalking.apm.collector.storage.table.register.ApplicationTable;
......@@ -33,9 +33,13 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ApplicationEsStreamDAO extends EsDAO implements IApplicationStreamDAO {
public class ApplicationEsRegisterDAO extends EsDAO implements IApplicationRegisterDAO {
private final Logger logger = LoggerFactory.getLogger(ApplicationEsStreamDAO.class);
private final Logger logger = LoggerFactory.getLogger(ApplicationEsRegisterDAO.class);
public ApplicationEsRegisterDAO(ElasticSearchClient client) {
super(client);
}
@Override public int getMaxApplicationId() {
return getMaxId(ApplicationTable.TABLE, ApplicationTable.COLUMN_APPLICATION_ID);
......
......@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.dao.ICpuMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.jvm.CpuMetric;
......@@ -36,6 +37,10 @@ public class CpuMetricEsPersistenceDAO extends EsDAO implements ICpuMetricPersis
private final Logger logger = LoggerFactory.getLogger(CpuMetricEsPersistenceDAO.class);
public CpuMetricEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override public CpuMetric get(String id) {
return null;
}
......
......@@ -23,6 +23,7 @@ import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.ICpuMetricUIDAO;
......@@ -34,6 +35,10 @@ import org.skywalking.apm.collector.storage.table.jvm.CpuMetricTable;
*/
public class CpuMetricEsUIDAO extends EsDAO implements ICpuMetricUIDAO {
public CpuMetricEsUIDAO(ElasticSearchClient client) {
super(client);
}
@Override public int getMetric(int instanceId, long timeBucket) {
String id = timeBucket + Const.ID_SPLIT + instanceId;
GetResponse getResponse = getClient().prepareGet(CpuMetricTable.TABLE, id).get();
......
......@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.dao.IGCMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.jvm.GCMetric;
......@@ -32,6 +33,10 @@ import org.skywalking.apm.collector.storage.table.jvm.GCMetricTable;
*/
public class GCMetricEsPersistenceDAO extends EsDAO implements IGCMetricPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, GCMetric> {
public GCMetricEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override public GCMetric get(String id) {
return null;
}
......
......@@ -32,6 +32,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.IGCMetricUIDAO;
......@@ -48,6 +49,10 @@ public class GCMetricEsUIDAO extends EsDAO implements IGCMetricUIDAO {
private final Logger logger = LoggerFactory.getLogger(GCMetricEsUIDAO.class);
public GCMetricEsUIDAO(ElasticSearchClient client) {
super(client);
}
@Override public GCCount getGCCount(long[] timeBuckets, int instanceId) {
logger.debug("get gc count, timeBuckets: {}, instanceId: {}", timeBuckets, instanceId);
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(GCMetricTable.TABLE);
......
......@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.UnexpectedException;
import org.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
......@@ -37,6 +38,10 @@ public class GlobalTraceEsPersistenceDAO extends EsDAO implements IGlobalTracePe
private final Logger logger = LoggerFactory.getLogger(GlobalTraceEsPersistenceDAO.class);
public GlobalTraceEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override public GlobalTrace get(String id) {
throw new UnexpectedException("There is no need to merge stream data with database data.");
}
......
......@@ -25,6 +25,7 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.dao.IGlobalTraceUIDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.global.GlobalTraceTable;
......@@ -38,6 +39,10 @@ public class GlobalTraceEsUIDAO extends EsDAO implements IGlobalTraceUIDAO {
private final Logger logger = LoggerFactory.getLogger(GlobalTraceEsUIDAO.class);
public GlobalTraceEsUIDAO(ElasticSearchClient client) {
super(client);
}
@Override public List<String> getGlobalTraceId(String segmentId) {
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(GlobalTraceTable.TABLE);
searchRequestBuilder.setTypes(GlobalTraceTable.TABLE_TYPE);
......
......@@ -23,6 +23,7 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.dao.IInstPerformancePersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.instance.InstPerformance;
......@@ -37,6 +38,10 @@ public class InstPerformanceEsPersistenceDAO extends EsDAO implements IInstPerfo
private final Logger logger = LoggerFactory.getLogger(InstPerformanceEsPersistenceDAO.class);
public InstPerformanceEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override public InstPerformance get(String id) {
GetResponse getResponse = getClient().prepareGet(InstPerformanceTable.TABLE, id).get();
if (getResponse.isExists()) {
......
......@@ -31,6 +31,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.sort.SortOrder;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.IInstPerformanceUIDAO;
......@@ -42,6 +43,10 @@ import org.skywalking.apm.collector.storage.table.instance.InstPerformanceTable;
*/
public class InstPerformanceEsUIDAO extends EsDAO implements IInstPerformanceUIDAO {
public InstPerformanceEsUIDAO(ElasticSearchClient client) {
super(client);
}
@Override public InstPerformance get(long[] timeBuckets, int instanceId) {
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(InstPerformanceTable.TABLE);
searchRequestBuilder.setTypes(InstPerformanceTable.TABLE_TYPE);
......
......@@ -39,6 +39,10 @@ public class InstanceEsCacheDAO extends EsDAO implements IInstanceCacheDAO {
private final Logger logger = LoggerFactory.getLogger(InstanceEsCacheDAO.class);
public InstanceEsCacheDAO(ElasticSearchClient client) {
super(client);
}
@Override public int getApplicationId(int instanceId) {
GetResponse response = getClient().prepareGet(InstanceTable.TABLE, String.valueOf(instanceId)).get();
if (response.isExists()) {
......
......@@ -24,7 +24,7 @@ import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.dao.IInstanceStreamDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceRegisterDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.storage.table.register.InstanceTable;
......@@ -34,9 +34,13 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class InstanceEsStreamDAO extends EsDAO implements IInstanceStreamDAO {
public class InstanceEsRegisterDAO extends EsDAO implements IInstanceRegisterDAO {
private final Logger logger = LoggerFactory.getLogger(InstanceEsStreamDAO.class);
private final Logger logger = LoggerFactory.getLogger(InstanceEsRegisterDAO.class);
public InstanceEsRegisterDAO(ElasticSearchClient client) {
super(client);
}
@Override public int getMaxInstanceId() {
return getMaxId(InstanceTable.TABLE, InstanceTable.COLUMN_INSTANCE_ID);
......
......@@ -37,6 +37,7 @@ import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCount;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortMode;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.IInstanceUIDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
......@@ -52,6 +53,10 @@ public class InstanceEsUIDAO extends EsDAO implements IInstanceUIDAO {
private final Logger logger = LoggerFactory.getLogger(InstanceEsUIDAO.class);
public InstanceEsUIDAO(ElasticSearchClient client) {
super(client);
}
@Override public Long lastHeartBeatTime() {
long fiveMinuteBefore = System.currentTimeMillis() - 5 * 60 * 1000;
fiveMinuteBefore = TimeBucketUtils.INSTANCE.getSecondTimeBucket(fiveMinuteBefore);
......
......@@ -23,6 +23,7 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.UnexpectedException;
import org.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
......@@ -38,6 +39,10 @@ public class InstanceHeartBeatEsPersistenceDAO extends EsDAO implements IInstanc
private final Logger logger = LoggerFactory.getLogger(InstanceHeartBeatEsPersistenceDAO.class);
public InstanceHeartBeatEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override public Instance get(String id) {
GetResponse getResponse = getClient().prepareGet(InstanceTable.TABLE, id).get();
if (getResponse.isExists()) {
......
......@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.jvm.MemoryMetric;
......@@ -32,6 +33,10 @@ import org.skywalking.apm.collector.storage.table.jvm.MemoryMetricTable;
*/
public class MemoryMetricEsPersistenceDAO extends EsDAO implements IMemoryMetricPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, MemoryMetric> {
public MemoryMetricEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override public MemoryMetric get(String id) {
return null;
}
......
......@@ -24,6 +24,7 @@ import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.IMemoryMetricUIDAO;
......@@ -35,6 +36,10 @@ import org.skywalking.apm.collector.storage.table.jvm.MemoryMetricTable;
*/
public class MemoryMetricEsUIDAO extends EsDAO implements IMemoryMetricUIDAO {
public MemoryMetricEsUIDAO(ElasticSearchClient client) {
super(client);
}
@Override public JsonObject getMetric(int instanceId, long timeBucket, boolean isHeap) {
String id = timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + isHeap;
GetResponse getResponse = getClient().prepareGet(MemoryMetricTable.TABLE, id).get();
......
......@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.dao.IMemoryPoolMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric;
......@@ -32,6 +33,10 @@ import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetricTable;
*/
public class MemoryPoolMetricEsPersistenceDAO extends EsDAO implements IMemoryPoolMetricPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, MemoryPoolMetric> {
public MemoryPoolMetricEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override public MemoryPoolMetric get(String id) {
return null;
}
......
......@@ -23,6 +23,7 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.dao.INodeComponentPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.node.NodeComponent;
......@@ -33,6 +34,10 @@ import org.skywalking.apm.collector.storage.table.node.NodeComponentTable;
*/
public class NodeComponentEsPersistenceDAO extends EsDAO implements INodeComponentPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, NodeComponent> {
public NodeComponentEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override public NodeComponent get(String id) {
GetResponse getResponse = getClient().prepareGet(NodeComponentTable.TABLE, id).get();
if (getResponse.isExists()) {
......
......@@ -26,6 +26,7 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.dao.INodeComponentUIDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
......@@ -41,6 +42,10 @@ public class NodeComponentEsUIDAO extends EsDAO implements INodeComponentUIDAO {
private final Logger logger = LoggerFactory.getLogger(NodeComponentEsPersistenceDAO.class);
public NodeComponentEsUIDAO(ElasticSearchClient client) {
super(client);
}
@Override public JsonArray load(long startTime, long endTime) {
logger.debug("node component load, start time: {}, end time: {}", startTime, endTime);
JsonArray nodeComponentArray = new JsonArray();
......
......@@ -23,6 +23,7 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.dao.INodeMappingPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.node.NodeMapping;
......@@ -33,6 +34,10 @@ import org.skywalking.apm.collector.storage.table.node.NodeMappingTable;
*/
public class NodeMappingEsPersistenceDAO extends EsDAO implements INodeMappingPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, NodeMapping> {
public NodeMappingEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override public NodeMapping get(String id) {
GetResponse getResponse = getClient().prepareGet(NodeMappingTable.TABLE, id).get();
if (getResponse.isExists()) {
......
......@@ -26,6 +26,7 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.dao.INodeMappingUIDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
......@@ -40,6 +41,10 @@ public class NodeMappingEsUIDAO extends EsDAO implements INodeMappingUIDAO {
private final Logger logger = LoggerFactory.getLogger(NodeMappingEsUIDAO.class);
public NodeMappingEsUIDAO(ElasticSearchClient client) {
super(client);
}
@Override public JsonArray load(long startTime, long endTime) {
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(NodeMappingTable.TABLE);
searchRequestBuilder.setTypes(NodeMappingTable.TABLE_TYPE);
......
......@@ -23,6 +23,7 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.dao.INodeReferencePersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.noderef.NodeReference;
......@@ -33,6 +34,10 @@ import org.skywalking.apm.collector.storage.table.noderef.NodeReferenceTable;
*/
public class NodeReferenceEsPersistenceDAO extends EsDAO implements INodeReferencePersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, NodeReference> {
public NodeReferenceEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override public NodeReference get(String id) {
GetResponse getResponse = getClient().prepareGet(NodeReferenceTable.TABLE, id).get();
if (getResponse.isExists()) {
......
......@@ -28,6 +28,7 @@ import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.dao.INodeReferenceUIDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
......@@ -42,6 +43,10 @@ public class NodeReferenceEsUIDAO extends EsDAO implements INodeReferenceUIDAO {
private final Logger logger = LoggerFactory.getLogger(NodeReferenceEsUIDAO.class);
public NodeReferenceEsUIDAO(ElasticSearchClient client) {
super(client);
}
@Override public JsonArray load(long startTime, long endTime) {
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(NodeReferenceTable.TABLE);
searchRequestBuilder.setTypes(NodeReferenceTable.TABLE_TYPE);
......
......@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.dao.ISegmentCostPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.segment.SegmentCost;
......@@ -36,6 +37,10 @@ public class SegmentCostEsPersistenceDAO extends EsDAO implements ISegmentCostPe
private final Logger logger = LoggerFactory.getLogger(SegmentCostEsPersistenceDAO.class);
public SegmentCostEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override public SegmentCost get(String id) {
return null;
}
......
......@@ -30,6 +30,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.dao.ISegmentCostUIDAO;
......@@ -41,6 +42,10 @@ import org.skywalking.apm.collector.storage.table.segment.SegmentCostTable;
*/
public class SegmentCostEsUIDAO extends EsDAO implements ISegmentCostUIDAO {
public SegmentCostEsUIDAO(ElasticSearchClient client) {
super(client);
}
@Override public JsonObject loadTop(long startTime, long endTime, long minCost, long maxCost, String operationName,
Error error, int applicationId, List<String> segmentIds, int limit, int from, Sort sort) {
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(SegmentCostTable.TABLE);
......
......@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.segment.Segment;
......@@ -37,6 +38,10 @@ public class SegmentEsPersistenceDAO extends EsDAO implements ISegmentPersistenc
private final Logger logger = LoggerFactory.getLogger(SegmentEsPersistenceDAO.class);
public SegmentEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override public Segment get(String id) {
return null;
}
......
......@@ -22,6 +22,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Base64;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.dao.ISegmentUIDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
......@@ -37,6 +38,10 @@ public class SegmentEsUIDAO extends EsDAO implements ISegmentUIDAO {
private final Logger logger = LoggerFactory.getLogger(SegmentEsUIDAO.class);
public SegmentEsUIDAO(ElasticSearchClient client) {
super(client);
}
@Override public TraceSegmentObject load(String segmentId) {
GetResponse response = getClient().prepareGet(SegmentTable.TABLE, segmentId).get();
Map<String, Object> source = response.getSource();
......
......@@ -23,6 +23,7 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.dao.IServiceEntryPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.service.ServiceEntry;
......@@ -33,6 +34,10 @@ import org.skywalking.apm.collector.storage.table.service.ServiceEntryTable;
*/
public class ServiceEntryEsPersistenceDAO extends EsDAO implements IServiceEntryPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, ServiceEntry> {
public ServiceEntryEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override public ServiceEntry get(String id) {
GetResponse getResponse = getClient().prepareGet(ServiceEntryTable.TABLE, id).get();
if (getResponse.isExists()) {
......
......@@ -29,6 +29,7 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.ColumnNameUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.dao.IServiceEntryUIDAO;
......@@ -40,6 +41,10 @@ import org.skywalking.apm.collector.storage.table.service.ServiceEntryTable;
*/
public class ServiceEntryEsUIDAO extends EsDAO implements IServiceEntryUIDAO {
public ServiceEntryEsUIDAO(ElasticSearchClient client) {
super(client);
}
@Override
public JsonObject load(int applicationId, String entryServiceName, long startTime, long endTime, int from,
int size) {
......
......@@ -26,6 +26,7 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
......@@ -36,6 +37,10 @@ import org.skywalking.apm.collector.storage.table.register.ServiceNameTable;
*/
public class ServiceNameEsCacheDAO extends EsDAO implements IServiceNameCacheDAO {
public ServiceNameEsCacheDAO(ElasticSearchClient client) {
super(client);
}
@Override public String getServiceName(int serviceId) {
GetRequestBuilder getRequestBuilder = getClient().prepareGet(ServiceNameTable.TABLE, String.valueOf(serviceId));
......
......@@ -23,7 +23,7 @@ import java.util.Map;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.dao.IServiceNameStreamDAO;
import org.skywalking.apm.collector.storage.dao.IServiceNameRegisterDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.register.ServiceName;
import org.skywalking.apm.collector.storage.table.register.ServiceNameTable;
......@@ -33,9 +33,13 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ServiceNameEsStreamDAO extends EsDAO implements IServiceNameStreamDAO {
public class ServiceNameEsRegisterDAO extends EsDAO implements IServiceNameRegisterDAO {
private final Logger logger = LoggerFactory.getLogger(ServiceNameEsStreamDAO.class);
private final Logger logger = LoggerFactory.getLogger(ServiceNameEsRegisterDAO.class);
public ServiceNameEsRegisterDAO(ElasticSearchClient client) {
super(client);
}
@Override public int getMaxServiceId() {
return getMaxId(ServiceNameTable.TABLE, ServiceNameTable.COLUMN_SERVICE_ID);
......
......@@ -23,6 +23,7 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.dao.IServiceReferencePersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference;
......@@ -37,6 +38,10 @@ public class ServiceReferenceEsPersistenceDAO extends EsDAO implements IServiceR
private final Logger logger = LoggerFactory.getLogger(ServiceReferenceEsPersistenceDAO.class);
public ServiceReferenceEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override public ServiceReference get(String id) {
GetResponse getResponse = getClient().prepareGet(ServiceReferenceTable.TABLE, id).get();
if (getResponse.isExists()) {
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.es.service;
import org.skywalking.apm.collector.storage.base.dao.DAO;
import org.skywalking.apm.collector.storage.base.dao.DAOContainer;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
/**
* @author peng-yongsheng
*/
public class ElasticSearchDAOService implements DAOService {
private final DAOContainer daoContainer;
public ElasticSearchDAOService(DAOContainer daoContainer) {
this.daoContainer = daoContainer;
}
@Override public DAO get(Class<? extends DAO> daoInterfaceClass) {
return daoContainer.get(daoInterfaceClass.getName());
}
@Override public IPersistenceDAO getPersistenceDAO(Class<? extends IPersistenceDAO> daoInterfaceClass) {
return daoContainer.getPersistenceDAO(daoInterfaceClass.getName());
}
}
org.skywalking.apm.collector.storage.es.dao.ApplicationEsCacheDAO
org.skywalking.apm.collector.storage.es.dao.InstanceEsCacheDAO
org.skywalking.apm.collector.storage.es.dao.ServiceNameEsCacheDAO
org.skywalking.apm.collector.storage.es.dao.ApplicationEsStreamDAO
org.skywalking.apm.collector.storage.es.dao.InstanceEsStreamDAO
org.skywalking.apm.collector.storage.es.dao.ServiceNameEsStreamDAO
org.skywalking.apm.collector.storage.es.dao.CpuMetricEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.GCMetricEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.MemoryMetricEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.MemoryPoolMetricEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.GlobalTraceEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.InstanceHeartBeatEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.InstPerformanceEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.NodeComponentEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.NodeMappingEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.NodeReferenceEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.SegmentCostEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.SegmentEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.ServiceEntryEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.ServiceReferenceEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.InstanceEsUIDAO
org.skywalking.apm.collector.storage.es.dao.InstPerformanceEsUIDAO
org.skywalking.apm.collector.storage.es.dao.CpuMetricEsUIDAO
org.skywalking.apm.collector.storage.es.dao.GCMetricEsUIDAO
org.skywalking.apm.collector.storage.es.dao.MemoryMetricEsUIDAO
org.skywalking.apm.collector.storage.es.dao.GlobalTraceEsUIDAO
org.skywalking.apm.collector.storage.es.dao.NodeComponentEsUIDAO
org.skywalking.apm.collector.storage.es.dao.NodeMappingEsUIDAO
org.skywalking.apm.collector.storage.es.dao.NodeReferenceEsUIDAO
org.skywalking.apm.collector.storage.es.dao.SegmentCostEsUIDAO
org.skywalking.apm.collector.storage.es.dao.SegmentEsUIDAO
org.skywalking.apm.collector.storage.es.dao.ServiceEntryEsUIDAO
\ No newline at end of file
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
* Copyright 2017, OpenSkywalking Organization All rights rH2erved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licensH2/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* UnlH2s required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* WITHOUT WARRANTIH2 OR CONDITIONS OF ANY KIND, either exprH2s or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
......@@ -18,24 +18,81 @@
package org.skywalking.apm.collector.storage.h2;
import java.util.List;
import java.util.Properties;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.core.define.DefineException;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.storage.StorageException;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.DAOContainer;
import org.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.skywalking.apm.collector.storage.dao.IApplicationCacheDAO;
import org.skywalking.apm.collector.storage.dao.IApplicationRegisterDAO;
import org.skywalking.apm.collector.storage.dao.ICpuMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ICpuMetricUIDAO;
import org.skywalking.apm.collector.storage.dao.IGCMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IGCMetricUIDAO;
import org.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IGlobalTraceUIDAO;
import org.skywalking.apm.collector.storage.dao.IInstPerformancePersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IInstPerformanceUIDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceCacheDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceRegisterDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceUIDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryMetricUIDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryPoolMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeComponentPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeComponentUIDAO;
import org.skywalking.apm.collector.storage.dao.INodeMappingPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeMappingUIDAO;
import org.skywalking.apm.collector.storage.dao.INodeReferencePersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeReferenceUIDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentCostPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentCostUIDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentUIDAO;
import org.skywalking.apm.collector.storage.dao.IServiceEntryPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IServiceEntryUIDAO;
import org.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO;
import org.skywalking.apm.collector.storage.dao.IServiceNameRegisterDAO;
import org.skywalking.apm.collector.storage.dao.IServiceReferencePersistenceDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.BatchH2DAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAODefineLoader;
import org.skywalking.apm.collector.storage.h2.base.define.H2StorageInstaller;
import org.skywalking.apm.collector.storage.h2.service.H2DAOService;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.h2.dao.ApplicationH2CacheDAO;
import org.skywalking.apm.collector.storage.h2.dao.ApplicationH2RegisterDAO;
import org.skywalking.apm.collector.storage.h2.dao.CpuMetricH2PersistenceDAO;
import org.skywalking.apm.collector.storage.h2.dao.CpuMetricH2UIDAO;
import org.skywalking.apm.collector.storage.h2.dao.GCMetricH2PersistenceDAO;
import org.skywalking.apm.collector.storage.h2.dao.GCMetricH2UIDAO;
import org.skywalking.apm.collector.storage.h2.dao.GlobalTraceH2PersistenceDAO;
import org.skywalking.apm.collector.storage.h2.dao.GlobalTraceH2UIDAO;
import org.skywalking.apm.collector.storage.h2.dao.InstPerformanceH2PersistenceDAO;
import org.skywalking.apm.collector.storage.h2.dao.InstPerformanceH2UIDAO;
import org.skywalking.apm.collector.storage.h2.dao.InstanceH2CacheDAO;
import org.skywalking.apm.collector.storage.h2.dao.InstanceH2RegisterDAO;
import org.skywalking.apm.collector.storage.h2.dao.InstanceH2UIDAO;
import org.skywalking.apm.collector.storage.h2.dao.InstanceHeartBeatH2PersistenceDAO;
import org.skywalking.apm.collector.storage.h2.dao.MemoryMetricH2PersistenceDAO;
import org.skywalking.apm.collector.storage.h2.dao.MemoryMetricH2UIDAO;
import org.skywalking.apm.collector.storage.h2.dao.MemoryPoolMetricH2PersistenceDAO;
import org.skywalking.apm.collector.storage.h2.dao.NodeComponentH2PersistenceDAO;
import org.skywalking.apm.collector.storage.h2.dao.NodeComponentH2UIDAO;
import org.skywalking.apm.collector.storage.h2.dao.NodeMappingH2PersistenceDAO;
import org.skywalking.apm.collector.storage.h2.dao.NodeMappingH2UIDAO;
import org.skywalking.apm.collector.storage.h2.dao.NodeReferenceH2PersistenceDAO;
import org.skywalking.apm.collector.storage.h2.dao.NodeReferenceH2UIDAO;
import org.skywalking.apm.collector.storage.h2.dao.SegmentCostH2PersistenceDAO;
import org.skywalking.apm.collector.storage.h2.dao.SegmentCostH2UIDAO;
import org.skywalking.apm.collector.storage.h2.dao.SegmentH2PersistenceDAO;
import org.skywalking.apm.collector.storage.h2.dao.SegmentH2UIDAO;
import org.skywalking.apm.collector.storage.h2.dao.ServiceEntryH2PersistenceDAO;
import org.skywalking.apm.collector.storage.h2.dao.ServiceEntryH2UIDAO;
import org.skywalking.apm.collector.storage.h2.dao.ServiceNameH2CacheDAO;
import org.skywalking.apm.collector.storage.h2.dao.ServiceNameH2RegisterDAO;
import org.skywalking.apm.collector.storage.h2.dao.ServiceReferenceH2PersistenceDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -50,12 +107,7 @@ public class StorageModuleH2Provider extends ModuleProvider {
private static final String USER_NAME = "user_name";
private static final String PASSWORD = "password";
private H2Client client;
private final DAOContainer daoContainer;
public StorageModuleH2Provider() {
this.daoContainer = new DAOContainer();
}
private H2Client h2Client;
@Override public String name() {
return "h2";
......@@ -69,27 +121,22 @@ public class StorageModuleH2Provider extends ModuleProvider {
String url = config.getProperty(URL);
String userName = config.getProperty(USER_NAME);
String password = config.getProperty(PASSWORD);
client = new H2Client(url, userName, password);
h2Client = new H2Client(url, userName, password);
this.registerServiceImplementation(DAOService.class, new H2DAOService(daoContainer));
this.registerServiceImplementation(IBatchDAO.class, new BatchH2DAO());
this.registerServiceImplementation(IBatchDAO.class, new BatchH2DAO(h2Client));
registerCacheDAO();
registerRegisterDAO();
registerPersistenceDAO();
registerUiDAO();
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
try {
client.initialize();
H2DAODefineLoader loader = new H2DAODefineLoader();
List<H2DAO> h2DAOs = loader.load();
h2DAOs.forEach(h2DAO -> {
h2DAO.setClient(client);
String interFaceName = h2DAO.getClass().getInterfaces()[0].getName();
daoContainer.put(interFaceName, h2DAO);
});
h2Client.initialize();
H2StorageInstaller installer = new H2StorageInstaller();
installer.install(client);
} catch (H2ClientException | DefineException | StorageException e) {
installer.install(h2Client);
} catch (H2ClientException | StorageException e) {
logger.error(e.getMessage(), e);
}
}
......@@ -101,4 +148,54 @@ public class StorageModuleH2Provider extends ModuleProvider {
@Override public String[] requiredModules() {
return new String[0];
}
private void registerCacheDAO() throws ServiceNotProvidedException {
this.registerServiceImplementation(IApplicationCacheDAO.class, new ApplicationH2CacheDAO(h2Client));
this.registerServiceImplementation(IInstanceCacheDAO.class, new InstanceH2CacheDAO(h2Client));
this.registerServiceImplementation(IServiceNameCacheDAO.class, new ServiceNameH2CacheDAO(h2Client));
}
private void registerRegisterDAO() throws ServiceNotProvidedException {
this.registerServiceImplementation(IApplicationRegisterDAO.class, new ApplicationH2RegisterDAO(h2Client));
this.registerServiceImplementation(IInstanceRegisterDAO.class, new InstanceH2RegisterDAO(h2Client));
this.registerServiceImplementation(IServiceNameRegisterDAO.class, new ServiceNameH2RegisterDAO(h2Client));
}
private void registerPersistenceDAO() throws ServiceNotProvidedException {
this.registerServiceImplementation(ICpuMetricPersistenceDAO.class, new CpuMetricH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IGCMetricPersistenceDAO.class, new GCMetricH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IMemoryMetricPersistenceDAO.class, new MemoryMetricH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IMemoryPoolMetricPersistenceDAO.class, new MemoryPoolMetricH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IGlobalTracePersistenceDAO.class, new GlobalTraceH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IInstPerformancePersistenceDAO.class, new InstPerformanceH2PersistenceDAO(h2Client));
this.registerServiceImplementation(INodeComponentPersistenceDAO.class, new NodeComponentH2PersistenceDAO(h2Client));
this.registerServiceImplementation(INodeMappingPersistenceDAO.class, new NodeMappingH2PersistenceDAO(h2Client));
this.registerServiceImplementation(INodeReferencePersistenceDAO.class, new NodeReferenceH2PersistenceDAO(h2Client));
this.registerServiceImplementation(ISegmentCostPersistenceDAO.class, new SegmentCostH2PersistenceDAO(h2Client));
this.registerServiceImplementation(ISegmentPersistenceDAO.class, new SegmentH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IServiceEntryPersistenceDAO.class, new ServiceEntryH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IServiceReferencePersistenceDAO.class, new ServiceReferenceH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IInstanceHeartBeatPersistenceDAO.class, new InstanceHeartBeatH2PersistenceDAO(h2Client));
}
private void registerUiDAO() throws ServiceNotProvidedException {
this.registerServiceImplementation(IInstanceUIDAO.class, new InstanceH2UIDAO(h2Client));
this.registerServiceImplementation(ICpuMetricUIDAO.class, new CpuMetricH2UIDAO(h2Client));
this.registerServiceImplementation(IGCMetricUIDAO.class, new GCMetricH2UIDAO(h2Client));
this.registerServiceImplementation(IMemoryMetricUIDAO.class, new MemoryMetricH2UIDAO(h2Client));
// this.registerServiceImplementation(IMemoryPoolMetricUIDAO.class, new MemoryPoolMetricH2UIDAO(h2Client));
this.registerServiceImplementation(IGlobalTraceUIDAO.class, new GlobalTraceH2UIDAO(h2Client));
this.registerServiceImplementation(IInstPerformanceUIDAO.class, new InstPerformanceH2UIDAO(h2Client));
this.registerServiceImplementation(INodeComponentUIDAO.class, new NodeComponentH2UIDAO(h2Client));
this.registerServiceImplementation(INodeMappingUIDAO.class, new NodeMappingH2UIDAO(h2Client));
this.registerServiceImplementation(INodeReferenceUIDAO.class, new NodeReferenceH2UIDAO(h2Client));
this.registerServiceImplementation(ISegmentCostUIDAO.class, new SegmentCostH2UIDAO(h2Client));
this.registerServiceImplementation(ISegmentUIDAO.class, new SegmentH2UIDAO(h2Client));
this.registerServiceImplementation(IServiceEntryUIDAO.class, new ServiceEntryH2UIDAO(h2Client));
// this.registerServiceImplementation(IServiceReferenceUIDAO.class, new ServiceReferenceH2UIDAO(elasticSearchClient));
}
}
......@@ -24,6 +24,7 @@ import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.skywalking.apm.collector.storage.h2.base.define.H2SqlEntity;
......@@ -34,8 +35,13 @@ import org.slf4j.LoggerFactory;
* @author peng-yongsheng
*/
public class BatchH2DAO extends H2DAO implements IBatchDAO {
private final Logger logger = LoggerFactory.getLogger(BatchH2DAO.class);
public BatchH2DAO(H2Client client) {
super(client);
}
@Override
public void batchPersistence(List<?> batchCollection) {
if (batchCollection != null && batchCollection.size() > 0) {
......
......@@ -30,8 +30,13 @@ import org.slf4j.LoggerFactory;
* @author peng-yongsheng
*/
public abstract class H2DAO extends AbstractDAO<H2Client> {
private final Logger logger = LoggerFactory.getLogger(H2DAO.class);
public H2DAO(H2Client client) {
super(client);
}
protected final int getMaxId(String tableName, String columnName) {
String sql = "select max(" + columnName + ") from " + tableName;
return getIntValueBySQL(sql);
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.h2.base.dao;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.define.DefineException;
import org.skywalking.apm.collector.core.define.DefinitionLoader;
import org.skywalking.apm.collector.core.define.Loader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class H2DAODefineLoader implements Loader<List<H2DAO>> {
private final Logger logger = LoggerFactory.getLogger(H2DAODefineLoader.class);
@Override public List<H2DAO> load() throws DefineException {
List<H2DAO> h2DAOs = new ArrayList<>();
H2DAODefinitionFile definitionFile = new H2DAODefinitionFile();
logger.info("h2 dao definition file name: {}", definitionFile.fileName());
DefinitionLoader<H2DAO> definitionLoader = DefinitionLoader.load(H2DAO.class, definitionFile);
for (H2DAO dao : definitionLoader) {
logger.info("loaded h2 dao definition class: {}", dao.getClass().getName());
h2DAOs.add(dao);
}
return h2DAOs;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.h2.base.dao;
import org.skywalking.apm.collector.core.define.DefinitionFile;
/**
* @author peng-yongsheng
*/
public class H2DAODefinitionFile extends DefinitionFile {
@Override protected String fileName() {
return "h2_dao.define";
}
}
......@@ -23,9 +23,9 @@ import java.sql.SQLException;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.IApplicationCacheDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.table.register.ApplicationTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -38,6 +38,10 @@ public class ApplicationH2CacheDAO extends H2DAO implements IApplicationCacheDAO
private final Logger logger = LoggerFactory.getLogger(ApplicationH2CacheDAO.class);
private static final String GET_APPLICATION_ID_OR_CODE_SQL = "select {0} from {1} where {2} = ?";
public ApplicationH2CacheDAO(H2Client client) {
super(client);
}
@Override
public int getApplicationId(String applicationCode) {
logger.info("get the application getId with application code = {}", applicationCode);
......
......@@ -23,7 +23,7 @@ import java.util.Map;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.IApplicationStreamDAO;
import org.skywalking.apm.collector.storage.dao.IApplicationRegisterDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.skywalking.apm.collector.storage.table.register.ApplicationTable;
......@@ -33,8 +33,12 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng, clevertension
*/
public class ApplicationH2StreamDAO extends H2DAO implements IApplicationStreamDAO {
private final Logger logger = LoggerFactory.getLogger(ApplicationH2StreamDAO.class);
public class ApplicationH2RegisterDAO extends H2DAO implements IApplicationRegisterDAO {
private final Logger logger = LoggerFactory.getLogger(ApplicationH2RegisterDAO.class);
public ApplicationH2RegisterDAO(H2Client client) {
super(client);
}
@Override
public int getMaxApplicationId() {
......
......@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.storage.h2.dao;
import java.util.HashMap;
import java.util.Map;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.ICpuMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
......@@ -36,6 +37,10 @@ public class CpuMetricH2PersistenceDAO extends H2DAO implements ICpuMetricPersis
private final Logger logger = LoggerFactory.getLogger(CpuMetricH2PersistenceDAO.class);
public CpuMetricH2PersistenceDAO(H2Client client) {
super(client);
}
@Override public CpuMetric get(String id) {
return null;
}
......
......@@ -41,6 +41,10 @@ public class CpuMetricH2UIDAO extends H2DAO implements ICpuMetricUIDAO {
private final Logger logger = LoggerFactory.getLogger(CpuMetricH2UIDAO.class);
private static final String GET_CPU_METRIC_SQL = "select * from {0} where {1} = ?";
public CpuMetricH2UIDAO(H2Client client) {
super(client);
}
@Override public int getMetric(int instanceId, long timeBucket) {
String id = timeBucket + Const.ID_SPLIT + instanceId;
H2Client client = getClient();
......
......@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.storage.h2.dao;
import java.util.HashMap;
import java.util.Map;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.IGCMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
......@@ -32,6 +33,10 @@ import org.skywalking.apm.collector.storage.table.jvm.GCMetricTable;
*/
public class GCMetricH2PersistenceDAO extends H2DAO implements IGCMetricPersistenceDAO<H2SqlEntity, H2SqlEntity, GCMetric> {
public GCMetricH2PersistenceDAO(H2Client client) {
super(client);
}
@Override public GCMetric get(String id) {
return null;
}
......
......@@ -45,6 +45,10 @@ public class GCMetricH2UIDAO extends H2DAO implements IGCMetricUIDAO {
private static final String GET_GC_COUNT_SQL = "select {1}, sum({0}) as cnt, {1} from {2} where {3} = ? and {4} in (";
private static final String GET_GC_METRIC_SQL = "select * from {0} where {1} = ?";
public GCMetricH2UIDAO(H2Client client) {
super(client);
}
@Override public GCCount getGCCount(long[] timeBuckets, int instanceId) {
GCCount gcCount = new GCCount();
H2Client client = getClient();
......
......@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.storage.h2.dao;
import java.util.HashMap;
import java.util.Map;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.core.UnexpectedException;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO;
......@@ -34,8 +35,13 @@ import org.slf4j.LoggerFactory;
* @author peng-yongsheng, clevertension
*/
public class GlobalTraceH2PersistenceDAO extends H2DAO implements IGlobalTracePersistenceDAO<H2SqlEntity, H2SqlEntity, GlobalTrace> {
private final Logger logger = LoggerFactory.getLogger(GlobalTraceH2PersistenceDAO.class);
public GlobalTraceH2PersistenceDAO(H2Client client) {
super(client);
}
@Override public GlobalTrace get(String id) {
throw new UnexpectedException("There is no need to merge stream data with database data.");
}
......
......@@ -41,6 +41,10 @@ public class GlobalTraceH2UIDAO extends H2DAO implements IGlobalTraceUIDAO {
private static final String GET_GLOBAL_TRACE_ID_SQL = "select {0} from {1} where {2} = ? limit 10";
private static final String GET_SEGMENT_IDS_SQL = "select {0} from {1} where {2} = ? limit 10";
public GlobalTraceH2UIDAO(H2Client client) {
super(client);
}
@Override public List<String> getGlobalTraceId(String segmentId) {
List<String> globalTraceIds = new ArrayList<>();
H2Client client = getClient();
......
......@@ -43,6 +43,10 @@ public class InstPerformanceH2PersistenceDAO extends H2DAO implements IInstPerfo
private final Logger logger = LoggerFactory.getLogger(InstPerformanceH2PersistenceDAO.class);
private static final String GET_SQL = "select * from {0} where {1} = ?";
public InstPerformanceH2PersistenceDAO(H2Client client) {
super(client);
}
@Override public InstPerformance get(String id) {
H2Client client = getClient();
String sql = SqlBuilder.buildSql(GET_SQL, InstPerformanceTable.TABLE, InstPerformanceTable.COLUMN_ID);
......
......@@ -43,6 +43,10 @@ public class InstPerformanceH2UIDAO extends H2DAO implements IInstPerformanceUID
private static final String GET_INST_PERF_SQL = "select * from {0} where {1} = ? and {2} in (";
private static final String GET_TPS_METRIC_SQL = "select * from {0} where {1} = ?";
public InstPerformanceH2UIDAO(H2Client client) {
super(client);
}
@Override public InstPerformance get(long[] timeBuckets, int instanceId) {
H2Client client = getClient();
logger.info("the inst performance inst id = {}", instanceId);
......
......@@ -39,6 +39,10 @@ public class InstanceH2CacheDAO extends H2DAO implements IInstanceCacheDAO {
private static final String GET_APPLICATION_ID_SQL = "select {0} from {1} where {2} = ?";
private static final String GET_INSTANCE_ID_SQL = "select {0} from {1} where {2} = ? and {3} = ?";
public InstanceH2CacheDAO(H2Client client) {
super(client);
}
@Override public int getApplicationId(int instanceId) {
logger.info("get the application getId with application getId = {}", instanceId);
H2Client client = getClient();
......
......@@ -23,7 +23,7 @@ import java.util.Map;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.IInstanceStreamDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceRegisterDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.storage.table.register.InstanceTable;
......@@ -33,8 +33,13 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng, clevertension
*/
public class InstanceH2StreamDAO extends H2DAO implements IInstanceStreamDAO {
private final Logger logger = LoggerFactory.getLogger(InstanceH2StreamDAO.class);
public class InstanceH2RegisterDAO extends H2DAO implements IInstanceRegisterDAO {
private final Logger logger = LoggerFactory.getLogger(InstanceH2RegisterDAO.class);
public InstanceH2RegisterDAO(H2Client client) {
super(client);
}
private static final String UPDATE_HEARTBEAT_TIME_SQL = "update {0} set {1} = ? where {2} = ?";
......
......@@ -42,6 +42,10 @@ public class InstanceH2UIDAO extends H2DAO implements IInstanceUIDAO {
private final Logger logger = LoggerFactory.getLogger(InstanceH2UIDAO.class);
public InstanceH2UIDAO(H2Client client) {
super(client);
}
private static final String GET_LAST_HEARTBEAT_TIME_SQL = "select {0} from {1} where {2} > ? limit 1";
private static final String GET_INST_LAST_HEARTBEAT_TIME_SQL = "select {0} from {1} where {2} > ? and {3} = ? limit 1";
private static final String GET_INSTANCE_SQL = "select * from {0} where {1} = ?";
......
......@@ -43,6 +43,10 @@ public class InstanceHeartBeatH2PersistenceDAO extends H2DAO implements IInstanc
private final Logger logger = LoggerFactory.getLogger(InstanceHeartBeatH2PersistenceDAO.class);
public InstanceHeartBeatH2PersistenceDAO(H2Client client) {
super(client);
}
private static final String GET_INSTANCE_HEARTBEAT_SQL = "select * from {0} where {1} = ?";
@Override public Instance get(String id) {
......
......@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.storage.h2.dao;
import java.util.HashMap;
import java.util.Map;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
......@@ -32,6 +33,10 @@ import org.skywalking.apm.collector.storage.table.jvm.MemoryMetricTable;
*/
public class MemoryMetricH2PersistenceDAO extends H2DAO implements IMemoryMetricPersistenceDAO<H2SqlEntity, H2SqlEntity, MemoryMetric> {
public MemoryMetricH2PersistenceDAO(H2Client client) {
super(client);
}
@Override public MemoryMetric get(String id) {
return null;
}
......
......@@ -43,6 +43,10 @@ public class MemoryMetricH2UIDAO extends H2DAO implements IMemoryMetricUIDAO {
private final Logger logger = LoggerFactory.getLogger(MemoryMetricH2UIDAO.class);
private static final String GET_MEMORY_METRIC_SQL = "select * from {0} where {1} =?";
public MemoryMetricH2UIDAO(H2Client client) {
super(client);
}
@Override public JsonObject getMetric(int instanceId, long timeBucket, boolean isHeap) {
H2Client client = getClient();
String id = timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + isHeap;
......
......@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.storage.h2.dao;
import java.util.HashMap;
import java.util.Map;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.IMemoryPoolMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
......@@ -32,6 +33,10 @@ import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetricTable;
*/
public class MemoryPoolMetricH2PersistenceDAO extends H2DAO implements IMemoryPoolMetricPersistenceDAO<H2SqlEntity, H2SqlEntity, MemoryPoolMetric> {
public MemoryPoolMetricH2PersistenceDAO(H2Client client) {
super(client);
}
@Override public MemoryPoolMetric get(String id) {
return null;
}
......
......@@ -42,6 +42,10 @@ public class NodeComponentH2PersistenceDAO extends H2DAO implements INodeCompone
private final Logger logger = LoggerFactory.getLogger(NodeComponentH2PersistenceDAO.class);
private static final String GET_SQL = "select * from {0} where {1} = ?";
public NodeComponentH2PersistenceDAO(H2Client client) {
super(client);
}
@Override
public NodeComponent get(String id) {
H2Client client = getClient();
......
......@@ -41,6 +41,10 @@ public class NodeComponentH2UIDAO extends H2DAO implements INodeComponentUIDAO {
private final Logger logger = LoggerFactory.getLogger(NodeComponentH2UIDAO.class);
private static final String AGGREGATE_COMPONENT_SQL = "select {0}, {1}, {2} from {3} where {4} >= ? and {4} <= ? group by {0}, {1}, {2} limit 100";
public NodeComponentH2UIDAO(H2Client client) {
super(client);
}
@Override public JsonArray load(long startTime, long endTime) {
JsonArray nodeComponentArray = new JsonArray();
nodeComponentArray.addAll(aggregationComponent(startTime, endTime));
......
......@@ -43,6 +43,10 @@ public class NodeMappingH2PersistenceDAO extends H2DAO implements INodeMappingPe
private final Logger logger = LoggerFactory.getLogger(NodeMappingH2PersistenceDAO.class);
private static final String GET_SQL = "select * from {0} where {1} = ?";
public NodeMappingH2PersistenceDAO(H2Client client) {
super(client);
}
@Override public NodeMapping get(String id) {
H2Client client = getClient();
String sql = SqlBuilder.buildSql(GET_SQL, NodeMappingTable.TABLE, NodeMappingTable.COLUMN_ID);
......
......@@ -40,6 +40,10 @@ public class NodeMappingH2UIDAO extends H2DAO implements INodeMappingUIDAO {
private final Logger logger = LoggerFactory.getLogger(NodeMappingH2UIDAO.class);
private static final String NODE_MAPPING_SQL = "select {0}, {1}, {2} from {3} where {4} >= ? and {4} <= ? group by {0}, {1}, {2} limit 100";
public NodeMappingH2UIDAO(H2Client client) {
super(client);
}
@Override public JsonArray load(long startTime, long endTime) {
H2Client client = getClient();
JsonArray nodeMappingArray = new JsonArray();
......
......@@ -43,6 +43,10 @@ public class NodeReferenceH2PersistenceDAO extends H2DAO implements INodeReferen
private final Logger logger = LoggerFactory.getLogger(NodeReferenceH2PersistenceDAO.class);
private static final String GET_SQL = "select * from {0} where {1} = ?";
public NodeReferenceH2PersistenceDAO(H2Client client) {
super(client);
}
@Override public NodeReference get(String id) {
H2Client client = getClient();
String sql = SqlBuilder.buildSql(GET_SQL, NodeReferenceTable.TABLE, NodeReferenceTable.COLUMN_ID);
......
......@@ -41,6 +41,10 @@ public class NodeReferenceH2UIDAO extends H2DAO implements INodeReferenceUIDAO {
private static final String NODE_REFERENCE_SQL = "select {8}, {9}, {10}, sum({0}) as {0}, sum({1}) as {1}, sum({2}) as {2}, " +
"sum({3}) as {3}, sum({4}) as {4}, sum({5}) as {5} from {6} where {7} >= ? and {7} <= ? group by {8}, {9}, {10} limit 100";
public NodeReferenceH2UIDAO(H2Client client) {
super(client);
}
@Override public JsonArray load(long startTime, long endTime) {
H2Client client = getClient();
JsonArray nodeRefResSumArray = new JsonArray();
......
......@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.storage.h2.dao;
import java.util.HashMap;
import java.util.Map;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.ISegmentCostPersistenceDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
......@@ -36,6 +37,10 @@ public class SegmentCostH2PersistenceDAO extends H2DAO implements ISegmentCostPe
private final Logger logger = LoggerFactory.getLogger(SegmentCostH2PersistenceDAO.class);
public SegmentCostH2PersistenceDAO(H2Client client) {
super(client);
}
@Override public SegmentCost get(String id) {
return null;
}
......
......@@ -44,6 +44,10 @@ public class SegmentCostH2UIDAO extends H2DAO implements ISegmentCostUIDAO {
private final Logger logger = LoggerFactory.getLogger(SegmentCostH2UIDAO.class);
private static final String GET_SEGMENT_COST_SQL = "select * from {0} where {1} >= ? and {1} <= ?";
public SegmentCostH2UIDAO(H2Client client) {
super(client);
}
@Override public JsonObject loadTop(long startTime, long endTime, long minCost, long maxCost, String operationName,
Error error, int applicationId, List<String> segmentIds, int limit, int from, Sort sort) {
H2Client client = getClient();
......
......@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.storage.h2.dao;
import java.util.HashMap;
import java.util.Map;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
......@@ -36,6 +37,10 @@ public class SegmentH2PersistenceDAO extends H2DAO implements ISegmentPersistenc
private final Logger logger = LoggerFactory.getLogger(SegmentH2PersistenceDAO.class);
public SegmentH2PersistenceDAO(H2Client client) {
super(client);
}
@Override public Segment get(String id) {
return null;
}
......
......@@ -38,6 +38,10 @@ public class SegmentH2UIDAO extends H2DAO implements ISegmentUIDAO {
private final Logger logger = LoggerFactory.getLogger(SegmentH2UIDAO.class);
private static final String GET_SEGMENT_SQL = "select {0} from {1} where {2} = ?";
public SegmentH2UIDAO(H2Client client) {
super(client);
}
@Override public TraceSegmentObject load(String segmentId) {
H2Client client = getClient();
String sql = SqlBuilder.buildSql(GET_SEGMENT_SQL, SegmentTable.COLUMN_DATA_BINARY,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册