提交 c955f788 编写于 作者: P pengys5

Add agent server module to get the collector stream server address

/agentstream/grpc
/agentstream/jetty
上级 4071b27b
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-agentserver</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-cluster</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-agentstream</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.agentserver;
import org.skywalking.apm.collector.core.framework.Context;
/**
* @author pengys5
*/
public class AgentServerModuleContext extends Context {
public AgentServerModuleContext(String groupName) {
super(groupName);
}
}
package org.skywalking.apm.collector.agentserver;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class AgentServerModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(AgentServerModuleDefine.class);
@Override public final void initialize(Map config) throws DefineException, ClientException {
try {
configParser().parse(config);
Server server = server();
server.initialize();
handlerList().forEach(handler -> server.addHandler(handler));
server.start();
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) {
throw new AgentServerModuleException(e.getMessage(), e);
}
}
@Override protected final Client createClient(DataMonitor dataMonitor) {
throw new UnsupportedOperationException("");
}
public abstract List<Handler> handlerList();
}
package org.skywalking.apm.collector.agentserver;
import org.skywalking.apm.collector.core.module.ModuleException;
/**
* @author pengys5
*/
public class AgentServerModuleException extends ModuleException {
public AgentServerModuleException(String message) {
super(message);
}
public AgentServerModuleException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.agentserver;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
/**
* @author pengys5
*/
public class AgentServerModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "agent_server";
@Override public String name() {
return GROUP_NAME;
}
@Override public Context groupContext() {
return new AgentServerModuleContext(GROUP_NAME);
}
@Override public ModuleInstaller moduleInstaller() {
return new AgentServerModuleInstaller();
}
}
package org.skywalking.apm.collector.agentserver;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class AgentServerModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(AgentServerModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap) throws DefineException, ClientException {
logger.info("beginning agent server module install");
AgentServerModuleContext context = new AgentServerModuleContext(AgentServerModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
logger.info("could not configure agent server module, use the default");
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null);
}
}
}
package org.skywalking.apm.collector.agentserver.jetty;
/**
* @author pengys5
*/
public class AgentServerJettyConfig {
public static String HOST;
public static int PORT;
public static String CONTEXT_PATH;
}
package org.skywalking.apm.collector.agentserver.jetty;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class AgentServerJettyConfigParser implements ModuleConfigParser {
private static final String HOST = "host";
private static final String PORT = "port";
public static final String CONTEXT_PATH = "contextPath";
@Override public void parse(Map config) throws ConfigParseException {
AgentServerJettyConfig.CONTEXT_PATH = "/";
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(HOST))) {
AgentServerJettyConfig.HOST = "localhost";
}
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(PORT))) {
AgentServerJettyConfig.PORT = 10800;
} else {
AgentServerJettyConfig.PORT = (Integer)config.get(PORT);
}
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(CONTEXT_PATH))) {
AgentServerJettyConfig.CONTEXT_PATH = (String)config.get(CONTEXT_PATH);
}
}
}
package org.skywalking.apm.collector.agentserver.jetty;
import org.skywalking.apm.collector.agentserver.AgentServerModuleGroupDefine;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
/**
* @author pengys5
*/
public class AgentServerJettyDataListener extends ClusterDataListener {
@Override public String path() {
return ClusterModuleDefine.BASE_CATALOG + "." + AgentServerModuleGroupDefine.GROUP_NAME + "." + AgentServerJettyModuleDefine.MODULE_NAME;
}
}
package org.skywalking.apm.collector.agentserver.jetty;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.agentserver.AgentServerModuleDefine;
import org.skywalking.apm.collector.agentserver.AgentServerModuleGroupDefine;
import org.skywalking.apm.collector.agentserver.jetty.handler.AgentStreamGRPCServerHandler;
import org.skywalking.apm.collector.agentserver.jetty.handler.AgentStreamJettyServerHandler;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.server.jetty.JettyServer;
/**
* @author pengys5
*/
public class AgentServerJettyModuleDefine extends AgentServerModuleDefine {
public static final String MODULE_NAME = "jetty";
@Override protected String group() {
return AgentServerModuleGroupDefine.GROUP_NAME;
}
@Override public String name() {
return MODULE_NAME;
}
@Override public boolean defaultModule() {
return true;
}
@Override protected ModuleConfigParser configParser() {
return new AgentServerJettyConfigParser();
}
@Override protected Server server() {
return new JettyServer(AgentServerJettyConfig.HOST, AgentServerJettyConfig.PORT, AgentServerJettyConfig.CONTEXT_PATH);
}
@Override protected ModuleRegistration registration() {
return new AgentServerJettyModuleRegistration();
}
@Override public ClusterDataListener listener() {
return new AgentServerJettyDataListener();
}
@Override public List<Handler> handlerList() {
List<Handler> handlers = new LinkedList<>();
handlers.add(new AgentStreamGRPCServerHandler());
handlers.add(new AgentStreamJettyServerHandler());
return handlers;
}
}
package org.skywalking.apm.collector.agentserver.jetty;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public class AgentServerJettyModuleRegistration extends ModuleRegistration {
@Override public Value buildValue() {
return new Value(AgentServerJettyConfig.HOST, AgentServerJettyConfig.PORT, AgentServerJettyConfig.CONTEXT_PATH);
}
}
package org.skywalking.apm.collector.agentserver.jetty.handler;
import com.google.gson.JsonArray;
import java.io.IOException;
import java.util.List;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.skywalking.apm.collector.agentstream.grpc.AgentStreamGRPCDataListener;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
/**
* @author pengys5
*/
public class AgentStreamGRPCServerHandler extends JettyHandler {
@Override public String pathSpec() {
return "/agentstream/grpc";
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader();
List<String> servers = reader.read(AgentStreamGRPCDataListener.PATH);
JsonArray serverArray = new JsonArray();
servers.forEach(server -> {
serverArray.add(server);
});
reply(resp, serverArray, HttpServletResponse.SC_OK);
}
}
package org.skywalking.apm.collector.agentserver.jetty.handler;
import com.google.gson.JsonArray;
import java.io.IOException;
import java.util.List;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.skywalking.apm.collector.agentstream.jetty.AgentStreamJettyDataListener;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
/**
* @author pengys5
*/
public class AgentStreamJettyServerHandler extends JettyHandler {
@Override public String pathSpec() {
return "/agentstream/jetty";
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader();
List<String> servers = reader.read(AgentStreamJettyDataListener.PATH);
JsonArray serverArray = new JsonArray();
servers.forEach(server -> {
serverArray.add(server);
});
reply(resp, serverArray, HttpServletResponse.SC_OK);
}
}
org.skywalking.apm.collector.agentserver.AgentServerModuleGroupDefine
\ No newline at end of file
org.skywalking.apm.collector.agentserver.jetty.AgentServerJettyModuleDefine
\ No newline at end of file
...@@ -9,11 +9,9 @@ import org.skywalking.apm.collector.core.cluster.ClusterDataListener; ...@@ -9,11 +9,9 @@ import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
*/ */
public class AgentStreamGRPCDataListener extends ClusterDataListener { public class AgentStreamGRPCDataListener extends ClusterDataListener {
public AgentStreamGRPCDataListener(String moduleName) { public static final String PATH = ClusterModuleDefine.BASE_CATALOG + "." + AgentStreamModuleGroupDefine.GROUP_NAME + "." + AgentStreamGRPCModuleDefine.MODULE_NAME;
super(moduleName);
}
@Override public String path() { @Override public String path() {
return ClusterModuleDefine.BASE_CATALOG + "." + AgentStreamModuleGroupDefine.GROUP_NAME + "." + moduleName(); return PATH;
} }
} }
...@@ -13,12 +13,14 @@ import org.skywalking.apm.collector.server.grpc.GRPCServer; ...@@ -13,12 +13,14 @@ import org.skywalking.apm.collector.server.grpc.GRPCServer;
*/ */
public class AgentStreamGRPCModuleDefine extends AgentStreamModuleDefine { public class AgentStreamGRPCModuleDefine extends AgentStreamModuleDefine {
public static final String MODULE_NAME = "grpc";
@Override protected String group() { @Override protected String group() {
return AgentStreamModuleGroupDefine.GROUP_NAME; return AgentStreamModuleGroupDefine.GROUP_NAME;
} }
@Override public String name() { @Override public String name() {
return "grpc"; return MODULE_NAME;
} }
@Override protected ModuleConfigParser configParser() { @Override protected ModuleConfigParser configParser() {
...@@ -34,6 +36,6 @@ public class AgentStreamGRPCModuleDefine extends AgentStreamModuleDefine { ...@@ -34,6 +36,6 @@ public class AgentStreamGRPCModuleDefine extends AgentStreamModuleDefine {
} }
@Override public ClusterDataListener listener() { @Override public ClusterDataListener listener() {
return new AgentStreamGRPCDataListener(name()); return new AgentStreamGRPCDataListener();
} }
} }
...@@ -9,11 +9,9 @@ import org.skywalking.apm.collector.core.cluster.ClusterDataListener; ...@@ -9,11 +9,9 @@ import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
*/ */
public class AgentStreamJettyDataListener extends ClusterDataListener { public class AgentStreamJettyDataListener extends ClusterDataListener {
public AgentStreamJettyDataListener(String moduleName) { public static final String PATH = ClusterModuleDefine.BASE_CATALOG + "." + AgentStreamModuleGroupDefine.GROUP_NAME + "." + AgentStreamJettyModuleDefine.MODULE_NAME;
super(moduleName);
}
@Override public String path() { @Override public String path() {
return ClusterModuleDefine.BASE_CATALOG + "." + AgentStreamModuleGroupDefine.GROUP_NAME + "." + moduleName(); return PATH;
} }
} }
...@@ -13,12 +13,14 @@ import org.skywalking.apm.collector.server.jetty.JettyServer; ...@@ -13,12 +13,14 @@ import org.skywalking.apm.collector.server.jetty.JettyServer;
*/ */
public class AgentStreamJettyModuleDefine extends AgentStreamModuleDefine { public class AgentStreamJettyModuleDefine extends AgentStreamModuleDefine {
public static final String MODULE_NAME = "jetty";
@Override protected String group() { @Override protected String group() {
return AgentStreamModuleGroupDefine.GROUP_NAME; return AgentStreamModuleGroupDefine.GROUP_NAME;
} }
@Override public String name() { @Override public String name() {
return "jetty"; return MODULE_NAME;
} }
@Override protected ModuleConfigParser configParser() { @Override protected ModuleConfigParser configParser() {
...@@ -34,6 +36,6 @@ public class AgentStreamJettyModuleDefine extends AgentStreamModuleDefine { ...@@ -34,6 +36,6 @@ public class AgentStreamJettyModuleDefine extends AgentStreamModuleDefine {
} }
@Override public ClusterDataListener listener() { @Override public ClusterDataListener listener() {
return new AgentStreamJettyDataListener(name()); return new AgentStreamJettyDataListener();
} }
} }
package org.skywalking.apm.collector.agentstream.jetty; package org.skywalking.apm.collector.agentstream.jetty;
import com.google.gson.JsonObject;
import org.skywalking.apm.collector.core.module.ModuleRegistration; import org.skywalking.apm.collector.core.module.ModuleRegistration;
/** /**
...@@ -9,8 +8,6 @@ import org.skywalking.apm.collector.core.module.ModuleRegistration; ...@@ -9,8 +8,6 @@ import org.skywalking.apm.collector.core.module.ModuleRegistration;
public class AgentStreamJettyModuleRegistration extends ModuleRegistration { public class AgentStreamJettyModuleRegistration extends ModuleRegistration {
@Override public Value buildValue() { @Override public Value buildValue() {
JsonObject data = new JsonObject(); return new Value(AgentStreamJettyConfig.HOST, AgentStreamJettyConfig.PORT, AgentStreamJettyConfig.CONTEXT_PATH);
data.addProperty(AgentStreamJettyConfigParser.CONTEXT_PATH, AgentStreamJettyConfig.CONTEXT_PATH);
return new Value(AgentStreamJettyConfig.HOST, AgentStreamJettyConfig.PORT, data);
} }
} }
...@@ -28,6 +28,11 @@ ...@@ -28,6 +28,11 @@
<artifactId>apm-collector-ui</artifactId> <artifactId>apm-collector-ui</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-agentserver</artifactId>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.skywalking</groupId> <groupId>org.skywalking</groupId>
<artifactId>apm-collector-agentstream</artifactId> <artifactId>apm-collector-agentstream</artifactId>
......
...@@ -31,7 +31,10 @@ public abstract class ClusterModuleDefine extends ModuleDefine { ...@@ -31,7 +31,10 @@ public abstract class ClusterModuleDefine extends ModuleDefine {
client.initialize(); client.initialize();
dataMonitor.setClient(client); dataMonitor.setClient(client);
ClusterModuleRegistrationReader reader = registrationReader(dataMonitor);
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setDataMonitor(dataMonitor); ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setDataMonitor(dataMonitor);
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setReader(reader);
} catch (ConfigParseException | ClientException e) { } catch (ConfigParseException | ClientException e) {
throw new ClusterModuleException(e.getMessage(), e); throw new ClusterModuleException(e.getMessage(), e);
} }
...@@ -51,5 +54,5 @@ public abstract class ClusterModuleDefine extends ModuleDefine { ...@@ -51,5 +54,5 @@ public abstract class ClusterModuleDefine extends ModuleDefine {
public abstract DataMonitor dataMonitor(); public abstract DataMonitor dataMonitor();
public abstract ClusterModuleRegistrationReader registrationReader(); public abstract ClusterModuleRegistrationReader registrationReader(DataMonitor dataMonitor);
} }
...@@ -39,7 +39,7 @@ public class ClusterRedisModuleDefine extends ClusterModuleDefine { ...@@ -39,7 +39,7 @@ public class ClusterRedisModuleDefine extends ClusterModuleDefine {
return new RedisClient(ClusterRedisConfig.HOST, ClusterRedisConfig.PORT); return new RedisClient(ClusterRedisConfig.HOST, ClusterRedisConfig.PORT);
} }
@Override public ClusterModuleRegistrationReader registrationReader() { @Override public ClusterModuleRegistrationReader registrationReader(DataMonitor dataMonitor) {
return new ClusterRedisModuleRegistrationReader(); return new ClusterRedisModuleRegistrationReader(dataMonitor);
} }
} }
package org.skywalking.apm.collector.cluster.redis; package org.skywalking.apm.collector.cluster.redis;
import java.util.List; import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader; import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
/** /**
* @author pengys5 * @author pengys5
*/ */
public class ClusterRedisModuleRegistrationReader implements ClusterModuleRegistrationReader { public class ClusterRedisModuleRegistrationReader extends ClusterModuleRegistrationReader {
@Override public List<String> read(String key) {
return null; public ClusterRedisModuleRegistrationReader(DataMonitor dataMonitor) {
super(dataMonitor);
} }
} }
...@@ -39,7 +39,7 @@ public class ClusterStandaloneModuleDefine extends ClusterModuleDefine { ...@@ -39,7 +39,7 @@ public class ClusterStandaloneModuleDefine extends ClusterModuleDefine {
return new H2Client(); return new H2Client();
} }
@Override public ClusterModuleRegistrationReader registrationReader() { @Override public ClusterModuleRegistrationReader registrationReader(DataMonitor dataMonitor) {
return new ClusterStandaloneModuleRegistrationReader(); return new ClusterStandaloneModuleRegistrationReader(dataMonitor);
} }
} }
package org.skywalking.apm.collector.cluster.standalone; package org.skywalking.apm.collector.cluster.standalone;
import java.util.List; import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader; import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
/** /**
* @author pengys5 * @author pengys5
*/ */
public class ClusterStandaloneModuleRegistrationReader implements ClusterModuleRegistrationReader { public class ClusterStandaloneModuleRegistrationReader extends ClusterModuleRegistrationReader {
@Override public List<String> read(String key) { public ClusterStandaloneModuleRegistrationReader(DataMonitor dataMonitor) {
return null; super(dataMonitor);
} }
} }
...@@ -37,7 +37,21 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher { ...@@ -37,7 +37,21 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher {
@Override public void process(WatchedEvent event) { @Override public void process(WatchedEvent event) {
logger.debug("changed path {}", event.getPath()); logger.debug("changed path {}", event.getPath());
if (listeners.containsKey(event.getPath())) { if (listeners.containsKey(event.getPath())) {
putDataIntoListener(listeners.get(event.getPath()), event.getPath()); List<String> paths = null;
try {
paths = client.getChildren(event.getPath(), true);
listeners.get(event.getPath()).clearData();
if (CollectionUtils.isNotEmpty(paths)) {
for (String serverPath : paths) {
byte[] data = client.getData(event.getPath() + "/" + serverPath, false, null);
String dataStr = new String(data);
logger.debug("path children has been changed, path: {}, data: {}", event.getPath() + "/" + serverPath, dataStr);
listeners.get(event.getPath()).addAddress(serverPath + dataStr);
}
}
} catch (ZookeeperClientException e) {
logger.error(e.getMessage(), e);
}
} }
} }
...@@ -51,16 +65,20 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher { ...@@ -51,16 +65,20 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher {
logger.info("listener path: {}", path); logger.info("listener path: {}", path);
listeners.put(path, listener); listeners.put(path, listener);
createPath(path); createPath(path);
List<String> paths = client.getChildren(path, true);
if (CollectionUtils.isNotEmpty(paths)) {
paths.forEach(subPath -> {
putDataIntoListener(listener, subPath);
});
}
ModuleRegistration.Value value = registration.buildValue(); ModuleRegistration.Value value = registration.buildValue();
setData(path + "/" + value.getHostPort(), value.getData() == null ? "" : value.getData().toString()); String contextPath = value.getContextPath() == null ? "" : value.getContextPath();
client.getChildren(path, true);
String serverPath = path + "/" + value.getHostPort();
listener.addAddress(value.getHostPort() + contextPath);
setData(serverPath, contextPath);
}
@Override public ClusterDataListener getListener(String path) {
path = PathUtils.convertKey2Path(path);
return listeners.get(path);
} }
@Override public void createPath(String path) throws ClientException { @Override public void createPath(String path) throws ClientException {
...@@ -82,14 +100,4 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher { ...@@ -82,14 +100,4 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher {
client.setData(path, value.getBytes(), -1); client.setData(path, value.getBytes(), -1);
} }
} }
private void putDataIntoListener(ClusterDataListener listener, String path) {
try {
byte[] data = client.getData(path, false, null);
String dataStr = String.valueOf(data);
listener.setData(new ClusterDataListener.Data(path, dataStr));
} catch (ZookeeperClientException e) {
logger.error(e.getMessage(), e);
}
}
} }
...@@ -40,7 +40,7 @@ public class ClusterZKModuleDefine extends ClusterModuleDefine { ...@@ -40,7 +40,7 @@ public class ClusterZKModuleDefine extends ClusterModuleDefine {
return new ZookeeperClient(ClusterZKConfig.HOST_PORT, ClusterZKConfig.SESSION_TIMEOUT, (Watcher)dataMonitor); return new ZookeeperClient(ClusterZKConfig.HOST_PORT, ClusterZKConfig.SESSION_TIMEOUT, (Watcher)dataMonitor);
} }
@Override public ClusterModuleRegistrationReader registrationReader() { @Override public ClusterModuleRegistrationReader registrationReader(DataMonitor dataMonitor) {
return new ClusterZKModuleRegistrationReader(); return new ClusterZKModuleRegistrationReader(dataMonitor);
} }
} }
package org.skywalking.apm.collector.cluster.zookeeper; package org.skywalking.apm.collector.cluster.zookeeper;
import java.util.List; import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader; import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
/** /**
* @author pengys5 * @author pengys5
*/ */
public class ClusterZKModuleRegistrationReader implements ClusterModuleRegistrationReader { public class ClusterZKModuleRegistrationReader extends ClusterModuleRegistrationReader {
@Override public List<String> read(String key) {
return null; public ClusterZKModuleRegistrationReader(DataMonitor dataMonitor) {
super(dataMonitor);
} }
} }
...@@ -11,6 +11,8 @@ public interface DataMonitor { ...@@ -11,6 +11,8 @@ public interface DataMonitor {
void addListener(ClusterDataListener listener, ModuleRegistration registration) throws ClientException; void addListener(ClusterDataListener listener, ModuleRegistration registration) throws ClientException;
ClusterDataListener getListener(String path);
void createPath(String path) throws ClientException; void createPath(String path) throws ClientException;
void setData(String path, String value) throws ClientException; void setData(String path, String value) throws ClientException;
......
...@@ -9,31 +9,23 @@ import org.skywalking.apm.collector.core.framework.Listener; ...@@ -9,31 +9,23 @@ import org.skywalking.apm.collector.core.framework.Listener;
*/ */
public abstract class ClusterDataListener implements Listener { public abstract class ClusterDataListener implements Listener {
private final String moduleName; private List<String> addresses;
private List<Data> datas;
public ClusterDataListener(String moduleName) { public ClusterDataListener() {
this.moduleName = moduleName; addresses = new LinkedList<>();
datas = new LinkedList<>();
}
public final String moduleName() {
return moduleName;
} }
public abstract String path(); public abstract String path();
public final void setData(Data data) { public final void addAddress(String address) {
datas.add(data); addresses.add(address);
} }
public static class Data { public final List<String> getAddresses() {
private final String key; return addresses;
private final String value; }
public Data(String key, String value) { public final void clearData() {
this.key = key; addresses.clear();
this.value = value;
}
} }
} }
package org.skywalking.apm.collector.core.cluster; package org.skywalking.apm.collector.core.cluster;
import java.util.List; import java.util.List;
import org.skywalking.apm.collector.core.client.DataMonitor;
/** /**
* @author pengys5 * @author pengys5
*/ */
public interface ClusterModuleRegistrationReader { public abstract class ClusterModuleRegistrationReader {
List<String> read(String key);
private final DataMonitor dataMonitor;
public ClusterModuleRegistrationReader(DataMonitor dataMonitor) {
this.dataMonitor = dataMonitor;
}
public final List<String> read(String path) {
return dataMonitor.getListener(path).getAddresses();
}
} }
package org.skywalking.apm.collector.core.framework;
/**
* @author pengys5
*/
public interface Handler {
}
package org.skywalking.apm.collector.core.module; package org.skywalking.apm.collector.core.module;
import com.google.gson.JsonObject;
/** /**
* @author pengys5 * @author pengys5
*/ */
...@@ -12,12 +10,12 @@ public abstract class ModuleRegistration { ...@@ -12,12 +10,12 @@ public abstract class ModuleRegistration {
public static class Value { public static class Value {
private final String host; private final String host;
private final int port; private final int port;
private final JsonObject data; private final String contextPath;
public Value(String host, int port, JsonObject data) { public Value(String host, int port, String contextPath) {
this.host = host; this.host = host;
this.port = port; this.port = port;
this.data = data; this.contextPath = contextPath;
} }
public String getHost() { public String getHost() {
...@@ -32,8 +30,8 @@ public abstract class ModuleRegistration { ...@@ -32,8 +30,8 @@ public abstract class ModuleRegistration {
return host + ":" + port; return host + ":" + port;
} }
public JsonObject getData() { public String getContextPath() {
return data; return contextPath;
} }
} }
} }
\ No newline at end of file
package org.skywalking.apm.collector.core.server; package org.skywalking.apm.collector.core.server;
import org.skywalking.apm.collector.core.framework.Handler;
/** /**
* @author pengys5 * @author pengys5
*/ */
public interface Server { public interface Server {
void initialize() throws ServerException; void initialize() throws ServerException;
void start() throws ServerException;
void addHandler(Handler handler);
} }
...@@ -3,6 +3,7 @@ package org.skywalking.apm.collector.server.grpc; ...@@ -3,6 +3,7 @@ package org.skywalking.apm.collector.server.grpc;
import io.grpc.netty.NettyServerBuilder; import io.grpc.netty.NettyServerBuilder;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.server.Server; import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException; import org.skywalking.apm.collector.core.server.ServerException;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -33,4 +34,12 @@ public class GRPCServer implements Server { ...@@ -33,4 +34,12 @@ public class GRPCServer implements Server {
} }
logger.info("Server started, host {} listening on {}", host, port); logger.info("Server started, host {} listening on {}", host, port);
} }
@Override public void start() throws ServerException {
}
@Override public void addHandler(Handler handler) {
}
} }
package org.skywalking.apm.collector.server.jetty;
import com.google.gson.JsonElement;
import java.io.IOException;
import java.io.PrintWriter;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletResponse;
import org.skywalking.apm.collector.core.framework.Handler;
/**
* @author pengys5
*/
public abstract class JettyHandler extends HttpServlet implements Handler {
public abstract String pathSpec();
protected final void reply(HttpServletResponse response, JsonElement resJson, int status) throws IOException {
response.setContentType("text/json");
response.setCharacterEncoding("utf-8");
response.setStatus(status);
PrintWriter out = response.getWriter();
out.print(resJson);
out.flush();
out.close();
}
}
package org.skywalking.apm.collector.server.jetty; package org.skywalking.apm.collector.server.jetty;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import javax.servlet.http.HttpServlet;
import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.server.Server; import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException; import org.skywalking.apm.collector.core.server.ServerException;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -17,6 +20,8 @@ public class JettyServer implements Server { ...@@ -17,6 +20,8 @@ public class JettyServer implements Server {
private final String host; private final String host;
private final int port; private final int port;
private final String contextPath; private final String contextPath;
private org.eclipse.jetty.server.Server server;
private ServletContextHandler servletContextHandler;
public JettyServer(String host, int port, String contextPath) { public JettyServer(String host, int port, String contextPath) {
this.host = host; this.host = host;
...@@ -25,13 +30,22 @@ public class JettyServer implements Server { ...@@ -25,13 +30,22 @@ public class JettyServer implements Server {
} }
@Override public void initialize() throws ServerException { @Override public void initialize() throws ServerException {
org.eclipse.jetty.server.Server server = new org.eclipse.jetty.server.Server(new InetSocketAddress(host, port)); server = new org.eclipse.jetty.server.Server(new InetSocketAddress(host, port));
ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); servletContextHandler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
servletContextHandler.setContextPath(contextPath); servletContextHandler.setContextPath(contextPath);
logger.info("http server root context path: {}", contextPath); logger.info("http server root context path: {}", contextPath);
server.setHandler(servletContextHandler); server.setHandler(servletContextHandler);
}
@Override public void addHandler(Handler handler) {
ServletHolder servletHolder = new ServletHolder();
servletHolder.setServlet((HttpServlet)handler);
servletContextHandler.addServlet(servletHolder, ((JettyHandler)handler).pathSpec());
}
@Override public void start() throws ServerException {
try { try {
server.start(); server.start();
} catch (Exception e) { } catch (Exception e) {
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
<module>apm-collector-boot</module> <module>apm-collector-boot</module>
<module>apm-collector-remote</module> <module>apm-collector-remote</module>
<module>apm-collector-stream</module> <module>apm-collector-stream</module>
<module>apm-collector-agentserver</module>
</modules> </modules>
<parent> <parent>
<artifactId>apm</artifactId> <artifactId>apm</artifactId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册