diff --git a/apm-collector/apm-collector-client/client-h2/src/main/java/org/skywalking/apm/collector/client/h2/H2Client.java b/apm-collector/apm-collector-client/client-h2/src/main/java/org/skywalking/apm/collector/client/h2/H2Client.java index c4855190ac155bb5edbe8473fb2aac74f573f2f5..a1b5acca99f24475036ed8b609b1b5cac1669df5 100644 --- a/apm-collector/apm-collector-client/client-h2/src/main/java/org/skywalking/apm/collector/client/h2/H2Client.java +++ b/apm-collector/apm-collector-client/client-h2/src/main/java/org/skywalking/apm/collector/client/h2/H2Client.java @@ -1,38 +1,50 @@ package org.skywalking.apm.collector.client.h2; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; import org.skywalking.apm.collector.core.client.Client; -import org.skywalking.apm.collector.core.client.ClientException; /** * @author pengys5 */ public class H2Client implements Client { - @Override public void initialize() throws ClientException { - - } - - @Override public void insert(String path) throws ClientException { - - } - - @Override public void update() { + private Connection conn; + @Override public void initialize() throws H2ClientException { + try { + Class.forName("org.h2.Driver"); + conn = DriverManager.getConnection("jdbc:h2:mem:collector"); + } catch (ClassNotFoundException | SQLException e) { + throw new H2ClientException(e.getMessage(), e); + } } - @Override public String select(String path) throws ClientException { - return null; + public void execute(String sql) throws H2ClientException { + Statement statement = null; + try { + statement = conn.createStatement(); + statement.execute(sql); + statement.closeOnCompletion(); + } catch (SQLException e) { + throw new H2ClientException(e.getMessage(), e); + } } - @Override public void delete() { - - } - - @Override public boolean exist(String path) throws ClientException { - return false; - } - - @Override public void listen(String path) throws ClientException { - + public void executeQuery(String sql) throws H2ClientException { + Statement statement = null; + try { + statement = conn.createStatement(); + ResultSet rs = statement.executeQuery(sql); + while (rs.next()) { + System.out.println(rs.getString("ADDRESS") + "," + rs.getString("DATA")); + } + statement.closeOnCompletion(); + } catch (SQLException e) { + throw new H2ClientException(e.getMessage(), e); + } } } diff --git a/apm-collector/apm-collector-client/client-h2/src/main/java/org/skywalking/apm/collector/client/h2/H2ClientException.java b/apm-collector/apm-collector-client/client-h2/src/main/java/org/skywalking/apm/collector/client/h2/H2ClientException.java new file mode 100644 index 0000000000000000000000000000000000000000..c746c48053d0bc516cb7d9df72cb2a0bb291cafa --- /dev/null +++ b/apm-collector/apm-collector-client/client-h2/src/main/java/org/skywalking/apm/collector/client/h2/H2ClientException.java @@ -0,0 +1,17 @@ +package org.skywalking.apm.collector.client.h2; + +import org.skywalking.apm.collector.core.client.ClientException; + +/** + * @author pengys5 + */ +public class H2ClientException extends ClientException { + + public H2ClientException(String message) { + super(message); + } + + public H2ClientException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/apm-collector/apm-collector-client/client-redis/pom.xml b/apm-collector/apm-collector-client/client-redis/pom.xml index d16292efea70fae18faa4439814493b975010982..09107f8d83a40cc17bb51d1150c4496eae4f4757 100644 --- a/apm-collector/apm-collector-client/client-redis/pom.xml +++ b/apm-collector/apm-collector-client/client-redis/pom.xml @@ -11,4 +11,12 @@ client-redis jar + + + + redis.clients + jedis + 2.9.0 + + \ No newline at end of file diff --git a/apm-collector/apm-collector-client/client-redis/src/main/java/org/skywalking/apm/collector/client/redis/RedisClient.java b/apm-collector/apm-collector-client/client-redis/src/main/java/org/skywalking/apm/collector/client/redis/RedisClient.java index 539d10fcacaabd13ef5c05c6db3a30d0de04ac49..fffc7f6dd13fb250451d8ccd3aad69c1a3fb3f29 100644 --- a/apm-collector/apm-collector-client/client-redis/src/main/java/org/skywalking/apm/collector/client/redis/RedisClient.java +++ b/apm-collector/apm-collector-client/client-redis/src/main/java/org/skywalking/apm/collector/client/redis/RedisClient.java @@ -2,37 +2,28 @@ package org.skywalking.apm.collector.client.redis; import org.skywalking.apm.collector.core.client.Client; import org.skywalking.apm.collector.core.client.ClientException; +import redis.clients.jedis.Jedis; /** * @author pengys5 */ public class RedisClient implements Client { - @Override public void initialize() throws ClientException { - - } - - @Override public void insert(String path) throws ClientException { - - } - - @Override public void update() { + private Jedis jedis; - } - - @Override public String select(String path) throws ClientException { - return null; - } - - @Override public void delete() { + private final String host; + private final int port; + public RedisClient(String host, int port) { + this.host = host; + this.port = port; } - @Override public boolean exist(String path) throws ClientException { - return false; + @Override public void initialize() throws ClientException { + jedis = new Jedis(host, port); } - @Override public void listen(String path) throws ClientException { - + public void setex(String key, int seconds, String value) { + jedis.setex(key, seconds, value); } } diff --git a/apm-collector/apm-collector-client/client-redis/src/main/java/org/skywalking/apm/collector/client/redis/RedisClientException.java b/apm-collector/apm-collector-client/client-redis/src/main/java/org/skywalking/apm/collector/client/redis/RedisClientException.java new file mode 100644 index 0000000000000000000000000000000000000000..047f726a52eb4fb0f3097fe3aac22dc492090e8a --- /dev/null +++ b/apm-collector/apm-collector-client/client-redis/src/main/java/org/skywalking/apm/collector/client/redis/RedisClientException.java @@ -0,0 +1,17 @@ +package org.skywalking.apm.collector.client.redis; + +import org.skywalking.apm.collector.core.client.ClientException; + +/** + * @author pengys5 + */ +public class RedisClientException extends ClientException { + + public RedisClientException(String message) { + super(message); + } + + public RedisClientException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperClient.java b/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperClient.java index 52798b26710f7332a6ff354b3d05111ab45deb2a..8191e1da7f5e11722a9b67b8b11c2878140e556e 100644 --- a/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperClient.java +++ b/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperClient.java @@ -1,13 +1,13 @@ package org.skywalking.apm.collector.client.zookeeper; import java.io.IOException; +import java.util.List; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.skywalking.apm.collector.core.client.Client; -import org.skywalking.apm.collector.core.util.ObjectUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,57 +20,50 @@ public class ZookeeperClient implements Client { private ZooKeeper zk; + private final String hostPort; + private final int sessionTimeout; + + public ZookeeperClient(String hostPort, int sessionTimeout) { + this.hostPort = hostPort; + this.sessionTimeout = sessionTimeout; + } + @Override public void initialize() throws ZookeeperClientException { try { - zk = new ZooKeeper(ZookeeperConfig.hostPort, ZookeeperConfig.sessionTimeout, new ZookeeperDataListener(this)); + zk = new ZooKeeper(hostPort, sessionTimeout, new ZookeeperDataListener(this)); } catch (IOException e) { throw new ZookeeperClientException(e.getMessage(), e); } } - @Override public void insert(String path) throws ZookeeperClientException { - logger.info("add the zookeeper path \"{}\"", path); + public void create(final String path, byte data[], List acl, + CreateMode createMode) throws ZookeeperClientException { try { - zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.create(path, data, acl, createMode); } catch (KeeperException | InterruptedException e) { throw new ZookeeperClientException(e.getMessage(), e); } } - @Override public void update() { - - } - - @Override public String select(String path) throws ZookeeperClientException { - logger.info("get the zookeeper data from path \"{}\"", path); + public Stat exists(final String path, boolean watch) throws ZookeeperClientException { try { - return zk.getData(path, false, null).toString(); + return zk.exists(path, watch); } catch (KeeperException | InterruptedException e) { throw new ZookeeperClientException(e.getMessage(), e); } } - @Override public void delete() { - - } - - @Override public boolean exist(String path) throws ZookeeperClientException { - logger.info("assess the zookeeper path \"{}\" exist", path); + public byte[] getData(String path, boolean watch, Stat stat) throws ZookeeperClientException { try { - Stat stat = zk.exists(path, false); - if (ObjectUtils.isEmpty(stat)) { - return false; - } else { - return true; - } + return zk.getData(path, watch, stat); } catch (KeeperException | InterruptedException e) { throw new ZookeeperClientException(e.getMessage(), e); } } - @Override public void listen(String path) throws ZookeeperClientException { + public Stat setData(final String path, byte data[], int version) throws ZookeeperClientException { try { - zk.exists(path, true); + return zk.setData(path, data, version); } catch (KeeperException | InterruptedException e) { throw new ZookeeperClientException(e.getMessage(), e); } diff --git a/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperConfig.java b/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperConfig.java deleted file mode 100644 index 11b27d0325b00dcf128ae7094ab5751fa11a2514..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperConfig.java +++ /dev/null @@ -1,9 +0,0 @@ -package org.skywalking.apm.collector.client.zookeeper; - -/** - * @author pengys5 - */ -public class ZookeeperConfig { - public static String hostPort; - public static int sessionTimeout; -} diff --git a/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperDataListener.java b/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperDataListener.java index 4b5e8f5dca1e11f8df2a4ba0a8de3d968517fca7..cd5ff1e194303f3cf3fe47acd13e73af41ddc54a 100644 --- a/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperDataListener.java +++ b/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperDataListener.java @@ -19,10 +19,10 @@ public class ZookeeperDataListener implements DataListener, Watcher { private final Logger logger = LoggerFactory.getLogger(ZookeeperDataListener.class); - private Client client; + private ZookeeperClient client; public ZookeeperDataListener(Client client) { - this.client = client; + this.client = (ZookeeperClient)client; } @Override public void process(WatchedEvent event) { @@ -32,7 +32,7 @@ public class ZookeeperDataListener implements DataListener, Watcher { } try { - String data = client.select(event.getPath()); + String data = String.valueOf(client.getData(event.getPath(), false, null)); logger.debug("data {}", data); } catch (ClientException e) { logger.error(e.getMessage(), e); @@ -46,7 +46,7 @@ public class ZookeeperDataListener implements DataListener, Watcher { for (String catalog : catalogs) { pathBuilder.append("/").append(catalog); } - client.listen(pathBuilder.toString()); + client.exists(pathBuilder.toString(), true); } } diff --git a/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/util/PathUtils.java b/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/util/PathUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..0b38b7406fa05d8225f0cb7ae34c85b1e6a09d8e --- /dev/null +++ b/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/util/PathUtils.java @@ -0,0 +1,16 @@ +package org.skywalking.apm.collector.client.zookeeper.util; + +/** + * @author pengys5 + */ +public class PathUtils { + + public static String convertKey2Path(String key) { + String[] keys = key.split("\\."); + StringBuilder pathBuilder = new StringBuilder(); + for (String subPath : keys) { + pathBuilder.append("/").append(subPath); + } + return pathBuilder.toString(); + } +} diff --git a/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisConfig.java b/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..6eda33a6069994f75f5b396dd3f6e4bbd29d3c8e --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisConfig.java @@ -0,0 +1,9 @@ +package org.skywalking.apm.collector.cluster.redis; + +/** + * @author pengys5 + */ +public class ClusterRedisConfig { + public static String HOST; + public static int PORT; +} diff --git a/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisConfigParser.java b/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisConfigParser.java index 4009ed918a3733a9398795d2d62030b6891f2cc9..63efb768091b0b7d117b7f5b885c967591629d36 100644 --- a/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisConfigParser.java +++ b/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisConfigParser.java @@ -3,13 +3,21 @@ package org.skywalking.apm.collector.cluster.redis; import java.util.Map; import org.skywalking.apm.collector.core.config.ConfigParseException; import org.skywalking.apm.collector.core.module.ModuleConfigParser; +import org.skywalking.apm.collector.core.util.StringUtils; /** * @author pengys5 */ public class ClusterRedisConfigParser implements ModuleConfigParser { + private final String HOST = "host"; + private final String PORT = "port"; + @Override public void parse(Map config) throws ConfigParseException { - + ClusterRedisConfig.HOST = (String)config.get(HOST); + ClusterRedisConfig.PORT = ((Integer)config.get(PORT)); + if (StringUtils.isEmpty(ClusterRedisConfig.HOST) || ClusterRedisConfig.PORT == 0) { + throw new ConfigParseException(""); + } } } diff --git a/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisDataInitializer.java b/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisDataInitializer.java index 55e1280d323504b50ad2120842f5d4ef48a803c3..9ea52886778f7befcc3dc4b707a9e5bf8db433f7 100644 --- a/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisDataInitializer.java +++ b/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisDataInitializer.java @@ -3,16 +3,22 @@ package org.skywalking.apm.collector.cluster.redis; import org.skywalking.apm.collector.core.client.Client; import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author pengys5 */ public class ClusterRedisDataInitializer extends ClusterDataInitializer { + + private final Logger logger = LoggerFactory.getLogger(ClusterRedisDataInitializer.class); + @Override public void addItem(Client client, String itemKey) throws ClientException { - + logger.info("add the redis item key \"{}\" exist", itemKey); } @Override public boolean existItem(Client client, String itemKey) throws ClientException { + logger.info("assess the redis item key \"{}\" exist", itemKey); return false; } } diff --git a/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleDefine.java b/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleDefine.java index 4947649a1057e058dea29936bfb27e1a107aa18f..69afcc1a268effbd3cafff8cd60a49be8cb8e0aa 100644 --- a/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleDefine.java +++ b/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleDefine.java @@ -29,8 +29,8 @@ public class ClusterRedisModuleDefine extends ClusterModuleDefine { return new ClusterRedisConfigParser(); } - @Override protected Client client() { - return new RedisClient(); + @Override protected Client createClient() { + return new RedisClient(ClusterRedisConfig.HOST, ClusterRedisConfig.PORT); } @Override protected DataInitializer dataInitializer() { @@ -38,6 +38,6 @@ public class ClusterRedisModuleDefine extends ClusterModuleDefine { } @Override protected ClusterModuleRegistrationWriter registrationWriter() { - return new ClusterRedisModuleRegistrationWriter(); + return new ClusterRedisModuleRegistrationWriter(getClient()); } } diff --git a/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleRegistrationWriter.java b/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleRegistrationWriter.java index bae3b505b0b9b9427db20f39129721da1c452145..f94a614fa7bc74972ce9a74dc1484c7afb3ca84d 100644 --- a/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleRegistrationWriter.java +++ b/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleRegistrationWriter.java @@ -1,13 +1,28 @@ package org.skywalking.apm.collector.cluster.redis; +import org.skywalking.apm.collector.client.redis.RedisClient; +import org.skywalking.apm.collector.core.client.Client; import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter; +import org.skywalking.apm.collector.core.module.ModuleRegistration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author pengys5 */ -public class ClusterRedisModuleRegistrationWriter implements ClusterModuleRegistrationWriter { +public class ClusterRedisModuleRegistrationWriter extends ClusterModuleRegistrationWriter { - @Override public void write(String key, String value) { + private final Logger logger = LoggerFactory.getLogger(ClusterRedisModuleRegistrationWriter.class); + public ClusterRedisModuleRegistrationWriter(Client client) { + super(client); + } + + @Override public void write(String key, ModuleRegistration.Value value) { + logger.debug("key {}, value {}", key, value.getHost()); + key = key + "." + value.getHost() + ":" + value.getPort(); + value.getData().addProperty("host", value.getHost()); + value.getData().addProperty("port", value.getPort()); + ((RedisClient)client).setex(key, 120, value.getData().toString()); } } diff --git a/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneConfigParser.java b/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneConfigParser.java index d12844f29f61f85eae773787fbed1a77676c9537..5a331c5b91669f1c51727838240c2747edd99cbe 100644 --- a/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneConfigParser.java +++ b/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneConfigParser.java @@ -9,6 +9,5 @@ import org.skywalking.apm.collector.core.module.ModuleConfigParser; */ public class ClusterStandaloneConfigParser implements ModuleConfigParser { @Override public void parse(Map config) throws ConfigParseException { - } } diff --git a/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneDataInitializer.java b/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneDataInitializer.java index 85f8bb21d787efc3287fd63a5ba68749ce52a179..6221ec2bbff38f2c04eeaca55af2a1b8104fb4bc 100644 --- a/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneDataInitializer.java +++ b/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneDataInitializer.java @@ -1,16 +1,29 @@ package org.skywalking.apm.collector.cluster.standalone; +import org.skywalking.apm.collector.client.h2.H2Client; +import org.skywalking.apm.collector.client.h2.H2ClientException; import org.skywalking.apm.collector.core.client.Client; import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author pengys5 */ public class ClusterStandaloneDataInitializer extends ClusterDataInitializer { - @Override public void addItem(Client client, String itemKey) throws ClientException { + private final Logger logger = LoggerFactory.getLogger(ClusterStandaloneDataInitializer.class); + @Override public void addItem(Client client, String itemKey) throws ClientException { + logger.info("add the h2 item key \"{}\" exist", itemKey); + itemKey = itemKey.replaceAll("\\.", "_"); + String sql = "CREATE TABLE " + itemKey + "(ADDRESS VARCHAR(100) PRIMARY KEY,DATA VARCHAR(255));"; + try { + ((H2Client)client).execute(sql); + } catch (H2ClientException e) { + logger.error(e.getMessage(), e); + } } @Override public boolean existItem(Client client, String itemKey) throws ClientException { diff --git a/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleDefine.java b/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleDefine.java index cfcb129f4a9101aea52c8d9d7c5add9c1dcf0871..baa27c40cb444e48afea36449533530f7be4c96b 100644 --- a/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleDefine.java +++ b/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleDefine.java @@ -29,7 +29,7 @@ public class ClusterStandaloneModuleDefine extends ClusterModuleDefine { return new ClusterStandaloneConfigParser(); } - @Override protected Client client() { + @Override public Client createClient() { return new H2Client(); } @@ -38,6 +38,6 @@ public class ClusterStandaloneModuleDefine extends ClusterModuleDefine { } @Override protected ClusterModuleRegistrationWriter registrationWriter() { - return new ClusterStandaloneModuleRegistrationWriter(); + return new ClusterStandaloneModuleRegistrationWriter(getClient()); } } diff --git a/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleRegistrationWriter.java b/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleRegistrationWriter.java index 4e535ddb5314dc769084528f37981d6758970dca..de811f165f558d319a61a7b2a547be14fd74de63 100644 --- a/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleRegistrationWriter.java +++ b/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleRegistrationWriter.java @@ -1,13 +1,34 @@ package org.skywalking.apm.collector.cluster.standalone; +import org.skywalking.apm.collector.client.h2.H2Client; +import org.skywalking.apm.collector.client.h2.H2ClientException; +import org.skywalking.apm.collector.core.client.Client; import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter; +import org.skywalking.apm.collector.core.module.ModuleRegistration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author pengys5 */ -public class ClusterStandaloneModuleRegistrationWriter implements ClusterModuleRegistrationWriter { +public class ClusterStandaloneModuleRegistrationWriter extends ClusterModuleRegistrationWriter { - @Override public void write(String key, String value) { + private final Logger logger = LoggerFactory.getLogger(ClusterStandaloneModuleRegistrationWriter.class); + public ClusterStandaloneModuleRegistrationWriter(Client client) { + super(client); + } + + @Override public void write(String key, ModuleRegistration.Value value) { + key = key.replaceAll("\\.", "_"); + String hostPort = value.getHost() + ":" + value.getPort(); + String sql = "INSERT INTO " + key + " VALUES('" + hostPort + "', '" + value.getData().toString() + "');"; + String sql2 = "SELECT * FROM " + key; + try { + ((H2Client)client).execute(sql); + ((H2Client)client).executeQuery(sql2); + } catch (H2ClientException e) { + logger.error(e.getMessage(), e); + } } } diff --git a/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKConfig.java b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..64264661ae0620b2c3ebb458d5b57d79ebe6fd16 --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKConfig.java @@ -0,0 +1,9 @@ +package org.skywalking.apm.collector.cluster.zookeeper; + +/** + * @author pengys5 + */ +public class ClusterZKConfig { + public static String HOST_PORT; + public static int SESSION_TIMEOUT; +} diff --git a/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKConfigParser.java b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKConfigParser.java index dfbf1c63f19b8bc6938107d20c7974d35023dcba..26409915461f2eb79e7c184f756f20a0120a2fd9 100644 --- a/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKConfigParser.java +++ b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKConfigParser.java @@ -1,7 +1,6 @@ package org.skywalking.apm.collector.cluster.zookeeper; import java.util.Map; -import org.skywalking.apm.collector.client.zookeeper.ZookeeperConfig; import org.skywalking.apm.collector.core.config.ConfigParseException; import org.skywalking.apm.collector.core.module.ModuleConfigParser; import org.skywalking.apm.collector.core.util.StringUtils; @@ -18,12 +17,11 @@ public class ClusterZKConfigParser implements ModuleConfigParser { if (StringUtils.isEmpty(config.get(HOST_PORT))) { throw new ConfigParseException(""); } - ZookeeperConfig.hostPort = (String)config.get(HOST_PORT); + ClusterZKConfig.HOST_PORT = (String)config.get(HOST_PORT); + ClusterZKConfig.SESSION_TIMEOUT = 1000; - if (StringUtils.isEmpty(config.get(SESSION_TIMEOUT))) { - ZookeeperConfig.sessionTimeout = 1000; - } else { - ZookeeperConfig.sessionTimeout = (Integer)config.get(SESSION_TIMEOUT); + if (!StringUtils.isEmpty(config.get(SESSION_TIMEOUT))) { + ClusterZKConfig.SESSION_TIMEOUT = (Integer)config.get(SESSION_TIMEOUT); } } } diff --git a/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataInitializer.java b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataInitializer.java index 4860c5c0325538e03dd93874a3462cc02b368dd6..9ccb13c870e0d7465c026a63d04af4cf70542514 100644 --- a/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataInitializer.java +++ b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataInitializer.java @@ -1,5 +1,8 @@ package org.skywalking.apm.collector.cluster.zookeeper; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs; +import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient; import org.skywalking.apm.collector.core.client.Client; import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer; @@ -15,23 +18,32 @@ public class ClusterZKDataInitializer extends ClusterDataInitializer { @Override public void addItem(Client client, String itemKey) throws ClientException { logger.info("add the zookeeper item key \"{}\" exist", itemKey); + ZookeeperClient zkClient = (ZookeeperClient)client; + String[] catalogs = itemKey.split("\\."); StringBuilder pathBuilder = new StringBuilder(); for (String catalog : catalogs) { pathBuilder.append("/").append(catalog); - if (!client.exist(pathBuilder.toString())) { - client.insert(pathBuilder.toString()); + if (zkClient.exists(pathBuilder.toString(), false) == null) { + zkClient.create(pathBuilder.toString(), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } } @Override public boolean existItem(Client client, String itemKey) throws ClientException { logger.info("assess the zookeeper item key \"{}\" exist", itemKey); + ZookeeperClient zkClient = (ZookeeperClient)client; + String[] catalogs = itemKey.split("\\."); StringBuilder pathBuilder = new StringBuilder(); for (String catalog : catalogs) { pathBuilder.append("/").append(catalog); } - return client.exist(pathBuilder.toString()); + + if (zkClient.exists(pathBuilder.toString(), false) == null) { + return false; + } else { + return true; + } } } diff --git a/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleDefine.java b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleDefine.java index 0485bb10f0fa09a6709cf688cbcdbf056c455bd6..078a1a1b4661ceee52fae770668678bd6eefc6e9 100644 --- a/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleDefine.java +++ b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleDefine.java @@ -13,7 +13,7 @@ import org.skywalking.apm.collector.core.module.ModuleGroup; */ public class ClusterZKModuleDefine extends ClusterModuleDefine { - @Override public ModuleGroup group() { + @Override protected ModuleGroup group() { return ModuleGroup.Cluster; } @@ -25,19 +25,19 @@ public class ClusterZKModuleDefine extends ClusterModuleDefine { return false; } - @Override public ModuleConfigParser configParser() { + @Override protected ModuleConfigParser configParser() { return new ClusterZKConfigParser(); } - @Override public Client client() { - return new ZookeeperClient(); + @Override protected Client createClient() { + return new ZookeeperClient(ClusterZKConfig.HOST_PORT, ClusterZKConfig.SESSION_TIMEOUT); } - @Override public ClusterDataInitializer dataInitializer() { + @Override protected ClusterDataInitializer dataInitializer() { return new ClusterZKDataInitializer(); } @Override protected ClusterModuleRegistrationWriter registrationWriter() { - return new ClusterZKModuleRegistrationWriter(); + return new ClusterZKModuleRegistrationWriter(getClient()); } } diff --git a/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleRegistrationWriter.java b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleRegistrationWriter.java index 8ca0b0cb857729abc02824db0452bf2d71f1fbd3..622379560100378cca29c3dbe387cb2a3c661180 100644 --- a/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleRegistrationWriter.java +++ b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleRegistrationWriter.java @@ -1,13 +1,37 @@ package org.skywalking.apm.collector.cluster.zookeeper; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.Stat; +import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient; +import org.skywalking.apm.collector.client.zookeeper.util.PathUtils; +import org.skywalking.apm.collector.core.client.Client; +import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter; +import org.skywalking.apm.collector.core.module.ModuleRegistration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author pengys5 */ -public class ClusterZKModuleRegistrationWriter implements ClusterModuleRegistrationWriter { +public class ClusterZKModuleRegistrationWriter extends ClusterModuleRegistrationWriter { - @Override public void write(String key, String value) { + private final Logger logger = LoggerFactory.getLogger(ClusterZKModuleRegistrationWriter.class); + public ClusterZKModuleRegistrationWriter(Client client) { + super(client); + } + + @Override public void write(String key, ModuleRegistration.Value value) throws ClientException { + logger.info("cluster zookeeper register key: {}, value: {}", key, value); + String workerUIPath = PathUtils.convertKey2Path(key) + "/" + value.getHost() + ":" + value.getPort(); + + Stat stat = ((ZookeeperClient)client).exists(workerUIPath, false); + if (stat == null) { + ((ZookeeperClient)client).create(workerUIPath, value.getData() == null ? null : value.getData().toString().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + } else { + ((ZookeeperClient)client).setData(workerUIPath, value.getData() == null ? null : value.getData().toString().getBytes(), -1); + } } } diff --git a/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/test/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleDefineTestCase.java b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/test/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleDefineTestCase.java index 4e7d92da116759391b08315fba6aae4456c48a96..07171b6736992832fcb2c2a5422419026bca5c0c 100644 --- a/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/test/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleDefineTestCase.java +++ b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/test/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleDefineTestCase.java @@ -4,7 +4,6 @@ import java.io.FileNotFoundException; import java.util.Map; import org.junit.Before; import org.junit.Test; -import org.skywalking.apm.collector.client.zookeeper.ZookeeperConfig; import org.skywalking.apm.collector.core.cluster.ClusterModuleException; import org.yaml.snakeyaml.Yaml; @@ -25,8 +24,5 @@ public class ClusterZKModuleDefineTestCase { public void testInitialize() throws ClusterModuleException { ClusterZKModuleDefine define = new ClusterZKModuleDefine(); define.initialize(config); - - System.out.println(ZookeeperConfig.hostPort); - System.out.println(ZookeeperConfig.sessionTimeout); } } diff --git a/apm-collector/apm-collector-core/pom.xml b/apm-collector/apm-collector-core/pom.xml index c68a10e864bddf716d2d3fd9a43bcb41d818ac04..5c21a41b73ae256ce40fbd3214b6ec5a5c497a2d 100644 --- a/apm-collector/apm-collector-core/pom.xml +++ b/apm-collector/apm-collector-core/pom.xml @@ -23,5 +23,10 @@ logback-classic 1.2.3 + + com.google.code.gson + gson + 2.8.1 + \ No newline at end of file diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/client/Client.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/client/Client.java index 661792e7135bde9692718a47c24025ba2c62aa14..24b7f5357a23a3436227bf5abccbfff31addc633 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/client/Client.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/client/Client.java @@ -4,18 +4,5 @@ package org.skywalking.apm.collector.core.client; * @author pengys5 */ public interface Client { - void initialize() throws ClientException; - - void insert(String path) throws ClientException; - - void update(); - - String select(String path) throws ClientException; - - void delete(); - - boolean exist(String path) throws ClientException; - - void listen(String path) throws ClientException; } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataInitializer.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataInitializer.java index 3be6aeaee1e8508fecf3dda3ce84b2300602b848..62ff30d7ea69b3b9fa3481013625b59c49f6f8fd 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataInitializer.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataInitializer.java @@ -9,7 +9,7 @@ import org.skywalking.apm.collector.core.framework.DataInitializer; */ public abstract class ClusterDataInitializer implements DataInitializer { - public static final String BASE_CATALOG = "collector.cluster"; + public static final String BASE_CATALOG = "skywalking"; public static final String FOR_UI_CATALOG = BASE_CATALOG + ".ui"; public static final String FOR_AGENT_CATALOG = BASE_CATALOG + ".agent"; diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleContext.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleContext.java index 0aa78b4f389e0774d830cee174edbdff5e78a564..6525dbdd0c3da7c8d6a1772376dea50ac80fe54d 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleContext.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleContext.java @@ -4,13 +4,5 @@ package org.skywalking.apm.collector.core.cluster; * @author pengys5 */ public class ClusterModuleContext { - private ClusterModuleRegistrationWriter writer; - - public ClusterModuleRegistrationWriter getWriter() { - return writer; - } - - public void setWriter(ClusterModuleRegistrationWriter writer) { - this.writer = writer; - } + public static ClusterModuleRegistrationWriter writer; } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleDefine.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleDefine.java index f09dff349d136d213c585786ea6530e4e0bf4f84..cd232a76cf810bcbdf62cc1c54cfe3e89cf15aa3 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleDefine.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleDefine.java @@ -13,10 +13,12 @@ import org.skywalking.apm.collector.core.server.Server; */ public abstract class ClusterModuleDefine extends ModuleDefine { + private Client client; + @Override public final void initialize(Map config) throws ClusterModuleException { try { configParser().parse(config); - Client client = client(); + client = createClient(); client.initialize(); dataInitializer().initialize(client); } catch (ConfigParseException | ClientException e) { @@ -24,6 +26,10 @@ public abstract class ClusterModuleDefine extends ModuleDefine { } } + public final Client getClient() { + return this.client; + } + @Override public final Server server() { throw new UnsupportedOperationException(""); } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleInstaller.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleInstaller.java index fd56eed9eac4c72cdd2b272404f6bc7e739b10da..68470a624a64ce7738f5d1c5718b243aebb5a259 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleInstaller.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleInstaller.java @@ -2,6 +2,7 @@ package org.skywalking.apm.collector.core.cluster; import java.util.Iterator; import java.util.Map; +import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.module.ModuleInstaller; @@ -17,22 +18,26 @@ public class ClusterModuleInstaller implements ModuleInstaller { private final Logger logger = LoggerFactory.getLogger(ClusterModuleInstaller.class); @Override public void install(Map moduleConfig, - Map moduleDefineMap) throws DefineException { + Map moduleDefineMap) throws DefineException, ClientException { logger.info("beginning cluster module install"); + + ModuleDefine moduleDefine = null; if (CollectionUtils.isEmpty(moduleConfig)) { logger.info("could not configure cluster module, use the default"); Iterator> moduleDefineEntry = moduleDefineMap.entrySet().iterator(); while (moduleDefineEntry.hasNext()) { - ModuleDefine moduleDefine = moduleDefineEntry.next().getValue(); + moduleDefine = moduleDefineEntry.next().getValue(); if (moduleDefine.defaultModule()) { logger.info("module {} initialize", moduleDefine.getClass().getName()); moduleDefine.initialize(null); + break; } } } else { Map.Entry clusterConfigEntry = moduleConfig.entrySet().iterator().next(); - ModuleDefine moduleDefine = moduleDefineMap.get(clusterConfigEntry.getKey()); + moduleDefine = moduleDefineMap.get(clusterConfigEntry.getKey()); moduleDefine.initialize(clusterConfigEntry.getValue()); } + ClusterModuleContext.writer = ((ClusterModuleDefine)moduleDefine).registrationWriter(); } } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleRegistrationWriter.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleRegistrationWriter.java index 449a904d54a62beb6f85fcc346c6fdb9a54ae1f4..f60eca9df5771125b21d178cddea13f0b766f18f 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleRegistrationWriter.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleRegistrationWriter.java @@ -1,8 +1,19 @@ package org.skywalking.apm.collector.core.cluster; +import org.skywalking.apm.collector.core.client.Client; +import org.skywalking.apm.collector.core.client.ClientException; +import org.skywalking.apm.collector.core.module.ModuleRegistration; + /** * @author pengys5 */ -public interface ClusterModuleRegistrationWriter { - void write(String key, String value); +public abstract class ClusterModuleRegistrationWriter { + + protected final Client client; + + public ClusterModuleRegistrationWriter(Client client) { + this.client = client; + } + + public abstract void write(String key, ModuleRegistration.Value value) throws ClientException; } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/CollectorStarter.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/CollectorStarter.java index 054e812ee48e80fea7008f1aa3fbdb24f1012628..9de01cf38ca0a99003172ff725fd1f1a2e6cbff6 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/CollectorStarter.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/CollectorStarter.java @@ -1,6 +1,7 @@ package org.skywalking.apm.collector.core.framework; import java.util.Map; +import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.config.ConfigException; import org.skywalking.apm.collector.core.module.ModuleConfigLoader; import org.skywalking.apm.collector.core.module.ModuleDefine; @@ -17,7 +18,7 @@ public class CollectorStarter implements Starter { private final Logger logger = LoggerFactory.getLogger(CollectorStarter.class); - @Override public void start() throws ConfigException, DefineException { + @Override public void start() throws ConfigException, DefineException, ClientException { ModuleConfigLoader configLoader = new ModuleConfigLoader(); Map configuration = configLoader.load(); diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Context.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Context.java deleted file mode 100644 index 9f9b7dffc3a6074700c1301753b29b6f86c5df85..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Context.java +++ /dev/null @@ -1,8 +0,0 @@ -package org.skywalking.apm.collector.core.framework; - -/** - * @author pengys5 - */ -public class Context { - -} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Define.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Define.java index 34ec6598d3e6e9acd8e590881749deb55d582922..542d72b7ec4bc1a360d099d5bd8da1096b3de0cc 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Define.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Define.java @@ -1,13 +1,14 @@ package org.skywalking.apm.collector.core.framework; import java.util.Map; +import org.skywalking.apm.collector.core.client.ClientException; /** * @author pengys5 */ public interface Define { - void initialize(Map config) throws DefineException; + void initialize(Map config) throws DefineException, ClientException; String name(); } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleConfigParser.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleConfigParser.java index 1f2fbd94180c1e923abd98bedb90a0a2347f1d5c..69330a7dd7e760fde5f11f2992b4841bb19914b9 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleConfigParser.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleConfigParser.java @@ -1,6 +1,7 @@ package org.skywalking.apm.collector.core.module; import java.util.Map; +import org.skywalking.apm.collector.core.config.Config; import org.skywalking.apm.collector.core.config.ConfigParseException; /** diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefine.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefine.java index f881e866ea160b05a76226dd0f097404e49fc437..63ef4d81992620bdcd1663da271ab6a4e13183b6 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefine.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefine.java @@ -1,6 +1,7 @@ package org.skywalking.apm.collector.core.module; import org.skywalking.apm.collector.core.client.Client; +import org.skywalking.apm.collector.core.client.ClientConfig; import org.skywalking.apm.collector.core.framework.DataInitializer; import org.skywalking.apm.collector.core.framework.Define; import org.skywalking.apm.collector.core.server.Server; @@ -16,7 +17,7 @@ public abstract class ModuleDefine implements Define { protected abstract ModuleConfigParser configParser(); - protected abstract Client client(); + protected abstract Client createClient(); protected abstract Server server(); diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleInstaller.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleInstaller.java index 3a33fb12b738e4621ba0356741367bfa59214da6..40aa905c8e7ef9ae3d00b6ebf175da12fe5b1560 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleInstaller.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleInstaller.java @@ -1,11 +1,13 @@ package org.skywalking.apm.collector.core.module; import java.util.Map; +import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.framework.DefineException; /** * @author pengys5 */ public interface ModuleInstaller { - void install(Map moduleConfig, Map moduleDefineMap) throws DefineException; + void install(Map moduleConfig, + Map moduleDefineMap) throws DefineException, ClientException; } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleInstallerAdapter.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleInstallerAdapter.java index ec9389dfc882e4955eac1bb529ab367bc826d0df..5e9d1c27bde1d3716cde724adf6e5365629256ec 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleInstallerAdapter.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleInstallerAdapter.java @@ -1,6 +1,7 @@ package org.skywalking.apm.collector.core.module; import java.util.Map; +import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.cluster.ClusterModuleInstaller; import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.worker.WorkerModuleInstaller; @@ -21,7 +22,7 @@ public class ModuleInstallerAdapter implements ModuleInstaller { } @Override public void install(Map moduleConfig, - Map moduleDefineMap) throws DefineException { + Map moduleDefineMap) throws DefineException, ClientException { moduleInstaller.install(moduleConfig, moduleDefineMap); } } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleRegistration.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleRegistration.java index 2b52b414b6b2fe05e07e273394832e887402b6fa..7242cbf2dd0eda814efccc1fbe7197b7c69a4992 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleRegistration.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleRegistration.java @@ -1,11 +1,35 @@ package org.skywalking.apm.collector.core.module; +import com.google.gson.JsonObject; + /** * @author pengys5 */ public abstract class ModuleRegistration { - protected static final String SEPARATOR = "|"; + public abstract Value buildValue(); + + public static class Value { + private final String host; + private final int port; + private final JsonObject data; + + public Value(String host, int port, JsonObject data) { + this.host = host; + this.port = port; + this.data = data; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } - protected abstract String buildValue(); -} + public JsonObject getData() { + return data; + } + } +} \ No newline at end of file diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/Column.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/Column.java new file mode 100644 index 0000000000000000000000000000000000000000..b8eac24290a2d9090738e01b9d7f039d1057e050 --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/Column.java @@ -0,0 +1,27 @@ +package org.skywalking.apm.collector.core.storage; + +/** + * @author pengys5 + */ +public abstract class Column { + + private String name; + + private T value; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public T getValue() { + return value; + } + + public void setValue(T value) { + this.value = value; + } +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/Create.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/Create.java new file mode 100644 index 0000000000000000000000000000000000000000..a690635d2684e4d291f0169f9c032edcb5d7bc52 --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/Create.java @@ -0,0 +1,7 @@ +package org.skywalking.apm.collector.core.storage; + +/** + * @author pengys5 + */ +public abstract class Create { +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/Insert.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/Insert.java new file mode 100644 index 0000000000000000000000000000000000000000..993f51157c79b214ca473cec671aec12153ce543 --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/Insert.java @@ -0,0 +1,8 @@ +package org.skywalking.apm.collector.core.storage; + +/** + * @author pengys5 + */ +public abstract class Insert { + +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/IntegerColumn.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/IntegerColumn.java new file mode 100644 index 0000000000000000000000000000000000000000..1d1bff45d3239a6a83f6079989107b649afc19d3 --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/IntegerColumn.java @@ -0,0 +1,7 @@ +package org.skywalking.apm.collector.core.storage; + +/** + * @author pengys5 + */ +public class IntegerColumn extends Column { +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/LongColumn.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/LongColumn.java new file mode 100644 index 0000000000000000000000000000000000000000..a104e73364c297959770acae932e27df84e83396 --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/LongColumn.java @@ -0,0 +1,7 @@ +package org.skywalking.apm.collector.core.storage; + +/** + * @author pengys5 + */ +public class LongColumn extends Column { +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/StringColumn.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/StringColumn.java new file mode 100644 index 0000000000000000000000000000000000000000..ac9facf11e75fa85d90bd3c3b42d1c9193c2c4ad --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/StringColumn.java @@ -0,0 +1,7 @@ +package org.skywalking.apm.collector.core.storage; + +/** + * @author pengys5 + */ +public class StringColumn extends Column { +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/BytesUtils.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/BytesUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..9e58f5946d972945b169ab27ffec6a7a179cbcd4 --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/BytesUtils.java @@ -0,0 +1,25 @@ +package org.skywalking.apm.collector.core.util; + +/** + * @author pengys5 + */ +public class BytesUtils { + + public static byte[] long2Bytes(long num) { + byte[] byteNum = new byte[8]; + for (int ix = 0; ix < 8; ++ix) { + int offset = 64 - (ix + 1) * 8; + byteNum[ix] = (byte)((num >> offset) & 0xff); + } + return byteNum; + } + + public static long bytes2Long(byte[] byteNum) { + long num = 0; + for (int ix = 0; ix < 8; ++ix) { + num <<= 8; + num |= (byteNum[ix] & 0xff); + } + return num; + } +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/worker/WorkerModuleDefine.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/worker/WorkerModuleDefine.java index f337a1de5f112f97e97eb9461e5831a09d4d4e94..2e67e51be3ddad7fcf4554630a58a9e5f48819de 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/worker/WorkerModuleDefine.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/worker/WorkerModuleDefine.java @@ -2,6 +2,9 @@ package org.skywalking.apm.collector.core.worker; import java.util.Map; import org.skywalking.apm.collector.core.client.Client; +import org.skywalking.apm.collector.core.client.ClientException; +import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer; +import org.skywalking.apm.collector.core.cluster.ClusterModuleContext; import org.skywalking.apm.collector.core.config.ConfigParseException; import org.skywalking.apm.collector.core.framework.DataInitializer; import org.skywalking.apm.collector.core.module.ModuleDefine; @@ -14,17 +17,20 @@ import org.skywalking.apm.collector.core.server.ServerException; */ public abstract class WorkerModuleDefine extends ModuleDefine { - @Override public final void initialize(Map config) throws ModuleException { + @Override public final void initialize(Map config) throws ModuleException, ClientException { try { configParser().parse(config); Server server = server(); server.initialize(); + + String key = ClusterDataInitializer.BASE_CATALOG + "." + name(); + ClusterModuleContext.writer.write(key, registration().buildValue()); } catch (ConfigParseException | ServerException e) { throw new WorkerModuleException(e.getMessage(), e); } } - @Override public final Client client() { + @Override public final Client createClient() { throw new UnsupportedOperationException(); } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/worker/WorkerModuleInstaller.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/worker/WorkerModuleInstaller.java index 2978ac85e2a7852fa5acdcf1f2f40b6e2950447a..0b86aa90c20a31ed29eaadf2fd3e7b68bd4bb6a0 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/worker/WorkerModuleInstaller.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/worker/WorkerModuleInstaller.java @@ -1,6 +1,7 @@ package org.skywalking.apm.collector.core.worker; import java.util.Map; +import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.module.ModuleInstaller; @@ -15,7 +16,7 @@ public class WorkerModuleInstaller implements ModuleInstaller { private final Logger logger = LoggerFactory.getLogger(WorkerModuleInstaller.class); @Override public void install(Map moduleConfig, - Map moduleDefineMap) throws DefineException { + Map moduleDefineMap) throws DefineException, ClientException { logger.info("beginning worker module install"); Map.Entry workerConfigEntry = moduleConfig.entrySet().iterator().next(); ModuleDefine moduleDefine = moduleDefineMap.get(workerConfigEntry.getKey()); diff --git a/apm-collector/apm-collector-worker-new/apm-collector-worker-agent/src/main/java/org/skywalking/apm/collector/worker/agent/WorkerAgentModuleRegistration.java b/apm-collector/apm-collector-worker-new/apm-collector-worker-agent/src/main/java/org/skywalking/apm/collector/worker/agent/WorkerAgentModuleRegistration.java index 81945f0fde26b68261f9084a52d5c31aa3259fa8..1c4b6bf1410b80284fbf4fc6a058e234f81e47cf 100644 --- a/apm-collector/apm-collector-worker-new/apm-collector-worker-agent/src/main/java/org/skywalking/apm/collector/worker/agent/WorkerAgentModuleRegistration.java +++ b/apm-collector/apm-collector-worker-new/apm-collector-worker-agent/src/main/java/org/skywalking/apm/collector/worker/agent/WorkerAgentModuleRegistration.java @@ -7,7 +7,7 @@ import org.skywalking.apm.collector.core.module.ModuleRegistration; */ public class WorkerAgentModuleRegistration extends ModuleRegistration { - @Override protected String buildValue() { - return WorkerAgentConfig.HOST + ModuleRegistration.SEPARATOR + WorkerAgentConfig.PORT; + @Override public Value buildValue() { + return new Value(WorkerAgentConfig.HOST, WorkerAgentConfig.PORT, null); } } diff --git a/apm-collector/apm-collector-worker-new/apm-collector-worker-impl/src/main/java/org/skywalking/apm/collector/worker/impl/CollectorBootStartUp.java b/apm-collector/apm-collector-worker-new/apm-collector-worker-impl/src/main/java/org/skywalking/apm/collector/worker/impl/CollectorBootStartUp.java index 8ec1270fbada5d0761975c53d03b41eb611abfd9..ab6bdf30c7eeeebec04ea641a6350e60b790a549 100644 --- a/apm-collector/apm-collector-worker-new/apm-collector-worker-impl/src/main/java/org/skywalking/apm/collector/worker/impl/CollectorBootStartUp.java +++ b/apm-collector/apm-collector-worker-new/apm-collector-worker-impl/src/main/java/org/skywalking/apm/collector/worker/impl/CollectorBootStartUp.java @@ -1,5 +1,6 @@ package org.skywalking.apm.collector.worker.impl; +import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.config.ConfigException; import org.skywalking.apm.collector.core.framework.CollectorStarter; import org.skywalking.apm.collector.core.framework.DefineException; @@ -13,7 +14,7 @@ public class CollectorBootStartUp { private static final Logger logger = LoggerFactory.getLogger(CollectorBootStartUp.class); - public static void main(String[] args) throws ConfigException, DefineException { + public static void main(String[] args) throws ConfigException, DefineException, ClientException { logger.info("collector starting..."); CollectorStarter starter = new CollectorStarter(); starter.start(); diff --git a/apm-collector/apm-collector-worker-new/apm-collector-worker-impl/src/main/resources/application.yml b/apm-collector/apm-collector-worker-new/apm-collector-worker-impl/src/main/resources/application.yml index d8bfb7a21e15c6f92d824c4b33280eac093e6980..35cb4b0009ece8e1d07727ca4448436a7b519ce1 100644 --- a/apm-collector/apm-collector-worker-new/apm-collector-worker-impl/src/main/resources/application.yml +++ b/apm-collector/apm-collector-worker-new/apm-collector-worker-impl/src/main/resources/application.yml @@ -1,10 +1,10 @@ cluster: - zookeeper: - hostPort: localhost:2181 - sessionTimeout: 1000 - redis: - host: localhost-rd - port: 2000 +# zookeeper: +# hostPort: localhost:2181 +# sessionTimeout: 1000 +# redis: +# host: localhost +# port: 6379 worker: ui: host: localhost diff --git a/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/java/org/skywalking/apm/collector/worker/ui/WorkerUIConfigParser.java b/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/java/org/skywalking/apm/collector/worker/ui/WorkerUIConfigParser.java index fce4ae0f7b3dd4ffd8f2aedf0bef5249c367e815..38344125f2eaaf4266e83bf16eb812f9107c9a20 100644 --- a/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/java/org/skywalking/apm/collector/worker/ui/WorkerUIConfigParser.java +++ b/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/java/org/skywalking/apm/collector/worker/ui/WorkerUIConfigParser.java @@ -24,10 +24,9 @@ public class WorkerUIConfigParser implements ModuleConfigParser { throw new ConfigParseException(""); } WorkerUIConfig.PORT = (Integer)config.get(PORT); + WorkerUIConfig.CONTEXT_PATH = "/"; - if (StringUtils.isEmpty(config.get(CONTEXT_PATH))) { - WorkerUIConfig.CONTEXT_PATH = "/"; - } else { + if (!StringUtils.isEmpty(config.get(CONTEXT_PATH))) { WorkerUIConfig.CONTEXT_PATH = (String)config.get(CONTEXT_PATH); } } diff --git a/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/java/org/skywalking/apm/collector/worker/ui/WorkerUIModuleRegistration.java b/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/java/org/skywalking/apm/collector/worker/ui/WorkerUIModuleRegistration.java index 5ae6cc6d05fcb213ca58d7efd23a379c7670a1a0..b9b40adaa5c35ebee673e310155263cecdaa8641 100644 --- a/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/java/org/skywalking/apm/collector/worker/ui/WorkerUIModuleRegistration.java +++ b/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/java/org/skywalking/apm/collector/worker/ui/WorkerUIModuleRegistration.java @@ -1,5 +1,6 @@ package org.skywalking.apm.collector.worker.ui; +import com.google.gson.JsonObject; import org.skywalking.apm.collector.core.module.ModuleRegistration; /** @@ -7,7 +8,9 @@ import org.skywalking.apm.collector.core.module.ModuleRegistration; */ public class WorkerUIModuleRegistration extends ModuleRegistration { - @Override protected String buildValue() { - return WorkerUIConfig.HOST + ModuleRegistration.SEPARATOR + WorkerUIConfig.PORT + ModuleRegistration.SEPARATOR + WorkerUIConfig.CONTEXT_PATH; + @Override public Value buildValue() { + JsonObject data = new JsonObject(); + data.addProperty("context_path", WorkerUIConfig.CONTEXT_PATH); + return new Value(WorkerUIConfig.HOST, WorkerUIConfig.PORT, data); } }