提交 730c6063 编写于 作者: P pengys5

add server holder to reuse server with same host, port, server classify

上级 c955f788
...@@ -9,6 +9,24 @@ ...@@ -9,6 +9,24 @@
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-discovery</artifactId> <artifactId>apm-collector-agentregister</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-cluster</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-network</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project> </project>
\ No newline at end of file
package org.skywalking.apm.collector.agentregister;
import org.skywalking.apm.collector.core.framework.Context;
/**
* @author pengys5
*/
public class AgentRegisterModuleContext extends Context {
public AgentRegisterModuleContext(String groupName) {
super(groupName);
}
}
package org.skywalking.apm.collector.agentregister;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class AgentRegisterModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(AgentRegisterModuleDefine.class);
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
try {
configParser().parse(config);
Server server = server();
serverHolder.holdServer(server, handlerList());
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) {
throw new AgentRegisterModuleException(e.getMessage(), e);
}
}
@Override protected final Client createClient(DataMonitor dataMonitor) {
throw new UnsupportedOperationException("");
}
@Override public final boolean defaultModule() {
return true;
}
public abstract List<Handler> handlerList();
}
package org.skywalking.apm.collector.agentregister;
import org.skywalking.apm.collector.core.module.ModuleException;
/**
* @author pengys5
*/
public class AgentRegisterModuleException extends ModuleException {
public AgentRegisterModuleException(String message) {
super(message);
}
public AgentRegisterModuleException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.agentregister;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
/**
* @author pengys5
*/
public class AgentRegisterModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "agent_register";
@Override public String name() {
return GROUP_NAME;
}
@Override public Context groupContext() {
return new AgentRegisterModuleContext(GROUP_NAME);
}
@Override public ModuleInstaller moduleInstaller() {
return new AgentRegisterModuleInstaller();
}
}
package org.skywalking.apm.collector.agentregister;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class AgentRegisterModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(AgentRegisterModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning agent register module install");
AgentRegisterModuleContext context = new AgentRegisterModuleContext(AgentRegisterModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder);
}
}
}
package org.skywalking.apm.collector.agentregister.grpc;
/**
* @author pengys5
*/
public class AgentRegisterGRPCConfig {
public static String HOST;
public static int PORT;
}
package org.skywalking.apm.collector.agentregister.grpc;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class AgentRegisterGRPCConfigParser implements ModuleConfigParser {
private static final String HOST = "host";
private static final String PORT = "port";
@Override public void parse(Map config) throws ConfigParseException {
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(HOST))) {
AgentRegisterGRPCConfig.HOST = "localhost";
}
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(PORT))) {
AgentRegisterGRPCConfig.PORT = 11800;
} else {
AgentRegisterGRPCConfig.PORT = (Integer)config.get(PORT);
}
}
}
package org.skywalking.apm.collector.agentregister.grpc;
import org.skywalking.apm.collector.agentregister.AgentRegisterModuleGroupDefine;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
/**
* @author pengys5
*/
public class AgentRegisterGRPCDataListener extends ClusterDataListener {
public static final String PATH = ClusterModuleDefine.BASE_CATALOG + "." + AgentRegisterModuleGroupDefine.GROUP_NAME + "." + AgentRegisterGRPCModuleDefine.MODULE_NAME;
@Override public String path() {
return PATH;
}
}
package org.skywalking.apm.collector.agentregister.grpc;
import java.util.List;
import org.skywalking.apm.collector.agentregister.AgentRegisterModuleDefine;
import org.skywalking.apm.collector.agentregister.AgentRegisterModuleGroupDefine;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.server.grpc.GRPCServer;
/**
* @author pengys5
*/
public class AgentRegisterGRPCModuleDefine extends AgentRegisterModuleDefine {
public static final String MODULE_NAME = "grpc";
@Override protected String group() {
return AgentRegisterModuleGroupDefine.GROUP_NAME;
}
@Override public String name() {
return MODULE_NAME;
}
@Override protected ModuleConfigParser configParser() {
return new AgentRegisterGRPCConfigParser();
}
@Override protected Server server() {
return new GRPCServer(AgentRegisterGRPCConfig.HOST, AgentRegisterGRPCConfig.PORT);
}
@Override protected ModuleRegistration registration() {
return new AgentRegisterGRPCModuleRegistration();
}
@Override public ClusterDataListener listener() {
return new AgentRegisterGRPCDataListener();
}
@Override public List<Handler> handlerList() {
return null;
}
}
package org.skywalking.apm.collector.agentregister.grpc;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public class AgentRegisterGRPCModuleRegistration extends ModuleRegistration {
@Override public Value buildValue() {
return new Value(AgentRegisterGRPCConfig.HOST, AgentRegisterGRPCConfig.PORT, null);
}
}
org.skywalking.apm.collector.agentregister.AgentRegisterModuleGroupDefine
\ No newline at end of file
org.skywalking.apm.collector.agentregister.grpc.AgentRegisterGRPCModuleDefine
\ No newline at end of file
...@@ -28,5 +28,10 @@ ...@@ -28,5 +28,10 @@
<artifactId>apm-collector-agentstream</artifactId> <artifactId>apm-collector-agentstream</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-agentregister</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
...@@ -15,6 +15,7 @@ import org.skywalking.apm.collector.core.framework.Handler; ...@@ -15,6 +15,7 @@ import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server; import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException; import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -25,13 +26,12 @@ public abstract class AgentServerModuleDefine extends ModuleDefine implements Cl ...@@ -25,13 +26,12 @@ public abstract class AgentServerModuleDefine extends ModuleDefine implements Cl
private final Logger logger = LoggerFactory.getLogger(AgentServerModuleDefine.class); private final Logger logger = LoggerFactory.getLogger(AgentServerModuleDefine.class);
@Override public final void initialize(Map config) throws DefineException, ClientException { @Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
try { try {
configParser().parse(config); configParser().parse(config);
Server server = server(); Server server = server();
server.initialize(); serverHolder.holdServer(server, handlerList());
handlerList().forEach(handler -> server.addHandler(handler));
server.start();
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration()); ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) { } catch (ConfigParseException | ServerException e) {
......
...@@ -7,6 +7,7 @@ import org.skywalking.apm.collector.core.framework.CollectorContextHelper; ...@@ -7,6 +7,7 @@ import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller; import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils; import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -19,7 +20,7 @@ public class AgentServerModuleInstaller implements ModuleInstaller { ...@@ -19,7 +20,7 @@ public class AgentServerModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(AgentServerModuleInstaller.class); private final Logger logger = LoggerFactory.getLogger(AgentServerModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig, @Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap) throws DefineException, ClientException { Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning agent server module install"); logger.info("beginning agent server module install");
AgentServerModuleContext context = new AgentServerModuleContext(AgentServerModuleGroupDefine.GROUP_NAME); AgentServerModuleContext context = new AgentServerModuleContext(AgentServerModuleGroupDefine.GROUP_NAME);
...@@ -30,7 +31,7 @@ public class AgentServerModuleInstaller implements ModuleInstaller { ...@@ -30,7 +31,7 @@ public class AgentServerModuleInstaller implements ModuleInstaller {
while (moduleDefineEntry.hasNext()) { while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue(); ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName()); logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null); moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder);
} }
} }
} }
package org.skywalking.apm.collector.agentstream; package org.skywalking.apm.collector.agentstream;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine; import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.Client; import org.skywalking.apm.collector.core.client.Client;
...@@ -10,9 +11,11 @@ import org.skywalking.apm.collector.core.cluster.ClusterModuleContext; ...@@ -10,9 +11,11 @@ import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException; import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server; import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException; import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -23,11 +26,12 @@ public abstract class AgentStreamModuleDefine extends ModuleDefine implements Cl ...@@ -23,11 +26,12 @@ public abstract class AgentStreamModuleDefine extends ModuleDefine implements Cl
private final Logger logger = LoggerFactory.getLogger(AgentStreamModuleDefine.class); private final Logger logger = LoggerFactory.getLogger(AgentStreamModuleDefine.class);
@Override public final void initialize(Map config) throws DefineException, ClientException { @Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
try { try {
configParser().parse(config); configParser().parse(config);
Server server = server(); Server server = server();
server.initialize(); serverHolder.holdServer(server, handlerList());
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration()); ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) { } catch (ConfigParseException | ServerException e) {
...@@ -35,11 +39,13 @@ public abstract class AgentStreamModuleDefine extends ModuleDefine implements Cl ...@@ -35,11 +39,13 @@ public abstract class AgentStreamModuleDefine extends ModuleDefine implements Cl
} }
} }
@Override protected Client createClient(DataMonitor dataMonitor) { @Override protected final Client createClient(DataMonitor dataMonitor) {
throw new UnsupportedOperationException(""); throw new UnsupportedOperationException("");
} }
@Override public final boolean defaultModule() { @Override public final boolean defaultModule() {
return true; return true;
} }
public abstract List<Handler> handlerList();
} }
...@@ -7,6 +7,7 @@ import org.skywalking.apm.collector.core.framework.CollectorContextHelper; ...@@ -7,6 +7,7 @@ import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller; import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils; import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -19,7 +20,7 @@ public class AgentStreamModuleInstaller implements ModuleInstaller { ...@@ -19,7 +20,7 @@ public class AgentStreamModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(AgentStreamModuleInstaller.class); private final Logger logger = LoggerFactory.getLogger(AgentStreamModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig, @Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap) throws DefineException, ClientException { Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning agent stream module install"); logger.info("beginning agent stream module install");
AgentStreamModuleContext context = new AgentStreamModuleContext(AgentStreamModuleGroupDefine.GROUP_NAME); AgentStreamModuleContext context = new AgentStreamModuleContext(AgentStreamModuleGroupDefine.GROUP_NAME);
...@@ -30,7 +31,7 @@ public class AgentStreamModuleInstaller implements ModuleInstaller { ...@@ -30,7 +31,7 @@ public class AgentStreamModuleInstaller implements ModuleInstaller {
while (moduleDefineEntry.hasNext()) { while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue(); ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName()); logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null); moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder);
} }
} }
} }
package org.skywalking.apm.collector.agentstream.grpc; package org.skywalking.apm.collector.agentstream.grpc;
import java.util.List;
import org.skywalking.apm.collector.agentstream.AgentStreamModuleDefine; import org.skywalking.apm.collector.agentstream.AgentStreamModuleDefine;
import org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine; import org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener; import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser; import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleRegistration; import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server; import org.skywalking.apm.collector.core.server.Server;
...@@ -38,4 +40,8 @@ public class AgentStreamGRPCModuleDefine extends AgentStreamModuleDefine { ...@@ -38,4 +40,8 @@ public class AgentStreamGRPCModuleDefine extends AgentStreamModuleDefine {
@Override public ClusterDataListener listener() { @Override public ClusterDataListener listener() {
return new AgentStreamGRPCDataListener(); return new AgentStreamGRPCDataListener();
} }
@Override public List<Handler> handlerList() {
return null;
}
} }
package org.skywalking.apm.collector.agentstream.jetty; package org.skywalking.apm.collector.agentstream.jetty;
import java.util.List;
import org.skywalking.apm.collector.agentstream.AgentStreamModuleDefine; import org.skywalking.apm.collector.agentstream.AgentStreamModuleDefine;
import org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine; import org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener; import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser; import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleRegistration; import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server; import org.skywalking.apm.collector.core.server.Server;
...@@ -38,4 +40,8 @@ public class AgentStreamJettyModuleDefine extends AgentStreamModuleDefine { ...@@ -38,4 +40,8 @@ public class AgentStreamJettyModuleDefine extends AgentStreamModuleDefine {
@Override public ClusterDataListener listener() { @Override public ClusterDataListener listener() {
return new AgentStreamJettyDataListener(); return new AgentStreamJettyDataListener();
} }
@Override public List<Handler> handlerList() {
return null;
}
} }
...@@ -38,5 +38,10 @@ ...@@ -38,5 +38,10 @@
<artifactId>apm-collector-agentstream</artifactId> <artifactId>apm-collector-agentstream</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-agentregister</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
...@@ -12,6 +12,8 @@ import org.skywalking.apm.collector.core.module.ModuleDefineLoader; ...@@ -12,6 +12,8 @@ import org.skywalking.apm.collector.core.module.ModuleDefineLoader;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine; import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleGroupDefineLoader; import org.skywalking.apm.collector.core.module.ModuleGroupDefineLoader;
import org.skywalking.apm.collector.core.remote.SerializedDefineLoader; import org.skywalking.apm.collector.core.remote.SerializedDefineLoader;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -35,11 +37,20 @@ public class CollectorStarter implements Starter { ...@@ -35,11 +37,20 @@ public class CollectorStarter implements Starter {
ModuleDefineLoader defineLoader = new ModuleDefineLoader(); ModuleDefineLoader defineLoader = new ModuleDefineLoader();
Map<String, Map<String, ModuleDefine>> moduleDefineMap = defineLoader.load(); Map<String, Map<String, ModuleDefine>> moduleDefineMap = defineLoader.load();
moduleGroupDefineMap.get(ClusterModuleGroupDefine.GROUP_NAME).moduleInstaller().install(configuration.get(ClusterModuleGroupDefine.GROUP_NAME), moduleDefineMap.get(ClusterModuleGroupDefine.GROUP_NAME)); ServerHolder serverHolder = new ServerHolder();
moduleGroupDefineMap.get(ClusterModuleGroupDefine.GROUP_NAME).moduleInstaller().install(configuration.get(ClusterModuleGroupDefine.GROUP_NAME), moduleDefineMap.get(ClusterModuleGroupDefine.GROUP_NAME), serverHolder);
moduleGroupDefineMap.remove(ClusterModuleGroupDefine.GROUP_NAME); moduleGroupDefineMap.remove(ClusterModuleGroupDefine.GROUP_NAME);
for (ModuleGroupDefine moduleGroupDefine : moduleGroupDefineMap.values()) { for (ModuleGroupDefine moduleGroupDefine : moduleGroupDefineMap.values()) {
moduleGroupDefine.moduleInstaller().install(configuration.get(moduleGroupDefine.name()), moduleDefineMap.get(moduleGroupDefine.name())); moduleGroupDefine.moduleInstaller().install(configuration.get(moduleGroupDefine.name()), moduleDefineMap.get(moduleGroupDefine.name()), serverHolder);
} }
serverHolder.getServers().forEach(server -> {
try {
server.start();
} catch (ServerException e) {
logger.error(e.getMessage(), e);
}
});
} }
} }
...@@ -12,6 +12,7 @@ import org.skywalking.apm.collector.core.framework.CollectorContextHelper; ...@@ -12,6 +12,7 @@ import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleRegistration; import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server; import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerHolder;
/** /**
* @author pengys5 * @author pengys5
...@@ -22,7 +23,7 @@ public abstract class ClusterModuleDefine extends ModuleDefine { ...@@ -22,7 +23,7 @@ public abstract class ClusterModuleDefine extends ModuleDefine {
private Client client; private Client client;
@Override public final void initialize(Map config) throws ClusterModuleException { @Override public final void initialize(Map config, ServerHolder serverHolder) throws ClusterModuleException {
try { try {
configParser().parse(config); configParser().parse(config);
......
...@@ -8,6 +8,7 @@ import org.skywalking.apm.collector.core.framework.CollectorContextHelper; ...@@ -8,6 +8,7 @@ import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller; import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.CollectionUtils; import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -20,7 +21,7 @@ public class ClusterModuleInstaller implements ModuleInstaller { ...@@ -20,7 +21,7 @@ public class ClusterModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(ClusterModuleInstaller.class); private final Logger logger = LoggerFactory.getLogger(ClusterModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig, @Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap) throws DefineException, ClientException { Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning cluster module install"); logger.info("beginning cluster module install");
ClusterModuleContext context = new ClusterModuleContext(ClusterModuleGroupDefine.GROUP_NAME); ClusterModuleContext context = new ClusterModuleContext(ClusterModuleGroupDefine.GROUP_NAME);
...@@ -34,14 +35,14 @@ public class ClusterModuleInstaller implements ModuleInstaller { ...@@ -34,14 +35,14 @@ public class ClusterModuleInstaller implements ModuleInstaller {
moduleDefine = moduleDefineEntry.next().getValue(); moduleDefine = moduleDefineEntry.next().getValue();
if (moduleDefine.defaultModule()) { if (moduleDefine.defaultModule()) {
logger.info("module {} initialize", moduleDefine.getClass().getName()); logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize(null); moduleDefine.initialize(null, serverHolder);
break; break;
} }
} }
} else { } else {
Map.Entry<String, Map> clusterConfigEntry = moduleConfig.entrySet().iterator().next(); Map.Entry<String, Map> clusterConfigEntry = moduleConfig.entrySet().iterator().next();
moduleDefine = moduleDefineMap.get(clusterConfigEntry.getKey()); moduleDefine = moduleDefineMap.get(clusterConfigEntry.getKey());
moduleDefine.initialize(clusterConfigEntry.getValue()); moduleDefine.initialize(clusterConfigEntry.getValue(), serverHolder);
} }
} }
} }
...@@ -2,13 +2,14 @@ package org.skywalking.apm.collector.core.framework; ...@@ -2,13 +2,14 @@ package org.skywalking.apm.collector.core.framework;
import java.util.Map; import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.server.ServerHolder;
/** /**
* @author pengys5 * @author pengys5
*/ */
public interface Define { public interface Define {
void initialize(Map config) throws DefineException, ClientException; void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException;
String name(); String name();
} }
...@@ -3,11 +3,12 @@ package org.skywalking.apm.collector.core.module; ...@@ -3,11 +3,12 @@ package org.skywalking.apm.collector.core.module;
import java.util.Map; import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.server.ServerHolder;
/** /**
* @author pengys5 * @author pengys5
*/ */
public interface ModuleInstaller { public interface ModuleInstaller {
void install(Map<String, Map> moduleConfig, void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap) throws DefineException, ClientException; Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException;
} }
...@@ -7,6 +7,10 @@ import org.skywalking.apm.collector.core.framework.Handler; ...@@ -7,6 +7,10 @@ import org.skywalking.apm.collector.core.framework.Handler;
*/ */
public interface Server { public interface Server {
String hostPort();
String serverClassify();
void initialize() throws ServerException; void initialize() throws ServerException;
void start() throws ServerException; void start() throws ServerException;
......
package org.skywalking.apm.collector.core.server;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.util.CollectionUtils;
/**
* @author pengys5
*/
public class ServerHolder {
private List<Server> servers;
public ServerHolder() {
servers = new LinkedList<>();
}
public void holdServer(Server newServer, List<Handler> handlers) throws ServerException {
boolean isNewServer = true;
for (Server server : servers) {
if (server.hostPort().equals(newServer.hostPort()) && server.serverClassify().equals(newServer.serverClassify())) {
isNewServer = false;
addHandler(handlers, server);
}
}
if (isNewServer) {
newServer.initialize();
servers.add(newServer);
addHandler(handlers, newServer);
}
}
private void addHandler(List<Handler> handlers, Server server) {
if (CollectionUtils.isNotEmpty(handlers)) {
handlers.forEach(handler -> server.addHandler(handler));
}
}
public List<Server> getServers() {
return servers;
}
}
package org.skywalking.apm.collector.discovery;
/**
* @author pengys5
*/
public class DiscoveryJettyModuleDefine {
}
org.skywalking.apm.collector.discovery.DiscoveryJettyModuleDefine
\ No newline at end of file
...@@ -5,6 +5,7 @@ import org.skywalking.apm.collector.core.client.ClientException; ...@@ -5,6 +5,7 @@ import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller; import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -16,7 +17,7 @@ public class QueueModuleInstaller implements ModuleInstaller { ...@@ -16,7 +17,7 @@ public class QueueModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(QueueModuleInstaller.class); private final Logger logger = LoggerFactory.getLogger(QueueModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig, @Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap) throws DefineException, ClientException { Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning queue module install"); logger.info("beginning queue module install");
} }
......
...@@ -4,6 +4,7 @@ import java.util.Map; ...@@ -4,6 +4,7 @@ import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.queue.QueueModuleContext; import org.skywalking.apm.collector.queue.QueueModuleContext;
import org.skywalking.apm.collector.queue.QueueModuleDefine; import org.skywalking.apm.collector.queue.QueueModuleDefine;
import org.skywalking.apm.collector.queue.QueueModuleGroupDefine; import org.skywalking.apm.collector.queue.QueueModuleGroupDefine;
...@@ -25,7 +26,8 @@ public class QueueDataCarrierModuleDefine extends QueueModuleDefine { ...@@ -25,7 +26,8 @@ public class QueueDataCarrierModuleDefine extends QueueModuleDefine {
return true; return true;
} }
@Override public final void initialize(Map config) throws DefineException, ClientException { @Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setQueueCreator(new DataCarrierQueueCreator()); ((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setQueueCreator(new DataCarrierQueueCreator());
} }
} }
...@@ -4,6 +4,7 @@ import java.util.Map; ...@@ -4,6 +4,7 @@ import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.queue.QueueModuleContext; import org.skywalking.apm.collector.queue.QueueModuleContext;
import org.skywalking.apm.collector.queue.QueueModuleDefine; import org.skywalking.apm.collector.queue.QueueModuleDefine;
import org.skywalking.apm.collector.queue.QueueModuleGroupDefine; import org.skywalking.apm.collector.queue.QueueModuleGroupDefine;
...@@ -25,7 +26,8 @@ public class QueueDisruptorModuleDefine extends QueueModuleDefine { ...@@ -25,7 +26,8 @@ public class QueueDisruptorModuleDefine extends QueueModuleDefine {
return true; return true;
} }
@Override public final void initialize(Map config) throws DefineException, ClientException { @Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setQueueCreator(new DisruptorQueueCreator()); ((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setQueueCreator(new DisruptorQueueCreator());
} }
} }
...@@ -5,13 +5,14 @@ import org.skywalking.apm.collector.core.client.ClientException; ...@@ -5,13 +5,14 @@ import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller; import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
/** /**
* @author pengys5 * @author pengys5
*/ */
public class RemoteModuleInstaller implements ModuleInstaller { public class RemoteModuleInstaller implements ModuleInstaller {
@Override public void install(Map<String, Map> moduleConfig, @Override public void install(Map<String, Map> moduleConfig, Map<String, ModuleDefine> moduleDefineMap,
Map<String, ModuleDefine> moduleDefineMap) throws DefineException, ClientException { ServerHolder serverHolder) throws DefineException, ClientException {
} }
} }
...@@ -8,6 +8,7 @@ import org.skywalking.apm.collector.core.framework.DefineException; ...@@ -8,6 +8,7 @@ import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser; import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleRegistration; import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server; import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.remote.RemoteModuleDefine; import org.skywalking.apm.collector.remote.RemoteModuleDefine;
import org.skywalking.apm.collector.remote.RemoteModuleGroupDefine; import org.skywalking.apm.collector.remote.RemoteModuleGroupDefine;
...@@ -39,7 +40,7 @@ public class RemoteGRPCModuleDefine extends RemoteModuleDefine { ...@@ -39,7 +40,7 @@ public class RemoteGRPCModuleDefine extends RemoteModuleDefine {
return null; return null;
} }
@Override public void initialize(Map config) throws DefineException, ClientException { @Override public void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
} }
......
...@@ -18,28 +18,37 @@ public class GRPCServer implements Server { ...@@ -18,28 +18,37 @@ public class GRPCServer implements Server {
private final String host; private final String host;
private final int port; private final int port;
private io.grpc.Server server;
private NettyServerBuilder nettyServerBuilder;
public GRPCServer(String host, int port) { public GRPCServer(String host, int port) {
this.host = host; this.host = host;
this.port = port; this.port = port;
} }
@Override public String hostPort() {
return host + ":" + port;
}
@Override public String serverClassify() {
return "Google-RPC";
}
@Override public void initialize() throws ServerException { @Override public void initialize() throws ServerException {
InetSocketAddress address = new InetSocketAddress(host, port); InetSocketAddress address = new InetSocketAddress(host, port);
NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forAddress(address); nettyServerBuilder = NettyServerBuilder.forAddress(address);
try { server = nettyServerBuilder.build();
io.grpc.Server server = nettyServerBuilder.build().start();
} catch (IOException e) {
throw new GRPCServerException(e.getMessage(), e);
}
logger.info("Server started, host {} listening on {}", host, port); logger.info("Server started, host {} listening on {}", host, port);
} }
@Override public void start() throws ServerException { @Override public void start() throws ServerException {
try {
server.start();
} catch (IOException e) {
throw new GRPCServerException(e.getMessage(), e);
}
} }
@Override public void addHandler(Handler handler) { @Override public void addHandler(Handler handler) {
} }
} }
...@@ -29,6 +29,14 @@ public class JettyServer implements Server { ...@@ -29,6 +29,14 @@ public class JettyServer implements Server {
this.contextPath = contextPath; this.contextPath = contextPath;
} }
@Override public String hostPort() {
return host + ":" + port;
}
@Override public String serverClassify() {
return "Jetty";
}
@Override public void initialize() throws ServerException { @Override public void initialize() throws ServerException {
server = new org.eclipse.jetty.server.Server(new InetSocketAddress(host, port)); server = new org.eclipse.jetty.server.Server(new InetSocketAddress(host, port));
......
...@@ -9,13 +9,13 @@ ...@@ -9,13 +9,13 @@
<module>apm-collector-storage</module> <module>apm-collector-storage</module>
<module>apm-collector-client</module> <module>apm-collector-client</module>
<module>apm-collector-server</module> <module>apm-collector-server</module>
<module>apm-collector-discovery</module>
<module>apm-collector-agentstream</module> <module>apm-collector-agentstream</module>
<module>apm-collector-ui</module> <module>apm-collector-ui</module>
<module>apm-collector-boot</module> <module>apm-collector-boot</module>
<module>apm-collector-remote</module> <module>apm-collector-remote</module>
<module>apm-collector-stream</module> <module>apm-collector-stream</module>
<module>apm-collector-agentserver</module> <module>apm-collector-agentserver</module>
<module>apm-collector-agentregister</module>
</modules> </modules>
<parent> <parent>
<artifactId>apm</artifactId> <artifactId>apm</artifactId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册