提交 972b6665 编写于 作者: P pengys5

Cluster module with zookeeper, redis, h2.

#266
上级 116fca8a
package org.skywalking.apm.collector.client.h2;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
/**
* @author pengys5
*/
public class H2Client implements Client {
@Override public void initialize() throws ClientException {
private Connection conn;
@Override public void initialize() throws H2ClientException {
try {
Class.forName("org.h2.Driver");
conn = DriverManager.getConnection("jdbc:h2:mem:collector");
} catch (ClassNotFoundException | SQLException e) {
throw new H2ClientException(e.getMessage(), e);
}
@Override public void insert(String path) throws ClientException {
}
@Override public void update() {
public void execute(String sql) throws H2ClientException {
Statement statement = null;
try {
statement = conn.createStatement();
statement.execute(sql);
statement.closeOnCompletion();
} catch (SQLException e) {
throw new H2ClientException(e.getMessage(), e);
}
@Override public String select(String path) throws ClientException {
return null;
}
@Override public void delete() {
public void executeQuery(String sql) throws H2ClientException {
Statement statement = null;
try {
statement = conn.createStatement();
ResultSet rs = statement.executeQuery(sql);
while (rs.next()) {
System.out.println(rs.getString("ADDRESS") + "," + rs.getString("DATA"));
}
@Override public boolean exist(String path) throws ClientException {
return false;
statement.closeOnCompletion();
} catch (SQLException e) {
throw new H2ClientException(e.getMessage(), e);
}
@Override public void listen(String path) throws ClientException {
}
}
package org.skywalking.apm.collector.client.h2;
import org.skywalking.apm.collector.core.client.ClientException;
/**
* @author pengys5
*/
public class H2ClientException extends ClientException {
public H2ClientException(String message) {
super(message);
}
public H2ClientException(String message, Throwable cause) {
super(message, cause);
}
}
......@@ -11,4 +11,12 @@
<artifactId>client-redis</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -2,37 +2,28 @@ package org.skywalking.apm.collector.client.redis;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import redis.clients.jedis.Jedis;
/**
* @author pengys5
*/
public class RedisClient implements Client {
@Override public void initialize() throws ClientException {
}
private Jedis jedis;
@Override public void insert(String path) throws ClientException {
private final String host;
private final int port;
public RedisClient(String host, int port) {
this.host = host;
this.port = port;
}
@Override public void update() {
}
@Override public String select(String path) throws ClientException {
return null;
}
@Override public void delete() {
}
@Override public boolean exist(String path) throws ClientException {
return false;
@Override public void initialize() throws ClientException {
jedis = new Jedis(host, port);
}
@Override public void listen(String path) throws ClientException {
public void setex(String key, int seconds, String value) {
jedis.setex(key, seconds, value);
}
}
package org.skywalking.apm.collector.client.redis;
import org.skywalking.apm.collector.core.client.ClientException;
/**
* @author pengys5
*/
public class RedisClientException extends ClientException {
public RedisClientException(String message) {
super(message);
}
public RedisClientException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.client.zookeeper;
import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -20,57 +20,50 @@ public class ZookeeperClient implements Client {
private ZooKeeper zk;
private final String hostPort;
private final int sessionTimeout;
public ZookeeperClient(String hostPort, int sessionTimeout) {
this.hostPort = hostPort;
this.sessionTimeout = sessionTimeout;
}
@Override public void initialize() throws ZookeeperClientException {
try {
zk = new ZooKeeper(ZookeeperConfig.hostPort, ZookeeperConfig.sessionTimeout, new ZookeeperDataListener(this));
zk = new ZooKeeper(hostPort, sessionTimeout, new ZookeeperDataListener(this));
} catch (IOException e) {
throw new ZookeeperClientException(e.getMessage(), e);
}
}
@Override public void insert(String path) throws ZookeeperClientException {
logger.info("add the zookeeper path \"{}\"", path);
public void create(final String path, byte data[], List<ACL> acl,
CreateMode createMode) throws ZookeeperClientException {
try {
zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create(path, data, acl, createMode);
} catch (KeeperException | InterruptedException e) {
throw new ZookeeperClientException(e.getMessage(), e);
}
}
@Override public void update() {
}
@Override public String select(String path) throws ZookeeperClientException {
logger.info("get the zookeeper data from path \"{}\"", path);
public Stat exists(final String path, boolean watch) throws ZookeeperClientException {
try {
return zk.getData(path, false, null).toString();
return zk.exists(path, watch);
} catch (KeeperException | InterruptedException e) {
throw new ZookeeperClientException(e.getMessage(), e);
}
}
@Override public void delete() {
}
@Override public boolean exist(String path) throws ZookeeperClientException {
logger.info("assess the zookeeper path \"{}\" exist", path);
public byte[] getData(String path, boolean watch, Stat stat) throws ZookeeperClientException {
try {
Stat stat = zk.exists(path, false);
if (ObjectUtils.isEmpty(stat)) {
return false;
} else {
return true;
}
return zk.getData(path, watch, stat);
} catch (KeeperException | InterruptedException e) {
throw new ZookeeperClientException(e.getMessage(), e);
}
}
@Override public void listen(String path) throws ZookeeperClientException {
public Stat setData(final String path, byte data[], int version) throws ZookeeperClientException {
try {
zk.exists(path, true);
return zk.setData(path, data, version);
} catch (KeeperException | InterruptedException e) {
throw new ZookeeperClientException(e.getMessage(), e);
}
......
package org.skywalking.apm.collector.client.zookeeper;
/**
* @author pengys5
*/
public class ZookeeperConfig {
public static String hostPort;
public static int sessionTimeout;
}
......@@ -19,10 +19,10 @@ public class ZookeeperDataListener implements DataListener, Watcher {
private final Logger logger = LoggerFactory.getLogger(ZookeeperDataListener.class);
private Client client;
private ZookeeperClient client;
public ZookeeperDataListener(Client client) {
this.client = client;
this.client = (ZookeeperClient)client;
}
@Override public void process(WatchedEvent event) {
......@@ -32,7 +32,7 @@ public class ZookeeperDataListener implements DataListener, Watcher {
}
try {
String data = client.select(event.getPath());
String data = String.valueOf(client.getData(event.getPath(), false, null));
logger.debug("data {}", data);
} catch (ClientException e) {
logger.error(e.getMessage(), e);
......@@ -46,7 +46,7 @@ public class ZookeeperDataListener implements DataListener, Watcher {
for (String catalog : catalogs) {
pathBuilder.append("/").append(catalog);
}
client.listen(pathBuilder.toString());
client.exists(pathBuilder.toString(), true);
}
}
......
package org.skywalking.apm.collector.client.zookeeper.util;
/**
* @author pengys5
*/
public class PathUtils {
public static String convertKey2Path(String key) {
String[] keys = key.split("\\.");
StringBuilder pathBuilder = new StringBuilder();
for (String subPath : keys) {
pathBuilder.append("/").append(subPath);
}
return pathBuilder.toString();
}
}
package org.skywalking.apm.collector.cluster.redis;
/**
* @author pengys5
*/
public class ClusterRedisConfig {
public static String HOST;
public static int PORT;
}
......@@ -3,13 +3,21 @@ package org.skywalking.apm.collector.cluster.redis;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class ClusterRedisConfigParser implements ModuleConfigParser {
@Override public void parse(Map config) throws ConfigParseException {
private final String HOST = "host";
private final String PORT = "port";
@Override public void parse(Map config) throws ConfigParseException {
ClusterRedisConfig.HOST = (String)config.get(HOST);
ClusterRedisConfig.PORT = ((Integer)config.get(PORT));
if (StringUtils.isEmpty(ClusterRedisConfig.HOST) || ClusterRedisConfig.PORT == 0) {
throw new ConfigParseException("");
}
}
}
......@@ -3,16 +3,22 @@ package org.skywalking.apm.collector.cluster.redis;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterRedisDataInitializer extends ClusterDataInitializer {
@Override public void addItem(Client client, String itemKey) throws ClientException {
private final Logger logger = LoggerFactory.getLogger(ClusterRedisDataInitializer.class);
@Override public void addItem(Client client, String itemKey) throws ClientException {
logger.info("add the redis item key \"{}\" exist", itemKey);
}
@Override public boolean existItem(Client client, String itemKey) throws ClientException {
logger.info("assess the redis item key \"{}\" exist", itemKey);
return false;
}
}
......@@ -29,8 +29,8 @@ public class ClusterRedisModuleDefine extends ClusterModuleDefine {
return new ClusterRedisConfigParser();
}
@Override protected Client client() {
return new RedisClient();
@Override protected Client createClient() {
return new RedisClient(ClusterRedisConfig.HOST, ClusterRedisConfig.PORT);
}
@Override protected DataInitializer dataInitializer() {
......@@ -38,6 +38,6 @@ public class ClusterRedisModuleDefine extends ClusterModuleDefine {
}
@Override protected ClusterModuleRegistrationWriter registrationWriter() {
return new ClusterRedisModuleRegistrationWriter();
return new ClusterRedisModuleRegistrationWriter(getClient());
}
}
package org.skywalking.apm.collector.cluster.redis;
import org.skywalking.apm.collector.client.redis.RedisClient;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterRedisModuleRegistrationWriter implements ClusterModuleRegistrationWriter {
public class ClusterRedisModuleRegistrationWriter extends ClusterModuleRegistrationWriter {
@Override public void write(String key, String value) {
private final Logger logger = LoggerFactory.getLogger(ClusterRedisModuleRegistrationWriter.class);
public ClusterRedisModuleRegistrationWriter(Client client) {
super(client);
}
@Override public void write(String key, ModuleRegistration.Value value) {
logger.debug("key {}, value {}", key, value.getHost());
key = key + "." + value.getHost() + ":" + value.getPort();
value.getData().addProperty("host", value.getHost());
value.getData().addProperty("port", value.getPort());
((RedisClient)client).setex(key, 120, value.getData().toString());
}
}
......@@ -9,6 +9,5 @@ import org.skywalking.apm.collector.core.module.ModuleConfigParser;
*/
public class ClusterStandaloneConfigParser implements ModuleConfigParser {
@Override public void parse(Map config) throws ConfigParseException {
}
}
package org.skywalking.apm.collector.cluster.standalone;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterStandaloneDataInitializer extends ClusterDataInitializer {
@Override public void addItem(Client client, String itemKey) throws ClientException {
private final Logger logger = LoggerFactory.getLogger(ClusterStandaloneDataInitializer.class);
@Override public void addItem(Client client, String itemKey) throws ClientException {
logger.info("add the h2 item key \"{}\" exist", itemKey);
itemKey = itemKey.replaceAll("\\.", "_");
String sql = "CREATE TABLE " + itemKey + "(ADDRESS VARCHAR(100) PRIMARY KEY,DATA VARCHAR(255));";
try {
((H2Client)client).execute(sql);
} catch (H2ClientException e) {
logger.error(e.getMessage(), e);
}
}
@Override public boolean existItem(Client client, String itemKey) throws ClientException {
......
......@@ -29,7 +29,7 @@ public class ClusterStandaloneModuleDefine extends ClusterModuleDefine {
return new ClusterStandaloneConfigParser();
}
@Override protected Client client() {
@Override public Client createClient() {
return new H2Client();
}
......@@ -38,6 +38,6 @@ public class ClusterStandaloneModuleDefine extends ClusterModuleDefine {
}
@Override protected ClusterModuleRegistrationWriter registrationWriter() {
return new ClusterStandaloneModuleRegistrationWriter();
return new ClusterStandaloneModuleRegistrationWriter(getClient());
}
}
package org.skywalking.apm.collector.cluster.standalone;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterStandaloneModuleRegistrationWriter implements ClusterModuleRegistrationWriter {
public class ClusterStandaloneModuleRegistrationWriter extends ClusterModuleRegistrationWriter {
@Override public void write(String key, String value) {
private final Logger logger = LoggerFactory.getLogger(ClusterStandaloneModuleRegistrationWriter.class);
public ClusterStandaloneModuleRegistrationWriter(Client client) {
super(client);
}
@Override public void write(String key, ModuleRegistration.Value value) {
key = key.replaceAll("\\.", "_");
String hostPort = value.getHost() + ":" + value.getPort();
String sql = "INSERT INTO " + key + " VALUES('" + hostPort + "', '" + value.getData().toString() + "');";
String sql2 = "SELECT * FROM " + key;
try {
((H2Client)client).execute(sql);
((H2Client)client).executeQuery(sql2);
} catch (H2ClientException e) {
logger.error(e.getMessage(), e);
}
}
}
package org.skywalking.apm.collector.cluster.zookeeper;
/**
* @author pengys5
*/
public class ClusterZKConfig {
public static String HOST_PORT;
public static int SESSION_TIMEOUT;
}
package org.skywalking.apm.collector.cluster.zookeeper;
import java.util.Map;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperConfig;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.StringUtils;
......@@ -18,12 +17,11 @@ public class ClusterZKConfigParser implements ModuleConfigParser {
if (StringUtils.isEmpty(config.get(HOST_PORT))) {
throw new ConfigParseException("");
}
ZookeeperConfig.hostPort = (String)config.get(HOST_PORT);
ClusterZKConfig.HOST_PORT = (String)config.get(HOST_PORT);
ClusterZKConfig.SESSION_TIMEOUT = 1000;
if (StringUtils.isEmpty(config.get(SESSION_TIMEOUT))) {
ZookeeperConfig.sessionTimeout = 1000;
} else {
ZookeeperConfig.sessionTimeout = (Integer)config.get(SESSION_TIMEOUT);
if (!StringUtils.isEmpty(config.get(SESSION_TIMEOUT))) {
ClusterZKConfig.SESSION_TIMEOUT = (Integer)config.get(SESSION_TIMEOUT);
}
}
}
package org.skywalking.apm.collector.cluster.zookeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
......@@ -15,23 +18,32 @@ public class ClusterZKDataInitializer extends ClusterDataInitializer {
@Override public void addItem(Client client, String itemKey) throws ClientException {
logger.info("add the zookeeper item key \"{}\" exist", itemKey);
ZookeeperClient zkClient = (ZookeeperClient)client;
String[] catalogs = itemKey.split("\\.");
StringBuilder pathBuilder = new StringBuilder();
for (String catalog : catalogs) {
pathBuilder.append("/").append(catalog);
if (!client.exist(pathBuilder.toString())) {
client.insert(pathBuilder.toString());
if (zkClient.exists(pathBuilder.toString(), false) == null) {
zkClient.create(pathBuilder.toString(), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
}
@Override public boolean existItem(Client client, String itemKey) throws ClientException {
logger.info("assess the zookeeper item key \"{}\" exist", itemKey);
ZookeeperClient zkClient = (ZookeeperClient)client;
String[] catalogs = itemKey.split("\\.");
StringBuilder pathBuilder = new StringBuilder();
for (String catalog : catalogs) {
pathBuilder.append("/").append(catalog);
}
return client.exist(pathBuilder.toString());
if (zkClient.exists(pathBuilder.toString(), false) == null) {
return false;
} else {
return true;
}
}
}
......@@ -13,7 +13,7 @@ import org.skywalking.apm.collector.core.module.ModuleGroup;
*/
public class ClusterZKModuleDefine extends ClusterModuleDefine {
@Override public ModuleGroup group() {
@Override protected ModuleGroup group() {
return ModuleGroup.Cluster;
}
......@@ -25,19 +25,19 @@ public class ClusterZKModuleDefine extends ClusterModuleDefine {
return false;
}
@Override public ModuleConfigParser configParser() {
@Override protected ModuleConfigParser configParser() {
return new ClusterZKConfigParser();
}
@Override public Client client() {
return new ZookeeperClient();
@Override protected Client createClient() {
return new ZookeeperClient(ClusterZKConfig.HOST_PORT, ClusterZKConfig.SESSION_TIMEOUT);
}
@Override public ClusterDataInitializer dataInitializer() {
@Override protected ClusterDataInitializer dataInitializer() {
return new ClusterZKDataInitializer();
}
@Override protected ClusterModuleRegistrationWriter registrationWriter() {
return new ClusterZKModuleRegistrationWriter();
return new ClusterZKModuleRegistrationWriter(getClient());
}
}
package org.skywalking.apm.collector.cluster.zookeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient;
import org.skywalking.apm.collector.client.zookeeper.util.PathUtils;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterZKModuleRegistrationWriter implements ClusterModuleRegistrationWriter {
public class ClusterZKModuleRegistrationWriter extends ClusterModuleRegistrationWriter {
@Override public void write(String key, String value) {
private final Logger logger = LoggerFactory.getLogger(ClusterZKModuleRegistrationWriter.class);
public ClusterZKModuleRegistrationWriter(Client client) {
super(client);
}
@Override public void write(String key, ModuleRegistration.Value value) throws ClientException {
logger.info("cluster zookeeper register key: {}, value: {}", key, value);
String workerUIPath = PathUtils.convertKey2Path(key) + "/" + value.getHost() + ":" + value.getPort();
Stat stat = ((ZookeeperClient)client).exists(workerUIPath, false);
if (stat == null) {
((ZookeeperClient)client).create(workerUIPath, value.getData() == null ? null : value.getData().toString().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} else {
((ZookeeperClient)client).setData(workerUIPath, value.getData() == null ? null : value.getData().toString().getBytes(), -1);
}
}
}
......@@ -4,7 +4,6 @@ import java.io.FileNotFoundException;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperConfig;
import org.skywalking.apm.collector.core.cluster.ClusterModuleException;
import org.yaml.snakeyaml.Yaml;
......@@ -25,8 +24,5 @@ public class ClusterZKModuleDefineTestCase {
public void testInitialize() throws ClusterModuleException {
ClusterZKModuleDefine define = new ClusterZKModuleDefine();
define.initialize(config);
System.out.println(ZookeeperConfig.hostPort);
System.out.println(ZookeeperConfig.sessionTimeout);
}
}
......@@ -23,5 +23,10 @@
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.1</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -4,18 +4,5 @@ package org.skywalking.apm.collector.core.client;
* @author pengys5
*/
public interface Client {
void initialize() throws ClientException;
void insert(String path) throws ClientException;
void update();
String select(String path) throws ClientException;
void delete();
boolean exist(String path) throws ClientException;
void listen(String path) throws ClientException;
}
......@@ -9,7 +9,7 @@ import org.skywalking.apm.collector.core.framework.DataInitializer;
*/
public abstract class ClusterDataInitializer implements DataInitializer {
public static final String BASE_CATALOG = "collector.cluster";
public static final String BASE_CATALOG = "skywalking";
public static final String FOR_UI_CATALOG = BASE_CATALOG + ".ui";
public static final String FOR_AGENT_CATALOG = BASE_CATALOG + ".agent";
......
......@@ -4,13 +4,5 @@ package org.skywalking.apm.collector.core.cluster;
* @author pengys5
*/
public class ClusterModuleContext {
private ClusterModuleRegistrationWriter writer;
public ClusterModuleRegistrationWriter getWriter() {
return writer;
}
public void setWriter(ClusterModuleRegistrationWriter writer) {
this.writer = writer;
}
public static ClusterModuleRegistrationWriter writer;
}
......@@ -13,10 +13,12 @@ import org.skywalking.apm.collector.core.server.Server;
*/
public abstract class ClusterModuleDefine extends ModuleDefine {
private Client client;
@Override public final void initialize(Map config) throws ClusterModuleException {
try {
configParser().parse(config);
Client client = client();
client = createClient();
client.initialize();
dataInitializer().initialize(client);
} catch (ConfigParseException | ClientException e) {
......@@ -24,6 +26,10 @@ public abstract class ClusterModuleDefine extends ModuleDefine {
}
}
public final Client getClient() {
return this.client;
}
@Override public final Server server() {
throw new UnsupportedOperationException("");
}
......
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.core.cluster;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
......@@ -17,22 +18,26 @@ public class ClusterModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(ClusterModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap) throws DefineException {
Map<String, ModuleDefine> moduleDefineMap) throws DefineException, ClientException {
logger.info("beginning cluster module install");
ModuleDefine moduleDefine = null;
if (CollectionUtils.isEmpty(moduleConfig)) {
logger.info("could not configure cluster module, use the default");
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
moduleDefine = moduleDefineEntry.next().getValue();
if (moduleDefine.defaultModule()) {
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize(null);
break;
}
}
} else {
Map.Entry<String, Map> clusterConfigEntry = moduleConfig.entrySet().iterator().next();
ModuleDefine moduleDefine = moduleDefineMap.get(clusterConfigEntry.getKey());
moduleDefine = moduleDefineMap.get(clusterConfigEntry.getKey());
moduleDefine.initialize(clusterConfigEntry.getValue());
}
ClusterModuleContext.writer = ((ClusterModuleDefine)moduleDefine).registrationWriter();
}
}
package org.skywalking.apm.collector.core.cluster;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public interface ClusterModuleRegistrationWriter {
void write(String key, String value);
public abstract class ClusterModuleRegistrationWriter {
protected final Client client;
public ClusterModuleRegistrationWriter(Client client) {
this.client = client;
}
public abstract void write(String key, ModuleRegistration.Value value) throws ClientException;
}
package org.skywalking.apm.collector.core.framework;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.module.ModuleConfigLoader;
import org.skywalking.apm.collector.core.module.ModuleDefine;
......@@ -17,7 +18,7 @@ public class CollectorStarter implements Starter {
private final Logger logger = LoggerFactory.getLogger(CollectorStarter.class);
@Override public void start() throws ConfigException, DefineException {
@Override public void start() throws ConfigException, DefineException, ClientException {
ModuleConfigLoader configLoader = new ModuleConfigLoader();
Map<String, Map> configuration = configLoader.load();
......
package org.skywalking.apm.collector.core.framework;
/**
* @author pengys5
*/
public class Context {
}
package org.skywalking.apm.collector.core.framework;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
/**
* @author pengys5
*/
public interface Define {
void initialize(Map config) throws DefineException;
void initialize(Map config) throws DefineException, ClientException;
String name();
}
package org.skywalking.apm.collector.core.module;
import java.util.Map;
import org.skywalking.apm.collector.core.config.Config;
import org.skywalking.apm.collector.core.config.ConfigParseException;
/**
......
package org.skywalking.apm.collector.core.module;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientConfig;
import org.skywalking.apm.collector.core.framework.DataInitializer;
import org.skywalking.apm.collector.core.framework.Define;
import org.skywalking.apm.collector.core.server.Server;
......@@ -16,7 +17,7 @@ public abstract class ModuleDefine implements Define {
protected abstract ModuleConfigParser configParser();
protected abstract Client client();
protected abstract Client createClient();
protected abstract Server server();
......
package org.skywalking.apm.collector.core.module;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.DefineException;
/**
* @author pengys5
*/
public interface ModuleInstaller {
void install(Map<String, Map> moduleConfig, Map<String, ModuleDefine> moduleDefineMap) throws DefineException;
void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap) throws DefineException, ClientException;
}
package org.skywalking.apm.collector.core.module;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterModuleInstaller;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.worker.WorkerModuleInstaller;
......@@ -21,7 +22,7 @@ public class ModuleInstallerAdapter implements ModuleInstaller {
}
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap) throws DefineException {
Map<String, ModuleDefine> moduleDefineMap) throws DefineException, ClientException {
moduleInstaller.install(moduleConfig, moduleDefineMap);
}
}
package org.skywalking.apm.collector.core.module;
import com.google.gson.JsonObject;
/**
* @author pengys5
*/
public abstract class ModuleRegistration {
protected static final String SEPARATOR = "|";
public abstract Value buildValue();
public static class Value {
private final String host;
private final int port;
private final JsonObject data;
public Value(String host, int port, JsonObject data) {
this.host = host;
this.port = port;
this.data = data;
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
protected abstract String buildValue();
public JsonObject getData() {
return data;
}
}
}
\ No newline at end of file
package org.skywalking.apm.collector.core.storage;
/**
* @author pengys5
*/
public abstract class Column<T> {
private String name;
private T value;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public T getValue() {
return value;
}
public void setValue(T value) {
this.value = value;
}
}
package org.skywalking.apm.collector.core.storage;
/**
* @author pengys5
*/
public abstract class Create {
}
package org.skywalking.apm.collector.core.storage;
/**
* @author pengys5
*/
public abstract class Insert {
}
package org.skywalking.apm.collector.core.storage;
/**
* @author pengys5
*/
public class IntegerColumn extends Column<Integer> {
}
package org.skywalking.apm.collector.core.storage;
/**
* @author pengys5
*/
public class LongColumn extends Column<Long> {
}
package org.skywalking.apm.collector.core.storage;
/**
* @author pengys5
*/
public class StringColumn extends Column<String> {
}
package org.skywalking.apm.collector.core.util;
/**
* @author pengys5
*/
public class BytesUtils {
public static byte[] long2Bytes(long num) {
byte[] byteNum = new byte[8];
for (int ix = 0; ix < 8; ++ix) {
int offset = 64 - (ix + 1) * 8;
byteNum[ix] = (byte)((num >> offset) & 0xff);
}
return byteNum;
}
public static long bytes2Long(byte[] byteNum) {
long num = 0;
for (int ix = 0; ix < 8; ++ix) {
num <<= 8;
num |= (byteNum[ix] & 0xff);
}
return num;
}
}
......@@ -2,6 +2,9 @@ package org.skywalking.apm.collector.core.worker;
import java.util.Map;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.DataInitializer;
import org.skywalking.apm.collector.core.module.ModuleDefine;
......@@ -14,17 +17,20 @@ import org.skywalking.apm.collector.core.server.ServerException;
*/
public abstract class WorkerModuleDefine extends ModuleDefine {
@Override public final void initialize(Map config) throws ModuleException {
@Override public final void initialize(Map config) throws ModuleException, ClientException {
try {
configParser().parse(config);
Server server = server();
server.initialize();
String key = ClusterDataInitializer.BASE_CATALOG + "." + name();
ClusterModuleContext.writer.write(key, registration().buildValue());
} catch (ConfigParseException | ServerException e) {
throw new WorkerModuleException(e.getMessage(), e);
}
}
@Override public final Client client() {
@Override public final Client createClient() {
throw new UnsupportedOperationException();
}
......
package org.skywalking.apm.collector.core.worker;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
......@@ -15,7 +16,7 @@ public class WorkerModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(WorkerModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap) throws DefineException {
Map<String, ModuleDefine> moduleDefineMap) throws DefineException, ClientException {
logger.info("beginning worker module install");
Map.Entry<String, Map> workerConfigEntry = moduleConfig.entrySet().iterator().next();
ModuleDefine moduleDefine = moduleDefineMap.get(workerConfigEntry.getKey());
......
......@@ -7,7 +7,7 @@ import org.skywalking.apm.collector.core.module.ModuleRegistration;
*/
public class WorkerAgentModuleRegistration extends ModuleRegistration {
@Override protected String buildValue() {
return WorkerAgentConfig.HOST + ModuleRegistration.SEPARATOR + WorkerAgentConfig.PORT;
@Override public Value buildValue() {
return new Value(WorkerAgentConfig.HOST, WorkerAgentConfig.PORT, null);
}
}
package org.skywalking.apm.collector.worker.impl;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.CollectorStarter;
import org.skywalking.apm.collector.core.framework.DefineException;
......@@ -13,7 +14,7 @@ public class CollectorBootStartUp {
private static final Logger logger = LoggerFactory.getLogger(CollectorBootStartUp.class);
public static void main(String[] args) throws ConfigException, DefineException {
public static void main(String[] args) throws ConfigException, DefineException, ClientException {
logger.info("collector starting...");
CollectorStarter starter = new CollectorStarter();
starter.start();
......
cluster:
zookeeper:
hostPort: localhost:2181
sessionTimeout: 1000
redis:
host: localhost-rd
port: 2000
# zookeeper:
# hostPort: localhost:2181
# sessionTimeout: 1000
# redis:
# host: localhost
# port: 6379
worker:
ui:
host: localhost
......
......@@ -24,10 +24,9 @@ public class WorkerUIConfigParser implements ModuleConfigParser {
throw new ConfigParseException("");
}
WorkerUIConfig.PORT = (Integer)config.get(PORT);
if (StringUtils.isEmpty(config.get(CONTEXT_PATH))) {
WorkerUIConfig.CONTEXT_PATH = "/";
} else {
if (!StringUtils.isEmpty(config.get(CONTEXT_PATH))) {
WorkerUIConfig.CONTEXT_PATH = (String)config.get(CONTEXT_PATH);
}
}
......
package org.skywalking.apm.collector.worker.ui;
import com.google.gson.JsonObject;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
......@@ -7,7 +8,9 @@ import org.skywalking.apm.collector.core.module.ModuleRegistration;
*/
public class WorkerUIModuleRegistration extends ModuleRegistration {
@Override protected String buildValue() {
return WorkerUIConfig.HOST + ModuleRegistration.SEPARATOR + WorkerUIConfig.PORT + ModuleRegistration.SEPARATOR + WorkerUIConfig.CONTEXT_PATH;
@Override public Value buildValue() {
JsonObject data = new JsonObject();
data.addProperty("context_path", WorkerUIConfig.CONTEXT_PATH);
return new Value(WorkerUIConfig.HOST, WorkerUIConfig.PORT, data);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册