From c955f7889b22de4a4816d961ae423c425c72d355 Mon Sep 17 00:00:00 2001 From: pengys5 <8082209@qq.com> Date: Wed, 19 Jul 2017 20:50:27 +0800 Subject: [PATCH] Add agent server module to get the collector stream server address /agentstream/grpc /agentstream/jetty --- .../apm-collector-agentserver/pom.xml | 32 +++++++++++ .../agentserver/AgentServerModuleContext.java | 13 +++++ .../agentserver/AgentServerModuleDefine.java | 47 +++++++++++++++ .../AgentServerModuleException.java | 17 ++++++ .../AgentServerModuleGroupDefine.java | 25 ++++++++ .../AgentServerModuleInstaller.java | 36 ++++++++++++ .../jetty/AgentServerJettyConfig.java | 10 ++++ .../jetty/AgentServerJettyConfigParser.java | 33 +++++++++++ .../jetty/AgentServerJettyDataListener.java | 15 +++++ .../jetty/AgentServerJettyModuleDefine.java | 57 +++++++++++++++++++ .../AgentServerJettyModuleRegistration.java | 13 +++++ .../handler/AgentStreamGRPCServerHandler.java | 36 ++++++++++++ .../AgentStreamJettyServerHandler.java | 36 ++++++++++++ .../resources/META-INF/defines/group.define | 1 + .../resources/META-INF/defines/module.define | 1 + .../grpc/AgentStreamGRPCDataListener.java | 6 +- .../grpc/AgentStreamGRPCModuleDefine.java | 6 +- .../jetty/AgentStreamJettyDataListener.java | 6 +- .../jetty/AgentStreamJettyModuleDefine.java | 6 +- .../AgentStreamJettyModuleRegistration.java | 5 +- apm-collector/apm-collector-boot/pom.xml | 5 ++ .../cluster/ClusterModuleDefine.java | 5 +- .../redis/ClusterRedisModuleDefine.java | 4 +- .../ClusterRedisModuleRegistrationReader.java | 9 +-- .../ClusterStandaloneModuleDefine.java | 4 +- ...terStandaloneModuleRegistrationReader.java | 8 +-- .../zookeeper/ClusterZKDataMonitor.java | 46 ++++++++------- .../zookeeper/ClusterZKModuleDefine.java | 4 +- .../ClusterZKModuleRegistrationReader.java | 9 +-- .../collector/core/client/DataMonitor.java | 2 + .../core/cluster/ClusterDataListener.java | 28 ++++----- .../ClusterModuleRegistrationReader.java | 14 ++++- .../apm/collector/core/framework/Handler.java | 7 +++ .../core/module/ModuleRegistration.java | 12 ++-- .../apm/collector/core/server/Server.java | 6 ++ .../apm/collector/server/grpc/GRPCServer.java | 9 +++ .../collector/server/jetty/JettyHandler.java | 27 +++++++++ .../collector/server/jetty/JettyServer.java | 18 +++++- apm-collector/pom.xml | 1 + 39 files changed, 536 insertions(+), 83 deletions(-) create mode 100644 apm-collector/apm-collector-agentserver/pom.xml create mode 100644 apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleContext.java create mode 100644 apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleDefine.java create mode 100644 apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleException.java create mode 100644 apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleGroupDefine.java create mode 100644 apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleInstaller.java create mode 100644 apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyConfig.java create mode 100644 apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyConfigParser.java create mode 100644 apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyDataListener.java create mode 100644 apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyModuleDefine.java create mode 100644 apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyModuleRegistration.java create mode 100644 apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/handler/AgentStreamGRPCServerHandler.java create mode 100644 apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/handler/AgentStreamJettyServerHandler.java create mode 100644 apm-collector/apm-collector-agentserver/src/main/resources/META-INF/defines/group.define create mode 100644 apm-collector/apm-collector-agentserver/src/main/resources/META-INF/defines/module.define create mode 100644 apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Handler.java create mode 100644 apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/jetty/JettyHandler.java diff --git a/apm-collector/apm-collector-agentserver/pom.xml b/apm-collector/apm-collector-agentserver/pom.xml new file mode 100644 index 0000000000..b4f26c8bf0 --- /dev/null +++ b/apm-collector/apm-collector-agentserver/pom.xml @@ -0,0 +1,32 @@ + + + + apm-collector + org.skywalking + 3.2-2017 + + 4.0.0 + + apm-collector-agentserver + jar + + + + org.skywalking + apm-collector-cluster + ${project.version} + + + org.skywalking + apm-collector-server + ${project.version} + + + org.skywalking + apm-collector-agentstream + ${project.version} + + + \ No newline at end of file diff --git a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleContext.java b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleContext.java new file mode 100644 index 0000000000..f6db9c4ab8 --- /dev/null +++ b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleContext.java @@ -0,0 +1,13 @@ +package org.skywalking.apm.collector.agentserver; + +import org.skywalking.apm.collector.core.framework.Context; + +/** + * @author pengys5 + */ +public class AgentServerModuleContext extends Context { + + public AgentServerModuleContext(String groupName) { + super(groupName); + } +} diff --git a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleDefine.java b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleDefine.java new file mode 100644 index 0000000000..7e1adb82a8 --- /dev/null +++ b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleDefine.java @@ -0,0 +1,47 @@ +package org.skywalking.apm.collector.agentserver; + +import java.util.List; +import java.util.Map; +import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine; +import org.skywalking.apm.collector.core.client.Client; +import org.skywalking.apm.collector.core.client.ClientException; +import org.skywalking.apm.collector.core.client.DataMonitor; +import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine; +import org.skywalking.apm.collector.core.cluster.ClusterModuleContext; +import org.skywalking.apm.collector.core.config.ConfigParseException; +import org.skywalking.apm.collector.core.framework.CollectorContextHelper; +import org.skywalking.apm.collector.core.framework.DefineException; +import org.skywalking.apm.collector.core.framework.Handler; +import org.skywalking.apm.collector.core.module.ModuleDefine; +import org.skywalking.apm.collector.core.server.Server; +import org.skywalking.apm.collector.core.server.ServerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public abstract class AgentServerModuleDefine extends ModuleDefine implements ClusterDataListenerDefine { + + private final Logger logger = LoggerFactory.getLogger(AgentServerModuleDefine.class); + + @Override public final void initialize(Map config) throws DefineException, ClientException { + try { + configParser().parse(config); + Server server = server(); + server.initialize(); + handlerList().forEach(handler -> server.addHandler(handler)); + server.start(); + + ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration()); + } catch (ConfigParseException | ServerException e) { + throw new AgentServerModuleException(e.getMessage(), e); + } + } + + @Override protected final Client createClient(DataMonitor dataMonitor) { + throw new UnsupportedOperationException(""); + } + + public abstract List handlerList(); +} diff --git a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleException.java b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleException.java new file mode 100644 index 0000000000..c9f7d2a975 --- /dev/null +++ b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleException.java @@ -0,0 +1,17 @@ +package org.skywalking.apm.collector.agentserver; + +import org.skywalking.apm.collector.core.module.ModuleException; + +/** + * @author pengys5 + */ +public class AgentServerModuleException extends ModuleException { + + public AgentServerModuleException(String message) { + super(message); + } + + public AgentServerModuleException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleGroupDefine.java b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleGroupDefine.java new file mode 100644 index 0000000000..9539be419d --- /dev/null +++ b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleGroupDefine.java @@ -0,0 +1,25 @@ +package org.skywalking.apm.collector.agentserver; + +import org.skywalking.apm.collector.core.framework.Context; +import org.skywalking.apm.collector.core.module.ModuleGroupDefine; +import org.skywalking.apm.collector.core.module.ModuleInstaller; + +/** + * @author pengys5 + */ +public class AgentServerModuleGroupDefine implements ModuleGroupDefine { + + public static final String GROUP_NAME = "agent_server"; + + @Override public String name() { + return GROUP_NAME; + } + + @Override public Context groupContext() { + return new AgentServerModuleContext(GROUP_NAME); + } + + @Override public ModuleInstaller moduleInstaller() { + return new AgentServerModuleInstaller(); + } +} diff --git a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleInstaller.java b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleInstaller.java new file mode 100644 index 0000000000..10756f9508 --- /dev/null +++ b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleInstaller.java @@ -0,0 +1,36 @@ +package org.skywalking.apm.collector.agentserver; + +import java.util.Iterator; +import java.util.Map; +import org.skywalking.apm.collector.core.client.ClientException; +import org.skywalking.apm.collector.core.framework.CollectorContextHelper; +import org.skywalking.apm.collector.core.framework.DefineException; +import org.skywalking.apm.collector.core.module.ModuleDefine; +import org.skywalking.apm.collector.core.module.ModuleInstaller; +import org.skywalking.apm.collector.core.util.ObjectUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public class AgentServerModuleInstaller implements ModuleInstaller { + + private final Logger logger = LoggerFactory.getLogger(AgentServerModuleInstaller.class); + + @Override public void install(Map moduleConfig, + Map moduleDefineMap) throws DefineException, ClientException { + logger.info("beginning agent server module install"); + + AgentServerModuleContext context = new AgentServerModuleContext(AgentServerModuleGroupDefine.GROUP_NAME); + CollectorContextHelper.INSTANCE.putContext(context); + + logger.info("could not configure agent server module, use the default"); + Iterator> moduleDefineEntry = moduleDefineMap.entrySet().iterator(); + while (moduleDefineEntry.hasNext()) { + ModuleDefine moduleDefine = moduleDefineEntry.next().getValue(); + logger.info("module {} initialize", moduleDefine.getClass().getName()); + moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null); + } + } +} diff --git a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyConfig.java b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyConfig.java new file mode 100644 index 0000000000..1bad21878d --- /dev/null +++ b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyConfig.java @@ -0,0 +1,10 @@ +package org.skywalking.apm.collector.agentserver.jetty; + +/** + * @author pengys5 + */ +public class AgentServerJettyConfig { + public static String HOST; + public static int PORT; + public static String CONTEXT_PATH; +} diff --git a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyConfigParser.java b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyConfigParser.java new file mode 100644 index 0000000000..bac76bcfc2 --- /dev/null +++ b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyConfigParser.java @@ -0,0 +1,33 @@ +package org.skywalking.apm.collector.agentserver.jetty; + +import java.util.Map; +import org.skywalking.apm.collector.core.config.ConfigParseException; +import org.skywalking.apm.collector.core.module.ModuleConfigParser; +import org.skywalking.apm.collector.core.util.ObjectUtils; +import org.skywalking.apm.collector.core.util.StringUtils; + +/** + * @author pengys5 + */ +public class AgentServerJettyConfigParser implements ModuleConfigParser { + + private static final String HOST = "host"; + private static final String PORT = "port"; + public static final String CONTEXT_PATH = "contextPath"; + + @Override public void parse(Map config) throws ConfigParseException { + AgentServerJettyConfig.CONTEXT_PATH = "/"; + + if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(HOST))) { + AgentServerJettyConfig.HOST = "localhost"; + } + if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(PORT))) { + AgentServerJettyConfig.PORT = 10800; + } else { + AgentServerJettyConfig.PORT = (Integer)config.get(PORT); + } + if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(CONTEXT_PATH))) { + AgentServerJettyConfig.CONTEXT_PATH = (String)config.get(CONTEXT_PATH); + } + } +} diff --git a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyDataListener.java b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyDataListener.java new file mode 100644 index 0000000000..9035ca2976 --- /dev/null +++ b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyDataListener.java @@ -0,0 +1,15 @@ +package org.skywalking.apm.collector.agentserver.jetty; + +import org.skywalking.apm.collector.agentserver.AgentServerModuleGroupDefine; +import org.skywalking.apm.collector.cluster.ClusterModuleDefine; +import org.skywalking.apm.collector.core.cluster.ClusterDataListener; + +/** + * @author pengys5 + */ +public class AgentServerJettyDataListener extends ClusterDataListener { + + @Override public String path() { + return ClusterModuleDefine.BASE_CATALOG + "." + AgentServerModuleGroupDefine.GROUP_NAME + "." + AgentServerJettyModuleDefine.MODULE_NAME; + } +} diff --git a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyModuleDefine.java b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyModuleDefine.java new file mode 100644 index 0000000000..43314323c3 --- /dev/null +++ b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyModuleDefine.java @@ -0,0 +1,57 @@ +package org.skywalking.apm.collector.agentserver.jetty; + +import java.util.LinkedList; +import java.util.List; +import org.skywalking.apm.collector.agentserver.AgentServerModuleDefine; +import org.skywalking.apm.collector.agentserver.AgentServerModuleGroupDefine; +import org.skywalking.apm.collector.agentserver.jetty.handler.AgentStreamGRPCServerHandler; +import org.skywalking.apm.collector.agentserver.jetty.handler.AgentStreamJettyServerHandler; +import org.skywalking.apm.collector.core.cluster.ClusterDataListener; +import org.skywalking.apm.collector.core.framework.Handler; +import org.skywalking.apm.collector.core.module.ModuleConfigParser; +import org.skywalking.apm.collector.core.module.ModuleRegistration; +import org.skywalking.apm.collector.core.server.Server; +import org.skywalking.apm.collector.server.jetty.JettyServer; + +/** + * @author pengys5 + */ +public class AgentServerJettyModuleDefine extends AgentServerModuleDefine { + + public static final String MODULE_NAME = "jetty"; + + @Override protected String group() { + return AgentServerModuleGroupDefine.GROUP_NAME; + } + + @Override public String name() { + return MODULE_NAME; + } + + @Override public boolean defaultModule() { + return true; + } + + @Override protected ModuleConfigParser configParser() { + return new AgentServerJettyConfigParser(); + } + + @Override protected Server server() { + return new JettyServer(AgentServerJettyConfig.HOST, AgentServerJettyConfig.PORT, AgentServerJettyConfig.CONTEXT_PATH); + } + + @Override protected ModuleRegistration registration() { + return new AgentServerJettyModuleRegistration(); + } + + @Override public ClusterDataListener listener() { + return new AgentServerJettyDataListener(); + } + + @Override public List handlerList() { + List handlers = new LinkedList<>(); + handlers.add(new AgentStreamGRPCServerHandler()); + handlers.add(new AgentStreamJettyServerHandler()); + return handlers; + } +} diff --git a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyModuleRegistration.java b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyModuleRegistration.java new file mode 100644 index 0000000000..5f2f2e0bf0 --- /dev/null +++ b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyModuleRegistration.java @@ -0,0 +1,13 @@ +package org.skywalking.apm.collector.agentserver.jetty; + +import org.skywalking.apm.collector.core.module.ModuleRegistration; + +/** + * @author pengys5 + */ +public class AgentServerJettyModuleRegistration extends ModuleRegistration { + + @Override public Value buildValue() { + return new Value(AgentServerJettyConfig.HOST, AgentServerJettyConfig.PORT, AgentServerJettyConfig.CONTEXT_PATH); + } +} diff --git a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/handler/AgentStreamGRPCServerHandler.java b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/handler/AgentStreamGRPCServerHandler.java new file mode 100644 index 0000000000..1a95037d71 --- /dev/null +++ b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/handler/AgentStreamGRPCServerHandler.java @@ -0,0 +1,36 @@ +package org.skywalking.apm.collector.agentserver.jetty.handler; + +import com.google.gson.JsonArray; +import java.io.IOException; +import java.util.List; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.skywalking.apm.collector.agentstream.grpc.AgentStreamGRPCDataListener; +import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine; +import org.skywalking.apm.collector.core.cluster.ClusterModuleContext; +import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader; +import org.skywalking.apm.collector.core.framework.CollectorContextHelper; +import org.skywalking.apm.collector.server.jetty.JettyHandler; + +/** + * @author pengys5 + */ +public class AgentStreamGRPCServerHandler extends JettyHandler { + + @Override public String pathSpec() { + return "/agentstream/grpc"; + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader(); + List servers = reader.read(AgentStreamGRPCDataListener.PATH); + JsonArray serverArray = new JsonArray(); + servers.forEach(server -> { + serverArray.add(server); + }); + + reply(resp, serverArray, HttpServletResponse.SC_OK); + } +} diff --git a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/handler/AgentStreamJettyServerHandler.java b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/handler/AgentStreamJettyServerHandler.java new file mode 100644 index 0000000000..3eab72a327 --- /dev/null +++ b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/handler/AgentStreamJettyServerHandler.java @@ -0,0 +1,36 @@ +package org.skywalking.apm.collector.agentserver.jetty.handler; + +import com.google.gson.JsonArray; +import java.io.IOException; +import java.util.List; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.skywalking.apm.collector.agentstream.jetty.AgentStreamJettyDataListener; +import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine; +import org.skywalking.apm.collector.core.cluster.ClusterModuleContext; +import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader; +import org.skywalking.apm.collector.core.framework.CollectorContextHelper; +import org.skywalking.apm.collector.server.jetty.JettyHandler; + +/** + * @author pengys5 + */ +public class AgentStreamJettyServerHandler extends JettyHandler { + + @Override public String pathSpec() { + return "/agentstream/jetty"; + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader(); + List servers = reader.read(AgentStreamJettyDataListener.PATH); + JsonArray serverArray = new JsonArray(); + servers.forEach(server -> { + serverArray.add(server); + }); + + reply(resp, serverArray, HttpServletResponse.SC_OK); + } +} diff --git a/apm-collector/apm-collector-agentserver/src/main/resources/META-INF/defines/group.define b/apm-collector/apm-collector-agentserver/src/main/resources/META-INF/defines/group.define new file mode 100644 index 0000000000..229ed8a563 --- /dev/null +++ b/apm-collector/apm-collector-agentserver/src/main/resources/META-INF/defines/group.define @@ -0,0 +1 @@ +org.skywalking.apm.collector.agentserver.AgentServerModuleGroupDefine \ No newline at end of file diff --git a/apm-collector/apm-collector-agentserver/src/main/resources/META-INF/defines/module.define b/apm-collector/apm-collector-agentserver/src/main/resources/META-INF/defines/module.define new file mode 100644 index 0000000000..cdd20e3bfa --- /dev/null +++ b/apm-collector/apm-collector-agentserver/src/main/resources/META-INF/defines/module.define @@ -0,0 +1 @@ +org.skywalking.apm.collector.agentserver.jetty.AgentServerJettyModuleDefine \ No newline at end of file diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCDataListener.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCDataListener.java index 0b749e0037..2fc79a258b 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCDataListener.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCDataListener.java @@ -9,11 +9,9 @@ import org.skywalking.apm.collector.core.cluster.ClusterDataListener; */ public class AgentStreamGRPCDataListener extends ClusterDataListener { - public AgentStreamGRPCDataListener(String moduleName) { - super(moduleName); - } + public static final String PATH = ClusterModuleDefine.BASE_CATALOG + "." + AgentStreamModuleGroupDefine.GROUP_NAME + "." + AgentStreamGRPCModuleDefine.MODULE_NAME; @Override public String path() { - return ClusterModuleDefine.BASE_CATALOG + "." + AgentStreamModuleGroupDefine.GROUP_NAME + "." + moduleName(); + return PATH; } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCModuleDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCModuleDefine.java index f9dcf28f4d..f485682165 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCModuleDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCModuleDefine.java @@ -13,12 +13,14 @@ import org.skywalking.apm.collector.server.grpc.GRPCServer; */ public class AgentStreamGRPCModuleDefine extends AgentStreamModuleDefine { + public static final String MODULE_NAME = "grpc"; + @Override protected String group() { return AgentStreamModuleGroupDefine.GROUP_NAME; } @Override public String name() { - return "grpc"; + return MODULE_NAME; } @Override protected ModuleConfigParser configParser() { @@ -34,6 +36,6 @@ public class AgentStreamGRPCModuleDefine extends AgentStreamModuleDefine { } @Override public ClusterDataListener listener() { - return new AgentStreamGRPCDataListener(name()); + return new AgentStreamGRPCDataListener(); } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyDataListener.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyDataListener.java index 8ee9e4a729..0ec18d14ab 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyDataListener.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyDataListener.java @@ -9,11 +9,9 @@ import org.skywalking.apm.collector.core.cluster.ClusterDataListener; */ public class AgentStreamJettyDataListener extends ClusterDataListener { - public AgentStreamJettyDataListener(String moduleName) { - super(moduleName); - } + public static final String PATH = ClusterModuleDefine.BASE_CATALOG + "." + AgentStreamModuleGroupDefine.GROUP_NAME + "." + AgentStreamJettyModuleDefine.MODULE_NAME; @Override public String path() { - return ClusterModuleDefine.BASE_CATALOG + "." + AgentStreamModuleGroupDefine.GROUP_NAME + "." + moduleName(); + return PATH; } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyModuleDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyModuleDefine.java index fff051e45f..42a86e729b 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyModuleDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyModuleDefine.java @@ -13,12 +13,14 @@ import org.skywalking.apm.collector.server.jetty.JettyServer; */ public class AgentStreamJettyModuleDefine extends AgentStreamModuleDefine { + public static final String MODULE_NAME = "jetty"; + @Override protected String group() { return AgentStreamModuleGroupDefine.GROUP_NAME; } @Override public String name() { - return "jetty"; + return MODULE_NAME; } @Override protected ModuleConfigParser configParser() { @@ -34,6 +36,6 @@ public class AgentStreamJettyModuleDefine extends AgentStreamModuleDefine { } @Override public ClusterDataListener listener() { - return new AgentStreamJettyDataListener(name()); + return new AgentStreamJettyDataListener(); } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyModuleRegistration.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyModuleRegistration.java index 3968fb8f7e..0b6527306e 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyModuleRegistration.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyModuleRegistration.java @@ -1,6 +1,5 @@ package org.skywalking.apm.collector.agentstream.jetty; -import com.google.gson.JsonObject; import org.skywalking.apm.collector.core.module.ModuleRegistration; /** @@ -9,8 +8,6 @@ import org.skywalking.apm.collector.core.module.ModuleRegistration; public class AgentStreamJettyModuleRegistration extends ModuleRegistration { @Override public Value buildValue() { - JsonObject data = new JsonObject(); - data.addProperty(AgentStreamJettyConfigParser.CONTEXT_PATH, AgentStreamJettyConfig.CONTEXT_PATH); - return new Value(AgentStreamJettyConfig.HOST, AgentStreamJettyConfig.PORT, data); + return new Value(AgentStreamJettyConfig.HOST, AgentStreamJettyConfig.PORT, AgentStreamJettyConfig.CONTEXT_PATH); } } diff --git a/apm-collector/apm-collector-boot/pom.xml b/apm-collector/apm-collector-boot/pom.xml index 2c66d98742..ae4b7501df 100644 --- a/apm-collector/apm-collector-boot/pom.xml +++ b/apm-collector/apm-collector-boot/pom.xml @@ -28,6 +28,11 @@ apm-collector-ui ${project.version} + + org.skywalking + apm-collector-agentserver + ${project.version} + org.skywalking apm-collector-agentstream diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/ClusterModuleDefine.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/ClusterModuleDefine.java index 846c9664a2..a1bd2df54e 100644 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/ClusterModuleDefine.java +++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/ClusterModuleDefine.java @@ -31,7 +31,10 @@ public abstract class ClusterModuleDefine extends ModuleDefine { client.initialize(); dataMonitor.setClient(client); + ClusterModuleRegistrationReader reader = registrationReader(dataMonitor); + ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setDataMonitor(dataMonitor); + ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setReader(reader); } catch (ConfigParseException | ClientException e) { throw new ClusterModuleException(e.getMessage(), e); } @@ -51,5 +54,5 @@ public abstract class ClusterModuleDefine extends ModuleDefine { public abstract DataMonitor dataMonitor(); - public abstract ClusterModuleRegistrationReader registrationReader(); + public abstract ClusterModuleRegistrationReader registrationReader(DataMonitor dataMonitor); } diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleDefine.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleDefine.java index 9f11286bd9..b946cd7093 100644 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleDefine.java +++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleDefine.java @@ -39,7 +39,7 @@ public class ClusterRedisModuleDefine extends ClusterModuleDefine { return new RedisClient(ClusterRedisConfig.HOST, ClusterRedisConfig.PORT); } - @Override public ClusterModuleRegistrationReader registrationReader() { - return new ClusterRedisModuleRegistrationReader(); + @Override public ClusterModuleRegistrationReader registrationReader(DataMonitor dataMonitor) { + return new ClusterRedisModuleRegistrationReader(dataMonitor); } } diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleRegistrationReader.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleRegistrationReader.java index 9871151dd2..6f76723238 100644 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleRegistrationReader.java +++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleRegistrationReader.java @@ -1,13 +1,14 @@ package org.skywalking.apm.collector.cluster.redis; -import java.util.List; +import org.skywalking.apm.collector.core.client.DataMonitor; import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader; /** * @author pengys5 */ -public class ClusterRedisModuleRegistrationReader implements ClusterModuleRegistrationReader { - @Override public List read(String key) { - return null; +public class ClusterRedisModuleRegistrationReader extends ClusterModuleRegistrationReader { + + public ClusterRedisModuleRegistrationReader(DataMonitor dataMonitor) { + super(dataMonitor); } } diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleDefine.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleDefine.java index 4375bec3a3..1c95e0642f 100644 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleDefine.java +++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleDefine.java @@ -39,7 +39,7 @@ public class ClusterStandaloneModuleDefine extends ClusterModuleDefine { return new H2Client(); } - @Override public ClusterModuleRegistrationReader registrationReader() { - return new ClusterStandaloneModuleRegistrationReader(); + @Override public ClusterModuleRegistrationReader registrationReader(DataMonitor dataMonitor) { + return new ClusterStandaloneModuleRegistrationReader(dataMonitor); } } diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleRegistrationReader.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleRegistrationReader.java index 2049060ba4..f08f821bec 100644 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleRegistrationReader.java +++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleRegistrationReader.java @@ -1,14 +1,14 @@ package org.skywalking.apm.collector.cluster.standalone; -import java.util.List; +import org.skywalking.apm.collector.core.client.DataMonitor; import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader; /** * @author pengys5 */ -public class ClusterStandaloneModuleRegistrationReader implements ClusterModuleRegistrationReader { +public class ClusterStandaloneModuleRegistrationReader extends ClusterModuleRegistrationReader { - @Override public List read(String key) { - return null; + public ClusterStandaloneModuleRegistrationReader(DataMonitor dataMonitor) { + super(dataMonitor); } } diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataMonitor.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataMonitor.java index 3f3bb2362e..c6be3bbe85 100644 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataMonitor.java +++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataMonitor.java @@ -37,7 +37,21 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher { @Override public void process(WatchedEvent event) { logger.debug("changed path {}", event.getPath()); if (listeners.containsKey(event.getPath())) { - putDataIntoListener(listeners.get(event.getPath()), event.getPath()); + List paths = null; + try { + paths = client.getChildren(event.getPath(), true); + listeners.get(event.getPath()).clearData(); + if (CollectionUtils.isNotEmpty(paths)) { + for (String serverPath : paths) { + byte[] data = client.getData(event.getPath() + "/" + serverPath, false, null); + String dataStr = new String(data); + logger.debug("path children has been changed, path: {}, data: {}", event.getPath() + "/" + serverPath, dataStr); + listeners.get(event.getPath()).addAddress(serverPath + dataStr); + } + } + } catch (ZookeeperClientException e) { + logger.error(e.getMessage(), e); + } } } @@ -51,16 +65,20 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher { logger.info("listener path: {}", path); listeners.put(path, listener); createPath(path); - List paths = client.getChildren(path, true); - - if (CollectionUtils.isNotEmpty(paths)) { - paths.forEach(subPath -> { - putDataIntoListener(listener, subPath); - }); - } ModuleRegistration.Value value = registration.buildValue(); - setData(path + "/" + value.getHostPort(), value.getData() == null ? "" : value.getData().toString()); + String contextPath = value.getContextPath() == null ? "" : value.getContextPath(); + + client.getChildren(path, true); + String serverPath = path + "/" + value.getHostPort(); + listener.addAddress(value.getHostPort() + contextPath); + + setData(serverPath, contextPath); + } + + @Override public ClusterDataListener getListener(String path) { + path = PathUtils.convertKey2Path(path); + return listeners.get(path); } @Override public void createPath(String path) throws ClientException { @@ -82,14 +100,4 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher { client.setData(path, value.getBytes(), -1); } } - - private void putDataIntoListener(ClusterDataListener listener, String path) { - try { - byte[] data = client.getData(path, false, null); - String dataStr = String.valueOf(data); - listener.setData(new ClusterDataListener.Data(path, dataStr)); - } catch (ZookeeperClientException e) { - logger.error(e.getMessage(), e); - } - } } diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleDefine.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleDefine.java index 6a51939e4c..c536764076 100644 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleDefine.java +++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleDefine.java @@ -40,7 +40,7 @@ public class ClusterZKModuleDefine extends ClusterModuleDefine { return new ZookeeperClient(ClusterZKConfig.HOST_PORT, ClusterZKConfig.SESSION_TIMEOUT, (Watcher)dataMonitor); } - @Override public ClusterModuleRegistrationReader registrationReader() { - return new ClusterZKModuleRegistrationReader(); + @Override public ClusterModuleRegistrationReader registrationReader(DataMonitor dataMonitor) { + return new ClusterZKModuleRegistrationReader(dataMonitor); } } diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleRegistrationReader.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleRegistrationReader.java index 44dec48611..1236116872 100644 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleRegistrationReader.java +++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleRegistrationReader.java @@ -1,13 +1,14 @@ package org.skywalking.apm.collector.cluster.zookeeper; -import java.util.List; +import org.skywalking.apm.collector.core.client.DataMonitor; import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader; /** * @author pengys5 */ -public class ClusterZKModuleRegistrationReader implements ClusterModuleRegistrationReader { - @Override public List read(String key) { - return null; +public class ClusterZKModuleRegistrationReader extends ClusterModuleRegistrationReader { + + public ClusterZKModuleRegistrationReader(DataMonitor dataMonitor) { + super(dataMonitor); } } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/client/DataMonitor.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/client/DataMonitor.java index 883db37694..c5e8d537d9 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/client/DataMonitor.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/client/DataMonitor.java @@ -11,6 +11,8 @@ public interface DataMonitor { void addListener(ClusterDataListener listener, ModuleRegistration registration) throws ClientException; + ClusterDataListener getListener(String path); + void createPath(String path) throws ClientException; void setData(String path, String value) throws ClientException; diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataListener.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataListener.java index 03b2177b05..7b14e7a33e 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataListener.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataListener.java @@ -9,31 +9,23 @@ import org.skywalking.apm.collector.core.framework.Listener; */ public abstract class ClusterDataListener implements Listener { - private final String moduleName; - private List datas; + private List addresses; - public ClusterDataListener(String moduleName) { - this.moduleName = moduleName; - datas = new LinkedList<>(); - } - - public final String moduleName() { - return moduleName; + public ClusterDataListener() { + addresses = new LinkedList<>(); } public abstract String path(); - public final void setData(Data data) { - datas.add(data); + public final void addAddress(String address) { + addresses.add(address); } - public static class Data { - private final String key; - private final String value; + public final List getAddresses() { + return addresses; + } - public Data(String key, String value) { - this.key = key; - this.value = value; - } + public final void clearData() { + addresses.clear(); } } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleRegistrationReader.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleRegistrationReader.java index d9ad04e02a..ddfcdd56ad 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleRegistrationReader.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleRegistrationReader.java @@ -1,10 +1,20 @@ package org.skywalking.apm.collector.core.cluster; import java.util.List; +import org.skywalking.apm.collector.core.client.DataMonitor; /** * @author pengys5 */ -public interface ClusterModuleRegistrationReader { - List read(String key); +public abstract class ClusterModuleRegistrationReader { + + private final DataMonitor dataMonitor; + + public ClusterModuleRegistrationReader(DataMonitor dataMonitor) { + this.dataMonitor = dataMonitor; + } + + public final List read(String path) { + return dataMonitor.getListener(path).getAddresses(); + } } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Handler.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Handler.java new file mode 100644 index 0000000000..0ae17b083f --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Handler.java @@ -0,0 +1,7 @@ +package org.skywalking.apm.collector.core.framework; + +/** + * @author pengys5 + */ +public interface Handler { +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleRegistration.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleRegistration.java index 959fccddad..90dd9afe99 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleRegistration.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleRegistration.java @@ -1,7 +1,5 @@ package org.skywalking.apm.collector.core.module; -import com.google.gson.JsonObject; - /** * @author pengys5 */ @@ -12,12 +10,12 @@ public abstract class ModuleRegistration { public static class Value { private final String host; private final int port; - private final JsonObject data; + private final String contextPath; - public Value(String host, int port, JsonObject data) { + public Value(String host, int port, String contextPath) { this.host = host; this.port = port; - this.data = data; + this.contextPath = contextPath; } public String getHost() { @@ -32,8 +30,8 @@ public abstract class ModuleRegistration { return host + ":" + port; } - public JsonObject getData() { - return data; + public String getContextPath() { + return contextPath; } } } \ No newline at end of file diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/server/Server.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/server/Server.java index eede89b810..4b246095ff 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/server/Server.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/server/Server.java @@ -1,9 +1,15 @@ package org.skywalking.apm.collector.core.server; +import org.skywalking.apm.collector.core.framework.Handler; + /** * @author pengys5 */ public interface Server { void initialize() throws ServerException; + + void start() throws ServerException; + + void addHandler(Handler handler); } diff --git a/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/grpc/GRPCServer.java b/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/grpc/GRPCServer.java index 7a92a2eb98..505ce43c6a 100644 --- a/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/grpc/GRPCServer.java +++ b/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/grpc/GRPCServer.java @@ -3,6 +3,7 @@ package org.skywalking.apm.collector.server.grpc; import io.grpc.netty.NettyServerBuilder; import java.io.IOException; import java.net.InetSocketAddress; +import org.skywalking.apm.collector.core.framework.Handler; import org.skywalking.apm.collector.core.server.Server; import org.skywalking.apm.collector.core.server.ServerException; import org.slf4j.Logger; @@ -33,4 +34,12 @@ public class GRPCServer implements Server { } logger.info("Server started, host {} listening on {}", host, port); } + + @Override public void start() throws ServerException { + + } + + @Override public void addHandler(Handler handler) { + + } } diff --git a/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/jetty/JettyHandler.java b/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/jetty/JettyHandler.java new file mode 100644 index 0000000000..b80db810d8 --- /dev/null +++ b/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/jetty/JettyHandler.java @@ -0,0 +1,27 @@ +package org.skywalking.apm.collector.server.jetty; + +import com.google.gson.JsonElement; +import java.io.IOException; +import java.io.PrintWriter; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletResponse; +import org.skywalking.apm.collector.core.framework.Handler; + +/** + * @author pengys5 + */ +public abstract class JettyHandler extends HttpServlet implements Handler { + + public abstract String pathSpec(); + + protected final void reply(HttpServletResponse response, JsonElement resJson, int status) throws IOException { + response.setContentType("text/json"); + response.setCharacterEncoding("utf-8"); + response.setStatus(status); + + PrintWriter out = response.getWriter(); + out.print(resJson); + out.flush(); + out.close(); + } +} diff --git a/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/jetty/JettyServer.java b/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/jetty/JettyServer.java index 984e76269d..3997a20b31 100644 --- a/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/jetty/JettyServer.java +++ b/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/jetty/JettyServer.java @@ -1,7 +1,10 @@ package org.skywalking.apm.collector.server.jetty; import java.net.InetSocketAddress; +import javax.servlet.http.HttpServlet; import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.skywalking.apm.collector.core.framework.Handler; import org.skywalking.apm.collector.core.server.Server; import org.skywalking.apm.collector.core.server.ServerException; import org.slf4j.Logger; @@ -17,6 +20,8 @@ public class JettyServer implements Server { private final String host; private final int port; private final String contextPath; + private org.eclipse.jetty.server.Server server; + private ServletContextHandler servletContextHandler; public JettyServer(String host, int port, String contextPath) { this.host = host; @@ -25,13 +30,22 @@ public class JettyServer implements Server { } @Override public void initialize() throws ServerException { - org.eclipse.jetty.server.Server server = new org.eclipse.jetty.server.Server(new InetSocketAddress(host, port)); + server = new org.eclipse.jetty.server.Server(new InetSocketAddress(host, port)); - ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); + servletContextHandler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); servletContextHandler.setContextPath(contextPath); logger.info("http server root context path: {}", contextPath); server.setHandler(servletContextHandler); + } + + @Override public void addHandler(Handler handler) { + ServletHolder servletHolder = new ServletHolder(); + servletHolder.setServlet((HttpServlet)handler); + servletContextHandler.addServlet(servletHolder, ((JettyHandler)handler).pathSpec()); + } + + @Override public void start() throws ServerException { try { server.start(); } catch (Exception e) { diff --git a/apm-collector/pom.xml b/apm-collector/pom.xml index 9fd1fbfb12..d544f0a0b7 100644 --- a/apm-collector/pom.xml +++ b/apm-collector/pom.xml @@ -15,6 +15,7 @@ apm-collector-boot apm-collector-remote apm-collector-stream + apm-collector-agentserver apm -- GitLab