From 730c606389c9028eb0d9d9980724052238e3e276 Mon Sep 17 00:00:00 2001 From: pengys5 <8082209@qq.com> Date: Wed, 19 Jul 2017 23:21:24 +0800 Subject: [PATCH] add server holder to reuse server with same host, port, server classify --- .../apm-collector-agentregister/pom.xml | 32 ++++++++++++ .../AgentRegisterModuleContext.java | 12 +++++ .../AgentRegisterModuleDefine.java | 51 +++++++++++++++++++ .../AgentRegisterModuleException.java | 16 ++++++ .../AgentRegisterModuleGroupDefine.java | 25 +++++++++ .../AgentRegisterModuleInstaller.java | 36 +++++++++++++ .../grpc/AgentRegisterGRPCConfig.java | 9 ++++ .../grpc/AgentRegisterGRPCConfigParser.java | 27 ++++++++++ .../grpc/AgentRegisterGRPCDataListener.java | 17 +++++++ .../grpc/AgentRegisterGRPCModuleDefine.java | 47 +++++++++++++++++ .../AgentRegisterGRPCModuleRegistration.java | 13 +++++ .../resources/META-INF/defines/group.define | 1 + .../resources/META-INF/defines/module.define | 1 + .../apm-collector-agentserver/pom.xml | 5 ++ .../agentserver/AgentServerModuleDefine.java | 8 +-- .../AgentServerModuleInstaller.java | 5 +- .../agentstream/AgentStreamModuleDefine.java | 12 +++-- .../AgentStreamModuleInstaller.java | 5 +- .../grpc/AgentStreamGRPCModuleDefine.java | 6 +++ .../jetty/AgentStreamJettyModuleDefine.java | 6 +++ apm-collector/apm-collector-boot/pom.xml | 5 ++ .../apm/collector/boot/CollectorStarter.java | 15 +++++- .../cluster/ClusterModuleDefine.java | 3 +- .../cluster/ClusterModuleInstaller.java | 7 +-- .../apm/collector/core/framework/Define.java | 3 +- .../core/module/ModuleInstaller.java | 3 +- .../apm/collector/core/server/Server.java | 4 ++ .../collector/core/server/ServerHolder.java | 43 ++++++++++++++++ apm-collector/apm-collector-discovery/pom.xml | 14 ----- .../discovery/DiscoveryJettyModuleDefine.java | 7 --- .../resources/META-INF/defines/module.define | 1 - .../collector/queue/QueueModuleInstaller.java | 3 +- .../QueueDataCarrierModuleDefine.java | 4 +- .../disruptor/QueueDisruptorModuleDefine.java | 4 +- .../remote/RemoteModuleInstaller.java | 7 +-- .../remote/grpc/RemoteGRPCModuleDefine.java | 3 +- .../apm/collector/server/grpc/GRPCServer.java | 25 ++++++--- .../collector/server/jetty/JettyServer.java | 8 +++ apm-collector/pom.xml | 2 +- 39 files changed, 438 insertions(+), 57 deletions(-) create mode 100644 apm-collector/apm-collector-agentregister/pom.xml create mode 100644 apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/AgentRegisterModuleContext.java create mode 100644 apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/AgentRegisterModuleDefine.java create mode 100644 apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/AgentRegisterModuleException.java create mode 100644 apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/AgentRegisterModuleGroupDefine.java create mode 100644 apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/AgentRegisterModuleInstaller.java create mode 100644 apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCConfig.java create mode 100644 apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCConfigParser.java create mode 100644 apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCDataListener.java create mode 100644 apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCModuleDefine.java create mode 100644 apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCModuleRegistration.java create mode 100644 apm-collector/apm-collector-agentregister/src/main/resources/META-INF/defines/group.define create mode 100644 apm-collector/apm-collector-agentregister/src/main/resources/META-INF/defines/module.define create mode 100644 apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/server/ServerHolder.java delete mode 100644 apm-collector/apm-collector-discovery/pom.xml delete mode 100644 apm-collector/apm-collector-discovery/src/main/java/org/skywalking/apm/collector/discovery/DiscoveryJettyModuleDefine.java delete mode 100644 apm-collector/apm-collector-discovery/src/main/resources/META-INF/defines/module.define diff --git a/apm-collector/apm-collector-agentregister/pom.xml b/apm-collector/apm-collector-agentregister/pom.xml new file mode 100644 index 0000000000..e704f264e4 --- /dev/null +++ b/apm-collector/apm-collector-agentregister/pom.xml @@ -0,0 +1,32 @@ + + + + apm-collector + org.skywalking + 3.2-2017 + + 4.0.0 + + apm-collector-agentregister + jar + + + + org.skywalking + apm-collector-cluster + ${project.version} + + + org.skywalking + apm-collector-server + ${project.version} + + + org.skywalking + apm-network + ${project.version} + + + \ No newline at end of file diff --git a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/AgentRegisterModuleContext.java b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/AgentRegisterModuleContext.java new file mode 100644 index 0000000000..0bea80ed0d --- /dev/null +++ b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/AgentRegisterModuleContext.java @@ -0,0 +1,12 @@ +package org.skywalking.apm.collector.agentregister; + +import org.skywalking.apm.collector.core.framework.Context; + +/** + * @author pengys5 + */ +public class AgentRegisterModuleContext extends Context { + public AgentRegisterModuleContext(String groupName) { + super(groupName); + } +} diff --git a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/AgentRegisterModuleDefine.java b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/AgentRegisterModuleDefine.java new file mode 100644 index 0000000000..976e784721 --- /dev/null +++ b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/AgentRegisterModuleDefine.java @@ -0,0 +1,51 @@ +package org.skywalking.apm.collector.agentregister; + +import java.util.List; +import java.util.Map; +import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine; +import org.skywalking.apm.collector.core.client.Client; +import org.skywalking.apm.collector.core.client.ClientException; +import org.skywalking.apm.collector.core.client.DataMonitor; +import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine; +import org.skywalking.apm.collector.core.cluster.ClusterModuleContext; +import org.skywalking.apm.collector.core.config.ConfigParseException; +import org.skywalking.apm.collector.core.framework.CollectorContextHelper; +import org.skywalking.apm.collector.core.framework.DefineException; +import org.skywalking.apm.collector.core.framework.Handler; +import org.skywalking.apm.collector.core.module.ModuleDefine; +import org.skywalking.apm.collector.core.server.Server; +import org.skywalking.apm.collector.core.server.ServerException; +import org.skywalking.apm.collector.core.server.ServerHolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public abstract class AgentRegisterModuleDefine extends ModuleDefine implements ClusterDataListenerDefine { + + private final Logger logger = LoggerFactory.getLogger(AgentRegisterModuleDefine.class); + + @Override + public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException { + try { + configParser().parse(config); + Server server = server(); + serverHolder.holdServer(server, handlerList()); + + ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration()); + } catch (ConfigParseException | ServerException e) { + throw new AgentRegisterModuleException(e.getMessage(), e); + } + } + + @Override protected final Client createClient(DataMonitor dataMonitor) { + throw new UnsupportedOperationException(""); + } + + @Override public final boolean defaultModule() { + return true; + } + + public abstract List handlerList(); +} diff --git a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/AgentRegisterModuleException.java b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/AgentRegisterModuleException.java new file mode 100644 index 0000000000..061937a7ee --- /dev/null +++ b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/AgentRegisterModuleException.java @@ -0,0 +1,16 @@ +package org.skywalking.apm.collector.agentregister; + +import org.skywalking.apm.collector.core.module.ModuleException; + +/** + * @author pengys5 + */ +public class AgentRegisterModuleException extends ModuleException { + public AgentRegisterModuleException(String message) { + super(message); + } + + public AgentRegisterModuleException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/AgentRegisterModuleGroupDefine.java b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/AgentRegisterModuleGroupDefine.java new file mode 100644 index 0000000000..d59aa5b858 --- /dev/null +++ b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/AgentRegisterModuleGroupDefine.java @@ -0,0 +1,25 @@ +package org.skywalking.apm.collector.agentregister; + +import org.skywalking.apm.collector.core.framework.Context; +import org.skywalking.apm.collector.core.module.ModuleGroupDefine; +import org.skywalking.apm.collector.core.module.ModuleInstaller; + +/** + * @author pengys5 + */ +public class AgentRegisterModuleGroupDefine implements ModuleGroupDefine { + + public static final String GROUP_NAME = "agent_register"; + + @Override public String name() { + return GROUP_NAME; + } + + @Override public Context groupContext() { + return new AgentRegisterModuleContext(GROUP_NAME); + } + + @Override public ModuleInstaller moduleInstaller() { + return new AgentRegisterModuleInstaller(); + } +} diff --git a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/AgentRegisterModuleInstaller.java b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/AgentRegisterModuleInstaller.java new file mode 100644 index 0000000000..0bf2218d1e --- /dev/null +++ b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/AgentRegisterModuleInstaller.java @@ -0,0 +1,36 @@ +package org.skywalking.apm.collector.agentregister; + +import java.util.Iterator; +import java.util.Map; +import org.skywalking.apm.collector.core.client.ClientException; +import org.skywalking.apm.collector.core.framework.CollectorContextHelper; +import org.skywalking.apm.collector.core.framework.DefineException; +import org.skywalking.apm.collector.core.module.ModuleDefine; +import org.skywalking.apm.collector.core.module.ModuleInstaller; +import org.skywalking.apm.collector.core.server.ServerHolder; +import org.skywalking.apm.collector.core.util.ObjectUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public class AgentRegisterModuleInstaller implements ModuleInstaller { + + private final Logger logger = LoggerFactory.getLogger(AgentRegisterModuleInstaller.class); + + @Override public void install(Map moduleConfig, + Map moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException { + logger.info("beginning agent register module install"); + + AgentRegisterModuleContext context = new AgentRegisterModuleContext(AgentRegisterModuleGroupDefine.GROUP_NAME); + CollectorContextHelper.INSTANCE.putContext(context); + + Iterator> moduleDefineEntry = moduleDefineMap.entrySet().iterator(); + while (moduleDefineEntry.hasNext()) { + ModuleDefine moduleDefine = moduleDefineEntry.next().getValue(); + logger.info("module {} initialize", moduleDefine.getClass().getName()); + moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder); + } + } +} diff --git a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCConfig.java b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCConfig.java new file mode 100644 index 0000000000..2d2ad6bc7a --- /dev/null +++ b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCConfig.java @@ -0,0 +1,9 @@ +package org.skywalking.apm.collector.agentregister.grpc; + +/** + * @author pengys5 + */ +public class AgentRegisterGRPCConfig { + public static String HOST; + public static int PORT; +} diff --git a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCConfigParser.java b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCConfigParser.java new file mode 100644 index 0000000000..cf59cd8ef3 --- /dev/null +++ b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCConfigParser.java @@ -0,0 +1,27 @@ +package org.skywalking.apm.collector.agentregister.grpc; + +import java.util.Map; +import org.skywalking.apm.collector.core.config.ConfigParseException; +import org.skywalking.apm.collector.core.module.ModuleConfigParser; +import org.skywalking.apm.collector.core.util.ObjectUtils; +import org.skywalking.apm.collector.core.util.StringUtils; + +/** + * @author pengys5 + */ +public class AgentRegisterGRPCConfigParser implements ModuleConfigParser { + + private static final String HOST = "host"; + private static final String PORT = "port"; + + @Override public void parse(Map config) throws ConfigParseException { + if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(HOST))) { + AgentRegisterGRPCConfig.HOST = "localhost"; + } + if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(PORT))) { + AgentRegisterGRPCConfig.PORT = 11800; + } else { + AgentRegisterGRPCConfig.PORT = (Integer)config.get(PORT); + } + } +} diff --git a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCDataListener.java b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCDataListener.java new file mode 100644 index 0000000000..e54e324ae6 --- /dev/null +++ b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCDataListener.java @@ -0,0 +1,17 @@ +package org.skywalking.apm.collector.agentregister.grpc; + +import org.skywalking.apm.collector.agentregister.AgentRegisterModuleGroupDefine; +import org.skywalking.apm.collector.cluster.ClusterModuleDefine; +import org.skywalking.apm.collector.core.cluster.ClusterDataListener; + +/** + * @author pengys5 + */ +public class AgentRegisterGRPCDataListener extends ClusterDataListener { + + public static final String PATH = ClusterModuleDefine.BASE_CATALOG + "." + AgentRegisterModuleGroupDefine.GROUP_NAME + "." + AgentRegisterGRPCModuleDefine.MODULE_NAME; + + @Override public String path() { + return PATH; + } +} diff --git a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCModuleDefine.java b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCModuleDefine.java new file mode 100644 index 0000000000..e9fca8ad35 --- /dev/null +++ b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCModuleDefine.java @@ -0,0 +1,47 @@ +package org.skywalking.apm.collector.agentregister.grpc; + +import java.util.List; +import org.skywalking.apm.collector.agentregister.AgentRegisterModuleDefine; +import org.skywalking.apm.collector.agentregister.AgentRegisterModuleGroupDefine; +import org.skywalking.apm.collector.core.cluster.ClusterDataListener; +import org.skywalking.apm.collector.core.framework.Handler; +import org.skywalking.apm.collector.core.module.ModuleConfigParser; +import org.skywalking.apm.collector.core.module.ModuleRegistration; +import org.skywalking.apm.collector.core.server.Server; +import org.skywalking.apm.collector.server.grpc.GRPCServer; + +/** + * @author pengys5 + */ +public class AgentRegisterGRPCModuleDefine extends AgentRegisterModuleDefine { + + public static final String MODULE_NAME = "grpc"; + + @Override protected String group() { + return AgentRegisterModuleGroupDefine.GROUP_NAME; + } + + @Override public String name() { + return MODULE_NAME; + } + + @Override protected ModuleConfigParser configParser() { + return new AgentRegisterGRPCConfigParser(); + } + + @Override protected Server server() { + return new GRPCServer(AgentRegisterGRPCConfig.HOST, AgentRegisterGRPCConfig.PORT); + } + + @Override protected ModuleRegistration registration() { + return new AgentRegisterGRPCModuleRegistration(); + } + + @Override public ClusterDataListener listener() { + return new AgentRegisterGRPCDataListener(); + } + + @Override public List handlerList() { + return null; + } +} diff --git a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCModuleRegistration.java b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCModuleRegistration.java new file mode 100644 index 0000000000..bc97dba60d --- /dev/null +++ b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCModuleRegistration.java @@ -0,0 +1,13 @@ +package org.skywalking.apm.collector.agentregister.grpc; + +import org.skywalking.apm.collector.core.module.ModuleRegistration; + +/** + * @author pengys5 + */ +public class AgentRegisterGRPCModuleRegistration extends ModuleRegistration { + + @Override public Value buildValue() { + return new Value(AgentRegisterGRPCConfig.HOST, AgentRegisterGRPCConfig.PORT, null); + } +} diff --git a/apm-collector/apm-collector-agentregister/src/main/resources/META-INF/defines/group.define b/apm-collector/apm-collector-agentregister/src/main/resources/META-INF/defines/group.define new file mode 100644 index 0000000000..0afe036bd7 --- /dev/null +++ b/apm-collector/apm-collector-agentregister/src/main/resources/META-INF/defines/group.define @@ -0,0 +1 @@ +org.skywalking.apm.collector.agentregister.AgentRegisterModuleGroupDefine \ No newline at end of file diff --git a/apm-collector/apm-collector-agentregister/src/main/resources/META-INF/defines/module.define b/apm-collector/apm-collector-agentregister/src/main/resources/META-INF/defines/module.define new file mode 100644 index 0000000000..70f99f0a2c --- /dev/null +++ b/apm-collector/apm-collector-agentregister/src/main/resources/META-INF/defines/module.define @@ -0,0 +1 @@ +org.skywalking.apm.collector.agentregister.grpc.AgentRegisterGRPCModuleDefine \ No newline at end of file diff --git a/apm-collector/apm-collector-agentserver/pom.xml b/apm-collector/apm-collector-agentserver/pom.xml index b4f26c8bf0..1451d0a0db 100644 --- a/apm-collector/apm-collector-agentserver/pom.xml +++ b/apm-collector/apm-collector-agentserver/pom.xml @@ -28,5 +28,10 @@ apm-collector-agentstream ${project.version} + + org.skywalking + apm-collector-agentregister + ${project.version} + \ No newline at end of file diff --git a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleDefine.java b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleDefine.java index 7e1adb82a8..e7e7d829a1 100644 --- a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleDefine.java +++ b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleDefine.java @@ -15,6 +15,7 @@ import org.skywalking.apm.collector.core.framework.Handler; import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.server.Server; import org.skywalking.apm.collector.core.server.ServerException; +import org.skywalking.apm.collector.core.server.ServerHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,13 +26,12 @@ public abstract class AgentServerModuleDefine extends ModuleDefine implements Cl private final Logger logger = LoggerFactory.getLogger(AgentServerModuleDefine.class); - @Override public final void initialize(Map config) throws DefineException, ClientException { + @Override + public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException { try { configParser().parse(config); Server server = server(); - server.initialize(); - handlerList().forEach(handler -> server.addHandler(handler)); - server.start(); + serverHolder.holdServer(server, handlerList()); ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration()); } catch (ConfigParseException | ServerException e) { diff --git a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleInstaller.java b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleInstaller.java index 10756f9508..c632161047 100644 --- a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleInstaller.java +++ b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/AgentServerModuleInstaller.java @@ -7,6 +7,7 @@ import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.module.ModuleInstaller; +import org.skywalking.apm.collector.core.server.ServerHolder; import org.skywalking.apm.collector.core.util.ObjectUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,7 +20,7 @@ public class AgentServerModuleInstaller implements ModuleInstaller { private final Logger logger = LoggerFactory.getLogger(AgentServerModuleInstaller.class); @Override public void install(Map moduleConfig, - Map moduleDefineMap) throws DefineException, ClientException { + Map moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException { logger.info("beginning agent server module install"); AgentServerModuleContext context = new AgentServerModuleContext(AgentServerModuleGroupDefine.GROUP_NAME); @@ -30,7 +31,7 @@ public class AgentServerModuleInstaller implements ModuleInstaller { while (moduleDefineEntry.hasNext()) { ModuleDefine moduleDefine = moduleDefineEntry.next().getValue(); logger.info("module {} initialize", moduleDefine.getClass().getName()); - moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null); + moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder); } } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleDefine.java index e69e7888d8..1535088279 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleDefine.java @@ -1,5 +1,6 @@ package org.skywalking.apm.collector.agentstream; +import java.util.List; import java.util.Map; import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine; import org.skywalking.apm.collector.core.client.Client; @@ -10,9 +11,11 @@ import org.skywalking.apm.collector.core.cluster.ClusterModuleContext; import org.skywalking.apm.collector.core.config.ConfigParseException; import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.DefineException; +import org.skywalking.apm.collector.core.framework.Handler; import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.server.Server; import org.skywalking.apm.collector.core.server.ServerException; +import org.skywalking.apm.collector.core.server.ServerHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,11 +26,12 @@ public abstract class AgentStreamModuleDefine extends ModuleDefine implements Cl private final Logger logger = LoggerFactory.getLogger(AgentStreamModuleDefine.class); - @Override public final void initialize(Map config) throws DefineException, ClientException { + @Override + public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException { try { configParser().parse(config); Server server = server(); - server.initialize(); + serverHolder.holdServer(server, handlerList()); ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration()); } catch (ConfigParseException | ServerException e) { @@ -35,11 +39,13 @@ public abstract class AgentStreamModuleDefine extends ModuleDefine implements Cl } } - @Override protected Client createClient(DataMonitor dataMonitor) { + @Override protected final Client createClient(DataMonitor dataMonitor) { throw new UnsupportedOperationException(""); } @Override public final boolean defaultModule() { return true; } + + public abstract List handlerList(); } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleInstaller.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleInstaller.java index 8feda295df..db4b3dc043 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleInstaller.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleInstaller.java @@ -7,6 +7,7 @@ import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.module.ModuleInstaller; +import org.skywalking.apm.collector.core.server.ServerHolder; import org.skywalking.apm.collector.core.util.ObjectUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,7 +20,7 @@ public class AgentStreamModuleInstaller implements ModuleInstaller { private final Logger logger = LoggerFactory.getLogger(AgentStreamModuleInstaller.class); @Override public void install(Map moduleConfig, - Map moduleDefineMap) throws DefineException, ClientException { + Map moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException { logger.info("beginning agent stream module install"); AgentStreamModuleContext context = new AgentStreamModuleContext(AgentStreamModuleGroupDefine.GROUP_NAME); @@ -30,7 +31,7 @@ public class AgentStreamModuleInstaller implements ModuleInstaller { while (moduleDefineEntry.hasNext()) { ModuleDefine moduleDefine = moduleDefineEntry.next().getValue(); logger.info("module {} initialize", moduleDefine.getClass().getName()); - moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null); + moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder); } } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCModuleDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCModuleDefine.java index f485682165..dd3b4c9891 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCModuleDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCModuleDefine.java @@ -1,8 +1,10 @@ package org.skywalking.apm.collector.agentstream.grpc; +import java.util.List; import org.skywalking.apm.collector.agentstream.AgentStreamModuleDefine; import org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine; import org.skywalking.apm.collector.core.cluster.ClusterDataListener; +import org.skywalking.apm.collector.core.framework.Handler; import org.skywalking.apm.collector.core.module.ModuleConfigParser; import org.skywalking.apm.collector.core.module.ModuleRegistration; import org.skywalking.apm.collector.core.server.Server; @@ -38,4 +40,8 @@ public class AgentStreamGRPCModuleDefine extends AgentStreamModuleDefine { @Override public ClusterDataListener listener() { return new AgentStreamGRPCDataListener(); } + + @Override public List handlerList() { + return null; + } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyModuleDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyModuleDefine.java index 42a86e729b..d20996ebd4 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyModuleDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyModuleDefine.java @@ -1,8 +1,10 @@ package org.skywalking.apm.collector.agentstream.jetty; +import java.util.List; import org.skywalking.apm.collector.agentstream.AgentStreamModuleDefine; import org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine; import org.skywalking.apm.collector.core.cluster.ClusterDataListener; +import org.skywalking.apm.collector.core.framework.Handler; import org.skywalking.apm.collector.core.module.ModuleConfigParser; import org.skywalking.apm.collector.core.module.ModuleRegistration; import org.skywalking.apm.collector.core.server.Server; @@ -38,4 +40,8 @@ public class AgentStreamJettyModuleDefine extends AgentStreamModuleDefine { @Override public ClusterDataListener listener() { return new AgentStreamJettyDataListener(); } + + @Override public List handlerList() { + return null; + } } diff --git a/apm-collector/apm-collector-boot/pom.xml b/apm-collector/apm-collector-boot/pom.xml index ae4b7501df..94efcc0fc8 100644 --- a/apm-collector/apm-collector-boot/pom.xml +++ b/apm-collector/apm-collector-boot/pom.xml @@ -38,5 +38,10 @@ apm-collector-agentstream ${project.version} + + org.skywalking + apm-collector-agentregister + ${project.version} + \ No newline at end of file diff --git a/apm-collector/apm-collector-boot/src/main/java/org/skywalking/apm/collector/boot/CollectorStarter.java b/apm-collector/apm-collector-boot/src/main/java/org/skywalking/apm/collector/boot/CollectorStarter.java index a5d55fd254..de22a4cf1f 100644 --- a/apm-collector/apm-collector-boot/src/main/java/org/skywalking/apm/collector/boot/CollectorStarter.java +++ b/apm-collector/apm-collector-boot/src/main/java/org/skywalking/apm/collector/boot/CollectorStarter.java @@ -12,6 +12,8 @@ import org.skywalking.apm.collector.core.module.ModuleDefineLoader; import org.skywalking.apm.collector.core.module.ModuleGroupDefine; import org.skywalking.apm.collector.core.module.ModuleGroupDefineLoader; import org.skywalking.apm.collector.core.remote.SerializedDefineLoader; +import org.skywalking.apm.collector.core.server.ServerException; +import org.skywalking.apm.collector.core.server.ServerHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,11 +37,20 @@ public class CollectorStarter implements Starter { ModuleDefineLoader defineLoader = new ModuleDefineLoader(); Map> moduleDefineMap = defineLoader.load(); - moduleGroupDefineMap.get(ClusterModuleGroupDefine.GROUP_NAME).moduleInstaller().install(configuration.get(ClusterModuleGroupDefine.GROUP_NAME), moduleDefineMap.get(ClusterModuleGroupDefine.GROUP_NAME)); + ServerHolder serverHolder = new ServerHolder(); + moduleGroupDefineMap.get(ClusterModuleGroupDefine.GROUP_NAME).moduleInstaller().install(configuration.get(ClusterModuleGroupDefine.GROUP_NAME), moduleDefineMap.get(ClusterModuleGroupDefine.GROUP_NAME), serverHolder); moduleGroupDefineMap.remove(ClusterModuleGroupDefine.GROUP_NAME); for (ModuleGroupDefine moduleGroupDefine : moduleGroupDefineMap.values()) { - moduleGroupDefine.moduleInstaller().install(configuration.get(moduleGroupDefine.name()), moduleDefineMap.get(moduleGroupDefine.name())); + moduleGroupDefine.moduleInstaller().install(configuration.get(moduleGroupDefine.name()), moduleDefineMap.get(moduleGroupDefine.name()), serverHolder); } + + serverHolder.getServers().forEach(server -> { + try { + server.start(); + } catch (ServerException e) { + logger.error(e.getMessage(), e); + } + }); } } diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/ClusterModuleDefine.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/ClusterModuleDefine.java index a1bd2df54e..900cf9b3df 100644 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/ClusterModuleDefine.java +++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/ClusterModuleDefine.java @@ -12,6 +12,7 @@ import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.module.ModuleRegistration; import org.skywalking.apm.collector.core.server.Server; +import org.skywalking.apm.collector.core.server.ServerHolder; /** * @author pengys5 @@ -22,7 +23,7 @@ public abstract class ClusterModuleDefine extends ModuleDefine { private Client client; - @Override public final void initialize(Map config) throws ClusterModuleException { + @Override public final void initialize(Map config, ServerHolder serverHolder) throws ClusterModuleException { try { configParser().parse(config); diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/ClusterModuleInstaller.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/ClusterModuleInstaller.java index 42a3c38a44..c9b56f811e 100644 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/ClusterModuleInstaller.java +++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/ClusterModuleInstaller.java @@ -8,6 +8,7 @@ import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.module.ModuleInstaller; +import org.skywalking.apm.collector.core.server.ServerHolder; import org.skywalking.apm.collector.core.util.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,7 +21,7 @@ public class ClusterModuleInstaller implements ModuleInstaller { private final Logger logger = LoggerFactory.getLogger(ClusterModuleInstaller.class); @Override public void install(Map moduleConfig, - Map moduleDefineMap) throws DefineException, ClientException { + Map moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException { logger.info("beginning cluster module install"); ClusterModuleContext context = new ClusterModuleContext(ClusterModuleGroupDefine.GROUP_NAME); @@ -34,14 +35,14 @@ public class ClusterModuleInstaller implements ModuleInstaller { moduleDefine = moduleDefineEntry.next().getValue(); if (moduleDefine.defaultModule()) { logger.info("module {} initialize", moduleDefine.getClass().getName()); - moduleDefine.initialize(null); + moduleDefine.initialize(null, serverHolder); break; } } } else { Map.Entry clusterConfigEntry = moduleConfig.entrySet().iterator().next(); moduleDefine = moduleDefineMap.get(clusterConfigEntry.getKey()); - moduleDefine.initialize(clusterConfigEntry.getValue()); + moduleDefine.initialize(clusterConfigEntry.getValue(), serverHolder); } } } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Define.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Define.java index 542d72b7ec..69e97e2940 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Define.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Define.java @@ -2,13 +2,14 @@ package org.skywalking.apm.collector.core.framework; import java.util.Map; import org.skywalking.apm.collector.core.client.ClientException; +import org.skywalking.apm.collector.core.server.ServerHolder; /** * @author pengys5 */ public interface Define { - void initialize(Map config) throws DefineException, ClientException; + void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException; String name(); } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleInstaller.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleInstaller.java index 40aa905c8e..2ec52b90b3 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleInstaller.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleInstaller.java @@ -3,11 +3,12 @@ package org.skywalking.apm.collector.core.module; import java.util.Map; import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.framework.DefineException; +import org.skywalking.apm.collector.core.server.ServerHolder; /** * @author pengys5 */ public interface ModuleInstaller { void install(Map moduleConfig, - Map moduleDefineMap) throws DefineException, ClientException; + Map moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException; } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/server/Server.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/server/Server.java index 4b246095ff..1d12d31e80 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/server/Server.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/server/Server.java @@ -7,6 +7,10 @@ import org.skywalking.apm.collector.core.framework.Handler; */ public interface Server { + String hostPort(); + + String serverClassify(); + void initialize() throws ServerException; void start() throws ServerException; diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/server/ServerHolder.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/server/ServerHolder.java new file mode 100644 index 0000000000..96edf8f02c --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/server/ServerHolder.java @@ -0,0 +1,43 @@ +package org.skywalking.apm.collector.core.server; + +import java.util.LinkedList; +import java.util.List; +import org.skywalking.apm.collector.core.framework.Handler; +import org.skywalking.apm.collector.core.util.CollectionUtils; + +/** + * @author pengys5 + */ +public class ServerHolder { + + private List servers; + + public ServerHolder() { + servers = new LinkedList<>(); + } + + public void holdServer(Server newServer, List handlers) throws ServerException { + boolean isNewServer = true; + for (Server server : servers) { + if (server.hostPort().equals(newServer.hostPort()) && server.serverClassify().equals(newServer.serverClassify())) { + isNewServer = false; + addHandler(handlers, server); + } + } + if (isNewServer) { + newServer.initialize(); + servers.add(newServer); + addHandler(handlers, newServer); + } + } + + private void addHandler(List handlers, Server server) { + if (CollectionUtils.isNotEmpty(handlers)) { + handlers.forEach(handler -> server.addHandler(handler)); + } + } + + public List getServers() { + return servers; + } +} diff --git a/apm-collector/apm-collector-discovery/pom.xml b/apm-collector/apm-collector-discovery/pom.xml deleted file mode 100644 index 5aba21592c..0000000000 --- a/apm-collector/apm-collector-discovery/pom.xml +++ /dev/null @@ -1,14 +0,0 @@ - - - - apm-collector - org.skywalking - 3.2-2017 - - 4.0.0 - - apm-collector-discovery - jar - \ No newline at end of file diff --git a/apm-collector/apm-collector-discovery/src/main/java/org/skywalking/apm/collector/discovery/DiscoveryJettyModuleDefine.java b/apm-collector/apm-collector-discovery/src/main/java/org/skywalking/apm/collector/discovery/DiscoveryJettyModuleDefine.java deleted file mode 100644 index 3d696fa9e7..0000000000 --- a/apm-collector/apm-collector-discovery/src/main/java/org/skywalking/apm/collector/discovery/DiscoveryJettyModuleDefine.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.skywalking.apm.collector.discovery; - -/** - * @author pengys5 - */ -public class DiscoveryJettyModuleDefine { -} diff --git a/apm-collector/apm-collector-discovery/src/main/resources/META-INF/defines/module.define b/apm-collector/apm-collector-discovery/src/main/resources/META-INF/defines/module.define deleted file mode 100644 index 427dbc6e49..0000000000 --- a/apm-collector/apm-collector-discovery/src/main/resources/META-INF/defines/module.define +++ /dev/null @@ -1 +0,0 @@ -org.skywalking.apm.collector.discovery.DiscoveryJettyModuleDefine \ No newline at end of file diff --git a/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/QueueModuleInstaller.java b/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/QueueModuleInstaller.java index 7403060ce7..590c32a660 100644 --- a/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/QueueModuleInstaller.java +++ b/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/QueueModuleInstaller.java @@ -5,6 +5,7 @@ import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.module.ModuleInstaller; +import org.skywalking.apm.collector.core.server.ServerHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,7 +17,7 @@ public class QueueModuleInstaller implements ModuleInstaller { private final Logger logger = LoggerFactory.getLogger(QueueModuleInstaller.class); @Override public void install(Map moduleConfig, - Map moduleDefineMap) throws DefineException, ClientException { + Map moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException { logger.info("beginning queue module install"); } diff --git a/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/datacarrier/QueueDataCarrierModuleDefine.java b/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/datacarrier/QueueDataCarrierModuleDefine.java index a5880c1b7a..2bef023b4b 100644 --- a/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/datacarrier/QueueDataCarrierModuleDefine.java +++ b/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/datacarrier/QueueDataCarrierModuleDefine.java @@ -4,6 +4,7 @@ import java.util.Map; import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.DefineException; +import org.skywalking.apm.collector.core.server.ServerHolder; import org.skywalking.apm.collector.queue.QueueModuleContext; import org.skywalking.apm.collector.queue.QueueModuleDefine; import org.skywalking.apm.collector.queue.QueueModuleGroupDefine; @@ -25,7 +26,8 @@ public class QueueDataCarrierModuleDefine extends QueueModuleDefine { return true; } - @Override public final void initialize(Map config) throws DefineException, ClientException { + @Override + public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException { ((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setQueueCreator(new DataCarrierQueueCreator()); } } diff --git a/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/disruptor/QueueDisruptorModuleDefine.java b/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/disruptor/QueueDisruptorModuleDefine.java index f550dd4e10..d74cb2deed 100644 --- a/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/disruptor/QueueDisruptorModuleDefine.java +++ b/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/disruptor/QueueDisruptorModuleDefine.java @@ -4,6 +4,7 @@ import java.util.Map; import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.DefineException; +import org.skywalking.apm.collector.core.server.ServerHolder; import org.skywalking.apm.collector.queue.QueueModuleContext; import org.skywalking.apm.collector.queue.QueueModuleDefine; import org.skywalking.apm.collector.queue.QueueModuleGroupDefine; @@ -25,7 +26,8 @@ public class QueueDisruptorModuleDefine extends QueueModuleDefine { return true; } - @Override public final void initialize(Map config) throws DefineException, ClientException { + @Override + public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException { ((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setQueueCreator(new DisruptorQueueCreator()); } } diff --git a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/RemoteModuleInstaller.java b/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/RemoteModuleInstaller.java index 470aa6392d..00a5bf070e 100644 --- a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/RemoteModuleInstaller.java +++ b/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/RemoteModuleInstaller.java @@ -5,13 +5,14 @@ import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.module.ModuleInstaller; +import org.skywalking.apm.collector.core.server.ServerHolder; /** * @author pengys5 */ public class RemoteModuleInstaller implements ModuleInstaller { - @Override public void install(Map moduleConfig, - Map moduleDefineMap) throws DefineException, ClientException { - + @Override public void install(Map moduleConfig, Map moduleDefineMap, + ServerHolder serverHolder) throws DefineException, ClientException { + } } diff --git a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCModuleDefine.java b/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCModuleDefine.java index f00bbace63..877952cffc 100644 --- a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCModuleDefine.java +++ b/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCModuleDefine.java @@ -8,6 +8,7 @@ import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.module.ModuleConfigParser; import org.skywalking.apm.collector.core.module.ModuleRegistration; import org.skywalking.apm.collector.core.server.Server; +import org.skywalking.apm.collector.core.server.ServerHolder; import org.skywalking.apm.collector.remote.RemoteModuleDefine; import org.skywalking.apm.collector.remote.RemoteModuleGroupDefine; @@ -39,7 +40,7 @@ public class RemoteGRPCModuleDefine extends RemoteModuleDefine { return null; } - @Override public void initialize(Map config) throws DefineException, ClientException { + @Override public void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException { } diff --git a/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/grpc/GRPCServer.java b/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/grpc/GRPCServer.java index 505ce43c6a..98c2f12db6 100644 --- a/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/grpc/GRPCServer.java +++ b/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/grpc/GRPCServer.java @@ -18,28 +18,37 @@ public class GRPCServer implements Server { private final String host; private final int port; + private io.grpc.Server server; + private NettyServerBuilder nettyServerBuilder; public GRPCServer(String host, int port) { this.host = host; this.port = port; } + @Override public String hostPort() { + return host + ":" + port; + } + + @Override public String serverClassify() { + return "Google-RPC"; + } + @Override public void initialize() throws ServerException { InetSocketAddress address = new InetSocketAddress(host, port); - NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forAddress(address); - try { - io.grpc.Server server = nettyServerBuilder.build().start(); - } catch (IOException e) { - throw new GRPCServerException(e.getMessage(), e); - } + nettyServerBuilder = NettyServerBuilder.forAddress(address); + server = nettyServerBuilder.build(); logger.info("Server started, host {} listening on {}", host, port); } @Override public void start() throws ServerException { - + try { + server.start(); + } catch (IOException e) { + throw new GRPCServerException(e.getMessage(), e); + } } @Override public void addHandler(Handler handler) { - } } diff --git a/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/jetty/JettyServer.java b/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/jetty/JettyServer.java index 3997a20b31..441a3b16a1 100644 --- a/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/jetty/JettyServer.java +++ b/apm-collector/apm-collector-server/src/main/java/org/skywalking/apm/collector/server/jetty/JettyServer.java @@ -29,6 +29,14 @@ public class JettyServer implements Server { this.contextPath = contextPath; } + @Override public String hostPort() { + return host + ":" + port; + } + + @Override public String serverClassify() { + return "Jetty"; + } + @Override public void initialize() throws ServerException { server = new org.eclipse.jetty.server.Server(new InetSocketAddress(host, port)); diff --git a/apm-collector/pom.xml b/apm-collector/pom.xml index d544f0a0b7..67c9b2384a 100644 --- a/apm-collector/pom.xml +++ b/apm-collector/pom.xml @@ -9,13 +9,13 @@ apm-collector-storage apm-collector-client apm-collector-server - apm-collector-discovery apm-collector-agentstream apm-collector-ui apm-collector-boot apm-collector-remote apm-collector-stream apm-collector-agentserver + apm-collector-agentregister apm -- GitLab