提交 4071b27b 编写于 作者: P pengys5

collector system use zookeeper cluster module start successful

上级 640440d5
package org.skywalking.apm.collector.agentstream;
import org.skywalking.apm.collector.core.framework.Context;
/**
* @author pengys5
*/
public class AgentStreamModuleContext extends Context {
public AgentStreamModuleContext(String groupName) {
super(groupName);
}
}
......@@ -4,20 +4,24 @@ import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DataInitializer;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class AgentStreamModuleDefine extends ModuleDefine {
public abstract class AgentStreamModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(AgentStreamModuleDefine.class);
@Override public final void initialize(Map config) throws DefineException, ClientException {
try {
......@@ -25,18 +29,17 @@ public abstract class AgentStreamModuleDefine extends ModuleDefine {
Server server = server();
server.initialize();
String key = ClusterDataInitializer.BASE_CATALOG + "." + name();
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getWriter().write(key, registration().buildValue());
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) {
throw new AgentStreamModuleException(e.getMessage(), e);
}
}
@Override protected final DataInitializer dataInitializer() {
@Override protected Client createClient(DataMonitor dataMonitor) {
throw new UnsupportedOperationException("");
}
@Override protected final Client createClient() {
throw new UnsupportedOperationException("");
@Override public final boolean defaultModule() {
return true;
}
}
package org.skywalking.apm.collector.agentstream;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
......@@ -17,7 +16,7 @@ public class AgentStreamModuleGroupDefine implements ModuleGroupDefine {
}
@Override public Context groupContext() {
return new ClusterModuleContext(GROUP_NAME);
return new AgentStreamModuleContext(GROUP_NAME);
}
@Override public ModuleInstaller moduleInstaller() {
......
......@@ -2,15 +2,12 @@ package org.skywalking.apm.collector.agentstream;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -23,29 +20,17 @@ public class AgentStreamModuleInstaller implements ModuleInstaller {
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap) throws DefineException, ClientException {
logger.info("beginning cluster module install");
ModuleDefine moduleDefine = null;
if (CollectionUtils.isEmpty(moduleConfig)) {
logger.info("could not configure cluster module, use the default");
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
moduleDefine = moduleDefineEntry.next().getValue();
if (moduleDefine.defaultModule()) {
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize(null);
break;
}
}
} else {
Map.Entry<String, Map> clusterConfigEntry = moduleConfig.entrySet().iterator().next();
moduleDefine = moduleDefineMap.get(clusterConfigEntry.getKey());
moduleDefine.initialize(clusterConfigEntry.getValue());
}
ClusterModuleContext context = new ClusterModuleContext(ClusterModuleGroupDefine.GROUP_NAME);
context.setWriter(((ClusterModuleDefine)moduleDefine).registrationWriter());
logger.info("beginning agent stream module install");
AgentStreamModuleContext context = new AgentStreamModuleContext(AgentStreamModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
logger.info("could not configure cluster module, use the default");
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null);
}
}
}
......@@ -3,6 +3,7 @@ package org.skywalking.apm.collector.agentstream.grpc;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
......@@ -14,16 +15,13 @@ public class AgentStreamGRPCConfigParser implements ModuleConfigParser {
private static final String PORT = "port";
@Override public void parse(Map config) throws ConfigParseException {
AgentStreamGRPCConfig.HOST = (String)config.get(HOST);
if (StringUtils.isEmpty(AgentStreamGRPCConfig.HOST)) {
throw new ConfigParseException("");
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(HOST))) {
AgentStreamGRPCConfig.HOST = "localhost";
}
if (StringUtils.isEmpty(config.get(PORT))) {
throw new ConfigParseException("");
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(PORT))) {
AgentStreamGRPCConfig.PORT = 11800;
} else {
AgentStreamGRPCConfig.PORT = (Integer)config.get(PORT);
}
}
}
package org.skywalking.apm.collector.agentstream.grpc;
import org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
/**
* @author pengys5
*/
public class AgentStreamGRPCDataListener extends ClusterDataListener {
public AgentStreamGRPCDataListener(String moduleName) {
super(moduleName);
}
@Override public String path() {
return ClusterModuleDefine.BASE_CATALOG + "." + AgentStreamModuleGroupDefine.GROUP_NAME + "." + moduleName();
}
}
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.agentstream.grpc;
import org.skywalking.apm.collector.agentstream.AgentStreamModuleDefine;
import org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
......@@ -20,10 +21,6 @@ public class AgentStreamGRPCModuleDefine extends AgentStreamModuleDefine {
return "grpc";
}
@Override public boolean defaultModule() {
return true;
}
@Override protected ModuleConfigParser configParser() {
return new AgentStreamGRPCConfigParser();
}
......@@ -35,4 +32,8 @@ public class AgentStreamGRPCModuleDefine extends AgentStreamModuleDefine {
@Override protected ModuleRegistration registration() {
return new AgentStreamGRPCModuleRegistration();
}
@Override public ClusterDataListener listener() {
return new AgentStreamGRPCDataListener(name());
}
}
......@@ -3,6 +3,7 @@ package org.skywalking.apm.collector.agentstream.jetty;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
......@@ -15,18 +16,17 @@ public class AgentStreamJettyConfigParser implements ModuleConfigParser {
public static final String CONTEXT_PATH = "contextPath";
@Override public void parse(Map config) throws ConfigParseException {
AgentStreamJettyConfig.HOST = (String)config.get(HOST);
AgentStreamJettyConfig.CONTEXT_PATH = "/";
if (StringUtils.isEmpty(AgentStreamJettyConfig.HOST)) {
throw new ConfigParseException("");
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(HOST))) {
AgentStreamJettyConfig.HOST = "localhost";
}
if (StringUtils.isEmpty(config.get(PORT))) {
throw new ConfigParseException("");
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(PORT))) {
AgentStreamJettyConfig.PORT = 12800;
} else {
AgentStreamJettyConfig.PORT = (Integer)config.get(PORT);
}
if (!StringUtils.isEmpty(config.get(CONTEXT_PATH))) {
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(CONTEXT_PATH))) {
AgentStreamJettyConfig.CONTEXT_PATH = (String)config.get(CONTEXT_PATH);
}
}
......
package org.skywalking.apm.collector.agentstream.jetty;
import org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
/**
* @author pengys5
*/
public class AgentStreamJettyDataListener extends ClusterDataListener {
public AgentStreamJettyDataListener(String moduleName) {
super(moduleName);
}
@Override public String path() {
return ClusterModuleDefine.BASE_CATALOG + "." + AgentStreamModuleGroupDefine.GROUP_NAME + "." + moduleName();
}
}
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.agentstream.jetty;
import org.skywalking.apm.collector.agentstream.AgentStreamModuleDefine;
import org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
......@@ -20,10 +21,6 @@ public class AgentStreamJettyModuleDefine extends AgentStreamModuleDefine {
return "jetty";
}
@Override public boolean defaultModule() {
return false;
}
@Override protected ModuleConfigParser configParser() {
return new AgentStreamJettyConfigParser();
}
......@@ -35,4 +32,8 @@ public class AgentStreamJettyModuleDefine extends AgentStreamModuleDefine {
@Override protected ModuleRegistration registration() {
return new AgentStreamJettyModuleRegistration();
}
@Override public ClusterDataListener listener() {
return new AgentStreamJettyDataListener(name());
}
}
......@@ -4,6 +4,7 @@ import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
......@@ -22,15 +23,17 @@ public class ZookeeperClient implements Client {
private final String hostPort;
private final int sessionTimeout;
private final Watcher watcher;
public ZookeeperClient(String hostPort, int sessionTimeout) {
public ZookeeperClient(String hostPort, int sessionTimeout, Watcher watcher) {
this.hostPort = hostPort;
this.sessionTimeout = sessionTimeout;
this.watcher = watcher;
}
@Override public void initialize() throws ZookeeperClientException {
try {
zk = new ZooKeeper(hostPort, sessionTimeout, new ZookeeperDataListener(this));
zk = new ZooKeeper(hostPort, sessionTimeout, watcher);
} catch (IOException e) {
throw new ZookeeperClientException(e.getMessage(), e);
}
......@@ -68,4 +71,12 @@ public class ZookeeperClient implements Client {
throw new ZookeeperClientException(e.getMessage(), e);
}
}
public List<String> getChildren(final String path, boolean watch) throws ZookeeperClientException {
try {
return zk.getChildren(path, watch);
} catch (KeeperException | InterruptedException e) {
throw new ZookeeperClientException(e.getMessage(), e);
}
}
}
package org.skywalking.apm.collector.client.zookeeper;
import java.util.LinkedList;
import java.util.List;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataListener;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ZookeeperDataListener implements DataListener, Watcher {
private final Logger logger = LoggerFactory.getLogger(ZookeeperDataListener.class);
private ZookeeperClient client;
public ZookeeperDataListener(Client client) {
this.client = (ZookeeperClient)client;
}
@Override public void process(WatchedEvent event) {
logger.debug("path {}", event.getPath());
if (StringUtils.isEmpty(event.getPath())) {
return;
}
try {
String data = String.valueOf(client.getData(event.getPath(), false, null));
logger.debug("data {}", data);
} catch (ClientException e) {
logger.error(e.getMessage(), e);
}
}
@Override public void listen() throws ClientException {
for (String itemKey : items()) {
String[] catalogs = itemKey.split("\\.");
StringBuilder pathBuilder = new StringBuilder();
for (String catalog : catalogs) {
pathBuilder.append("/").append(catalog);
}
client.exists(pathBuilder.toString(), true);
}
}
@Override public List<String> items() {
List<String> items = new LinkedList<>();
items.add(ClusterDataInitializer.FOR_AGENT_CATALOG);
items.add(ClusterDataInitializer.FOR_UI_CATALOG);
return items;
}
}
package org.skywalking.apm.collector.core.cluster;
package org.skywalking.apm.collector.cluster;
import java.util.Map;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.cluster.ClusterModuleException;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
......@@ -13,14 +18,20 @@ import org.skywalking.apm.collector.core.server.Server;
*/
public abstract class ClusterModuleDefine extends ModuleDefine {
public static final String BASE_CATALOG = "skywalking";
private Client client;
@Override public final void initialize(Map config) throws ClusterModuleException {
try {
configParser().parse(config);
client = createClient();
DataMonitor dataMonitor = dataMonitor();
client = createClient(dataMonitor);
client.initialize();
dataInitializer().initialize(client);
dataMonitor.setClient(client);
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setDataMonitor(dataMonitor);
} catch (ConfigParseException | ClientException e) {
throw new ClusterModuleException(e.getMessage(), e);
}
......@@ -38,7 +49,7 @@ public abstract class ClusterModuleDefine extends ModuleDefine {
throw new UnsupportedOperationException("Cluster module do not need module registration.");
}
public abstract ClusterModuleRegistrationWriter registrationWriter();
public abstract DataMonitor dataMonitor();
public abstract ClusterModuleRegistrationReader registrationReader();
}
......@@ -4,7 +4,6 @@ import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
......@@ -24,6 +23,9 @@ public class ClusterModuleInstaller implements ModuleInstaller {
Map<String, ModuleDefine> moduleDefineMap) throws DefineException, ClientException {
logger.info("beginning cluster module install");
ClusterModuleContext context = new ClusterModuleContext(ClusterModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
ModuleDefine moduleDefine = null;
if (CollectionUtils.isEmpty(moduleConfig)) {
logger.info("could not configure cluster module, use the default");
......@@ -41,10 +43,5 @@ public class ClusterModuleInstaller implements ModuleInstaller {
moduleDefine = moduleDefineMap.get(clusterConfigEntry.getKey());
moduleDefine.initialize(clusterConfigEntry.getValue());
}
ClusterModuleContext context = new ClusterModuleContext(ClusterModuleGroupDefine.GROUP_NAME);
context.setWriter(((ClusterModuleDefine)moduleDefine).registrationWriter());
CollectorContextHelper.INSTANCE.putContext(context);
}
}
package org.skywalking.apm.collector.cluster.redis;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterRedisDataInitializer extends ClusterDataInitializer {
private final Logger logger = LoggerFactory.getLogger(ClusterRedisDataInitializer.class);
@Override public void addItem(Client client, String itemKey) throws ClientException {
logger.info("add the redis item key \"{}\" exist", itemKey);
}
@Override public boolean existItem(Client client, String itemKey) throws ClientException {
logger.info("assess the redis item key \"{}\" exist", itemKey);
return false;
}
}
package org.skywalking.apm.collector.cluster.redis;
import org.skywalking.apm.collector.client.redis.RedisClient;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
import org.skywalking.apm.collector.core.framework.DataInitializer;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
/**
......@@ -32,19 +31,15 @@ public class ClusterRedisModuleDefine extends ClusterModuleDefine {
return new ClusterRedisConfigParser();
}
@Override protected Client createClient() {
return new RedisClient(ClusterRedisConfig.HOST, ClusterRedisConfig.PORT);
}
@Override protected DataInitializer dataInitializer() {
return new ClusterRedisDataInitializer();
@Override public DataMonitor dataMonitor() {
return null;
}
@Override public ClusterModuleRegistrationWriter registrationWriter() {
return new ClusterRedisModuleRegistrationWriter(getClient());
@Override protected Client createClient(DataMonitor dataMonitor) {
return new RedisClient(ClusterRedisConfig.HOST, ClusterRedisConfig.PORT);
}
@Override public ClusterModuleRegistrationReader registrationReader() {
return null;
return new ClusterRedisModuleRegistrationReader();
}
}
package org.skywalking.apm.collector.cluster.redis;
import java.util.List;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
/**
* @author pengys5
*/
public class ClusterRedisModuleRegistrationReader implements ClusterModuleRegistrationReader {
@Override public List<String> read(String key) {
return null;
}
}
package org.skywalking.apm.collector.cluster.redis;
import org.skywalking.apm.collector.client.redis.RedisClient;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterRedisModuleRegistrationWriter extends ClusterModuleRegistrationWriter {
private final Logger logger = LoggerFactory.getLogger(ClusterRedisModuleRegistrationWriter.class);
public ClusterRedisModuleRegistrationWriter(Client client) {
super(client);
}
@Override public void write(String key, ModuleRegistration.Value value) {
logger.debug("key {}, value {}", key, value.getHost());
key = key + "." + value.getHost() + ":" + value.getPort();
value.getData().addProperty("host", value.getHost());
value.getData().addProperty("port", value.getPort());
((RedisClient)client).setex(key, 120, value.getData().toString());
}
}
package org.skywalking.apm.collector.cluster.standalone;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterStandaloneDataInitializer extends ClusterDataInitializer {
private final Logger logger = LoggerFactory.getLogger(ClusterStandaloneDataInitializer.class);
@Override public void addItem(Client client, String itemKey) throws ClientException {
logger.info("add the h2 item key \"{}\" exist", itemKey);
itemKey = itemKey.replaceAll("\\.", "_");
String sql = "CREATE TABLE " + itemKey + "(ADDRESS VARCHAR(100) PRIMARY KEY,DATA VARCHAR(255));";
try {
((H2Client)client).execute(sql);
} catch (H2ClientException e) {
logger.error(e.getMessage(), e);
}
}
@Override public boolean existItem(Client client, String itemKey) throws ClientException {
return false;
}
}
package org.skywalking.apm.collector.cluster.standalone;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
import org.skywalking.apm.collector.core.framework.DataInitializer;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
/**
......@@ -32,19 +31,15 @@ public class ClusterStandaloneModuleDefine extends ClusterModuleDefine {
return new ClusterStandaloneConfigParser();
}
@Override public Client createClient() {
return new H2Client();
}
@Override protected DataInitializer dataInitializer() {
return new ClusterStandaloneDataInitializer();
@Override public DataMonitor dataMonitor() {
return null;
}
@Override public ClusterModuleRegistrationWriter registrationWriter() {
return new ClusterStandaloneModuleRegistrationWriter(getClient());
@Override protected Client createClient(DataMonitor dataMonitor) {
return new H2Client();
}
@Override public ClusterModuleRegistrationReader registrationReader() {
return null;
return new ClusterStandaloneModuleRegistrationReader();
}
}
package org.skywalking.apm.collector.cluster.standalone;
import java.util.List;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
/**
* @author pengys5
*/
public class ClusterStandaloneModuleRegistrationReader implements ClusterModuleRegistrationReader {
@Override public List<String> read(String key) {
return null;
}
}
package org.skywalking.apm.collector.cluster.standalone;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterStandaloneModuleRegistrationWriter extends ClusterModuleRegistrationWriter {
private final Logger logger = LoggerFactory.getLogger(ClusterStandaloneModuleRegistrationWriter.class);
public ClusterStandaloneModuleRegistrationWriter(Client client) {
super(client);
}
@Override public void write(String key, ModuleRegistration.Value value) {
key = key.replaceAll("\\.", "_");
String hostPort = value.getHost() + ":" + value.getPort();
String sql = "INSERT INTO " + key + " VALUES('" + hostPort + "', '" + value.getData().toString() + "');";
String sql2 = "SELECT * FROM " + key;
try {
((H2Client)client).execute(sql);
((H2Client)client).executeQuery(sql2);
} catch (H2ClientException e) {
logger.error(e.getMessage(), e);
}
}
}
......@@ -15,7 +15,7 @@ public class ClusterZKConfigParser implements ModuleConfigParser {
@Override public void parse(Map config) throws ConfigParseException {
ClusterZKConfig.HOST_PORT = (String)config.get(HOST_PORT);
ClusterZKConfig.SESSION_TIMEOUT = 1000;
ClusterZKConfig.SESSION_TIMEOUT = 3000;
if (StringUtils.isEmpty(ClusterZKConfig.HOST_PORT)) {
throw new ConfigParseException("");
......
package org.skywalking.apm.collector.cluster.zookeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterZKDataInitializer extends ClusterDataInitializer {
private final Logger logger = LoggerFactory.getLogger(ClusterZKDataInitializer.class);
@Override public void addItem(Client client, String itemKey) throws ClientException {
logger.info("add the zookeeper item key \"{}\" exist", itemKey);
ZookeeperClient zkClient = (ZookeeperClient)client;
String[] catalogs = itemKey.split("\\.");
StringBuilder pathBuilder = new StringBuilder();
for (String catalog : catalogs) {
pathBuilder.append("/").append(catalog);
if (zkClient.exists(pathBuilder.toString(), false) == null) {
zkClient.create(pathBuilder.toString(), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
}
@Override public boolean existItem(Client client, String itemKey) throws ClientException {
logger.info("assess the zookeeper item key \"{}\" exist", itemKey);
ZookeeperClient zkClient = (ZookeeperClient)client;
String[] catalogs = itemKey.split("\\.");
StringBuilder pathBuilder = new StringBuilder();
for (String catalog : catalogs) {
pathBuilder.append("/").append(catalog);
}
// if (zkClient.exists(pathBuilder.toString(), false) == null) {
// return false;
// } else {
return true;
// }
}
}
package org.skywalking.apm.collector.cluster.zookeeper;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperClientException;
import org.skywalking.apm.collector.client.zookeeper.util.PathUtils;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterZKDataMonitor implements DataMonitor, Watcher {
private final Logger logger = LoggerFactory.getLogger(ClusterZKDataMonitor.class);
private ZookeeperClient client;
private Map<String, ClusterDataListener> listeners;
public ClusterZKDataMonitor() {
listeners = new LinkedHashMap<>();
}
@Override public void process(WatchedEvent event) {
logger.debug("changed path {}", event.getPath());
if (listeners.containsKey(event.getPath())) {
putDataIntoListener(listeners.get(event.getPath()), event.getPath());
}
}
@Override public void setClient(Client client) {
this.client = (ZookeeperClient)client;
}
@Override
public void addListener(ClusterDataListener listener, ModuleRegistration registration) throws ClientException {
String path = PathUtils.convertKey2Path(listener.path());
logger.info("listener path: {}", path);
listeners.put(path, listener);
createPath(path);
List<String> paths = client.getChildren(path, true);
if (CollectionUtils.isNotEmpty(paths)) {
paths.forEach(subPath -> {
putDataIntoListener(listener, subPath);
});
}
ModuleRegistration.Value value = registration.buildValue();
setData(path + "/" + value.getHostPort(), value.getData() == null ? "" : value.getData().toString());
}
@Override public void createPath(String path) throws ClientException {
String[] paths = path.replaceFirst("/", "").split("/");
StringBuilder pathBuilder = new StringBuilder();
for (String subPath : paths) {
pathBuilder.append("/").append(subPath);
if (client.exists(pathBuilder.toString(), false) == null) {
client.create(pathBuilder.toString(), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
}
@Override public void setData(String path, String value) throws ClientException {
if (client.exists(path, false) == null) {
client.create(path, value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} else {
client.setData(path, value.getBytes(), -1);
}
}
private void putDataIntoListener(ClusterDataListener listener, String path) {
try {
byte[] data = client.getData(path, false, null);
String dataStr = String.valueOf(data);
listener.setData(new ClusterDataListener.Data(path, dataStr));
} catch (ZookeeperClientException e) {
logger.error(e.getMessage(), e);
}
}
}
package org.skywalking.apm.collector.cluster.zookeeper;
import org.apache.zookeeper.Watcher;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
import org.skywalking.apm.collector.core.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
/**
......@@ -32,19 +32,15 @@ public class ClusterZKModuleDefine extends ClusterModuleDefine {
return new ClusterZKConfigParser();
}
@Override protected Client createClient() {
return new ZookeeperClient(ClusterZKConfig.HOST_PORT, ClusterZKConfig.SESSION_TIMEOUT);
@Override public DataMonitor dataMonitor() {
return new ClusterZKDataMonitor();
}
@Override protected ClusterDataInitializer dataInitializer() {
return new ClusterZKDataInitializer();
}
@Override public ClusterModuleRegistrationWriter registrationWriter() {
return new ClusterZKModuleRegistrationWriter(getClient());
@Override protected Client createClient(DataMonitor dataMonitor) {
return new ZookeeperClient(ClusterZKConfig.HOST_PORT, ClusterZKConfig.SESSION_TIMEOUT, (Watcher)dataMonitor);
}
@Override public ClusterModuleRegistrationReader registrationReader() {
return null;
return new ClusterZKModuleRegistrationReader();
}
}
package org.skywalking.apm.collector.cluster.zookeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient;
import org.skywalking.apm.collector.client.zookeeper.util.PathUtils;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterZKModuleRegistrationWriter extends ClusterModuleRegistrationWriter {
private final Logger logger = LoggerFactory.getLogger(ClusterZKModuleRegistrationWriter.class);
public ClusterZKModuleRegistrationWriter(Client client) {
super(client);
}
@Override public void write(String key, ModuleRegistration.Value value) throws ClientException {
logger.info("cluster zookeeper register key: {}, value: {}", key, value);
String workerUIPath = PathUtils.convertKey2Path(key) + "/" + value.getHost() + ":" + value.getPort();
Stat stat = ((ZookeeperClient)client).exists(workerUIPath, false);
if (stat == null) {
((ZookeeperClient)client).create(workerUIPath, value.getData() == null ? null : value.getData().toString().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} else {
((ZookeeperClient)client).setData(workerUIPath, value.getData() == null ? null : value.getData().toString().getBytes(), -1);
}
}
}
package org.skywalking.apm.collector.core.client;
import java.util.List;
/**
* @author pengys5
*/
public interface DataListener {
List<String> items();
void listen() throws ClientException;
}
package org.skywalking.apm.collector.core.client;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public interface DataMonitor {
void setClient(Client client);
void addListener(ClusterDataListener listener, ModuleRegistration registration) throws ClientException;
void createPath(String path) throws ClientException;
void setData(String path, String value) throws ClientException;
}
package org.skywalking.apm.collector.core.cluster;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.DataInitializer;
/**
* @author pengys5
*/
public abstract class ClusterDataInitializer implements DataInitializer {
public static final String BASE_CATALOG = "skywalking";
public static final String FOR_UI_CATALOG = BASE_CATALOG + ".ui";
public static final String FOR_AGENT_CATALOG = BASE_CATALOG + ".agent";
@Override public final void initialize(Client client) throws ClientException {
if (!existItem(client, FOR_UI_CATALOG)) {
addItem(client, FOR_UI_CATALOG);
}
if (!existItem(client, FOR_AGENT_CATALOG)) {
addItem(client, FOR_AGENT_CATALOG);
}
}
}
package org.skywalking.apm.collector.core.cluster;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.core.framework.Listener;
/**
* @author pengys5
*/
public abstract class ClusterDataListener implements Listener {
private final String moduleName;
private List<Data> datas;
public ClusterDataListener(String moduleName) {
this.moduleName = moduleName;
datas = new LinkedList<>();
}
public final String moduleName() {
return moduleName;
}
public abstract String path();
public final void setData(Data data) {
datas.add(data);
}
public static class Data {
private final String key;
private final String value;
public Data(String key, String value) {
this.key = key;
this.value = value;
}
}
}
package org.skywalking.apm.collector.core.cluster;
/**
* @author pengys5
*/
public interface ClusterDataListenerDefine {
ClusterDataListener listener();
}
package org.skywalking.apm.collector.core.cluster;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.framework.Context;
/**
......@@ -11,17 +12,9 @@ public class ClusterModuleContext extends Context {
super(groupName);
}
private ClusterModuleRegistrationWriter writer;
private ClusterModuleRegistrationReader reader;
public ClusterModuleRegistrationWriter getWriter() {
return writer;
}
public void setWriter(ClusterModuleRegistrationWriter writer) {
this.writer = writer;
}
private DataMonitor dataMonitor;
public ClusterModuleRegistrationReader getReader() {
return reader;
......@@ -30,4 +23,12 @@ public class ClusterModuleContext extends Context {
public void setReader(ClusterModuleRegistrationReader reader) {
this.reader = reader;
}
public DataMonitor getDataMonitor() {
return dataMonitor;
}
public void setDataMonitor(DataMonitor dataMonitor) {
this.dataMonitor = dataMonitor;
}
}
package org.skywalking.apm.collector.core.cluster;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public abstract class ClusterModuleRegistrationWriter {
protected final Client client;
public ClusterModuleRegistrationWriter(Client client) {
this.client = client;
}
public abstract void write(String key, ModuleRegistration.Value value) throws ClientException;
}
package org.skywalking.apm.collector.core.framework;
/**
* @author pengys5
*/
public interface Listener {
}
package org.skywalking.apm.collector.core.module;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.framework.DataInitializer;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.framework.Define;
import org.skywalking.apm.collector.core.server.Server;
......@@ -16,11 +16,9 @@ public abstract class ModuleDefine implements Define {
protected abstract ModuleConfigParser configParser();
protected abstract Client createClient();
protected abstract Client createClient(DataMonitor dataMonitor);
protected abstract Server server();
protected abstract DataInitializer dataInitializer();
protected abstract ModuleRegistration registration();
}
......@@ -28,6 +28,10 @@ public abstract class ModuleRegistration {
return port;
}
public String getHostPort() {
return host + ":" + port;
}
public JsonObject getData() {
return data;
}
......
package org.skywalking.apm.collector.core.util;
import java.util.List;
import java.util.Map;
/**
......@@ -10,4 +11,12 @@ public class CollectionUtils {
public static boolean isEmpty(Map map) {
return map == null || map.size() == 0;
}
public static boolean isEmpty(List list) {
return list == null || list.size() == 0;
}
public static boolean isNotEmpty(List list) {
return !isEmpty(list);
}
}
......@@ -7,4 +7,8 @@ public class ObjectUtils {
public static boolean isEmpty(Object obj) {
return obj == null;
}
public static boolean isNotEmpty(Object obj) {
return !isEmpty(obj);
}
}
......@@ -10,4 +10,8 @@ public class StringUtils {
public static boolean isEmpty(Object str) {
return str == null || EMPTY_STRING.equals(str);
}
public static boolean isNotEmpty(Object str) {
return !isEmpty(str);
}
}
cluster:
# zookeeper:
# hostPort: localhost:2181
# sessionTimeout: 1000
zookeeper:
hostPort: localhost:2181
sessionTimeout: 100000
# redis:
# host: localhost
# port: 6379
......
......@@ -6,7 +6,7 @@
</layout>
</appender>
<logger name="com.base22" level="TRACE"/>
<logger name="org.apache.zookeeper.ClientCnxn" level="INFO"/>
<root level="debug">
<appender-ref ref="STDOUT"/>
......
package org.skywalking.apm.collector.queue;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.framework.DataInitializer;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
......@@ -15,11 +15,7 @@ public abstract class QueueModuleDefine extends ModuleDefine {
throw new UnsupportedOperationException("");
}
@Override protected final Client createClient() {
throw new UnsupportedOperationException("");
}
@Override protected final DataInitializer dataInitializer() {
@Override protected Client createClient(DataMonitor dataMonitor) {
throw new UnsupportedOperationException("");
}
......
......@@ -17,7 +17,7 @@ public class QueueModuleInstaller implements ModuleInstaller {
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap) throws DefineException, ClientException {
logger.info("beginning cluster module install");
logger.info("beginning queue module install");
}
}
......@@ -3,7 +3,7 @@ package org.skywalking.apm.collector.remote.grpc;
import java.util.Map;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.DataInitializer;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
......@@ -27,7 +27,7 @@ public class RemoteGRPCModuleDefine extends RemoteModuleDefine {
return null;
}
@Override protected Client createClient() {
@Override protected Client createClient(DataMonitor dataMonitor) {
return null;
}
......@@ -35,10 +35,6 @@ public class RemoteGRPCModuleDefine extends RemoteModuleDefine {
return null;
}
@Override protected DataInitializer dataInitializer() {
return null;
}
@Override protected ModuleRegistration registration() {
return null;
}
......
......@@ -28,16 +28,9 @@ public class GRPCServer implements Server {
NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forAddress(address);
try {
io.grpc.Server server = nettyServerBuilder.build().start();
blockUntilShutdown(server);
} catch (InterruptedException | IOException e) {
} catch (IOException e) {
throw new GRPCServerException(e.getMessage(), e);
}
logger.info("Server started, host {} listening on {}", host, port);
}
private void blockUntilShutdown(io.grpc.Server server) throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
}
......@@ -3,13 +3,14 @@ package org.skywalking.apm.collector.stream;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.queue.QueueCreator;
import org.skywalking.apm.collector.core.queue.QueueEventHandler;
import org.skywalking.apm.collector.core.queue.QueueExecutor;
import org.skywalking.apm.collector.queue.QueueModuleContext;
import org.skywalking.apm.collector.queue.QueueModuleGroupDefine;
/**
* @author pengys5
*/
public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAsyncWorker> extends AbstractLocalWorkerProvider<T> {
public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAsyncWorker & QueueExecutor> extends AbstractLocalWorkerProvider<T> {
public abstract int queueSize();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册