diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/pom.xml b/apm-collector/apm-collector-agent/collector-agent-stream/pom.xml
index 71243c5f4b9b9a936917ba7236d07d59b4106710..08a3c488501949dab461f5317210fc25d0841a8f 100644
--- a/apm-collector/apm-collector-agent/collector-agent-stream/pom.xml
+++ b/apm-collector/apm-collector-agent/collector-agent-stream/pom.xml
@@ -36,5 +36,10 @@
collector-stream-provider
${project.version}
+
+ org.skywalking
+ collector-cache-define
+ ${project.version}
+
diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/IdAutoIncrement.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/IdAutoIncrement.java
new file mode 100644
index 0000000000000000000000000000000000000000..c2ce349ba410ed0c7380303777fe5f124986dca3
--- /dev/null
+++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/IdAutoIncrement.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2017, OpenSkywalking Organization All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Project repository: https://github.com/OpenSkywalking/skywalking
+ */
+
+package org.skywalking.apm.collector.agent.stream;
+
+/**
+ * @author peng-yongsheng
+ */
+public enum IdAutoIncrement {
+ INSTANCE;
+
+ public int increment(int min, int max) {
+ int instanceId;
+ if (min == max) {
+ instanceId = -1;
+ } else if (min + max == 0) {
+ instanceId = max + 1;
+ } else if (min + max > 0) {
+ instanceId = min - 1;
+ } else if (max < 0) {
+ instanceId = 1;
+ } else {
+ instanceId = max + 1;
+ }
+ return instanceId;
+ }
+}
diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/CpuMetricPersistenceWorker.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/CpuMetricPersistenceWorker.java
index c3adcd8e56284c8273a1d5f4e21f1e3523e4372a..4981ae588209777ed22588d1df802f67d9cc229e 100644
--- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/CpuMetricPersistenceWorker.java
+++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/CpuMetricPersistenceWorker.java
@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.jvm;
+import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ICpuMetricPersistenceDAO;
@@ -31,34 +32,33 @@ import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
*/
public class CpuMetricPersistenceWorker extends PersistenceWorker {
- private final DAOService daoService;
+ public CpuMetricPersistenceWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
+ super(daoService, cacheServiceManager);
+ }
@Override public int id() {
return 0;
}
- public CpuMetricPersistenceWorker(DAOService daoService) {
- super(daoService);
- this.daoService = daoService;
- }
-
@Override protected boolean needMergeDBData() {
return false;
}
@Override protected IPersistenceDAO persistenceDAO() {
- return daoService.getPersistenceDAO(ICpuMetricPersistenceDAO.class);
+ return getDaoService().getPersistenceDAO(ICpuMetricPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider {
- public Factory(DAOService daoService, QueueCreatorService queueCreatorService) {
- super(daoService, queueCreatorService);
+ public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
+ QueueCreatorService queueCreatorService) {
+ super(daoService, cacheServiceManager, queueCreatorService);
}
@Override
- public CpuMetricPersistenceWorker workerInstance(DAOService daoService) {
- return new CpuMetricPersistenceWorker(daoService);
+ public CpuMetricPersistenceWorker workerInstance(DAOService daoService,
+ CacheServiceManager cacheServiceManager) {
+ return new CpuMetricPersistenceWorker(getDaoService(), getCacheServiceManager());
}
@Override
diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/GCMetricPersistenceWorker.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/GCMetricPersistenceWorker.java
index ad28c10c2a50401469eee40786150fc9b0cf8ae2..47cdc5b8bc59fda34b0ed3df3720e36153921ee5 100644
--- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/GCMetricPersistenceWorker.java
+++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/GCMetricPersistenceWorker.java
@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.jvm;
+import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IGCMetricPersistenceDAO;
@@ -31,15 +32,12 @@ import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
*/
public class GCMetricPersistenceWorker extends PersistenceWorker {
- private final DAOService daoService;
-
@Override public int id() {
return 0;
}
- public GCMetricPersistenceWorker(DAOService daoService) {
- super(daoService);
- this.daoService = daoService;
+ public GCMetricPersistenceWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
+ super(daoService, cacheServiceManager);
}
@Override protected boolean needMergeDBData() {
@@ -47,18 +45,20 @@ public class GCMetricPersistenceWorker extends PersistenceWorker {
- public Factory(DAOService daoService, QueueCreatorService queueCreatorService) {
- super(daoService, queueCreatorService);
+ public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
+ QueueCreatorService queueCreatorService) {
+ super(daoService, cacheServiceManager, queueCreatorService);
}
@Override
- public GCMetricPersistenceWorker workerInstance(DAOService daoService) {
- return new GCMetricPersistenceWorker(daoService);
+ public GCMetricPersistenceWorker workerInstance(DAOService daoService,
+ CacheServiceManager cacheServiceManager) {
+ return new GCMetricPersistenceWorker(getDaoService(), getCacheServiceManager());
}
@Override
diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/InstHeartBeatPersistenceWorker.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/InstHeartBeatPersistenceWorker.java
index bde7fb1c7565e027d288211d0a5848143d387c80..c43766fa791b971e39b9c3fefc3bba34b56e044f 100644
--- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/InstHeartBeatPersistenceWorker.java
+++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/InstHeartBeatPersistenceWorker.java
@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.jvm;
+import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO;
@@ -31,15 +32,12 @@ import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
*/
public class InstHeartBeatPersistenceWorker extends PersistenceWorker {
- private final DAOService daoService;
-
@Override public int id() {
return 0;
}
- public InstHeartBeatPersistenceWorker(DAOService daoService) {
- super(daoService);
- this.daoService = daoService;
+ public InstHeartBeatPersistenceWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
+ super(daoService, cacheServiceManager);
}
@Override protected boolean needMergeDBData() {
@@ -47,18 +45,19 @@ public class InstHeartBeatPersistenceWorker extends PersistenceWorker {
- public Factory(DAOService daoService, QueueCreatorService queueCreatorService) {
- super(daoService, queueCreatorService);
+ public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
+ QueueCreatorService queueCreatorService) {
+ super(daoService, cacheServiceManager, queueCreatorService);
}
- @Override
- public InstHeartBeatPersistenceWorker workerInstance(DAOService daoService) {
- return new InstHeartBeatPersistenceWorker(daoService);
+ @Override public InstHeartBeatPersistenceWorker workerInstance(DAOService daoService,
+ CacheServiceManager cacheServiceManager) {
+ return new InstHeartBeatPersistenceWorker(getDaoService(), getCacheServiceManager());
}
@Override
diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryMetricPersistenceWorker.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryMetricPersistenceWorker.java
index b91966f4874019a6ff46118c7826de327c59f9e0..1621190d8ddf00acf4dd2efe03d22ce9d9e339b7 100644
--- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryMetricPersistenceWorker.java
+++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryMetricPersistenceWorker.java
@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.jvm;
+import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO;
@@ -31,34 +32,32 @@ import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
*/
public class MemoryMetricPersistenceWorker extends PersistenceWorker {
- private final DAOService daoService;
+ public MemoryMetricPersistenceWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
+ super(daoService, cacheServiceManager);
+ }
@Override public int id() {
return 0;
}
- public MemoryMetricPersistenceWorker(DAOService daoService) {
- super(daoService);
- this.daoService = daoService;
- }
-
@Override protected boolean needMergeDBData() {
return false;
}
@Override protected IPersistenceDAO persistenceDAO() {
- return daoService.getPersistenceDAO(IMemoryMetricPersistenceDAO.class);
+ return getDaoService().getPersistenceDAO(IMemoryMetricPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider {
- public Factory(DAOService daoService, QueueCreatorService queueCreatorService) {
- super(daoService, queueCreatorService);
+ public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
+ QueueCreatorService queueCreatorService) {
+ super(daoService, cacheServiceManager, queueCreatorService);
}
- @Override
- public MemoryMetricPersistenceWorker workerInstance(DAOService daoService) {
- return new MemoryMetricPersistenceWorker(daoService);
+ @Override public MemoryMetricPersistenceWorker workerInstance(DAOService daoService,
+ CacheServiceManager cacheServiceManager) {
+ return new MemoryMetricPersistenceWorker(getDaoService(), getCacheServiceManager());
}
@Override
diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryPoolMetricPersistenceWorker.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryPoolMetricPersistenceWorker.java
index d54fee8b5c1ddd70a57808eb1d2a83df1a3f5478..fc22e5a4b5cc196d4ef4ef03c428806bd6c25a31 100644
--- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryPoolMetricPersistenceWorker.java
+++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryPoolMetricPersistenceWorker.java
@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.jvm;
+import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryPoolMetricPersistenceDAO;
@@ -31,15 +32,12 @@ import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
*/
public class MemoryPoolMetricPersistenceWorker extends PersistenceWorker {
- private final DAOService daoService;
-
@Override public int id() {
return 0;
}
- public MemoryPoolMetricPersistenceWorker(DAOService daoService) {
- super(daoService);
- this.daoService = daoService;
+ public MemoryPoolMetricPersistenceWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
+ super(daoService, cacheServiceManager);
}
@Override protected boolean needMergeDBData() {
@@ -47,18 +45,19 @@ public class MemoryPoolMetricPersistenceWorker extends PersistenceWorker {
- public Factory(DAOService daoService, QueueCreatorService queueCreatorService) {
- super(daoService, queueCreatorService);
+ public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
+ QueueCreatorService queueCreatorService) {
+ super(daoService, cacheServiceManager, queueCreatorService);
}
- @Override
- public MemoryPoolMetricPersistenceWorker workerInstance(DAOService daoService) {
- return new MemoryPoolMetricPersistenceWorker(daoService);
+ @Override public MemoryPoolMetricPersistenceWorker workerInstance(DAOService daoService,
+ CacheServiceManager cacheServiceManager) {
+ return new MemoryPoolMetricPersistenceWorker(getDaoService(), getCacheServiceManager());
}
@Override
diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationRegisterRemoteWorker.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationRegisterRemoteWorker.java
new file mode 100644
index 0000000000000000000000000000000000000000..b816cb4a7b2bfd49010690d1aa86131cf2b47bb8
--- /dev/null
+++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationRegisterRemoteWorker.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2017, OpenSkywalking Organization All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Project repository: https://github.com/OpenSkywalking/skywalking
+ */
+
+package org.skywalking.apm.collector.agent.stream.worker.register;
+
+import org.skywalking.apm.collector.cache.CacheServiceManager;
+import org.skywalking.apm.collector.remote.service.RemoteClientService;
+import org.skywalking.apm.collector.storage.service.DAOService;
+import org.skywalking.apm.collector.storage.table.register.Application;
+import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker;
+import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorkerProvider;
+import org.skywalking.apm.collector.stream.worker.base.WorkerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ApplicationRegisterRemoteWorker extends AbstractRemoteWorker {
+
+ private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterRemoteWorker.class);
+
+ public ApplicationRegisterRemoteWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
+ super(daoService, cacheServiceManager);
+ }
+
+ @Override public int id() {
+ return 0;
+ }
+
+ @Override protected void onWork(Application message) throws WorkerException {
+ logger.debug("application code: {}", message.getApplicationCode());
+ onNext(message);
+ }
+
+ public static class Factory extends AbstractRemoteWorkerProvider {
+
+ public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
+ RemoteClientService remoteClientService) {
+ super(daoService, cacheServiceManager, remoteClientService);
+ }
+
+ @Override public ApplicationRegisterRemoteWorker workerInstance(DAOService daoService,
+ CacheServiceManager cacheServiceManager) {
+ return new ApplicationRegisterRemoteWorker(getDaoService(), getCacheServiceManager());
+ }
+ }
+}
diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationRegisterSerialWorker.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationRegisterSerialWorker.java
new file mode 100644
index 0000000000000000000000000000000000000000..1d7523b1eecd90cdc0d1ac5b28a2a9607c279388
--- /dev/null
+++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationRegisterSerialWorker.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2017, OpenSkywalking Organization All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Project repository: https://github.com/OpenSkywalking/skywalking
+ */
+
+package org.skywalking.apm.collector.agent.stream.worker.register;
+
+import org.skywalking.apm.collector.agent.stream.IdAutoIncrement;
+import org.skywalking.apm.collector.cache.CacheServiceManager;
+import org.skywalking.apm.collector.core.util.Const;
+import org.skywalking.apm.collector.queue.service.QueueCreatorService;
+import org.skywalking.apm.collector.storage.dao.IApplicationStreamDAO;
+import org.skywalking.apm.collector.storage.service.DAOService;
+import org.skywalking.apm.collector.storage.table.register.Application;
+import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker;
+import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
+import org.skywalking.apm.collector.stream.worker.base.WorkerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ApplicationRegisterSerialWorker extends AbstractLocalAsyncWorker {
+
+ private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterSerialWorker.class);
+
+ public ApplicationRegisterSerialWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
+ super(daoService, cacheServiceManager);
+ }
+
+ @Override public int id() {
+ return 0;
+ }
+
+ @Override protected void onWork(Application application) throws WorkerException {
+ logger.debug("register application, application code: {}", application.getApplicationCode());
+ int applicationId = getCacheServiceManager().getApplicationCacheService().get(application.getApplicationCode());
+
+ if (applicationId == 0) {
+ IApplicationStreamDAO dao = (IApplicationStreamDAO)getDaoService().get(IApplicationStreamDAO.class);
+ int min = dao.getMinApplicationId();
+ if (min == 0) {
+ Application userApplication = new Application(String.valueOf(Const.USER_ID));
+ userApplication.setApplicationCode(Const.USER_CODE);
+ userApplication.setApplicationId(Const.USER_ID);
+ dao.save(userApplication);
+
+ application = new Application("-1");
+ application.setApplicationId(-1);
+ } else {
+ int max = dao.getMaxApplicationId();
+ applicationId = IdAutoIncrement.INSTANCE.increment(min, max);
+
+ application = new Application(String.valueOf(applicationId));
+ application.setApplicationId(applicationId);
+ }
+ dao.save(application);
+ }
+ }
+
+ public static class Factory extends AbstractLocalAsyncWorkerProvider {
+
+ public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
+ QueueCreatorService queueCreatorService) {
+ super(daoService, cacheServiceManager, queueCreatorService);
+ }
+
+ @Override public ApplicationRegisterSerialWorker workerInstance(DAOService daoService,
+ CacheServiceManager cacheServiceManager) {
+ return new ApplicationRegisterSerialWorker(daoService, cacheServiceManager);
+ }
+
+ @Override public int queueSize() {
+ return 256;
+ }
+ }
+}
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/dao/IApplicationStreamDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/dao/IApplicationStreamDAO.java
index de2818cd0d4f6dc3198fb9b9e519f8f8173f9a02..32e3ecff5bacd208712cd1767ddc3877390d9b60 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/dao/IApplicationStreamDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/dao/IApplicationStreamDAO.java
@@ -18,12 +18,13 @@
package org.skywalking.apm.collector.storage.dao;
+import org.skywalking.apm.collector.storage.base.dao.DAO;
import org.skywalking.apm.collector.storage.table.register.Application;
/**
* @author peng-yongsheng
*/
-public interface IApplicationStreamDAO {
+public interface IApplicationStreamDAO extends DAO {
int getMaxApplicationId();
int getMinApplicationId();
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/register/Application.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/register/Application.java
index bea8135cac7248fabe055e0e183de0f4b233515b..e27d75753f106c2333ee58a16ac725d7d93b114f 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/register/Application.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/register/Application.java
@@ -50,7 +50,15 @@ public class Application extends Data {
return getDataString(1);
}
+ public void setApplicationCode(String applicationCode) {
+ setDataString(1, applicationCode);
+ }
+
public int getApplicationId() {
return getDataInteger(0);
}
+
+ public void setApplicationId(int applicationId) {
+ setDataInteger(0, applicationId);
+ }
}
diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/StreamModuleProvider.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/StreamModuleProvider.java
index 78ae60ca6599ecee00276fc0ff55f701f7f6ee21..d74b0d75f6870af3a957d21b2c34f08841619b09 100644
--- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/StreamModuleProvider.java
+++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/StreamModuleProvider.java
@@ -19,6 +19,7 @@
package org.skywalking.apm.collector.stream;
import java.util.Properties;
+import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ModuleProvider;
@@ -64,6 +65,6 @@ public class StreamModuleProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
- return new String[] {RemoteModule.NAME, QueueModule.NAME, StorageModule.NAME};
+ return new String[] {RemoteModule.NAME, QueueModule.NAME, StorageModule.NAME, CacheModule.NAME};
}
}
diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorker.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorker.java
index b3d3ee2282af2cfddf7651103ea1c9cad62942fe..dcafca158e137046c0f8a712aefa9f3c9b264abe 100644
--- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorker.java
+++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorker.java
@@ -18,8 +18,10 @@
package org.skywalking.apm.collector.stream.worker.base;
+import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.queue.base.QueueExecutor;
+import org.skywalking.apm.collector.storage.service.DAOService;
/**
* The AbstractLocalAsyncWorker
implementations represent workers,
@@ -30,6 +32,10 @@ import org.skywalking.apm.collector.queue.base.QueueExecutor;
*/
public abstract class AbstractLocalAsyncWorker extends AbstractWorker implements QueueExecutor {
+ public AbstractLocalAsyncWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
+ super(daoService, cacheServiceManager);
+ }
+
/**
* Receive message
*
diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorkerProvider.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorkerProvider.java
index d87ffebf1229f713a1a059a89d412397de630522..e07545687e2d8bbc4fb78009f103954b09cfc6fc 100644
--- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorkerProvider.java
+++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorkerProvider.java
@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.stream.worker.base;
+import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.skywalking.apm.collector.queue.base.QueueExecutor;
@@ -31,17 +32,17 @@ public abstract class AbstractLocalAsyncWorkerProvider queueCreatorService;
- public AbstractLocalAsyncWorkerProvider(DAOService daoService, QueueCreatorService queueCreatorService) {
- this.daoService = daoService;
+ public AbstractLocalAsyncWorkerProvider(DAOService daoService, CacheServiceManager cacheServiceManager,
+ QueueCreatorService queueCreatorService) {
+ super(daoService, cacheServiceManager);
this.queueCreatorService = queueCreatorService;
}
@Override
final public WorkerRef create(WorkerCreateListener workerCreateListener) throws ProviderNotFoundException {
- WorkerType localAsyncWorker = workerInstance(daoService);
+ WorkerType localAsyncWorker = workerInstance(getDaoService(), getCacheServiceManager());
workerCreateListener.addWorker(localAsyncWorker);
QueueEventHandler queueEventHandler = queueCreatorService.create(queueSize(), localAsyncWorker);
return new LocalAsyncWorkerRef<>(localAsyncWorker, queueEventHandler);
diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractRemoteWorker.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractRemoteWorker.java
index 52341beabb378d1bb21a0b680e63fde267638985..419e1c3cee0048e81df9b89eaaae26af81a50770 100644
--- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractRemoteWorker.java
+++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractRemoteWorker.java
@@ -18,7 +18,9 @@
package org.skywalking.apm.collector.stream.worker.base;
+import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.data.Data;
+import org.skywalking.apm.collector.storage.service.DAOService;
/**
* The AbstractRemoteWorker
implementations represent workers,
@@ -31,6 +33,10 @@ import org.skywalking.apm.collector.core.data.Data;
*/
public abstract class AbstractRemoteWorker extends AbstractWorker {
+ public AbstractRemoteWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
+ super(daoService, cacheServiceManager);
+ }
+
/**
* This method use for message producer to call for send message.
*
diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractRemoteWorkerProvider.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractRemoteWorkerProvider.java
index dce4fe55e448fdd12c3c5bbea1a202352312fe89..73d2da0140d01df18700670e8027e8f6c1eaada5 100644
--- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractRemoteWorkerProvider.java
+++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractRemoteWorkerProvider.java
@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.stream.worker.base;
+import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.storage.service.DAOService;
@@ -32,11 +33,11 @@ import org.skywalking.apm.collector.storage.service.DAOService;
*/
public abstract class AbstractRemoteWorkerProvider> extends AbstractWorkerProvider {
- private final DAOService daoService;
private final RemoteClientService remoteClientService;
- public AbstractRemoteWorkerProvider(DAOService daoService, RemoteClientService remoteClientService) {
- this.daoService = daoService;
+ public AbstractRemoteWorkerProvider(DAOService daoService, CacheServiceManager cacheServiceManager,
+ RemoteClientService remoteClientService) {
+ super(daoService, cacheServiceManager);
this.remoteClientService = remoteClientService;
}
@@ -48,7 +49,7 @@ public abstract class AbstractRemoteWorkerProvider workerRef = new RemoteWorkerRef<>(remoteWorker);
return workerRef;
diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorker.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorker.java
index 65bfb418522557ebd1842f641d61de31a1ff8427..8a04a093bc50cde89e361546f8b1ba29c5945059 100644
--- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorker.java
+++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorker.java
@@ -18,9 +18,11 @@
package org.skywalking.apm.collector.stream.worker.base;
+import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.graph.Next;
import org.skywalking.apm.collector.core.graph.NodeProcessor;
+import org.skywalking.apm.collector.storage.service.DAOService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,6 +33,22 @@ public abstract class AbstractWorker im
private final Logger logger = LoggerFactory.getLogger(AbstractWorker.class);
+ private final DAOService daoService;
+ private final CacheServiceManager cacheServiceManager;
+
+ public AbstractWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
+ this.daoService = daoService;
+ this.cacheServiceManager = cacheServiceManager;
+ }
+
+ public DAOService getDaoService() {
+ return daoService;
+ }
+
+ public CacheServiceManager getCacheServiceManager() {
+ return cacheServiceManager;
+ }
+
private Next