提交 6435882e 编写于 作者: P peng-yongsheng

Add cache service manager in worker construction.

上级 932dbbc7
......@@ -36,5 +36,10 @@
<artifactId>collector-stream-provider</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-cache-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream;
/**
* @author peng-yongsheng
*/
public enum IdAutoIncrement {
INSTANCE;
public int increment(int min, int max) {
int instanceId;
if (min == max) {
instanceId = -1;
} else if (min + max == 0) {
instanceId = max + 1;
} else if (min + max > 0) {
instanceId = min - 1;
} else if (max < 0) {
instanceId = 1;
} else {
instanceId = max + 1;
}
return instanceId;
}
}
......@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.jvm;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ICpuMetricPersistenceDAO;
......@@ -31,34 +32,33 @@ import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
*/
public class CpuMetricPersistenceWorker extends PersistenceWorker<CpuMetric, CpuMetric> {
private final DAOService daoService;
public CpuMetricPersistenceWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
public CpuMetricPersistenceWorker(DAOService daoService) {
super(daoService);
this.daoService = daoService;
}
@Override protected boolean needMergeDBData() {
return false;
}
@Override protected IPersistenceDAO persistenceDAO() {
return daoService.getPersistenceDAO(ICpuMetricPersistenceDAO.class);
return getDaoService().getPersistenceDAO(ICpuMetricPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<CpuMetric, CpuMetric, CpuMetricPersistenceWorker> {
public Factory(DAOService daoService, QueueCreatorService<CpuMetric> queueCreatorService) {
super(daoService, queueCreatorService);
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<CpuMetric> queueCreatorService) {
super(daoService, cacheServiceManager, queueCreatorService);
}
@Override
public CpuMetricPersistenceWorker workerInstance(DAOService daoService) {
return new CpuMetricPersistenceWorker(daoService);
public CpuMetricPersistenceWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new CpuMetricPersistenceWorker(getDaoService(), getCacheServiceManager());
}
@Override
......
......@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.jvm;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IGCMetricPersistenceDAO;
......@@ -31,15 +32,12 @@ import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
*/
public class GCMetricPersistenceWorker extends PersistenceWorker<GCMetric, GCMetric> {
private final DAOService daoService;
@Override public int id() {
return 0;
}
public GCMetricPersistenceWorker(DAOService daoService) {
super(daoService);
this.daoService = daoService;
public GCMetricPersistenceWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override protected boolean needMergeDBData() {
......@@ -47,18 +45,20 @@ public class GCMetricPersistenceWorker extends PersistenceWorker<GCMetric, GCMet
}
@Override protected IPersistenceDAO persistenceDAO() {
return daoService.getPersistenceDAO(IGCMetricPersistenceDAO.class);
return getDaoService().getPersistenceDAO(IGCMetricPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<GCMetric, GCMetric, GCMetricPersistenceWorker> {
public Factory(DAOService daoService, QueueCreatorService<GCMetric> queueCreatorService) {
super(daoService, queueCreatorService);
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<GCMetric> queueCreatorService) {
super(daoService, cacheServiceManager, queueCreatorService);
}
@Override
public GCMetricPersistenceWorker workerInstance(DAOService daoService) {
return new GCMetricPersistenceWorker(daoService);
public GCMetricPersistenceWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new GCMetricPersistenceWorker(getDaoService(), getCacheServiceManager());
}
@Override
......
......@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.jvm;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO;
......@@ -31,15 +32,12 @@ import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
*/
public class InstHeartBeatPersistenceWorker extends PersistenceWorker<Instance, Instance> {
private final DAOService daoService;
@Override public int id() {
return 0;
}
public InstHeartBeatPersistenceWorker(DAOService daoService) {
super(daoService);
this.daoService = daoService;
public InstHeartBeatPersistenceWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override protected boolean needMergeDBData() {
......@@ -47,18 +45,19 @@ public class InstHeartBeatPersistenceWorker extends PersistenceWorker<Instance,
}
@Override protected IPersistenceDAO persistenceDAO() {
return daoService.getPersistenceDAO(IInstanceHeartBeatPersistenceDAO.class);
return getDaoService().getPersistenceDAO(IInstanceHeartBeatPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<Instance, Instance, InstHeartBeatPersistenceWorker> {
public Factory(DAOService daoService, QueueCreatorService<Instance> queueCreatorService) {
super(daoService, queueCreatorService);
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<Instance> queueCreatorService) {
super(daoService, cacheServiceManager, queueCreatorService);
}
@Override
public InstHeartBeatPersistenceWorker workerInstance(DAOService daoService) {
return new InstHeartBeatPersistenceWorker(daoService);
@Override public InstHeartBeatPersistenceWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new InstHeartBeatPersistenceWorker(getDaoService(), getCacheServiceManager());
}
@Override
......
......@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.jvm;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO;
......@@ -31,34 +32,32 @@ import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
*/
public class MemoryMetricPersistenceWorker extends PersistenceWorker<MemoryMetric, MemoryMetric> {
private final DAOService daoService;
public MemoryMetricPersistenceWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
public MemoryMetricPersistenceWorker(DAOService daoService) {
super(daoService);
this.daoService = daoService;
}
@Override protected boolean needMergeDBData() {
return false;
}
@Override protected IPersistenceDAO persistenceDAO() {
return daoService.getPersistenceDAO(IMemoryMetricPersistenceDAO.class);
return getDaoService().getPersistenceDAO(IMemoryMetricPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<MemoryMetric, MemoryMetric, MemoryMetricPersistenceWorker> {
public Factory(DAOService daoService, QueueCreatorService<MemoryMetric> queueCreatorService) {
super(daoService, queueCreatorService);
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<MemoryMetric> queueCreatorService) {
super(daoService, cacheServiceManager, queueCreatorService);
}
@Override
public MemoryMetricPersistenceWorker workerInstance(DAOService daoService) {
return new MemoryMetricPersistenceWorker(daoService);
@Override public MemoryMetricPersistenceWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new MemoryMetricPersistenceWorker(getDaoService(), getCacheServiceManager());
}
@Override
......
......@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.jvm;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryPoolMetricPersistenceDAO;
......@@ -31,15 +32,12 @@ import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
*/
public class MemoryPoolMetricPersistenceWorker extends PersistenceWorker<MemoryPoolMetric, MemoryPoolMetric> {
private final DAOService daoService;
@Override public int id() {
return 0;
}
public MemoryPoolMetricPersistenceWorker(DAOService daoService) {
super(daoService);
this.daoService = daoService;
public MemoryPoolMetricPersistenceWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override protected boolean needMergeDBData() {
......@@ -47,18 +45,19 @@ public class MemoryPoolMetricPersistenceWorker extends PersistenceWorker<MemoryP
}
@Override protected IPersistenceDAO persistenceDAO() {
return daoService.getPersistenceDAO(IMemoryPoolMetricPersistenceDAO.class);
return getDaoService().getPersistenceDAO(IMemoryPoolMetricPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<MemoryPoolMetric, MemoryPoolMetric, MemoryPoolMetricPersistenceWorker> {
public Factory(DAOService daoService, QueueCreatorService<MemoryPoolMetric> queueCreatorService) {
super(daoService, queueCreatorService);
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<MemoryPoolMetric> queueCreatorService) {
super(daoService, cacheServiceManager, queueCreatorService);
}
@Override
public MemoryPoolMetricPersistenceWorker workerInstance(DAOService daoService) {
return new MemoryPoolMetricPersistenceWorker(daoService);
@Override public MemoryPoolMetricPersistenceWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new MemoryPoolMetricPersistenceWorker(getDaoService(), getCacheServiceManager());
}
@Override
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.register;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ApplicationRegisterRemoteWorker extends AbstractRemoteWorker<Application, Application> {
private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterRemoteWorker.class);
public ApplicationRegisterRemoteWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
@Override protected void onWork(Application message) throws WorkerException {
logger.debug("application code: {}", message.getApplicationCode());
onNext(message);
}
public static class Factory extends AbstractRemoteWorkerProvider<Application, Application, ApplicationRegisterRemoteWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
RemoteClientService remoteClientService) {
super(daoService, cacheServiceManager, remoteClientService);
}
@Override public ApplicationRegisterRemoteWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new ApplicationRegisterRemoteWorker(getDaoService(), getCacheServiceManager());
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.register;
import org.skywalking.apm.collector.agent.stream.IdAutoIncrement;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.dao.IApplicationStreamDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ApplicationRegisterSerialWorker extends AbstractLocalAsyncWorker<Application, Application> {
private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterSerialWorker.class);
public ApplicationRegisterSerialWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
@Override protected void onWork(Application application) throws WorkerException {
logger.debug("register application, application code: {}", application.getApplicationCode());
int applicationId = getCacheServiceManager().getApplicationCacheService().get(application.getApplicationCode());
if (applicationId == 0) {
IApplicationStreamDAO dao = (IApplicationStreamDAO)getDaoService().get(IApplicationStreamDAO.class);
int min = dao.getMinApplicationId();
if (min == 0) {
Application userApplication = new Application(String.valueOf(Const.USER_ID));
userApplication.setApplicationCode(Const.USER_CODE);
userApplication.setApplicationId(Const.USER_ID);
dao.save(userApplication);
application = new Application("-1");
application.setApplicationId(-1);
} else {
int max = dao.getMaxApplicationId();
applicationId = IdAutoIncrement.INSTANCE.increment(min, max);
application = new Application(String.valueOf(applicationId));
application.setApplicationId(applicationId);
}
dao.save(application);
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<Application, Application, ApplicationRegisterSerialWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<Application> queueCreatorService) {
super(daoService, cacheServiceManager, queueCreatorService);
}
@Override public ApplicationRegisterSerialWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new ApplicationRegisterSerialWorker(daoService, cacheServiceManager);
}
@Override public int queueSize() {
return 256;
}
}
}
......@@ -18,12 +18,13 @@
package org.skywalking.apm.collector.storage.dao;
import org.skywalking.apm.collector.storage.base.dao.DAO;
import org.skywalking.apm.collector.storage.table.register.Application;
/**
* @author peng-yongsheng
*/
public interface IApplicationStreamDAO {
public interface IApplicationStreamDAO extends DAO {
int getMaxApplicationId();
int getMinApplicationId();
......
......@@ -50,7 +50,15 @@ public class Application extends Data {
return getDataString(1);
}
public void setApplicationCode(String applicationCode) {
setDataString(1, applicationCode);
}
public int getApplicationId() {
return getDataInteger(0);
}
public void setApplicationId(int applicationId) {
setDataInteger(0, applicationId);
}
}
......@@ -19,6 +19,7 @@
package org.skywalking.apm.collector.stream;
import java.util.Properties;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ModuleProvider;
......@@ -64,6 +65,6 @@ public class StreamModuleProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
return new String[] {RemoteModule.NAME, QueueModule.NAME, StorageModule.NAME};
return new String[] {RemoteModule.NAME, QueueModule.NAME, StorageModule.NAME, CacheModule.NAME};
}
}
......@@ -18,8 +18,10 @@
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.queue.base.QueueExecutor;
import org.skywalking.apm.collector.storage.service.DAOService;
/**
* The <code>AbstractLocalAsyncWorker</code> implementations represent workers,
......@@ -30,6 +32,10 @@ import org.skywalking.apm.collector.queue.base.QueueExecutor;
*/
public abstract class AbstractLocalAsyncWorker<INPUT extends Data, OUTPUT extends Data> extends AbstractWorker<INPUT, OUTPUT> implements QueueExecutor<INPUT> {
public AbstractLocalAsyncWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
/**
* Receive message
*
......
......@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.skywalking.apm.collector.queue.base.QueueExecutor;
......@@ -31,17 +32,17 @@ public abstract class AbstractLocalAsyncWorkerProvider<INPUT extends Data, OUTPU
public abstract int queueSize();
private final DAOService daoService;
private final QueueCreatorService<INPUT> queueCreatorService;
public AbstractLocalAsyncWorkerProvider(DAOService daoService, QueueCreatorService<INPUT> queueCreatorService) {
this.daoService = daoService;
public AbstractLocalAsyncWorkerProvider(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<INPUT> queueCreatorService) {
super(daoService, cacheServiceManager);
this.queueCreatorService = queueCreatorService;
}
@Override
final public WorkerRef create(WorkerCreateListener workerCreateListener) throws ProviderNotFoundException {
WorkerType localAsyncWorker = workerInstance(daoService);
WorkerType localAsyncWorker = workerInstance(getDaoService(), getCacheServiceManager());
workerCreateListener.addWorker(localAsyncWorker);
QueueEventHandler<INPUT> queueEventHandler = queueCreatorService.create(queueSize(), localAsyncWorker);
return new LocalAsyncWorkerRef<>(localAsyncWorker, queueEventHandler);
......
......@@ -18,7 +18,9 @@
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.service.DAOService;
/**
* The <code>AbstractRemoteWorker</code> implementations represent workers,
......@@ -31,6 +33,10 @@ import org.skywalking.apm.collector.core.data.Data;
*/
public abstract class AbstractRemoteWorker<INPUT extends Data, OUTPUT extends Data> extends AbstractWorker<INPUT, OUTPUT> {
public AbstractRemoteWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
/**
* This method use for message producer to call for send message.
*
......
......@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.storage.service.DAOService;
......@@ -32,11 +33,11 @@ import org.skywalking.apm.collector.storage.service.DAOService;
*/
public abstract class AbstractRemoteWorkerProvider<INPUT extends Data, OUTPUT extends Data, WorkerType extends AbstractRemoteWorker<INPUT, OUTPUT>> extends AbstractWorkerProvider<INPUT, OUTPUT, WorkerType> {
private final DAOService daoService;
private final RemoteClientService remoteClientService;
public AbstractRemoteWorkerProvider(DAOService daoService, RemoteClientService remoteClientService) {
this.daoService = daoService;
public AbstractRemoteWorkerProvider(DAOService daoService, CacheServiceManager cacheServiceManager,
RemoteClientService remoteClientService) {
super(daoService, cacheServiceManager);
this.remoteClientService = remoteClientService;
}
......@@ -48,7 +49,7 @@ public abstract class AbstractRemoteWorkerProvider<INPUT extends Data, OUTPUT ex
* worker instance, when the worker provider not find then Throw this Exception.
*/
@Override final public WorkerRef create(WorkerCreateListener workerCreateListener) {
WorkerType remoteWorker = workerInstance(daoService);
WorkerType remoteWorker = workerInstance(getDaoService(), getCacheServiceManager());
workerCreateListener.addWorker(remoteWorker);
RemoteWorkerRef<INPUT, OUTPUT> workerRef = new RemoteWorkerRef<>(remoteWorker);
return workerRef;
......
......@@ -18,9 +18,11 @@
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.graph.Next;
import org.skywalking.apm.collector.core.graph.NodeProcessor;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -31,6 +33,22 @@ public abstract class AbstractWorker<INPUT extends Data, OUTPUT extends Data> im
private final Logger logger = LoggerFactory.getLogger(AbstractWorker.class);
private final DAOService daoService;
private final CacheServiceManager cacheServiceManager;
public AbstractWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
this.daoService = daoService;
this.cacheServiceManager = cacheServiceManager;
}
public DAOService getDaoService() {
return daoService;
}
public CacheServiceManager getCacheServiceManager() {
return cacheServiceManager;
}
private Next<OUTPUT> next;
/**
......
......@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.service.DAOService;
......@@ -25,5 +26,22 @@ import org.skywalking.apm.collector.storage.service.DAOService;
* @author peng-yongsheng
*/
public abstract class AbstractWorkerProvider<INPUT extends Data, OUTPUT extends Data, WorkerType extends AbstractWorker<INPUT, OUTPUT>> implements Provider {
public abstract WorkerType workerInstance(DAOService daoService);
private final DAOService daoService;
private final CacheServiceManager cacheServiceManager;
public AbstractWorkerProvider(DAOService daoService, CacheServiceManager cacheServiceManager) {
this.daoService = daoService;
this.cacheServiceManager = cacheServiceManager;
}
public final DAOService getDaoService() {
return daoService;
}
public final CacheServiceManager getCacheServiceManager() {
return cacheServiceManager;
}
public abstract WorkerType workerInstance(DAOService daoService, CacheServiceManager cacheServiceManager);
}
......@@ -18,7 +18,9 @@
package org.skywalking.apm.collector.stream.worker.impl;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataCache;
......@@ -35,8 +37,9 @@ public abstract class AggregationWorker<INPUT extends Data, OUTPUT extends Data>
private DataCache dataCache;
private int messageNum;
public AggregationWorker() {
dataCache = new DataCache();
public AggregationWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
this.dataCache = new DataCache();
}
@Override protected final void onWork(INPUT message) throws WorkerException {
......
......@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.stream.worker.impl;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.storage.base.dao.IBatchDAO;
......@@ -39,12 +40,11 @@ public abstract class PersistenceWorker<INPUT extends Data, OUTPUT extends Data>
private final Logger logger = LoggerFactory.getLogger(PersistenceWorker.class);
private final DAOService daoService;
private final DataCache dataCache;
public PersistenceWorker(DAOService daoService) {
public PersistenceWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
this.dataCache = new DataCache();
this.daoService = daoService;
}
public final void flushAndSwitch() {
......@@ -64,7 +64,7 @@ public abstract class PersistenceWorker<INPUT extends Data, OUTPUT extends Data>
dataCache.switchPointer();
List<?> collection = buildBatchCollection();
IBatchDAO dao = (IBatchDAO)daoService.get(IBatchDAO.class);
IBatchDAO dao = (IBatchDAO)getDaoService().get(IBatchDAO.class);
dao.batchPersistence(collection);
}
} finally {
......
......@@ -55,5 +55,10 @@
<artifactId>collector-remote-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-cache-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -54,7 +54,7 @@ import org.skywalking.apm.collector.ui.jetty.handler.servicetree.EntryServiceGet
import org.skywalking.apm.collector.ui.jetty.handler.servicetree.ServiceTreeGetByIdHandler;
import org.skywalking.apm.collector.ui.jetty.handler.time.AllInstanceLastTimeGetHandler;
import org.skywalking.apm.collector.ui.jetty.handler.time.OneInstanceLastTimeGetHandler;
import org.skywalking.apm.collector.ui.service.CacheServiceManager;
import org.skywalking.apm.collector.cache.CacheServiceManager;
/**
* @author peng-yongsheng
......
......@@ -25,7 +25,7 @@ import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.skywalking.apm.collector.storage.dao.ISegmentCostUIDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.ui.service.CacheServiceManager;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.ui.service.SegmentTopService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -23,7 +23,7 @@ import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.ui.service.CacheServiceManager;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.ui.service.SpanService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -23,7 +23,7 @@ import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.ui.service.CacheServiceManager;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.ui.service.TraceDagService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -23,7 +23,7 @@ import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.ui.service.CacheServiceManager;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.ui.service.TraceStackService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -24,7 +24,7 @@ import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.ui.service.ApplicationService;
import org.skywalking.apm.collector.ui.service.CacheServiceManager;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -25,7 +25,7 @@ import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.ui.service.CacheServiceManager;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.ui.service.InstanceHealthService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -25,7 +25,7 @@ import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.ui.service.CacheServiceManager;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.ui.service.InstanceJVMService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -25,7 +25,7 @@ import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.ui.service.CacheServiceManager;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.ui.service.InstanceJVMService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -23,7 +23,7 @@ import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.ui.service.CacheServiceManager;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.ui.service.InstanceJVMService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -23,7 +23,7 @@ import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.ui.service.CacheServiceManager;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.ui.service.ServiceTreeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -23,7 +23,7 @@ import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.ui.service.CacheServiceManager;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.ui.service.ServiceTreeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -26,7 +26,7 @@ import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.ui.service.CacheServiceManager;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.ui.service.TimeSynchronousService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -24,7 +24,7 @@ import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.ui.service.CacheServiceManager;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.ui.service.TimeSynchronousService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.ui.service;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.storage.dao.IInstanceUIDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
......
......@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.ui.service;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.util.List;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.IGCMetricUIDAO;
import org.skywalking.apm.collector.storage.dao.IInstPerformanceUIDAO;
......
......@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.ui.service;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import java.util.Set;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.UnexpectedException;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.storage.dao.ICpuMetricUIDAO;
......
......@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.ui.service;
import com.google.gson.JsonObject;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.dao.IGlobalTraceUIDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentCostUIDAO;
......
......@@ -23,6 +23,7 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.util.ColumnNameUtils;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.ObjectUtils;
......
......@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.ui.service;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.util.List;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.dao.ISegmentUIDAO;
......
......@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.ui.service;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.storage.dao.IInstanceUIDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.slf4j.Logger;
......
......@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.ui.service;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.storage.dao.INodeComponentUIDAO;
import org.skywalking.apm.collector.storage.dao.INodeMappingUIDAO;
import org.skywalking.apm.collector.storage.dao.INodeReferenceUIDAO;
......
......@@ -22,6 +22,7 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.ObjectUtils;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册