From 4071b27b21a2d7e535d753a87cfd65c4352805cf Mon Sep 17 00:00:00 2001 From: pengys5 <8082209@qq.com> Date: Tue, 18 Jul 2017 23:00:24 +0800 Subject: [PATCH] collector system use zookeeper cluster module start successful --- .../agentstream/AgentStreamModuleContext.java | 12 +++ .../agentstream/AgentStreamModuleDefine.java | 19 ++-- .../AgentStreamModuleGroupDefine.java | 3 +- .../AgentStreamModuleInstaller.java | 37 +++----- .../grpc/AgentStreamGRPCConfigParser.java | 12 +-- .../grpc/AgentStreamGRPCDataListener.java | 19 ++++ .../grpc/AgentStreamGRPCModuleDefine.java | 9 +- .../jetty/AgentStreamJettyConfigParser.java | 12 +-- .../jetty/AgentStreamJettyDataListener.java | 19 ++++ .../jetty/AgentStreamJettyModuleDefine.java | 9 +- .../client/zookeeper/ZookeeperClient.java | 15 ++- .../zookeeper/ZookeeperDataListener.java | 59 ------------ .../cluster/ClusterModuleDefine.java | 19 +++- .../cluster/ClusterModuleInstaller.java | 9 +- .../redis/ClusterRedisDataInitializer.java | 24 ----- .../redis/ClusterRedisModuleDefine.java | 19 ++-- .../ClusterRedisModuleRegistrationReader.java | 13 +++ .../ClusterRedisModuleRegistrationWriter.java | 28 ------ .../ClusterStandaloneDataInitializer.java | 32 ------- .../ClusterStandaloneModuleDefine.java | 19 ++-- ...terStandaloneModuleRegistrationReader.java | 14 +++ ...terStandaloneModuleRegistrationWriter.java | 34 ------- .../zookeeper/ClusterZKConfigParser.java | 2 +- .../zookeeper/ClusterZKDataInitializer.java | 49 ---------- .../zookeeper/ClusterZKDataMonitor.java | 95 +++++++++++++++++++ .../zookeeper/ClusterZKModuleDefine.java | 20 ++-- .../ClusterZKModuleRegistrationWriter.java | 37 -------- .../collector/core/client/DataListener.java | 12 --- .../collector/core/client/DataMonitor.java | 17 ++++ .../core/cluster/ClusterDataInitializer.java | 24 ----- .../core/cluster/ClusterDataListener.java | 39 ++++++++ .../cluster/ClusterDataListenerDefine.java | 8 ++ .../core/cluster/ClusterModuleContext.java | 19 ++-- .../ClusterModuleRegistrationWriter.java | 19 ---- .../collector/core/framework/Listener.java | 7 ++ .../collector/core/module/ModuleDefine.java | 6 +- .../core/module/ModuleRegistration.java | 4 + .../collector/core/util/CollectionUtils.java | 9 ++ .../apm/collector/core/util/ObjectUtils.java | 4 + .../apm/collector/core/util/StringUtils.java | 4 + .../main/resources/application-default.yml | 6 +- .../src/main/resources/logback.xml | 2 +- .../collector/queue/QueueModuleDefine.java | 8 +- .../collector/queue/QueueModuleInstaller.java | 2 +- .../remote/grpc/RemoteGRPCModuleDefine.java | 8 +- .../apm/collector/server/grpc/GRPCServer.java | 9 +- .../AbstractLocalAsyncWorkerProvider.java | 3 +- 47 files changed, 386 insertions(+), 463 deletions(-) create mode 100644 apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleContext.java create mode 100644 apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCDataListener.java create mode 100644 apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyDataListener.java delete mode 100644 apm-collector/apm-collector-client/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperDataListener.java rename apm-collector/{apm-collector-core/src/main/java/org/skywalking/apm/collector/core => apm-collector-cluster/src/main/java/org/skywalking/apm/collector}/cluster/ClusterModuleDefine.java (61%) delete mode 100644 apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisDataInitializer.java create mode 100644 apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleRegistrationReader.java delete mode 100644 apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleRegistrationWriter.java delete mode 100644 apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneDataInitializer.java create mode 100644 apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleRegistrationReader.java delete mode 100644 apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleRegistrationWriter.java delete mode 100644 apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataInitializer.java create mode 100644 apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataMonitor.java delete mode 100644 apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleRegistrationWriter.java delete mode 100644 apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/client/DataListener.java create mode 100644 apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/client/DataMonitor.java delete mode 100644 apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataInitializer.java create mode 100644 apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataListener.java create mode 100644 apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataListenerDefine.java delete mode 100644 apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleRegistrationWriter.java create mode 100644 apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Listener.java diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleContext.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleContext.java new file mode 100644 index 0000000000..8d6219c04c --- /dev/null +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleContext.java @@ -0,0 +1,12 @@ +package org.skywalking.apm.collector.agentstream; + +import org.skywalking.apm.collector.core.framework.Context; + +/** + * @author pengys5 + */ +public class AgentStreamModuleContext extends Context { + public AgentStreamModuleContext(String groupName) { + super(groupName); + } +} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleDefine.java index 239e574249..e69e7888d8 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleDefine.java @@ -4,20 +4,24 @@ import java.util.Map; import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine; import org.skywalking.apm.collector.core.client.Client; import org.skywalking.apm.collector.core.client.ClientException; -import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer; +import org.skywalking.apm.collector.core.client.DataMonitor; +import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine; import org.skywalking.apm.collector.core.cluster.ClusterModuleContext; import org.skywalking.apm.collector.core.config.ConfigParseException; import org.skywalking.apm.collector.core.framework.CollectorContextHelper; -import org.skywalking.apm.collector.core.framework.DataInitializer; import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.server.Server; import org.skywalking.apm.collector.core.server.ServerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author pengys5 */ -public abstract class AgentStreamModuleDefine extends ModuleDefine { +public abstract class AgentStreamModuleDefine extends ModuleDefine implements ClusterDataListenerDefine { + + private final Logger logger = LoggerFactory.getLogger(AgentStreamModuleDefine.class); @Override public final void initialize(Map config) throws DefineException, ClientException { try { @@ -25,18 +29,17 @@ public abstract class AgentStreamModuleDefine extends ModuleDefine { Server server = server(); server.initialize(); - String key = ClusterDataInitializer.BASE_CATALOG + "." + name(); - ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getWriter().write(key, registration().buildValue()); + ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration()); } catch (ConfigParseException | ServerException e) { throw new AgentStreamModuleException(e.getMessage(), e); } } - @Override protected final DataInitializer dataInitializer() { + @Override protected Client createClient(DataMonitor dataMonitor) { throw new UnsupportedOperationException(""); } - @Override protected final Client createClient() { - throw new UnsupportedOperationException(""); + @Override public final boolean defaultModule() { + return true; } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleGroupDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleGroupDefine.java index ea730c5d66..216e2ef2ec 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleGroupDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleGroupDefine.java @@ -1,6 +1,5 @@ package org.skywalking.apm.collector.agentstream; -import org.skywalking.apm.collector.core.cluster.ClusterModuleContext; import org.skywalking.apm.collector.core.framework.Context; import org.skywalking.apm.collector.core.module.ModuleGroupDefine; import org.skywalking.apm.collector.core.module.ModuleInstaller; @@ -17,7 +16,7 @@ public class AgentStreamModuleGroupDefine implements ModuleGroupDefine { } @Override public Context groupContext() { - return new ClusterModuleContext(GROUP_NAME); + return new AgentStreamModuleContext(GROUP_NAME); } @Override public ModuleInstaller moduleInstaller() { diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleInstaller.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleInstaller.java index df6a5f168f..8feda295df 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleInstaller.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleInstaller.java @@ -2,15 +2,12 @@ package org.skywalking.apm.collector.agentstream; import java.util.Iterator; import java.util.Map; -import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine; import org.skywalking.apm.collector.core.client.ClientException; -import org.skywalking.apm.collector.core.cluster.ClusterModuleContext; -import org.skywalking.apm.collector.core.cluster.ClusterModuleDefine; import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.module.ModuleInstaller; -import org.skywalking.apm.collector.core.util.CollectionUtils; +import org.skywalking.apm.collector.core.util.ObjectUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,29 +20,17 @@ public class AgentStreamModuleInstaller implements ModuleInstaller { @Override public void install(Map moduleConfig, Map moduleDefineMap) throws DefineException, ClientException { - logger.info("beginning cluster module install"); - - ModuleDefine moduleDefine = null; - if (CollectionUtils.isEmpty(moduleConfig)) { - logger.info("could not configure cluster module, use the default"); - Iterator> moduleDefineEntry = moduleDefineMap.entrySet().iterator(); - while (moduleDefineEntry.hasNext()) { - moduleDefine = moduleDefineEntry.next().getValue(); - if (moduleDefine.defaultModule()) { - logger.info("module {} initialize", moduleDefine.getClass().getName()); - moduleDefine.initialize(null); - break; - } - } - } else { - Map.Entry clusterConfigEntry = moduleConfig.entrySet().iterator().next(); - moduleDefine = moduleDefineMap.get(clusterConfigEntry.getKey()); - moduleDefine.initialize(clusterConfigEntry.getValue()); - } - - ClusterModuleContext context = new ClusterModuleContext(ClusterModuleGroupDefine.GROUP_NAME); - context.setWriter(((ClusterModuleDefine)moduleDefine).registrationWriter()); + logger.info("beginning agent stream module install"); + AgentStreamModuleContext context = new AgentStreamModuleContext(AgentStreamModuleGroupDefine.GROUP_NAME); CollectorContextHelper.INSTANCE.putContext(context); + + logger.info("could not configure cluster module, use the default"); + Iterator> moduleDefineEntry = moduleDefineMap.entrySet().iterator(); + while (moduleDefineEntry.hasNext()) { + ModuleDefine moduleDefine = moduleDefineEntry.next().getValue(); + logger.info("module {} initialize", moduleDefine.getClass().getName()); + moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null); + } } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCConfigParser.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCConfigParser.java index 35b9e288f0..43a7ac543a 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCConfigParser.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCConfigParser.java @@ -3,6 +3,7 @@ package org.skywalking.apm.collector.agentstream.grpc; import java.util.Map; import org.skywalking.apm.collector.core.config.ConfigParseException; import org.skywalking.apm.collector.core.module.ModuleConfigParser; +import org.skywalking.apm.collector.core.util.ObjectUtils; import org.skywalking.apm.collector.core.util.StringUtils; /** @@ -14,16 +15,13 @@ public class AgentStreamGRPCConfigParser implements ModuleConfigParser { private static final String PORT = "port"; @Override public void parse(Map config) throws ConfigParseException { - AgentStreamGRPCConfig.HOST = (String)config.get(HOST); - - if (StringUtils.isEmpty(AgentStreamGRPCConfig.HOST)) { - throw new ConfigParseException(""); + if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(HOST))) { + AgentStreamGRPCConfig.HOST = "localhost"; } - if (StringUtils.isEmpty(config.get(PORT))) { - throw new ConfigParseException(""); + if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(PORT))) { + AgentStreamGRPCConfig.PORT = 11800; } else { AgentStreamGRPCConfig.PORT = (Integer)config.get(PORT); - } } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCDataListener.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCDataListener.java new file mode 100644 index 0000000000..0b749e0037 --- /dev/null +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCDataListener.java @@ -0,0 +1,19 @@ +package org.skywalking.apm.collector.agentstream.grpc; + +import org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine; +import org.skywalking.apm.collector.cluster.ClusterModuleDefine; +import org.skywalking.apm.collector.core.cluster.ClusterDataListener; + +/** + * @author pengys5 + */ +public class AgentStreamGRPCDataListener extends ClusterDataListener { + + public AgentStreamGRPCDataListener(String moduleName) { + super(moduleName); + } + + @Override public String path() { + return ClusterModuleDefine.BASE_CATALOG + "." + AgentStreamModuleGroupDefine.GROUP_NAME + "." + moduleName(); + } +} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCModuleDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCModuleDefine.java index b41b062d52..f9dcf28f4d 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCModuleDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCModuleDefine.java @@ -2,6 +2,7 @@ package org.skywalking.apm.collector.agentstream.grpc; import org.skywalking.apm.collector.agentstream.AgentStreamModuleDefine; import org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine; +import org.skywalking.apm.collector.core.cluster.ClusterDataListener; import org.skywalking.apm.collector.core.module.ModuleConfigParser; import org.skywalking.apm.collector.core.module.ModuleRegistration; import org.skywalking.apm.collector.core.server.Server; @@ -20,10 +21,6 @@ public class AgentStreamGRPCModuleDefine extends AgentStreamModuleDefine { return "grpc"; } - @Override public boolean defaultModule() { - return true; - } - @Override protected ModuleConfigParser configParser() { return new AgentStreamGRPCConfigParser(); } @@ -35,4 +32,8 @@ public class AgentStreamGRPCModuleDefine extends AgentStreamModuleDefine { @Override protected ModuleRegistration registration() { return new AgentStreamGRPCModuleRegistration(); } + + @Override public ClusterDataListener listener() { + return new AgentStreamGRPCDataListener(name()); + } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyConfigParser.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyConfigParser.java index 11afc270b4..86eb65a2bb 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyConfigParser.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyConfigParser.java @@ -3,6 +3,7 @@ package org.skywalking.apm.collector.agentstream.jetty; import java.util.Map; import org.skywalking.apm.collector.core.config.ConfigParseException; import org.skywalking.apm.collector.core.module.ModuleConfigParser; +import org.skywalking.apm.collector.core.util.ObjectUtils; import org.skywalking.apm.collector.core.util.StringUtils; /** @@ -15,18 +16,17 @@ public class AgentStreamJettyConfigParser implements ModuleConfigParser { public static final String CONTEXT_PATH = "contextPath"; @Override public void parse(Map config) throws ConfigParseException { - AgentStreamJettyConfig.HOST = (String)config.get(HOST); AgentStreamJettyConfig.CONTEXT_PATH = "/"; - if (StringUtils.isEmpty(AgentStreamJettyConfig.HOST)) { - throw new ConfigParseException(""); + if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(HOST))) { + AgentStreamJettyConfig.HOST = "localhost"; } - if (StringUtils.isEmpty(config.get(PORT))) { - throw new ConfigParseException(""); + if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(PORT))) { + AgentStreamJettyConfig.PORT = 12800; } else { AgentStreamJettyConfig.PORT = (Integer)config.get(PORT); } - if (!StringUtils.isEmpty(config.get(CONTEXT_PATH))) { + if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(CONTEXT_PATH))) { AgentStreamJettyConfig.CONTEXT_PATH = (String)config.get(CONTEXT_PATH); } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyDataListener.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyDataListener.java new file mode 100644 index 0000000000..8ee9e4a729 --- /dev/null +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyDataListener.java @@ -0,0 +1,19 @@ +package org.skywalking.apm.collector.agentstream.jetty; + +import org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine; +import org.skywalking.apm.collector.cluster.ClusterModuleDefine; +import org.skywalking.apm.collector.core.cluster.ClusterDataListener; + +/** + * @author pengys5 + */ +public class AgentStreamJettyDataListener extends ClusterDataListener { + + public AgentStreamJettyDataListener(String moduleName) { + super(moduleName); + } + + @Override public String path() { + return ClusterModuleDefine.BASE_CATALOG + "." + AgentStreamModuleGroupDefine.GROUP_NAME + "." + moduleName(); + } +} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyModuleDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyModuleDefine.java index ac14352cbf..fff051e45f 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyModuleDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyModuleDefine.java @@ -2,6 +2,7 @@ package org.skywalking.apm.collector.agentstream.jetty; import org.skywalking.apm.collector.agentstream.AgentStreamModuleDefine; import org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine; +import org.skywalking.apm.collector.core.cluster.ClusterDataListener; import org.skywalking.apm.collector.core.module.ModuleConfigParser; import org.skywalking.apm.collector.core.module.ModuleRegistration; import org.skywalking.apm.collector.core.server.Server; @@ -20,10 +21,6 @@ public class AgentStreamJettyModuleDefine extends AgentStreamModuleDefine { return "jetty"; } - @Override public boolean defaultModule() { - return false; - } - @Override protected ModuleConfigParser configParser() { return new AgentStreamJettyConfigParser(); } @@ -35,4 +32,8 @@ public class AgentStreamJettyModuleDefine extends AgentStreamModuleDefine { @Override protected ModuleRegistration registration() { return new AgentStreamJettyModuleRegistration(); } + + @Override public ClusterDataListener listener() { + return new AgentStreamJettyDataListener(name()); + } } diff --git a/apm-collector/apm-collector-client/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperClient.java b/apm-collector/apm-collector-client/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperClient.java index 8191e1da7f..89dc9174ae 100644 --- a/apm-collector/apm-collector-client/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperClient.java +++ b/apm-collector/apm-collector-client/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperClient.java @@ -4,6 +4,7 @@ import java.io.IOException; import java.util.List; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; @@ -22,15 +23,17 @@ public class ZookeeperClient implements Client { private final String hostPort; private final int sessionTimeout; + private final Watcher watcher; - public ZookeeperClient(String hostPort, int sessionTimeout) { + public ZookeeperClient(String hostPort, int sessionTimeout, Watcher watcher) { this.hostPort = hostPort; this.sessionTimeout = sessionTimeout; + this.watcher = watcher; } @Override public void initialize() throws ZookeeperClientException { try { - zk = new ZooKeeper(hostPort, sessionTimeout, new ZookeeperDataListener(this)); + zk = new ZooKeeper(hostPort, sessionTimeout, watcher); } catch (IOException e) { throw new ZookeeperClientException(e.getMessage(), e); } @@ -68,4 +71,12 @@ public class ZookeeperClient implements Client { throw new ZookeeperClientException(e.getMessage(), e); } } + + public List getChildren(final String path, boolean watch) throws ZookeeperClientException { + try { + return zk.getChildren(path, watch); + } catch (KeeperException | InterruptedException e) { + throw new ZookeeperClientException(e.getMessage(), e); + } + } } diff --git a/apm-collector/apm-collector-client/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperDataListener.java b/apm-collector/apm-collector-client/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperDataListener.java deleted file mode 100644 index cd5ff1e194..0000000000 --- a/apm-collector/apm-collector-client/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperDataListener.java +++ /dev/null @@ -1,59 +0,0 @@ -package org.skywalking.apm.collector.client.zookeeper; - -import java.util.LinkedList; -import java.util.List; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.skywalking.apm.collector.core.client.Client; -import org.skywalking.apm.collector.core.client.ClientException; -import org.skywalking.apm.collector.core.client.DataListener; -import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer; -import org.skywalking.apm.collector.core.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author pengys5 - */ -public class ZookeeperDataListener implements DataListener, Watcher { - - private final Logger logger = LoggerFactory.getLogger(ZookeeperDataListener.class); - - private ZookeeperClient client; - - public ZookeeperDataListener(Client client) { - this.client = (ZookeeperClient)client; - } - - @Override public void process(WatchedEvent event) { - logger.debug("path {}", event.getPath()); - if (StringUtils.isEmpty(event.getPath())) { - return; - } - - try { - String data = String.valueOf(client.getData(event.getPath(), false, null)); - logger.debug("data {}", data); - } catch (ClientException e) { - logger.error(e.getMessage(), e); - } - } - - @Override public void listen() throws ClientException { - for (String itemKey : items()) { - String[] catalogs = itemKey.split("\\."); - StringBuilder pathBuilder = new StringBuilder(); - for (String catalog : catalogs) { - pathBuilder.append("/").append(catalog); - } - client.exists(pathBuilder.toString(), true); - } - } - - @Override public List items() { - List items = new LinkedList<>(); - items.add(ClusterDataInitializer.FOR_AGENT_CATALOG); - items.add(ClusterDataInitializer.FOR_UI_CATALOG); - return items; - } -} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleDefine.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/ClusterModuleDefine.java similarity index 61% rename from apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleDefine.java rename to apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/ClusterModuleDefine.java index 1d788d8ec5..846c9664a2 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleDefine.java +++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/ClusterModuleDefine.java @@ -1,9 +1,14 @@ -package org.skywalking.apm.collector.core.cluster; +package org.skywalking.apm.collector.cluster; import java.util.Map; import org.skywalking.apm.collector.core.client.Client; import org.skywalking.apm.collector.core.client.ClientException; +import org.skywalking.apm.collector.core.client.DataMonitor; +import org.skywalking.apm.collector.core.cluster.ClusterModuleContext; +import org.skywalking.apm.collector.core.cluster.ClusterModuleException; +import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader; import org.skywalking.apm.collector.core.config.ConfigParseException; +import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.module.ModuleRegistration; import org.skywalking.apm.collector.core.server.Server; @@ -13,14 +18,20 @@ import org.skywalking.apm.collector.core.server.Server; */ public abstract class ClusterModuleDefine extends ModuleDefine { + public static final String BASE_CATALOG = "skywalking"; + private Client client; @Override public final void initialize(Map config) throws ClusterModuleException { try { configParser().parse(config); - client = createClient(); + + DataMonitor dataMonitor = dataMonitor(); + client = createClient(dataMonitor); client.initialize(); - dataInitializer().initialize(client); + dataMonitor.setClient(client); + + ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setDataMonitor(dataMonitor); } catch (ConfigParseException | ClientException e) { throw new ClusterModuleException(e.getMessage(), e); } @@ -38,7 +49,7 @@ public abstract class ClusterModuleDefine extends ModuleDefine { throw new UnsupportedOperationException("Cluster module do not need module registration."); } - public abstract ClusterModuleRegistrationWriter registrationWriter(); + public abstract DataMonitor dataMonitor(); public abstract ClusterModuleRegistrationReader registrationReader(); } diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/ClusterModuleInstaller.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/ClusterModuleInstaller.java index 666366244e..42a3c38a44 100644 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/ClusterModuleInstaller.java +++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/ClusterModuleInstaller.java @@ -4,7 +4,6 @@ import java.util.Iterator; import java.util.Map; import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.cluster.ClusterModuleContext; -import org.skywalking.apm.collector.core.cluster.ClusterModuleDefine; import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.module.ModuleDefine; @@ -24,6 +23,9 @@ public class ClusterModuleInstaller implements ModuleInstaller { Map moduleDefineMap) throws DefineException, ClientException { logger.info("beginning cluster module install"); + ClusterModuleContext context = new ClusterModuleContext(ClusterModuleGroupDefine.GROUP_NAME); + CollectorContextHelper.INSTANCE.putContext(context); + ModuleDefine moduleDefine = null; if (CollectionUtils.isEmpty(moduleConfig)) { logger.info("could not configure cluster module, use the default"); @@ -41,10 +43,5 @@ public class ClusterModuleInstaller implements ModuleInstaller { moduleDefine = moduleDefineMap.get(clusterConfigEntry.getKey()); moduleDefine.initialize(clusterConfigEntry.getValue()); } - - ClusterModuleContext context = new ClusterModuleContext(ClusterModuleGroupDefine.GROUP_NAME); - context.setWriter(((ClusterModuleDefine)moduleDefine).registrationWriter()); - - CollectorContextHelper.INSTANCE.putContext(context); } } diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisDataInitializer.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisDataInitializer.java deleted file mode 100644 index 9ea5288677..0000000000 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisDataInitializer.java +++ /dev/null @@ -1,24 +0,0 @@ -package org.skywalking.apm.collector.cluster.redis; - -import org.skywalking.apm.collector.core.client.Client; -import org.skywalking.apm.collector.core.client.ClientException; -import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author pengys5 - */ -public class ClusterRedisDataInitializer extends ClusterDataInitializer { - - private final Logger logger = LoggerFactory.getLogger(ClusterRedisDataInitializer.class); - - @Override public void addItem(Client client, String itemKey) throws ClientException { - logger.info("add the redis item key \"{}\" exist", itemKey); - } - - @Override public boolean existItem(Client client, String itemKey) throws ClientException { - logger.info("assess the redis item key \"{}\" exist", itemKey); - return false; - } -} diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleDefine.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleDefine.java index 625d894fa1..9f11286bd9 100644 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleDefine.java +++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleDefine.java @@ -1,12 +1,11 @@ package org.skywalking.apm.collector.cluster.redis; import org.skywalking.apm.collector.client.redis.RedisClient; +import org.skywalking.apm.collector.cluster.ClusterModuleDefine; import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine; import org.skywalking.apm.collector.core.client.Client; -import org.skywalking.apm.collector.core.cluster.ClusterModuleDefine; +import org.skywalking.apm.collector.core.client.DataMonitor; import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader; -import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter; -import org.skywalking.apm.collector.core.framework.DataInitializer; import org.skywalking.apm.collector.core.module.ModuleConfigParser; /** @@ -32,19 +31,15 @@ public class ClusterRedisModuleDefine extends ClusterModuleDefine { return new ClusterRedisConfigParser(); } - @Override protected Client createClient() { - return new RedisClient(ClusterRedisConfig.HOST, ClusterRedisConfig.PORT); - } - - @Override protected DataInitializer dataInitializer() { - return new ClusterRedisDataInitializer(); + @Override public DataMonitor dataMonitor() { + return null; } - @Override public ClusterModuleRegistrationWriter registrationWriter() { - return new ClusterRedisModuleRegistrationWriter(getClient()); + @Override protected Client createClient(DataMonitor dataMonitor) { + return new RedisClient(ClusterRedisConfig.HOST, ClusterRedisConfig.PORT); } @Override public ClusterModuleRegistrationReader registrationReader() { - return null; + return new ClusterRedisModuleRegistrationReader(); } } diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleRegistrationReader.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleRegistrationReader.java new file mode 100644 index 0000000000..9871151dd2 --- /dev/null +++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleRegistrationReader.java @@ -0,0 +1,13 @@ +package org.skywalking.apm.collector.cluster.redis; + +import java.util.List; +import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader; + +/** + * @author pengys5 + */ +public class ClusterRedisModuleRegistrationReader implements ClusterModuleRegistrationReader { + @Override public List read(String key) { + return null; + } +} diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleRegistrationWriter.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleRegistrationWriter.java deleted file mode 100644 index f94a614fa7..0000000000 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleRegistrationWriter.java +++ /dev/null @@ -1,28 +0,0 @@ -package org.skywalking.apm.collector.cluster.redis; - -import org.skywalking.apm.collector.client.redis.RedisClient; -import org.skywalking.apm.collector.core.client.Client; -import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter; -import org.skywalking.apm.collector.core.module.ModuleRegistration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author pengys5 - */ -public class ClusterRedisModuleRegistrationWriter extends ClusterModuleRegistrationWriter { - - private final Logger logger = LoggerFactory.getLogger(ClusterRedisModuleRegistrationWriter.class); - - public ClusterRedisModuleRegistrationWriter(Client client) { - super(client); - } - - @Override public void write(String key, ModuleRegistration.Value value) { - logger.debug("key {}, value {}", key, value.getHost()); - key = key + "." + value.getHost() + ":" + value.getPort(); - value.getData().addProperty("host", value.getHost()); - value.getData().addProperty("port", value.getPort()); - ((RedisClient)client).setex(key, 120, value.getData().toString()); - } -} diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneDataInitializer.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneDataInitializer.java deleted file mode 100644 index 6221ec2bbf..0000000000 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneDataInitializer.java +++ /dev/null @@ -1,32 +0,0 @@ -package org.skywalking.apm.collector.cluster.standalone; - -import org.skywalking.apm.collector.client.h2.H2Client; -import org.skywalking.apm.collector.client.h2.H2ClientException; -import org.skywalking.apm.collector.core.client.Client; -import org.skywalking.apm.collector.core.client.ClientException; -import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author pengys5 - */ -public class ClusterStandaloneDataInitializer extends ClusterDataInitializer { - - private final Logger logger = LoggerFactory.getLogger(ClusterStandaloneDataInitializer.class); - - @Override public void addItem(Client client, String itemKey) throws ClientException { - logger.info("add the h2 item key \"{}\" exist", itemKey); - itemKey = itemKey.replaceAll("\\.", "_"); - String sql = "CREATE TABLE " + itemKey + "(ADDRESS VARCHAR(100) PRIMARY KEY,DATA VARCHAR(255));"; - try { - ((H2Client)client).execute(sql); - } catch (H2ClientException e) { - logger.error(e.getMessage(), e); - } - } - - @Override public boolean existItem(Client client, String itemKey) throws ClientException { - return false; - } -} diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleDefine.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleDefine.java index 583439d7c5..4375bec3a3 100644 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleDefine.java +++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleDefine.java @@ -1,12 +1,11 @@ package org.skywalking.apm.collector.cluster.standalone; import org.skywalking.apm.collector.client.h2.H2Client; +import org.skywalking.apm.collector.cluster.ClusterModuleDefine; import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine; import org.skywalking.apm.collector.core.client.Client; -import org.skywalking.apm.collector.core.cluster.ClusterModuleDefine; +import org.skywalking.apm.collector.core.client.DataMonitor; import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader; -import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter; -import org.skywalking.apm.collector.core.framework.DataInitializer; import org.skywalking.apm.collector.core.module.ModuleConfigParser; /** @@ -32,19 +31,15 @@ public class ClusterStandaloneModuleDefine extends ClusterModuleDefine { return new ClusterStandaloneConfigParser(); } - @Override public Client createClient() { - return new H2Client(); - } - - @Override protected DataInitializer dataInitializer() { - return new ClusterStandaloneDataInitializer(); + @Override public DataMonitor dataMonitor() { + return null; } - @Override public ClusterModuleRegistrationWriter registrationWriter() { - return new ClusterStandaloneModuleRegistrationWriter(getClient()); + @Override protected Client createClient(DataMonitor dataMonitor) { + return new H2Client(); } @Override public ClusterModuleRegistrationReader registrationReader() { - return null; + return new ClusterStandaloneModuleRegistrationReader(); } } diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleRegistrationReader.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleRegistrationReader.java new file mode 100644 index 0000000000..2049060ba4 --- /dev/null +++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleRegistrationReader.java @@ -0,0 +1,14 @@ +package org.skywalking.apm.collector.cluster.standalone; + +import java.util.List; +import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader; + +/** + * @author pengys5 + */ +public class ClusterStandaloneModuleRegistrationReader implements ClusterModuleRegistrationReader { + + @Override public List read(String key) { + return null; + } +} diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleRegistrationWriter.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleRegistrationWriter.java deleted file mode 100644 index de811f165f..0000000000 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleRegistrationWriter.java +++ /dev/null @@ -1,34 +0,0 @@ -package org.skywalking.apm.collector.cluster.standalone; - -import org.skywalking.apm.collector.client.h2.H2Client; -import org.skywalking.apm.collector.client.h2.H2ClientException; -import org.skywalking.apm.collector.core.client.Client; -import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter; -import org.skywalking.apm.collector.core.module.ModuleRegistration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author pengys5 - */ -public class ClusterStandaloneModuleRegistrationWriter extends ClusterModuleRegistrationWriter { - - private final Logger logger = LoggerFactory.getLogger(ClusterStandaloneModuleRegistrationWriter.class); - - public ClusterStandaloneModuleRegistrationWriter(Client client) { - super(client); - } - - @Override public void write(String key, ModuleRegistration.Value value) { - key = key.replaceAll("\\.", "_"); - String hostPort = value.getHost() + ":" + value.getPort(); - String sql = "INSERT INTO " + key + " VALUES('" + hostPort + "', '" + value.getData().toString() + "');"; - String sql2 = "SELECT * FROM " + key; - try { - ((H2Client)client).execute(sql); - ((H2Client)client).executeQuery(sql2); - } catch (H2ClientException e) { - logger.error(e.getMessage(), e); - } - } -} diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKConfigParser.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKConfigParser.java index 9d0d62871a..2555b61cb1 100644 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKConfigParser.java +++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKConfigParser.java @@ -15,7 +15,7 @@ public class ClusterZKConfigParser implements ModuleConfigParser { @Override public void parse(Map config) throws ConfigParseException { ClusterZKConfig.HOST_PORT = (String)config.get(HOST_PORT); - ClusterZKConfig.SESSION_TIMEOUT = 1000; + ClusterZKConfig.SESSION_TIMEOUT = 3000; if (StringUtils.isEmpty(ClusterZKConfig.HOST_PORT)) { throw new ConfigParseException(""); diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataInitializer.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataInitializer.java deleted file mode 100644 index 4f741245fa..0000000000 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataInitializer.java +++ /dev/null @@ -1,49 +0,0 @@ -package org.skywalking.apm.collector.cluster.zookeeper; - -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.ZooDefs; -import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient; -import org.skywalking.apm.collector.core.client.Client; -import org.skywalking.apm.collector.core.client.ClientException; -import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author pengys5 - */ -public class ClusterZKDataInitializer extends ClusterDataInitializer { - - private final Logger logger = LoggerFactory.getLogger(ClusterZKDataInitializer.class); - - @Override public void addItem(Client client, String itemKey) throws ClientException { - logger.info("add the zookeeper item key \"{}\" exist", itemKey); - ZookeeperClient zkClient = (ZookeeperClient)client; - - String[] catalogs = itemKey.split("\\."); - StringBuilder pathBuilder = new StringBuilder(); - for (String catalog : catalogs) { - pathBuilder.append("/").append(catalog); - if (zkClient.exists(pathBuilder.toString(), false) == null) { - zkClient.create(pathBuilder.toString(), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } - } - } - - @Override public boolean existItem(Client client, String itemKey) throws ClientException { - logger.info("assess the zookeeper item key \"{}\" exist", itemKey); - ZookeeperClient zkClient = (ZookeeperClient)client; - - String[] catalogs = itemKey.split("\\."); - StringBuilder pathBuilder = new StringBuilder(); - for (String catalog : catalogs) { - pathBuilder.append("/").append(catalog); - } - -// if (zkClient.exists(pathBuilder.toString(), false) == null) { -// return false; -// } else { - return true; -// } - } -} diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataMonitor.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataMonitor.java new file mode 100644 index 0000000000..3f3bb2362e --- /dev/null +++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataMonitor.java @@ -0,0 +1,95 @@ +package org.skywalking.apm.collector.cluster.zookeeper; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient; +import org.skywalking.apm.collector.client.zookeeper.ZookeeperClientException; +import org.skywalking.apm.collector.client.zookeeper.util.PathUtils; +import org.skywalking.apm.collector.core.client.Client; +import org.skywalking.apm.collector.core.client.ClientException; +import org.skywalking.apm.collector.core.client.DataMonitor; +import org.skywalking.apm.collector.core.cluster.ClusterDataListener; +import org.skywalking.apm.collector.core.module.ModuleRegistration; +import org.skywalking.apm.collector.core.util.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public class ClusterZKDataMonitor implements DataMonitor, Watcher { + + private final Logger logger = LoggerFactory.getLogger(ClusterZKDataMonitor.class); + + private ZookeeperClient client; + + private Map listeners; + + public ClusterZKDataMonitor() { + listeners = new LinkedHashMap<>(); + } + + @Override public void process(WatchedEvent event) { + logger.debug("changed path {}", event.getPath()); + if (listeners.containsKey(event.getPath())) { + putDataIntoListener(listeners.get(event.getPath()), event.getPath()); + } + } + + @Override public void setClient(Client client) { + this.client = (ZookeeperClient)client; + } + + @Override + public void addListener(ClusterDataListener listener, ModuleRegistration registration) throws ClientException { + String path = PathUtils.convertKey2Path(listener.path()); + logger.info("listener path: {}", path); + listeners.put(path, listener); + createPath(path); + List paths = client.getChildren(path, true); + + if (CollectionUtils.isNotEmpty(paths)) { + paths.forEach(subPath -> { + putDataIntoListener(listener, subPath); + }); + } + + ModuleRegistration.Value value = registration.buildValue(); + setData(path + "/" + value.getHostPort(), value.getData() == null ? "" : value.getData().toString()); + } + + @Override public void createPath(String path) throws ClientException { + String[] paths = path.replaceFirst("/", "").split("/"); + + StringBuilder pathBuilder = new StringBuilder(); + for (String subPath : paths) { + pathBuilder.append("/").append(subPath); + if (client.exists(pathBuilder.toString(), false) == null) { + client.create(pathBuilder.toString(), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + } + } + + @Override public void setData(String path, String value) throws ClientException { + if (client.exists(path, false) == null) { + client.create(path, value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + } else { + client.setData(path, value.getBytes(), -1); + } + } + + private void putDataIntoListener(ClusterDataListener listener, String path) { + try { + byte[] data = client.getData(path, false, null); + String dataStr = String.valueOf(data); + listener.setData(new ClusterDataListener.Data(path, dataStr)); + } catch (ZookeeperClientException e) { + logger.error(e.getMessage(), e); + } + } +} diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleDefine.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleDefine.java index f9d570601a..6a51939e4c 100644 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleDefine.java +++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleDefine.java @@ -1,12 +1,12 @@ package org.skywalking.apm.collector.cluster.zookeeper; +import org.apache.zookeeper.Watcher; import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient; +import org.skywalking.apm.collector.cluster.ClusterModuleDefine; import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine; import org.skywalking.apm.collector.core.client.Client; -import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer; -import org.skywalking.apm.collector.core.cluster.ClusterModuleDefine; +import org.skywalking.apm.collector.core.client.DataMonitor; import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader; -import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter; import org.skywalking.apm.collector.core.module.ModuleConfigParser; /** @@ -32,19 +32,15 @@ public class ClusterZKModuleDefine extends ClusterModuleDefine { return new ClusterZKConfigParser(); } - @Override protected Client createClient() { - return new ZookeeperClient(ClusterZKConfig.HOST_PORT, ClusterZKConfig.SESSION_TIMEOUT); + @Override public DataMonitor dataMonitor() { + return new ClusterZKDataMonitor(); } - @Override protected ClusterDataInitializer dataInitializer() { - return new ClusterZKDataInitializer(); - } - - @Override public ClusterModuleRegistrationWriter registrationWriter() { - return new ClusterZKModuleRegistrationWriter(getClient()); + @Override protected Client createClient(DataMonitor dataMonitor) { + return new ZookeeperClient(ClusterZKConfig.HOST_PORT, ClusterZKConfig.SESSION_TIMEOUT, (Watcher)dataMonitor); } @Override public ClusterModuleRegistrationReader registrationReader() { - return null; + return new ClusterZKModuleRegistrationReader(); } } diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleRegistrationWriter.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleRegistrationWriter.java deleted file mode 100644 index 6223795601..0000000000 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleRegistrationWriter.java +++ /dev/null @@ -1,37 +0,0 @@ -package org.skywalking.apm.collector.cluster.zookeeper; - -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.Stat; -import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient; -import org.skywalking.apm.collector.client.zookeeper.util.PathUtils; -import org.skywalking.apm.collector.core.client.Client; -import org.skywalking.apm.collector.core.client.ClientException; -import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter; -import org.skywalking.apm.collector.core.module.ModuleRegistration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author pengys5 - */ -public class ClusterZKModuleRegistrationWriter extends ClusterModuleRegistrationWriter { - - private final Logger logger = LoggerFactory.getLogger(ClusterZKModuleRegistrationWriter.class); - - public ClusterZKModuleRegistrationWriter(Client client) { - super(client); - } - - @Override public void write(String key, ModuleRegistration.Value value) throws ClientException { - logger.info("cluster zookeeper register key: {}, value: {}", key, value); - String workerUIPath = PathUtils.convertKey2Path(key) + "/" + value.getHost() + ":" + value.getPort(); - - Stat stat = ((ZookeeperClient)client).exists(workerUIPath, false); - if (stat == null) { - ((ZookeeperClient)client).create(workerUIPath, value.getData() == null ? null : value.getData().toString().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - } else { - ((ZookeeperClient)client).setData(workerUIPath, value.getData() == null ? null : value.getData().toString().getBytes(), -1); - } - } -} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/client/DataListener.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/client/DataListener.java deleted file mode 100644 index 1022d39aa8..0000000000 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/client/DataListener.java +++ /dev/null @@ -1,12 +0,0 @@ -package org.skywalking.apm.collector.core.client; - -import java.util.List; - -/** - * @author pengys5 - */ -public interface DataListener { - List items(); - - void listen() throws ClientException; -} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/client/DataMonitor.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/client/DataMonitor.java new file mode 100644 index 0000000000..883db37694 --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/client/DataMonitor.java @@ -0,0 +1,17 @@ +package org.skywalking.apm.collector.core.client; + +import org.skywalking.apm.collector.core.cluster.ClusterDataListener; +import org.skywalking.apm.collector.core.module.ModuleRegistration; + +/** + * @author pengys5 + */ +public interface DataMonitor { + void setClient(Client client); + + void addListener(ClusterDataListener listener, ModuleRegistration registration) throws ClientException; + + void createPath(String path) throws ClientException; + + void setData(String path, String value) throws ClientException; +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataInitializer.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataInitializer.java deleted file mode 100644 index 62ff30d7ea..0000000000 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataInitializer.java +++ /dev/null @@ -1,24 +0,0 @@ -package org.skywalking.apm.collector.core.cluster; - -import org.skywalking.apm.collector.core.client.Client; -import org.skywalking.apm.collector.core.client.ClientException; -import org.skywalking.apm.collector.core.framework.DataInitializer; - -/** - * @author pengys5 - */ -public abstract class ClusterDataInitializer implements DataInitializer { - - public static final String BASE_CATALOG = "skywalking"; - public static final String FOR_UI_CATALOG = BASE_CATALOG + ".ui"; - public static final String FOR_AGENT_CATALOG = BASE_CATALOG + ".agent"; - - @Override public final void initialize(Client client) throws ClientException { - if (!existItem(client, FOR_UI_CATALOG)) { - addItem(client, FOR_UI_CATALOG); - } - if (!existItem(client, FOR_AGENT_CATALOG)) { - addItem(client, FOR_AGENT_CATALOG); - } - } -} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataListener.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataListener.java new file mode 100644 index 0000000000..03b2177b05 --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataListener.java @@ -0,0 +1,39 @@ +package org.skywalking.apm.collector.core.cluster; + +import java.util.LinkedList; +import java.util.List; +import org.skywalking.apm.collector.core.framework.Listener; + +/** + * @author pengys5 + */ +public abstract class ClusterDataListener implements Listener { + + private final String moduleName; + private List datas; + + public ClusterDataListener(String moduleName) { + this.moduleName = moduleName; + datas = new LinkedList<>(); + } + + public final String moduleName() { + return moduleName; + } + + public abstract String path(); + + public final void setData(Data data) { + datas.add(data); + } + + public static class Data { + private final String key; + private final String value; + + public Data(String key, String value) { + this.key = key; + this.value = value; + } + } +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataListenerDefine.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataListenerDefine.java new file mode 100644 index 0000000000..9f9600579e --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataListenerDefine.java @@ -0,0 +1,8 @@ +package org.skywalking.apm.collector.core.cluster; + +/** + * @author pengys5 + */ +public interface ClusterDataListenerDefine { + ClusterDataListener listener(); +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleContext.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleContext.java index a104e24ea4..17ab2971ed 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleContext.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleContext.java @@ -1,5 +1,6 @@ package org.skywalking.apm.collector.core.cluster; +import org.skywalking.apm.collector.core.client.DataMonitor; import org.skywalking.apm.collector.core.framework.Context; /** @@ -11,17 +12,9 @@ public class ClusterModuleContext extends Context { super(groupName); } - private ClusterModuleRegistrationWriter writer; - private ClusterModuleRegistrationReader reader; - public ClusterModuleRegistrationWriter getWriter() { - return writer; - } - - public void setWriter(ClusterModuleRegistrationWriter writer) { - this.writer = writer; - } + private DataMonitor dataMonitor; public ClusterModuleRegistrationReader getReader() { return reader; @@ -30,4 +23,12 @@ public class ClusterModuleContext extends Context { public void setReader(ClusterModuleRegistrationReader reader) { this.reader = reader; } + + public DataMonitor getDataMonitor() { + return dataMonitor; + } + + public void setDataMonitor(DataMonitor dataMonitor) { + this.dataMonitor = dataMonitor; + } } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleRegistrationWriter.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleRegistrationWriter.java deleted file mode 100644 index f60eca9df5..0000000000 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleRegistrationWriter.java +++ /dev/null @@ -1,19 +0,0 @@ -package org.skywalking.apm.collector.core.cluster; - -import org.skywalking.apm.collector.core.client.Client; -import org.skywalking.apm.collector.core.client.ClientException; -import org.skywalking.apm.collector.core.module.ModuleRegistration; - -/** - * @author pengys5 - */ -public abstract class ClusterModuleRegistrationWriter { - - protected final Client client; - - public ClusterModuleRegistrationWriter(Client client) { - this.client = client; - } - - public abstract void write(String key, ModuleRegistration.Value value) throws ClientException; -} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Listener.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Listener.java new file mode 100644 index 0000000000..d486408eb8 --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Listener.java @@ -0,0 +1,7 @@ +package org.skywalking.apm.collector.core.framework; + +/** + * @author pengys5 + */ +public interface Listener { +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefine.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefine.java index eeb3f56e38..49cc7704a1 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefine.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefine.java @@ -1,7 +1,7 @@ package org.skywalking.apm.collector.core.module; import org.skywalking.apm.collector.core.client.Client; -import org.skywalking.apm.collector.core.framework.DataInitializer; +import org.skywalking.apm.collector.core.client.DataMonitor; import org.skywalking.apm.collector.core.framework.Define; import org.skywalking.apm.collector.core.server.Server; @@ -16,11 +16,9 @@ public abstract class ModuleDefine implements Define { protected abstract ModuleConfigParser configParser(); - protected abstract Client createClient(); + protected abstract Client createClient(DataMonitor dataMonitor); protected abstract Server server(); - protected abstract DataInitializer dataInitializer(); - protected abstract ModuleRegistration registration(); } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleRegistration.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleRegistration.java index 7242cbf2dd..959fccddad 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleRegistration.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleRegistration.java @@ -28,6 +28,10 @@ public abstract class ModuleRegistration { return port; } + public String getHostPort() { + return host + ":" + port; + } + public JsonObject getData() { return data; } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/CollectionUtils.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/CollectionUtils.java index dd66a5ecb9..22141d6a2b 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/CollectionUtils.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/CollectionUtils.java @@ -1,5 +1,6 @@ package org.skywalking.apm.collector.core.util; +import java.util.List; import java.util.Map; /** @@ -10,4 +11,12 @@ public class CollectionUtils { public static boolean isEmpty(Map map) { return map == null || map.size() == 0; } + + public static boolean isEmpty(List list) { + return list == null || list.size() == 0; + } + + public static boolean isNotEmpty(List list) { + return !isEmpty(list); + } } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/ObjectUtils.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/ObjectUtils.java index df264ab32c..9300a103e3 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/ObjectUtils.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/ObjectUtils.java @@ -7,4 +7,8 @@ public class ObjectUtils { public static boolean isEmpty(Object obj) { return obj == null; } + + public static boolean isNotEmpty(Object obj) { + return !isEmpty(obj); + } } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/StringUtils.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/StringUtils.java index 83413c1780..58d260d7e3 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/StringUtils.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/StringUtils.java @@ -10,4 +10,8 @@ public class StringUtils { public static boolean isEmpty(Object str) { return str == null || EMPTY_STRING.equals(str); } + + public static boolean isNotEmpty(Object str) { + return !isEmpty(str); + } } diff --git a/apm-collector/apm-collector-core/src/main/resources/application-default.yml b/apm-collector/apm-collector-core/src/main/resources/application-default.yml index 7406f217ba..1e4e62547d 100644 --- a/apm-collector/apm-collector-core/src/main/resources/application-default.yml +++ b/apm-collector/apm-collector-core/src/main/resources/application-default.yml @@ -1,7 +1,7 @@ cluster: -# zookeeper: -# hostPort: localhost:2181 -# sessionTimeout: 1000 + zookeeper: + hostPort: localhost:2181 + sessionTimeout: 100000 # redis: # host: localhost # port: 6379 diff --git a/apm-collector/apm-collector-core/src/main/resources/logback.xml b/apm-collector/apm-collector-core/src/main/resources/logback.xml index b0df07af44..6871826203 100644 --- a/apm-collector/apm-collector-core/src/main/resources/logback.xml +++ b/apm-collector/apm-collector-core/src/main/resources/logback.xml @@ -6,7 +6,7 @@ - + diff --git a/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/QueueModuleDefine.java b/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/QueueModuleDefine.java index 087792f22e..88bbae0c7a 100644 --- a/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/QueueModuleDefine.java +++ b/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/QueueModuleDefine.java @@ -1,7 +1,7 @@ package org.skywalking.apm.collector.queue; import org.skywalking.apm.collector.core.client.Client; -import org.skywalking.apm.collector.core.framework.DataInitializer; +import org.skywalking.apm.collector.core.client.DataMonitor; import org.skywalking.apm.collector.core.module.ModuleConfigParser; import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.module.ModuleRegistration; @@ -15,11 +15,7 @@ public abstract class QueueModuleDefine extends ModuleDefine { throw new UnsupportedOperationException(""); } - @Override protected final Client createClient() { - throw new UnsupportedOperationException(""); - } - - @Override protected final DataInitializer dataInitializer() { + @Override protected Client createClient(DataMonitor dataMonitor) { throw new UnsupportedOperationException(""); } diff --git a/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/QueueModuleInstaller.java b/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/QueueModuleInstaller.java index 6b4353efd4..7403060ce7 100644 --- a/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/QueueModuleInstaller.java +++ b/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/QueueModuleInstaller.java @@ -17,7 +17,7 @@ public class QueueModuleInstaller implements ModuleInstaller { @Override public void install(Map moduleConfig, Map moduleDefineMap) throws DefineException, ClientException { - logger.info("beginning cluster module install"); + logger.info("beginning queue module install"); } } diff --git a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCModuleDefine.java b/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCModuleDefine.java index 8711e6e070..f00bbace63 100644 --- a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCModuleDefine.java +++ b/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCModuleDefine.java @@ -3,7 +3,7 @@ package org.skywalking.apm.collector.remote.grpc; import java.util.Map; import org.skywalking.apm.collector.core.client.Client; import org.skywalking.apm.collector.core.client.ClientException; -import org.skywalking.apm.collector.core.framework.DataInitializer; +import org.skywalking.apm.collector.core.client.DataMonitor; import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.module.ModuleConfigParser; import org.skywalking.apm.collector.core.module.ModuleRegistration; @@ -27,7 +27,7 @@ public class RemoteGRPCModuleDefine extends RemoteModuleDefine { return null; } - @Override protected Client createClient() { + @Override protected Client createClient(DataMonitor dataMonitor) { return null; } @@ -35,10 +35,6 @@ public class RemoteGRPCModuleDefine extends RemoteModuleDefine { return null; } - @Override protected DataInitializer dataInitializer() { - return null; - } - @Override protected ModuleRegistration registration() { return null; } diff --git a/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/grpc/GRPCServer.java b/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/grpc/GRPCServer.java index 309901adee..7a92a2eb98 100644 --- a/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/grpc/GRPCServer.java +++ b/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/grpc/GRPCServer.java @@ -28,16 +28,9 @@ public class GRPCServer implements Server { NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forAddress(address); try { io.grpc.Server server = nettyServerBuilder.build().start(); - blockUntilShutdown(server); - } catch (InterruptedException | IOException e) { + } catch (IOException e) { throw new GRPCServerException(e.getMessage(), e); } logger.info("Server started, host {} listening on {}", host, port); } - - private void blockUntilShutdown(io.grpc.Server server) throws InterruptedException { - if (server != null) { - server.awaitTermination(); - } - } } diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractLocalAsyncWorkerProvider.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractLocalAsyncWorkerProvider.java index adb9e6aa83..21fe8b1001 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractLocalAsyncWorkerProvider.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractLocalAsyncWorkerProvider.java @@ -3,13 +3,14 @@ package org.skywalking.apm.collector.stream; import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.queue.QueueCreator; import org.skywalking.apm.collector.core.queue.QueueEventHandler; +import org.skywalking.apm.collector.core.queue.QueueExecutor; import org.skywalking.apm.collector.queue.QueueModuleContext; import org.skywalking.apm.collector.queue.QueueModuleGroupDefine; /** * @author pengys5 */ -public abstract class AbstractLocalAsyncWorkerProvider extends AbstractLocalWorkerProvider { +public abstract class AbstractLocalAsyncWorkerProvider extends AbstractLocalWorkerProvider { public abstract int queueSize(); -- GitLab