diff --git a/apm-collector/apm-collector-client/client-h2/pom.xml b/apm-collector/apm-collector-client/client-h2/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..bf8b6d0cc31c309801d91647747748154323bbab --- /dev/null +++ b/apm-collector/apm-collector-client/client-h2/pom.xml @@ -0,0 +1,22 @@ + + + + apm-collector-client + org.skywalking + 3.2-2017 + + 4.0.0 + + client-h2 + jar + + + + com.h2database + h2 + 1.4.196 + + + \ No newline at end of file diff --git a/apm-collector/apm-collector-client/client-h2/src/main/java/org/skywalking/apm/collector/client/h2/H2Client.java b/apm-collector/apm-collector-client/client-h2/src/main/java/org/skywalking/apm/collector/client/h2/H2Client.java new file mode 100644 index 0000000000000000000000000000000000000000..c4855190ac155bb5edbe8473fb2aac74f573f2f5 --- /dev/null +++ b/apm-collector/apm-collector-client/client-h2/src/main/java/org/skywalking/apm/collector/client/h2/H2Client.java @@ -0,0 +1,38 @@ +package org.skywalking.apm.collector.client.h2; + +import org.skywalking.apm.collector.core.client.Client; +import org.skywalking.apm.collector.core.client.ClientException; + +/** + * @author pengys5 + */ +public class H2Client implements Client { + + @Override public void initialize() throws ClientException { + + } + + @Override public void insert(String path) throws ClientException { + + } + + @Override public void update() { + + } + + @Override public String select(String path) throws ClientException { + return null; + } + + @Override public void delete() { + + } + + @Override public boolean exist(String path) throws ClientException { + return false; + } + + @Override public void listen(String path) throws ClientException { + + } +} diff --git a/apm-collector/apm-collector-client/client-redis/pom.xml b/apm-collector/apm-collector-client/client-redis/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..d16292efea70fae18faa4439814493b975010982 --- /dev/null +++ b/apm-collector/apm-collector-client/client-redis/pom.xml @@ -0,0 +1,14 @@ + + + + apm-collector-client + org.skywalking + 3.2-2017 + + 4.0.0 + + client-redis + jar + \ No newline at end of file diff --git a/apm-collector/apm-collector-client/client-redis/src/main/java/org/skywalking/apm/collector/client/redis/RedisClient.java b/apm-collector/apm-collector-client/client-redis/src/main/java/org/skywalking/apm/collector/client/redis/RedisClient.java new file mode 100644 index 0000000000000000000000000000000000000000..539d10fcacaabd13ef5c05c6db3a30d0de04ac49 --- /dev/null +++ b/apm-collector/apm-collector-client/client-redis/src/main/java/org/skywalking/apm/collector/client/redis/RedisClient.java @@ -0,0 +1,38 @@ +package org.skywalking.apm.collector.client.redis; + +import org.skywalking.apm.collector.core.client.Client; +import org.skywalking.apm.collector.core.client.ClientException; + +/** + * @author pengys5 + */ +public class RedisClient implements Client { + + @Override public void initialize() throws ClientException { + + } + + @Override public void insert(String path) throws ClientException { + + } + + @Override public void update() { + + } + + @Override public String select(String path) throws ClientException { + return null; + } + + @Override public void delete() { + + } + + @Override public boolean exist(String path) throws ClientException { + return false; + } + + @Override public void listen(String path) throws ClientException { + + } +} diff --git a/apm-collector/apm-collector-client/client-zookeeper/pom.xml b/apm-collector/apm-collector-client/client-zookeeper/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..50a4f61f1d0a52067e8feb8782be642bf58822a1 --- /dev/null +++ b/apm-collector/apm-collector-client/client-zookeeper/pom.xml @@ -0,0 +1,36 @@ + + + + apm-collector-client + org.skywalking + 3.2-2017 + + 4.0.0 + + client-zookeeper + jar + + + + org.apache.zookeeper + zookeeper + 3.4.10 + + + slf4j-api + org.slf4j + + + log4j + log4j + + + slf4j-log4j12 + org.slf4j + + + + + \ No newline at end of file diff --git a/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperClient.java b/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperClient.java new file mode 100644 index 0000000000000000000000000000000000000000..52798b26710f7332a6ff354b3d05111ab45deb2a --- /dev/null +++ b/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperClient.java @@ -0,0 +1,78 @@ +package org.skywalking.apm.collector.client.zookeeper; + +import java.io.IOException; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; +import org.skywalking.apm.collector.core.client.Client; +import org.skywalking.apm.collector.core.util.ObjectUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public class ZookeeperClient implements Client { + + private final Logger logger = LoggerFactory.getLogger(ZookeeperClient.class); + + private ZooKeeper zk; + + @Override public void initialize() throws ZookeeperClientException { + try { + zk = new ZooKeeper(ZookeeperConfig.hostPort, ZookeeperConfig.sessionTimeout, new ZookeeperDataListener(this)); + } catch (IOException e) { + throw new ZookeeperClientException(e.getMessage(), e); + } + } + + @Override public void insert(String path) throws ZookeeperClientException { + logger.info("add the zookeeper path \"{}\"", path); + try { + zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } catch (KeeperException | InterruptedException e) { + throw new ZookeeperClientException(e.getMessage(), e); + } + } + + @Override public void update() { + + } + + @Override public String select(String path) throws ZookeeperClientException { + logger.info("get the zookeeper data from path \"{}\"", path); + try { + return zk.getData(path, false, null).toString(); + } catch (KeeperException | InterruptedException e) { + throw new ZookeeperClientException(e.getMessage(), e); + } + } + + @Override public void delete() { + + } + + @Override public boolean exist(String path) throws ZookeeperClientException { + logger.info("assess the zookeeper path \"{}\" exist", path); + try { + Stat stat = zk.exists(path, false); + if (ObjectUtils.isEmpty(stat)) { + return false; + } else { + return true; + } + } catch (KeeperException | InterruptedException e) { + throw new ZookeeperClientException(e.getMessage(), e); + } + } + + @Override public void listen(String path) throws ZookeeperClientException { + try { + zk.exists(path, true); + } catch (KeeperException | InterruptedException e) { + throw new ZookeeperClientException(e.getMessage(), e); + } + } +} diff --git a/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperClientException.java b/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperClientException.java new file mode 100644 index 0000000000000000000000000000000000000000..d6772e66a9534f572a9a7357c7c5f33ad9da905c --- /dev/null +++ b/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperClientException.java @@ -0,0 +1,16 @@ +package org.skywalking.apm.collector.client.zookeeper; + +import org.skywalking.apm.collector.core.client.ClientException; + +/** + * @author pengys5 + */ +public class ZookeeperClientException extends ClientException { + public ZookeeperClientException(String message) { + super(message); + } + + public ZookeeperClientException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperConfig.java b/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..11b27d0325b00dcf128ae7094ab5751fa11a2514 --- /dev/null +++ b/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperConfig.java @@ -0,0 +1,9 @@ +package org.skywalking.apm.collector.client.zookeeper; + +/** + * @author pengys5 + */ +public class ZookeeperConfig { + public static String hostPort; + public static int sessionTimeout; +} diff --git a/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperDataListener.java b/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperDataListener.java new file mode 100644 index 0000000000000000000000000000000000000000..4b5e8f5dca1e11f8df2a4ba0a8de3d968517fca7 --- /dev/null +++ b/apm-collector/apm-collector-client/client-zookeeper/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperDataListener.java @@ -0,0 +1,59 @@ +package org.skywalking.apm.collector.client.zookeeper; + +import java.util.LinkedList; +import java.util.List; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.skywalking.apm.collector.core.client.Client; +import org.skywalking.apm.collector.core.client.ClientException; +import org.skywalking.apm.collector.core.client.DataListener; +import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer; +import org.skywalking.apm.collector.core.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public class ZookeeperDataListener implements DataListener, Watcher { + + private final Logger logger = LoggerFactory.getLogger(ZookeeperDataListener.class); + + private Client client; + + public ZookeeperDataListener(Client client) { + this.client = client; + } + + @Override public void process(WatchedEvent event) { + logger.debug("path {}", event.getPath()); + if (StringUtils.isEmpty(event.getPath())) { + return; + } + + try { + String data = client.select(event.getPath()); + logger.debug("data {}", data); + } catch (ClientException e) { + logger.error(e.getMessage(), e); + } + } + + @Override public void listen() throws ClientException { + for (String itemKey : items()) { + String[] catalogs = itemKey.split("\\."); + StringBuilder pathBuilder = new StringBuilder(); + for (String catalog : catalogs) { + pathBuilder.append("/").append(catalog); + } + client.listen(pathBuilder.toString()); + } + } + + @Override public List items() { + List items = new LinkedList<>(); + items.add(ClusterDataInitializer.FOR_AGENT_CATALOG); + items.add(ClusterDataInitializer.FOR_UI_CATALOG); + return items; + } +} diff --git a/apm-collector/apm-collector-client/pom.xml b/apm-collector/apm-collector-client/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..7942957cfc8a12c25309ebc33f4f71136268832e --- /dev/null +++ b/apm-collector/apm-collector-client/pom.xml @@ -0,0 +1,27 @@ + + + + apm-collector + org.skywalking + 3.2-2017 + + 4.0.0 + + apm-collector-client + pom + + client-zookeeper + client-redis + client-h2 + + + + + org.skywalking + apm-collector-core + ${project.version} + + + \ No newline at end of file diff --git a/apm-collector/apm-collector-commons/pom.xml b/apm-collector/apm-collector-cluster-new/cluster-redis/pom.xml similarity index 78% rename from apm-collector/apm-collector-commons/pom.xml rename to apm-collector/apm-collector-cluster-new/cluster-redis/pom.xml index a6cc374cc386b48841c4aad3f725ea9da2d0d2ca..0759fcf058a5d846899b12f3a9b7aa98311d4989 100644 --- a/apm-collector/apm-collector-commons/pom.xml +++ b/apm-collector/apm-collector-cluster-new/cluster-redis/pom.xml @@ -3,20 +3,20 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - apm-collector + apm-collector-cluster-new org.skywalking 3.2-2017 4.0.0 - apm-collector-commons + cluster-redis jar org.skywalking - apm-collector-cluster + client-redis ${project.version} - + \ No newline at end of file diff --git a/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisConfigParser.java b/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisConfigParser.java new file mode 100644 index 0000000000000000000000000000000000000000..4009ed918a3733a9398795d2d62030b6891f2cc9 --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisConfigParser.java @@ -0,0 +1,15 @@ +package org.skywalking.apm.collector.cluster.redis; + +import java.util.Map; +import org.skywalking.apm.collector.core.config.ConfigParseException; +import org.skywalking.apm.collector.core.module.ModuleConfigParser; + +/** + * @author pengys5 + */ +public class ClusterRedisConfigParser implements ModuleConfigParser { + + @Override public void parse(Map config) throws ConfigParseException { + + } +} diff --git a/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisDataInitializer.java b/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisDataInitializer.java new file mode 100644 index 0000000000000000000000000000000000000000..55e1280d323504b50ad2120842f5d4ef48a803c3 --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisDataInitializer.java @@ -0,0 +1,18 @@ +package org.skywalking.apm.collector.cluster.redis; + +import org.skywalking.apm.collector.core.client.Client; +import org.skywalking.apm.collector.core.client.ClientException; +import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer; + +/** + * @author pengys5 + */ +public class ClusterRedisDataInitializer extends ClusterDataInitializer { + @Override public void addItem(Client client, String itemKey) throws ClientException { + + } + + @Override public boolean existItem(Client client, String itemKey) throws ClientException { + return false; + } +} diff --git a/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleDefine.java b/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleDefine.java new file mode 100644 index 0000000000000000000000000000000000000000..4947649a1057e058dea29936bfb27e1a107aa18f --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleDefine.java @@ -0,0 +1,43 @@ +package org.skywalking.apm.collector.cluster.redis; + +import org.skywalking.apm.collector.client.redis.RedisClient; +import org.skywalking.apm.collector.core.client.Client; +import org.skywalking.apm.collector.core.cluster.ClusterModuleDefine; +import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter; +import org.skywalking.apm.collector.core.framework.DataInitializer; +import org.skywalking.apm.collector.core.module.ModuleConfigParser; +import org.skywalking.apm.collector.core.module.ModuleGroup; + +/** + * @author pengys5 + */ +public class ClusterRedisModuleDefine extends ClusterModuleDefine { + + @Override public ModuleGroup group() { + return ModuleGroup.Cluster; + } + + @Override public String name() { + return "redis"; + } + + @Override public boolean defaultModule() { + return false; + } + + @Override protected ModuleConfigParser configParser() { + return new ClusterRedisConfigParser(); + } + + @Override protected Client client() { + return new RedisClient(); + } + + @Override protected DataInitializer dataInitializer() { + return new ClusterRedisDataInitializer(); + } + + @Override protected ClusterModuleRegistrationWriter registrationWriter() { + return new ClusterRedisModuleRegistrationWriter(); + } +} diff --git a/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleRegistrationWriter.java b/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleRegistrationWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..bae3b505b0b9b9427db20f39129721da1c452145 --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/java/org/skywalking/apm/collector/cluster/redis/ClusterRedisModuleRegistrationWriter.java @@ -0,0 +1,13 @@ +package org.skywalking.apm.collector.cluster.redis; + +import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter; + +/** + * @author pengys5 + */ +public class ClusterRedisModuleRegistrationWriter implements ClusterModuleRegistrationWriter { + + @Override public void write(String key, String value) { + + } +} diff --git a/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/resources/META-INF/defines/module.define b/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/resources/META-INF/defines/module.define new file mode 100644 index 0000000000000000000000000000000000000000..108425a44deaaad0cf61ee7ca2e821e34795d29e --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-redis/src/main/resources/META-INF/defines/module.define @@ -0,0 +1 @@ +org.skywalking.apm.collector.cluster.redis.ClusterRedisModuleDefine \ No newline at end of file diff --git a/apm-collector/apm-collector-cluster-new/cluster-standalone/pom.xml b/apm-collector/apm-collector-cluster-new/cluster-standalone/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..f7e08fdee94bb0fd148b659e2929cdc0fecc32bf --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-standalone/pom.xml @@ -0,0 +1,22 @@ + + + + apm-collector-cluster-new + org.skywalking + 3.2-2017 + + 4.0.0 + + cluster-standalone + jar + + + + org.skywalking + client-h2 + ${project.version} + + + \ No newline at end of file diff --git a/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneConfigParser.java b/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneConfigParser.java new file mode 100644 index 0000000000000000000000000000000000000000..d12844f29f61f85eae773787fbed1a77676c9537 --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneConfigParser.java @@ -0,0 +1,14 @@ +package org.skywalking.apm.collector.cluster.standalone; + +import java.util.Map; +import org.skywalking.apm.collector.core.config.ConfigParseException; +import org.skywalking.apm.collector.core.module.ModuleConfigParser; + +/** + * @author pengys5 + */ +public class ClusterStandaloneConfigParser implements ModuleConfigParser { + @Override public void parse(Map config) throws ConfigParseException { + + } +} diff --git a/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneDataInitializer.java b/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneDataInitializer.java new file mode 100644 index 0000000000000000000000000000000000000000..85f8bb21d787efc3287fd63a5ba68749ce52a179 --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneDataInitializer.java @@ -0,0 +1,19 @@ +package org.skywalking.apm.collector.cluster.standalone; + +import org.skywalking.apm.collector.core.client.Client; +import org.skywalking.apm.collector.core.client.ClientException; +import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer; + +/** + * @author pengys5 + */ +public class ClusterStandaloneDataInitializer extends ClusterDataInitializer { + + @Override public void addItem(Client client, String itemKey) throws ClientException { + + } + + @Override public boolean existItem(Client client, String itemKey) throws ClientException { + return false; + } +} diff --git a/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleDefine.java b/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleDefine.java new file mode 100644 index 0000000000000000000000000000000000000000..cfcb129f4a9101aea52c8d9d7c5add9c1dcf0871 --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleDefine.java @@ -0,0 +1,43 @@ +package org.skywalking.apm.collector.cluster.standalone; + +import org.skywalking.apm.collector.client.h2.H2Client; +import org.skywalking.apm.collector.core.client.Client; +import org.skywalking.apm.collector.core.cluster.ClusterModuleDefine; +import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter; +import org.skywalking.apm.collector.core.framework.DataInitializer; +import org.skywalking.apm.collector.core.module.ModuleConfigParser; +import org.skywalking.apm.collector.core.module.ModuleGroup; + +/** + * @author pengys5 + */ +public class ClusterStandaloneModuleDefine extends ClusterModuleDefine { + + @Override public ModuleGroup group() { + return ModuleGroup.Cluster; + } + + @Override public String name() { + return "standalone"; + } + + @Override public boolean defaultModule() { + return true; + } + + @Override protected ModuleConfigParser configParser() { + return new ClusterStandaloneConfigParser(); + } + + @Override protected Client client() { + return new H2Client(); + } + + @Override protected DataInitializer dataInitializer() { + return new ClusterStandaloneDataInitializer(); + } + + @Override protected ClusterModuleRegistrationWriter registrationWriter() { + return new ClusterStandaloneModuleRegistrationWriter(); + } +} diff --git a/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleRegistrationWriter.java b/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleRegistrationWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..4e535ddb5314dc769084528f37981d6758970dca --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleRegistrationWriter.java @@ -0,0 +1,13 @@ +package org.skywalking.apm.collector.cluster.standalone; + +import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter; + +/** + * @author pengys5 + */ +public class ClusterStandaloneModuleRegistrationWriter implements ClusterModuleRegistrationWriter { + + @Override public void write(String key, String value) { + + } +} diff --git a/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/resources/META-INF/defines/module.define b/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/resources/META-INF/defines/module.define new file mode 100644 index 0000000000000000000000000000000000000000..6cf83ed9b55edeb1169331cc66873d6e0a5b36dd --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-standalone/src/main/resources/META-INF/defines/module.define @@ -0,0 +1 @@ +org.skywalking.apm.collector.cluster.standalone.ClusterStandaloneModuleDefine \ No newline at end of file diff --git a/apm-collector/apm-collector-cluster-new/cluster-zookeeper/pom.xml b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..342d5ccf30ec0e40b43936b09a1945dc88884931 --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/pom.xml @@ -0,0 +1,22 @@ + + + + apm-collector-cluster-new + org.skywalking + 3.2-2017 + + 4.0.0 + + cluster-zookeeper + jar + + + + org.skywalking + client-zookeeper + ${project.version} + + + \ No newline at end of file diff --git a/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKConfigParser.java b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKConfigParser.java new file mode 100644 index 0000000000000000000000000000000000000000..dfbf1c63f19b8bc6938107d20c7974d35023dcba --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKConfigParser.java @@ -0,0 +1,29 @@ +package org.skywalking.apm.collector.cluster.zookeeper; + +import java.util.Map; +import org.skywalking.apm.collector.client.zookeeper.ZookeeperConfig; +import org.skywalking.apm.collector.core.config.ConfigParseException; +import org.skywalking.apm.collector.core.module.ModuleConfigParser; +import org.skywalking.apm.collector.core.util.StringUtils; + +/** + * @author pengys5 + */ +public class ClusterZKConfigParser implements ModuleConfigParser { + + private final String HOST_PORT = "hostPort"; + private final String SESSION_TIMEOUT = "sessionTimeout"; + + @Override public void parse(Map config) throws ConfigParseException { + if (StringUtils.isEmpty(config.get(HOST_PORT))) { + throw new ConfigParseException(""); + } + ZookeeperConfig.hostPort = (String)config.get(HOST_PORT); + + if (StringUtils.isEmpty(config.get(SESSION_TIMEOUT))) { + ZookeeperConfig.sessionTimeout = 1000; + } else { + ZookeeperConfig.sessionTimeout = (Integer)config.get(SESSION_TIMEOUT); + } + } +} diff --git a/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataInitializer.java b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataInitializer.java new file mode 100644 index 0000000000000000000000000000000000000000..4860c5c0325538e03dd93874a3462cc02b368dd6 --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataInitializer.java @@ -0,0 +1,37 @@ +package org.skywalking.apm.collector.cluster.zookeeper; + +import org.skywalking.apm.collector.core.client.Client; +import org.skywalking.apm.collector.core.client.ClientException; +import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public class ClusterZKDataInitializer extends ClusterDataInitializer { + + private final Logger logger = LoggerFactory.getLogger(ClusterZKDataInitializer.class); + + @Override public void addItem(Client client, String itemKey) throws ClientException { + logger.info("add the zookeeper item key \"{}\" exist", itemKey); + String[] catalogs = itemKey.split("\\."); + StringBuilder pathBuilder = new StringBuilder(); + for (String catalog : catalogs) { + pathBuilder.append("/").append(catalog); + if (!client.exist(pathBuilder.toString())) { + client.insert(pathBuilder.toString()); + } + } + } + + @Override public boolean existItem(Client client, String itemKey) throws ClientException { + logger.info("assess the zookeeper item key \"{}\" exist", itemKey); + String[] catalogs = itemKey.split("\\."); + StringBuilder pathBuilder = new StringBuilder(); + for (String catalog : catalogs) { + pathBuilder.append("/").append(catalog); + } + return client.exist(pathBuilder.toString()); + } +} diff --git a/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleDefine.java b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleDefine.java new file mode 100644 index 0000000000000000000000000000000000000000..0485bb10f0fa09a6709cf688cbcdbf056c455bd6 --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleDefine.java @@ -0,0 +1,43 @@ +package org.skywalking.apm.collector.cluster.zookeeper; + +import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient; +import org.skywalking.apm.collector.core.client.Client; +import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer; +import org.skywalking.apm.collector.core.cluster.ClusterModuleDefine; +import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter; +import org.skywalking.apm.collector.core.module.ModuleConfigParser; +import org.skywalking.apm.collector.core.module.ModuleGroup; + +/** + * @author pengys5 + */ +public class ClusterZKModuleDefine extends ClusterModuleDefine { + + @Override public ModuleGroup group() { + return ModuleGroup.Cluster; + } + + @Override public String name() { + return "zookeeper"; + } + + @Override public boolean defaultModule() { + return false; + } + + @Override public ModuleConfigParser configParser() { + return new ClusterZKConfigParser(); + } + + @Override public Client client() { + return new ZookeeperClient(); + } + + @Override public ClusterDataInitializer dataInitializer() { + return new ClusterZKDataInitializer(); + } + + @Override protected ClusterModuleRegistrationWriter registrationWriter() { + return new ClusterZKModuleRegistrationWriter(); + } +} diff --git a/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleRegistrationReader.java b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleRegistrationReader.java new file mode 100644 index 0000000000000000000000000000000000000000..44dec48611a14b62f2fd703855776adc1c482248 --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleRegistrationReader.java @@ -0,0 +1,13 @@ +package org.skywalking.apm.collector.cluster.zookeeper; + +import java.util.List; +import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader; + +/** + * @author pengys5 + */ +public class ClusterZKModuleRegistrationReader implements ClusterModuleRegistrationReader { + @Override public List read(String key) { + return null; + } +} diff --git a/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleRegistrationWriter.java b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleRegistrationWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..8ca0b0cb857729abc02824db0452bf2d71f1fbd3 --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleRegistrationWriter.java @@ -0,0 +1,13 @@ +package org.skywalking.apm.collector.cluster.zookeeper; + +import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter; + +/** + * @author pengys5 + */ +public class ClusterZKModuleRegistrationWriter implements ClusterModuleRegistrationWriter { + + @Override public void write(String key, String value) { + + } +} diff --git a/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/resources/META-INF/defines/module.define b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/resources/META-INF/defines/module.define new file mode 100644 index 0000000000000000000000000000000000000000..54bd972ef54e57a4d86666998b1672041255a372 --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/main/resources/META-INF/defines/module.define @@ -0,0 +1 @@ +org.skywalking.apm.collector.cluster.zookeeper.ClusterZKModuleDefine \ No newline at end of file diff --git a/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/test/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleDefineTestCase.java b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/test/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleDefineTestCase.java new file mode 100644 index 0000000000000000000000000000000000000000..4e7d92da116759391b08315fba6aae4456c48a96 --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/test/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKModuleDefineTestCase.java @@ -0,0 +1,32 @@ +package org.skywalking.apm.collector.cluster.zookeeper; + +import java.io.FileNotFoundException; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import org.skywalking.apm.collector.client.zookeeper.ZookeeperConfig; +import org.skywalking.apm.collector.core.cluster.ClusterModuleException; +import org.yaml.snakeyaml.Yaml; + +/** + * @author pengys5 + */ +public class ClusterZKModuleDefineTestCase { + + private Map config; + + @Before + public void before() throws FileNotFoundException { + Yaml yaml = new Yaml(); + config = (Map)yaml.load("hostPort: localhost:2181" + System.lineSeparator() + "sessionTimeout: 2000"); + } + + @Test + public void testInitialize() throws ClusterModuleException { + ClusterZKModuleDefine define = new ClusterZKModuleDefine(); + define.initialize(config); + + System.out.println(ZookeeperConfig.hostPort); + System.out.println(ZookeeperConfig.sessionTimeout); + } +} diff --git a/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/test/java/org/skywalking/apm/collector/cluster/zookeeper/ZookeeperTestCase.java b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/test/java/org/skywalking/apm/collector/cluster/zookeeper/ZookeeperTestCase.java new file mode 100644 index 0000000000000000000000000000000000000000..dc5fb00e5bc15781377006fc0dc34898844c5f75 --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/test/java/org/skywalking/apm/collector/cluster/zookeeper/ZookeeperTestCase.java @@ -0,0 +1,55 @@ +package org.skywalking.apm.collector.cluster.zookeeper; + +import java.io.IOException; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.junit.Test; + +/** + * @author pengys5 + */ +public class ZookeeperTestCase { + + @Test + public void test() throws IOException, KeeperException, InterruptedException { + String hostPort = "localhost:2181"; + String znode = "/collector/module"; + String filename = ""; + String exec[] = new String[5 - 3]; +// new ZookeeperExecutor(hostPort, znode, filename, exec).run(); + + ZooKeeper zk = new ZooKeeper(hostPort, 1000, new Watcher() { + @Override public void process(WatchedEvent event) { + String path = event.getPath(); + System.out.println("已经触发了" + event.getType() + "事件!"); + System.out.println("path: " + path); + } + }); + + zk.create("/testRootPath", "testRootData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); +// 创建一个子目录节点 + zk.create("/testRootPath/testChildPathOne", "testChildDataOne".getBytes(), + ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); + System.out.println(new String(zk.getData("/testRootPath",false,null))); + // 取出子目录节点列表 + System.out.println(zk.getChildren("/testRootPath",true)); + // 修改子目录节点数据 + zk.setData("/testRootPath/testChildPathOne","modifyChildDataOne".getBytes(),-1); + System.out.println("目录节点状态:["+zk.exists("/testRootPath",true)+"]"); + // 创建另外一个子目录节点 + zk.create("/testRootPath/testChildPathTwo", "testChildDataTwo".getBytes(), + ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); +// System.out.println(new String(zk.getData("/testRootPath/testChildPathTwo",true,null))); + // 删除子目录节点 + zk.delete("/testRootPath/testChildPathTwo",-1); + zk.delete("/testRootPath/testChildPathOne",-1); + // 删除父目录节点 + zk.delete("/testRootPath",-1); + zk.close(); + } +} diff --git a/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/test/resources/application.yml b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/test/resources/application.yml new file mode 100644 index 0000000000000000000000000000000000000000..5608ba0c6d0b1e3d130687c8ef72c91ccc89e3f9 --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/cluster-zookeeper/src/test/resources/application.yml @@ -0,0 +1,3 @@ + hostPort: localhost-zk + sessionTimeout: 2000 + diff --git a/apm-collector/apm-collector-cluster-new/pom.xml b/apm-collector/apm-collector-cluster-new/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..e08b3e70c539d5d533327b7cf252aaa7df23dfe9 --- /dev/null +++ b/apm-collector/apm-collector-cluster-new/pom.xml @@ -0,0 +1,27 @@ + + + + apm-collector + org.skywalking + 3.2-2017 + + 4.0.0 + + apm-collector-cluster-new + pom + + cluster-zookeeper + cluster-redis + cluster-standalone + + + + + org.skywalking + apm-collector-core + ${project.version} + + + \ No newline at end of file diff --git a/apm-collector/apm-collector-commons/src/main/java/org/skywalking/apm/collector/commons/config/SeedNodesFormatter.java b/apm-collector/apm-collector-commons/src/main/java/org/skywalking/apm/collector/commons/config/SeedNodesFormatter.java deleted file mode 100644 index 93b27edeb9f9cbd25ffcb9906f47102cef6e9847..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-commons/src/main/java/org/skywalking/apm/collector/commons/config/SeedNodesFormatter.java +++ /dev/null @@ -1,12 +0,0 @@ -package org.skywalking.apm.collector.commons.config; - -/** - * @author pengys5 - */ -public enum SeedNodesFormatter { - INSTANCE; - - public String formatter(String seedNodes) { - return null; - } -} diff --git a/apm-collector/apm-collector-commons/src/main/java/org/skywalking/apm/collector/commons/role/TraceSegmentReceiverRole.java b/apm-collector/apm-collector-commons/src/main/java/org/skywalking/apm/collector/commons/role/TraceSegmentReceiverRole.java deleted file mode 100644 index a21b71cde5adcbfc8bc957749fb89c5e73657c1a..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-commons/src/main/java/org/skywalking/apm/collector/commons/role/TraceSegmentReceiverRole.java +++ /dev/null @@ -1,22 +0,0 @@ -package org.skywalking.apm.collector.commons.role; - -import org.skywalking.apm.collector.actor.Role; -import org.skywalking.apm.collector.actor.selector.RollingSelector; -import org.skywalking.apm.collector.actor.selector.WorkerSelector; - -/** - * @author pengys5 - */ -public enum TraceSegmentReceiverRole implements Role { - INSTANCE; - - @Override - public String roleName() { - return "TraceSegmentReceiver"; - } - - @Override - public WorkerSelector workerSelector() { - return new RollingSelector(); - } -} diff --git a/apm-collector/apm-collector-commons/src/main/resources/application.conf b/apm-collector/apm-collector-commons/src/main/resources/application.conf deleted file mode 100644 index c37784cd82b0efca07f527422bc08ba145ff87c3..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-commons/src/main/resources/application.conf +++ /dev/null @@ -1,34 +0,0 @@ -akka { - actor { - provider = "akka.cluster.ClusterActorRefProvider" - - serializers { - java = "akka.serialization.JavaSerializer" - proto = "akka.remote.serialization.ProtobufSerializer" -// TraceSegment = "org.skywalking.apm.collector.worker.TraceSegmentSerializer" -// json = "org.skywalking.apm.collector.commons.serializer.JsonSerializer" - } - - serialization-bindings { - "java.lang.String" = java - "com.google.protobuf.Message" = proto -// "TraceSegment" = TraceSegment -// "com.google.gson.JsonObject" = json - } - - warn-about-java-serializer-usage = on - } - - remote { - log-remote-lifecycle-events = off - - netty.tcp { - } - } - - cluster { - auto-down-unreachable-after = off - metrics.enabled = off - roles = ["WorkersListener"] - } -} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleContext.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleContext.java new file mode 100644 index 0000000000000000000000000000000000000000..0aa78b4f389e0774d830cee174edbdff5e78a564 --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleContext.java @@ -0,0 +1,16 @@ +package org.skywalking.apm.collector.core.cluster; + +/** + * @author pengys5 + */ +public class ClusterModuleContext { + private ClusterModuleRegistrationWriter writer; + + public ClusterModuleRegistrationWriter getWriter() { + return writer; + } + + public void setWriter(ClusterModuleRegistrationWriter writer) { + this.writer = writer; + } +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleDefine.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleDefine.java index 2ac18f2f2923c010255f1c7968ab6370aea32577..f09dff349d136d213c585786ea6530e4e0bf4f84 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleDefine.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleDefine.java @@ -5,6 +5,7 @@ import org.skywalking.apm.collector.core.client.Client; import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.config.ConfigParseException; import org.skywalking.apm.collector.core.module.ModuleDefine; +import org.skywalking.apm.collector.core.module.ModuleRegistration; import org.skywalking.apm.collector.core.server.Server; /** @@ -26,4 +27,10 @@ public abstract class ClusterModuleDefine extends ModuleDefine { @Override public final Server server() { throw new UnsupportedOperationException(""); } + + @Override protected final ModuleRegistration registration() { + throw new UnsupportedOperationException("Cluster module do not need module registration."); + } + + protected abstract ClusterModuleRegistrationWriter registrationWriter(); } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/Discovery.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleDiscovery.java similarity index 71% rename from apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/Discovery.java rename to apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleDiscovery.java index 0957d512545a813fadb76a7162484f4a730d5d38..bddfaf2216713f7a9c45be18b2a87e349444ba57 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/Discovery.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleDiscovery.java @@ -3,7 +3,7 @@ package org.skywalking.apm.collector.core.cluster; /** * @author pengys5 */ -public interface Discovery { +public interface ClusterModuleDiscovery { void discover(); } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleInstaller.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleInstaller.java new file mode 100644 index 0000000000000000000000000000000000000000..fd56eed9eac4c72cdd2b272404f6bc7e739b10da --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleInstaller.java @@ -0,0 +1,38 @@ +package org.skywalking.apm.collector.core.cluster; + +import java.util.Iterator; +import java.util.Map; +import org.skywalking.apm.collector.core.framework.DefineException; +import org.skywalking.apm.collector.core.module.ModuleDefine; +import org.skywalking.apm.collector.core.module.ModuleInstaller; +import org.skywalking.apm.collector.core.util.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public class ClusterModuleInstaller implements ModuleInstaller { + + private final Logger logger = LoggerFactory.getLogger(ClusterModuleInstaller.class); + + @Override public void install(Map moduleConfig, + Map moduleDefineMap) throws DefineException { + logger.info("beginning cluster module install"); + if (CollectionUtils.isEmpty(moduleConfig)) { + logger.info("could not configure cluster module, use the default"); + Iterator> moduleDefineEntry = moduleDefineMap.entrySet().iterator(); + while (moduleDefineEntry.hasNext()) { + ModuleDefine moduleDefine = moduleDefineEntry.next().getValue(); + if (moduleDefine.defaultModule()) { + logger.info("module {} initialize", moduleDefine.getClass().getName()); + moduleDefine.initialize(null); + } + } + } else { + Map.Entry clusterConfigEntry = moduleConfig.entrySet().iterator().next(); + ModuleDefine moduleDefine = moduleDefineMap.get(clusterConfigEntry.getKey()); + moduleDefine.initialize(clusterConfigEntry.getValue()); + } + } +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleRegistrationReader.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleRegistrationReader.java new file mode 100644 index 0000000000000000000000000000000000000000..d9ad04e02ab72def6bc073479a2e599ce4104d8c --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleRegistrationReader.java @@ -0,0 +1,10 @@ +package org.skywalking.apm.collector.core.cluster; + +import java.util.List; + +/** + * @author pengys5 + */ +public interface ClusterModuleRegistrationReader { + List read(String key); +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleRegistrationWriter.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleRegistrationWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..449a904d54a62beb6f85fcc346c6fdb9a54ae1f4 --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleRegistrationWriter.java @@ -0,0 +1,8 @@ +package org.skywalking.apm.collector.core.cluster; + +/** + * @author pengys5 + */ +public interface ClusterModuleRegistrationWriter { + void write(String key, String value); +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/Registration.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/Registration.java deleted file mode 100644 index c37bb83bb20639a85905f969f36b137cda06af53..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/Registration.java +++ /dev/null @@ -1,9 +0,0 @@ -package org.skywalking.apm.collector.core.cluster; - -/** - * @author pengys5 - */ -public interface Registration { - - void register(); -} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/config/ConfigLoader.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/config/ConfigLoader.java index 06e1cff84b3678d8b70132921c4431107e93a65e..b4fd431cfada267e8473b445e4b1d107b5f4e45a 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/config/ConfigLoader.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/config/ConfigLoader.java @@ -5,5 +5,5 @@ import org.skywalking.apm.collector.core.framework.Loader; /** * @author pengys5 */ -public interface ConfigLoader extends Loader { +public interface ConfigLoader extends Loader { } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/CollectorStarter.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/CollectorStarter.java new file mode 100644 index 0000000000000000000000000000000000000000..054e812ee48e80fea7008f1aa3fbdb24f1012628 --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/CollectorStarter.java @@ -0,0 +1,39 @@ +package org.skywalking.apm.collector.core.framework; + +import java.util.Map; +import org.skywalking.apm.collector.core.config.ConfigException; +import org.skywalking.apm.collector.core.module.ModuleConfigLoader; +import org.skywalking.apm.collector.core.module.ModuleDefine; +import org.skywalking.apm.collector.core.module.ModuleDefineLoader; +import org.skywalking.apm.collector.core.module.ModuleGroup; +import org.skywalking.apm.collector.core.module.ModuleInstallerAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public class CollectorStarter implements Starter { + + private final Logger logger = LoggerFactory.getLogger(CollectorStarter.class); + + @Override public void start() throws ConfigException, DefineException { + ModuleConfigLoader configLoader = new ModuleConfigLoader(); + Map configuration = configLoader.load(); + + ModuleDefineLoader defineLoader = new ModuleDefineLoader(); + Map> moduleDefineMap = defineLoader.load(); + + ModuleInstallerAdapter moduleInstallerAdapter = new ModuleInstallerAdapter(ModuleGroup.Cluster); + moduleInstallerAdapter.install(configuration.get(ModuleGroup.Cluster.name().toLowerCase()), moduleDefineMap.get(ModuleGroup.Cluster.name().toLowerCase())); + + ModuleGroup[] moduleGroups = ModuleGroup.values(); + for (ModuleGroup moduleGroup : moduleGroups) { + if (!ModuleGroup.Cluster.equals(moduleGroup)) { + moduleInstallerAdapter = new ModuleInstallerAdapter(moduleGroup); + logger.info("module group {}, configuration {}", moduleGroup.name().toLowerCase(), configuration.get(moduleGroup.name().toLowerCase())); + moduleInstallerAdapter.install(configuration.get(moduleGroup.name().toLowerCase()), moduleDefineMap.get(moduleGroup.name().toLowerCase())); + } + } + } +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Context.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Context.java new file mode 100644 index 0000000000000000000000000000000000000000..9f9b7dffc3a6074700c1301753b29b6f86c5df85 --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Context.java @@ -0,0 +1,8 @@ +package org.skywalking.apm.collector.core.framework; + +/** + * @author pengys5 + */ +public class Context { + +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Define.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Define.java index 591a383435a699b60051c7923fa90783505b6dd8..34ec6598d3e6e9acd8e590881749deb55d582922 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Define.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Define.java @@ -9,7 +9,5 @@ public interface Define { void initialize(Map config) throws DefineException; - String getName(); - - void setName(String name); + String name(); } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Loader.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Loader.java index 70957c8a415bc11a985e14b7f7167b332ff7cf6a..10c4be9343e92d5b625d3b1f8d979cbf47d788b9 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Loader.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Loader.java @@ -5,7 +5,6 @@ import org.skywalking.apm.collector.core.config.ConfigException; /** * @author pengys5 */ -public interface Loader { - - void load() throws ConfigException; +public interface Loader { + T load() throws ConfigException; } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Starter.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Starter.java index 8d7a4f8febefe55ceb92e56dec22732426ef2f52..dbf0a53ed985cd50506198f537d7218b6c077bb4 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Starter.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Starter.java @@ -1,8 +1,10 @@ package org.skywalking.apm.collector.core.framework; +import org.skywalking.apm.collector.core.CollectorException; + /** * @author pengys5 */ public interface Starter { - void start(); + void start() throws CollectorException; } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleConfigLoader.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleConfigLoader.java index f3d882e53b3c8da799f202e41bb5241be0d11bbf..55c8a0e78f7734d9737eea8f18e8f553073d4441 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleConfigLoader.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleConfigLoader.java @@ -11,28 +11,16 @@ import org.yaml.snakeyaml.Yaml; /** * @author pengys5 */ -public class ModuleConfigLoader implements ConfigLoader { +public class ModuleConfigLoader implements ConfigLoader> { private final Logger logger = LoggerFactory.getLogger(ModuleConfigLoader.class); - @Override public void load() throws ModuleConfigLoaderException { + @Override public Map load() throws ModuleConfigLoaderException { Yaml yaml = new Yaml(); - ModuleInstaller installer = new ModuleInstaller(); - - Map configurations = null; try { - configurations = (Map)yaml.load(ResourceUtils.read("application.yml")); + return (Map)yaml.load(ResourceUtils.read("application.yml")); } catch (FileNotFoundException e) { throw new ModuleConfigLoaderException(e.getMessage(), e); } - configurations.forEach((moduleName, moduleConfig) -> { - logger.info("module name \"{}\" from application.yml", moduleName); - try { - installer.install(moduleName, moduleConfig); - } catch (ModuleException e) { - logger.error("module \"{}\" install failure", moduleName); - logger.error(e.getMessage(), e); - } - }); } } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleContext.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleContext.java new file mode 100644 index 0000000000000000000000000000000000000000..00019e610e686000231d775137131f53502f2ef6 --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleContext.java @@ -0,0 +1,18 @@ +package org.skywalking.apm.collector.core.module; + +import org.skywalking.apm.collector.core.cluster.ClusterModuleContext; + +/** + * @author pengys5 + */ +public class ModuleContext { + private ClusterModuleContext clusterContext; + + public ClusterModuleContext getClusterContext() { + return clusterContext; + } + + public void setClusterContext(ClusterModuleContext clusterContext) { + this.clusterContext = clusterContext; + } +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefine.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefine.java index e6a661350839b6a45ce4ce92b0b6a5518b009a95..f881e866ea160b05a76226dd0f097404e49fc437 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefine.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefine.java @@ -10,19 +10,9 @@ import org.skywalking.apm.collector.core.server.Server; */ public abstract class ModuleDefine implements Define { - private String moduleName; - - @Override public final String getName() { - return moduleName; - } - - @Override public final void setName(String name) { - this.moduleName = name; - } - protected abstract ModuleGroup group(); - protected abstract boolean defaultModule(); + public abstract boolean defaultModule(); protected abstract ModuleConfigParser configParser(); @@ -31,4 +21,6 @@ public abstract class ModuleDefine implements Define { protected abstract Server server(); protected abstract DataInitializer dataInitializer(); + + protected abstract ModuleRegistration registration(); } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefineLoader.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefineLoader.java new file mode 100644 index 0000000000000000000000000000000000000000..031ab022d1929d568353694f21f4b88409869a16 --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefineLoader.java @@ -0,0 +1,35 @@ +package org.skywalking.apm.collector.core.module; + +import java.util.LinkedHashMap; +import java.util.Map; +import org.skywalking.apm.collector.core.config.ConfigException; +import org.skywalking.apm.collector.core.framework.Loader; +import org.skywalking.apm.collector.core.util.DefinitionLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public class ModuleDefineLoader implements Loader>> { + + private final Logger logger = LoggerFactory.getLogger(ModuleDefineLoader.class); + + @Override public Map> load() throws ConfigException { + Map> moduleDefineMap = new LinkedHashMap<>(); + + ModuleDefinitionFile definitionFile = new ModuleDefinitionFile(); + logger.info("definition file name: {}", definitionFile.fileName()); + DefinitionLoader definitionLoader = DefinitionLoader.load(ModuleDefine.class, definitionFile); + for (ModuleDefine moduleDefine : definitionLoader) { + logger.info("loaded module class: {}", moduleDefine.getClass().getName()); + + String groupName = moduleDefine.group().name().toLowerCase(); + if (!moduleDefineMap.containsKey(groupName)) { + moduleDefineMap.put(groupName, new LinkedHashMap<>()); + } + moduleDefineMap.get(groupName).put(moduleDefine.name().toLowerCase(), moduleDefine); + } + return moduleDefineMap; + } +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleGroup.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleGroup.java index 3732640eab58317a6fca98fa1787008f680a60b7..bcf3ab2505c56addfb4d037f90a753d438b1cff8 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleGroup.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleGroup.java @@ -4,5 +4,5 @@ package org.skywalking.apm.collector.core.module; * @author pengys5 */ public enum ModuleGroup { - Cluster, Worker, Queue + Cluster, Worker } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleInstaller.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleInstaller.java index 61b1dc866271a912b92ca5b8401e3c5228010532..3a33fb12b738e4621ba0356741367bfa59214da6 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleInstaller.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleInstaller.java @@ -1,46 +1,11 @@ package org.skywalking.apm.collector.core.module; -import java.util.LinkedHashMap; import java.util.Map; import org.skywalking.apm.collector.core.framework.DefineException; -import org.skywalking.apm.collector.core.util.DefinitionLoader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * @author pengys5 */ -public class ModuleInstaller { - - private final Logger logger = LoggerFactory.getLogger(ModuleInstaller.class); - - private final Map moduleDefineMap; - - protected ModuleInstaller() { - moduleDefineMap = new LinkedHashMap<>(); - ModuleDefinitionFile definitionFile = new ModuleDefinitionFile(); - logger.info("definition file name: {}", definitionFile.fileName()); - DefinitionLoader definitionLoader = DefinitionLoader.load(ModuleDefine.class, definitionFile); - for (ModuleDefine moduleDefine : definitionLoader) { - logger.info("loaded module class: {}", moduleDefine.getClass().getName()); - moduleDefineMap.put(moduleDefine.getName(), moduleDefine); - } - } - - public void install(String moduleName, Map moduleConfig) throws ModuleException { - Map module = (LinkedHashMap)moduleConfig; - module.entrySet().forEach(subModuleConfig -> { - String subMoudleName = moduleName + "." + subModuleConfig.getKey(); - logger.info("install sub module {}", subMoudleName); - try { - if (moduleDefineMap.containsKey(subMoudleName)) { - moduleDefineMap.get(subMoudleName).initialize(subModuleConfig.getValue()); - } else { - logger.error("could not found the module definition, module name: {}", subMoudleName); - } - } catch (DefineException e) { - logger.error(e.getMessage(), e); - } - }); - } +public interface ModuleInstaller { + void install(Map moduleConfig, Map moduleDefineMap) throws DefineException; } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleInstallerAdapter.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleInstallerAdapter.java new file mode 100644 index 0000000000000000000000000000000000000000..ec9389dfc882e4955eac1bb529ab367bc826d0df --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleInstallerAdapter.java @@ -0,0 +1,27 @@ +package org.skywalking.apm.collector.core.module; + +import java.util.Map; +import org.skywalking.apm.collector.core.cluster.ClusterModuleInstaller; +import org.skywalking.apm.collector.core.framework.DefineException; +import org.skywalking.apm.collector.core.worker.WorkerModuleInstaller; + +/** + * @author pengys5 + */ +public class ModuleInstallerAdapter implements ModuleInstaller { + + private ModuleInstaller moduleInstaller; + + public ModuleInstallerAdapter(ModuleGroup moduleGroup) { + if (ModuleGroup.Cluster.equals(moduleGroup)) { + moduleInstaller = new ClusterModuleInstaller(); + } else if (ModuleGroup.Worker.equals(moduleGroup)) { + moduleInstaller = new WorkerModuleInstaller(); + } + } + + @Override public void install(Map moduleConfig, + Map moduleDefineMap) throws DefineException { + moduleInstaller.install(moduleConfig, moduleDefineMap); + } +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleRegistration.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleRegistration.java new file mode 100644 index 0000000000000000000000000000000000000000..2b52b414b6b2fe05e07e273394832e887402b6fa --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleRegistration.java @@ -0,0 +1,11 @@ +package org.skywalking.apm.collector.core.module; + +/** + * @author pengys5 + */ +public abstract class ModuleRegistration { + + protected static final String SEPARATOR = "|"; + + protected abstract String buildValue(); +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/CollectionUtils.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/CollectionUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..c959bc70c9a114b53f2bf6699d8b6490f9ac9747 --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/CollectionUtils.java @@ -0,0 +1,14 @@ +package org.skywalking.apm.collector.core.util; + +import com.sun.istack.internal.Nullable; +import java.util.Map; + +/** + * @author pengys5 + */ +public class CollectionUtils { + + public static boolean isEmpty(@Nullable Map map) { + return (map == null || map.size() == 0); + } +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/DefinitionLoader.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/DefinitionLoader.java index bd5164620cc665d2446532f0c4b775c5fb4e40de..d728dcbc166af813a67ef302297f0568d8df2f2e 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/DefinitionLoader.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/util/DefinitionLoader.java @@ -5,11 +5,10 @@ import java.io.IOException; import java.net.URL; import java.util.Enumeration; import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.Map; +import java.util.LinkedList; +import java.util.List; import java.util.Objects; import java.util.Properties; -import org.skywalking.apm.collector.core.framework.Define; import org.skywalking.apm.collector.core.framework.DefinitionFile; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +35,7 @@ public class DefinitionLoader implements Iterable { @Override public final Iterator iterator() { logger.info("load definition file: {}", definitionFile.get()); Properties properties = new Properties(); - Map definitionList = new LinkedHashMap<>(); + List definitionList = new LinkedList<>(); try { Enumeration urlEnumeration = this.getClass().getClassLoader().getResources(definitionFile.get()); while (urlEnumeration.hasMoreElements()) { @@ -46,16 +45,15 @@ public class DefinitionLoader implements Iterable { Enumeration defineItem = properties.propertyNames(); while (defineItem.hasMoreElements()) { - String key = (String)defineItem.nextElement(); - String fullNameClass = properties.getProperty(key); - definitionList.put(key, fullNameClass); + String fullNameClass = (String)defineItem.nextElement(); + definitionList.add(fullNameClass); } } } catch (IOException e) { e.printStackTrace(); } - Iterator> moduleDefineIterator = definitionList.entrySet().iterator(); + Iterator moduleDefineIterator = definitionList.iterator(); return new Iterator() { @Override public boolean hasNext() { @@ -63,16 +61,13 @@ public class DefinitionLoader implements Iterable { } @Override public D next() { - Map.Entry moduleDefineEntry = moduleDefineIterator.next(); - String definitionName = moduleDefineEntry.getKey(); - String definitionClass = moduleDefineEntry.getValue(); - logger.info("key: {}, definitionClass: {}", definitionName, definitionClass); + String definitionClass = moduleDefineIterator.next(); + logger.info("definitionClass: {}", definitionClass); try { Class c = Class.forName(definitionClass); - Define define = (Define)c.newInstance(); - define.setName(definitionName); - return (D)define; + return (D)c.newInstance(); } catch (Exception e) { + logger.error(e.getMessage(), e); } return null; } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/worker/WorkerModuleInstaller.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/worker/WorkerModuleInstaller.java new file mode 100644 index 0000000000000000000000000000000000000000..2978ac85e2a7852fa5acdcf1f2f40b6e2950447a --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/worker/WorkerModuleInstaller.java @@ -0,0 +1,24 @@ +package org.skywalking.apm.collector.core.worker; + +import java.util.Map; +import org.skywalking.apm.collector.core.framework.DefineException; +import org.skywalking.apm.collector.core.module.ModuleDefine; +import org.skywalking.apm.collector.core.module.ModuleInstaller; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public class WorkerModuleInstaller implements ModuleInstaller { + + private final Logger logger = LoggerFactory.getLogger(WorkerModuleInstaller.class); + + @Override public void install(Map moduleConfig, + Map moduleDefineMap) throws DefineException { + logger.info("beginning worker module install"); + Map.Entry workerConfigEntry = moduleConfig.entrySet().iterator().next(); + ModuleDefine moduleDefine = moduleDefineMap.get(workerConfigEntry.getKey()); + moduleDefine.initialize(workerConfigEntry.getValue()); + } +} diff --git a/apm-collector/apm-collector-core/src/test/java/org/skywalking/apm/collector/core/module/ClusterModuleForTest.java b/apm-collector/apm-collector-core/src/test/java/org/skywalking/apm/collector/core/module/ClusterModuleForTest.java deleted file mode 100644 index b6ece374610b66e319fe68347eb335193cd92d3c..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-core/src/test/java/org/skywalking/apm/collector/core/module/ClusterModuleForTest.java +++ /dev/null @@ -1,17 +0,0 @@ -package org.skywalking.apm.collector.core.module; - -import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author pengys5 - */ -public class ClusterModuleForTest implements Module { - - private final Logger logger = LoggerFactory.getLogger(ModuleInstaller.class); - - @Override public void install(Map configuration) { - logger.debug(configuration.toString()); - } -} diff --git a/apm-collector/apm-collector-core/src/test/java/org/skywalking/apm/collector/core/module/ModuleInstallerTestCase.java b/apm-collector/apm-collector-core/src/test/java/org/skywalking/apm/collector/core/module/ModuleInstallerTestCase.java deleted file mode 100644 index 9032bb1451f2dae3a5154e0030ac723c80ce3b72..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-core/src/test/java/org/skywalking/apm/collector/core/module/ModuleInstallerTestCase.java +++ /dev/null @@ -1,19 +0,0 @@ -package org.skywalking.apm.collector.core.module; - -import org.junit.Before; -import org.junit.Test; - -/** - * @author pengys5 - */ -public class ModuleInstallerTestCase { - - @Before - public void init() { - } - - @Test - public void testInstall() { - ModuleInstaller installer = new ModuleInstaller(); - } -} diff --git a/apm-collector/apm-collector-queue/datacarrier-queue/pom.xml b/apm-collector/apm-collector-queue/datacarrier-queue/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..f6bc40abb23951af049929bde7747d87636a28eb --- /dev/null +++ b/apm-collector/apm-collector-queue/datacarrier-queue/pom.xml @@ -0,0 +1,14 @@ + + + + apm-collector-queue + org.skywalking + 3.2-2017 + + 4.0.0 + + datacarrier-queue + jar + \ No newline at end of file diff --git a/apm-collector/apm-collector-queue/disruptor-queue/pom.xml b/apm-collector/apm-collector-queue/disruptor-queue/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..e55ae5df0cdd8960a36a02da11a3bd1f3d2cc9b2 --- /dev/null +++ b/apm-collector/apm-collector-queue/disruptor-queue/pom.xml @@ -0,0 +1,22 @@ + + + + apm-collector-queue + org.skywalking + 3.2-2017 + + 4.0.0 + + disruptor-queue + jar + + + + com.lmax + disruptor + 3.3.6 + + + \ No newline at end of file diff --git a/apm-collector/apm-collector-queue/pom.xml b/apm-collector/apm-collector-queue/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..e403002c9653883a93deb525ea8d95ca1d867fe2 --- /dev/null +++ b/apm-collector/apm-collector-queue/pom.xml @@ -0,0 +1,26 @@ + + + + apm-collector + org.skywalking + 3.2-2017 + + 4.0.0 + + apm-collector-queue + pom + + disruptor-queue + datacarrier-queue + + + + + org.skywalking + apm-collector-core + ${project.version} + + + \ No newline at end of file diff --git a/apm-collector/apm-collector-server/google-rpc-server/pom.xml b/apm-collector/apm-collector-server/google-rpc-server/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..3f3faff1ff8816687707d2498b9548c2811f7346 --- /dev/null +++ b/apm-collector/apm-collector-server/google-rpc-server/pom.xml @@ -0,0 +1,22 @@ + + + + apm-collector-server + org.skywalking + 3.2-2017 + + 4.0.0 + + google-rpc-server + jar + + + + org.skywalking + apm-network + ${project.version} + + + \ No newline at end of file diff --git a/apm-collector/apm-collector-server/google-rpc-server/src/main/java/org/skywalking/apm/collector/server/grpc/GRPCServer.java b/apm-collector/apm-collector-server/google-rpc-server/src/main/java/org/skywalking/apm/collector/server/grpc/GRPCServer.java new file mode 100644 index 0000000000000000000000000000000000000000..309901adeec020ed7d19cf09f6265e38a66b6b3b --- /dev/null +++ b/apm-collector/apm-collector-server/google-rpc-server/src/main/java/org/skywalking/apm/collector/server/grpc/GRPCServer.java @@ -0,0 +1,43 @@ +package org.skywalking.apm.collector.server.grpc; + +import io.grpc.netty.NettyServerBuilder; +import java.io.IOException; +import java.net.InetSocketAddress; +import org.skywalking.apm.collector.core.server.Server; +import org.skywalking.apm.collector.core.server.ServerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public class GRPCServer implements Server { + + private final Logger logger = LoggerFactory.getLogger(GRPCServer.class); + + private final String host; + private final int port; + + public GRPCServer(String host, int port) { + this.host = host; + this.port = port; + } + + @Override public void initialize() throws ServerException { + InetSocketAddress address = new InetSocketAddress(host, port); + NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forAddress(address); + try { + io.grpc.Server server = nettyServerBuilder.build().start(); + blockUntilShutdown(server); + } catch (InterruptedException | IOException e) { + throw new GRPCServerException(e.getMessage(), e); + } + logger.info("Server started, host {} listening on {}", host, port); + } + + private void blockUntilShutdown(io.grpc.Server server) throws InterruptedException { + if (server != null) { + server.awaitTermination(); + } + } +} diff --git a/apm-collector/apm-collector-server/google-rpc-server/src/main/java/org/skywalking/apm/collector/server/grpc/GRPCServerException.java b/apm-collector/apm-collector-server/google-rpc-server/src/main/java/org/skywalking/apm/collector/server/grpc/GRPCServerException.java new file mode 100644 index 0000000000000000000000000000000000000000..b1f7f9d9701be9a9b96775015675fd4bd5a6d24b --- /dev/null +++ b/apm-collector/apm-collector-server/google-rpc-server/src/main/java/org/skywalking/apm/collector/server/grpc/GRPCServerException.java @@ -0,0 +1,17 @@ +package org.skywalking.apm.collector.server.grpc; + +import org.skywalking.apm.collector.core.server.ServerException; + +/** + * @author pengys5 + */ +public class GRPCServerException extends ServerException { + + public GRPCServerException(String message) { + super(message); + } + + public GRPCServerException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/apm-collector/apm-collector-server/jetty-server/pom.xml b/apm-collector/apm-collector-server/jetty-server/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..ee78f518e9b4f3d80b59731b9a3c8c58e06adb94 --- /dev/null +++ b/apm-collector/apm-collector-server/jetty-server/pom.xml @@ -0,0 +1,31 @@ + + + + apm-collector-server + org.skywalking + 3.2-2017 + + 4.0.0 + + jetty-server + jar + + + 9.4.2.v20170220 + + + + + org.eclipse.jetty + jetty-server + ${jetty.version} + + + org.eclipse.jetty + jetty-servlet + ${jetty.version} + + + \ No newline at end of file diff --git a/apm-collector/apm-collector-server/jetty-server/src/main/java/org/skywalking/apm/collector/server/jetty/JettyServer.java b/apm-collector/apm-collector-server/jetty-server/src/main/java/org/skywalking/apm/collector/server/jetty/JettyServer.java new file mode 100644 index 0000000000000000000000000000000000000000..984e76269d28ec8abf6a069367eba8d6462c57b4 --- /dev/null +++ b/apm-collector/apm-collector-server/jetty-server/src/main/java/org/skywalking/apm/collector/server/jetty/JettyServer.java @@ -0,0 +1,41 @@ +package org.skywalking.apm.collector.server.jetty; + +import java.net.InetSocketAddress; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.skywalking.apm.collector.core.server.Server; +import org.skywalking.apm.collector.core.server.ServerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public class JettyServer implements Server { + + private final Logger logger = LoggerFactory.getLogger(JettyServer.class); + + private final String host; + private final int port; + private final String contextPath; + + public JettyServer(String host, int port, String contextPath) { + this.host = host; + this.port = port; + this.contextPath = contextPath; + } + + @Override public void initialize() throws ServerException { + org.eclipse.jetty.server.Server server = new org.eclipse.jetty.server.Server(new InetSocketAddress(host, port)); + + ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); + servletContextHandler.setContextPath(contextPath); + logger.info("http server root context path: {}", contextPath); + + server.setHandler(servletContextHandler); + try { + server.start(); + } catch (Exception e) { + throw new JettyServerException(e.getMessage(), e); + } + } +} diff --git a/apm-collector/apm-collector-server/jetty-server/src/main/java/org/skywalking/apm/collector/server/jetty/JettyServerException.java b/apm-collector/apm-collector-server/jetty-server/src/main/java/org/skywalking/apm/collector/server/jetty/JettyServerException.java new file mode 100644 index 0000000000000000000000000000000000000000..8c321d90ba551e39b73aa3171b8460bcd4f00142 --- /dev/null +++ b/apm-collector/apm-collector-server/jetty-server/src/main/java/org/skywalking/apm/collector/server/jetty/JettyServerException.java @@ -0,0 +1,17 @@ +package org.skywalking.apm.collector.server.jetty; + +import org.skywalking.apm.collector.core.server.ServerException; + +/** + * @author pengys5 + */ +public class JettyServerException extends ServerException { + + public JettyServerException(String message) { + super(message); + } + + public JettyServerException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/apm-collector/apm-collector-server/pom.xml b/apm-collector/apm-collector-server/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..52c0f4ac56b423b2dbf4951c9b95d91a147bf908 --- /dev/null +++ b/apm-collector/apm-collector-server/pom.xml @@ -0,0 +1,26 @@ + + + + apm-collector + org.skywalking + 3.2-2017 + + 4.0.0 + + apm-collector-server + pom + + google-rpc-server + jetty-server + + + + + org.skywalking + apm-collector-core + ${project.version} + + + \ No newline at end of file diff --git a/apm-collector/apm-collector-storage/elasticsearch-storage/pom.xml b/apm-collector/apm-collector-storage/elasticsearch-storage/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..94b2517c0ad24089d0c7b5711d5614cca10823be --- /dev/null +++ b/apm-collector/apm-collector-storage/elasticsearch-storage/pom.xml @@ -0,0 +1,14 @@ + + + + apm-collector-storage + org.skywalking + 3.2-2017 + + 4.0.0 + + elasticsearch-storage + jar + \ No newline at end of file diff --git a/apm-collector/apm-collector-storage/h2-storage/pom.xml b/apm-collector/apm-collector-storage/h2-storage/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..8bc63b3f0b99363ebe6e8c5e8ce4dcc2dd411457 --- /dev/null +++ b/apm-collector/apm-collector-storage/h2-storage/pom.xml @@ -0,0 +1,14 @@ + + + + apm-collector-storage + org.skywalking + 3.2-2017 + + 4.0.0 + + h2-storage + jar + \ No newline at end of file diff --git a/apm-collector/apm-collector-storage/pom.xml b/apm-collector/apm-collector-storage/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..6ade3218b6801c305ce1547db1fc1b19d20db03a --- /dev/null +++ b/apm-collector/apm-collector-storage/pom.xml @@ -0,0 +1,18 @@ + + + + apm-collector + org.skywalking + 3.2-2017 + + 4.0.0 + + apm-collector-storage + pom + + h2-storage + elasticsearch-storage + + \ No newline at end of file diff --git a/apm-collector/apm-collector-worker-new/apm-collector-worker-agent/pom.xml b/apm-collector/apm-collector-worker-new/apm-collector-worker-agent/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..7cb621ec5d3bb5a326defd1a2b293a65326dbcd4 --- /dev/null +++ b/apm-collector/apm-collector-worker-new/apm-collector-worker-agent/pom.xml @@ -0,0 +1,14 @@ + + + + apm-collector-worker-new + org.skywalking + 3.2-2017 + + 4.0.0 + + apm-collector-worker-agent + jar + \ No newline at end of file diff --git a/apm-collector/apm-collector-worker-new/apm-collector-worker-agent/src/main/java/org/skywalking/apm/collector/worker/agent/WorkerAgentConfig.java b/apm-collector/apm-collector-worker-new/apm-collector-worker-agent/src/main/java/org/skywalking/apm/collector/worker/agent/WorkerAgentConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..5acf0b40aa838ee586ea4e288702408d8bc08da5 --- /dev/null +++ b/apm-collector/apm-collector-worker-new/apm-collector-worker-agent/src/main/java/org/skywalking/apm/collector/worker/agent/WorkerAgentConfig.java @@ -0,0 +1,9 @@ +package org.skywalking.apm.collector.worker.agent; + +/** + * @author pengys5 + */ +public class WorkerAgentConfig { + public static String HOST; + public static int PORT; +} diff --git a/apm-collector/apm-collector-worker-new/apm-collector-worker-agent/src/main/java/org/skywalking/apm/collector/worker/agent/WorkerAgentConfigParser.java b/apm-collector/apm-collector-worker-new/apm-collector-worker-agent/src/main/java/org/skywalking/apm/collector/worker/agent/WorkerAgentConfigParser.java new file mode 100644 index 0000000000000000000000000000000000000000..6f7073f11b5af08e3c46d0f8ffad4b4507aa556d --- /dev/null +++ b/apm-collector/apm-collector-worker-new/apm-collector-worker-agent/src/main/java/org/skywalking/apm/collector/worker/agent/WorkerAgentConfigParser.java @@ -0,0 +1,27 @@ +package org.skywalking.apm.collector.worker.agent; + +import java.util.Map; +import org.skywalking.apm.collector.core.config.ConfigParseException; +import org.skywalking.apm.collector.core.module.ModuleConfigParser; +import org.skywalking.apm.collector.core.util.StringUtils; + +/** + * @author pengys5 + */ +public class WorkerAgentConfigParser implements ModuleConfigParser { + + private final String HOST = "host"; + private final String PORT = "port"; + + @Override public void parse(Map config) throws ConfigParseException { + if (StringUtils.isEmpty(config.get(HOST))) { + throw new ConfigParseException(""); + } + WorkerAgentConfig.HOST = (String)config.get(HOST); + + if (StringUtils.isEmpty(config.get(PORT))) { + throw new ConfigParseException(""); + } + WorkerAgentConfig.PORT = (Integer)config.get(PORT); + } +} diff --git a/apm-collector/apm-collector-worker-new/apm-collector-worker-agent/src/main/java/org/skywalking/apm/collector/worker/agent/WorkerAgentModuleDefine.java b/apm-collector/apm-collector-worker-new/apm-collector-worker-agent/src/main/java/org/skywalking/apm/collector/worker/agent/WorkerAgentModuleDefine.java new file mode 100644 index 0000000000000000000000000000000000000000..f2bf5491abd3a0a2be1375db2a1db6fd938c4db7 --- /dev/null +++ b/apm-collector/apm-collector-worker-new/apm-collector-worker-agent/src/main/java/org/skywalking/apm/collector/worker/agent/WorkerAgentModuleDefine.java @@ -0,0 +1,38 @@ +package org.skywalking.apm.collector.worker.agent; + +import org.skywalking.apm.collector.core.module.ModuleConfigParser; +import org.skywalking.apm.collector.core.module.ModuleGroup; +import org.skywalking.apm.collector.core.module.ModuleRegistration; +import org.skywalking.apm.collector.core.server.Server; +import org.skywalking.apm.collector.core.worker.WorkerModuleDefine; +import org.skywalking.apm.collector.server.grpc.GRPCServer; + +/** + * @author pengys5 + */ +public class WorkerAgentModuleDefine extends WorkerModuleDefine { + + @Override public ModuleGroup group() { + return ModuleGroup.Worker; + } + + @Override public String name() { + return "agent"; + } + + @Override public boolean defaultModule() { + return true; + } + + @Override public ModuleConfigParser configParser() { + return new WorkerAgentConfigParser(); + } + + @Override public Server server() { + return new GRPCServer(WorkerAgentConfig.HOST, WorkerAgentConfig.PORT); + } + + @Override protected ModuleRegistration registration() { + return new WorkerAgentModuleRegistration(); + } +} diff --git a/apm-collector/apm-collector-worker-new/apm-collector-worker-agent/src/main/java/org/skywalking/apm/collector/worker/agent/WorkerAgentModuleRegistration.java b/apm-collector/apm-collector-worker-new/apm-collector-worker-agent/src/main/java/org/skywalking/apm/collector/worker/agent/WorkerAgentModuleRegistration.java new file mode 100644 index 0000000000000000000000000000000000000000..81945f0fde26b68261f9084a52d5c31aa3259fa8 --- /dev/null +++ b/apm-collector/apm-collector-worker-new/apm-collector-worker-agent/src/main/java/org/skywalking/apm/collector/worker/agent/WorkerAgentModuleRegistration.java @@ -0,0 +1,13 @@ +package org.skywalking.apm.collector.worker.agent; + +import org.skywalking.apm.collector.core.module.ModuleRegistration; + +/** + * @author pengys5 + */ +public class WorkerAgentModuleRegistration extends ModuleRegistration { + + @Override protected String buildValue() { + return WorkerAgentConfig.HOST + ModuleRegistration.SEPARATOR + WorkerAgentConfig.PORT; + } +} diff --git a/apm-collector/apm-collector-worker-new/apm-collector-worker-agent/src/main/resources/META-INF/defines/module.define b/apm-collector/apm-collector-worker-new/apm-collector-worker-agent/src/main/resources/META-INF/defines/module.define new file mode 100644 index 0000000000000000000000000000000000000000..adf3437a4c4aba8aec5a58b6ccc033722c258464 --- /dev/null +++ b/apm-collector/apm-collector-worker-new/apm-collector-worker-agent/src/main/resources/META-INF/defines/module.define @@ -0,0 +1 @@ +org.skywalking.apm.collector.worker.agent.WorkerAgentModuleDefine \ No newline at end of file diff --git a/apm-collector/apm-collector-worker-new/apm-collector-worker-impl/pom.xml b/apm-collector/apm-collector-worker-new/apm-collector-worker-impl/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..6b53ce9f414a2ab7af076125fe2b66d51bb18396 --- /dev/null +++ b/apm-collector/apm-collector-worker-new/apm-collector-worker-impl/pom.xml @@ -0,0 +1,42 @@ + + + + apm-collector-worker-new + org.skywalking + 3.2-2017 + + 4.0.0 + + apm-collector-worker-impl + jar + + + + org.skywalking + cluster-standalone + ${project.version} + + + org.skywalking + cluster-zookeeper + ${project.version} + + + org.skywalking + cluster-redis + ${project.version} + + + org.skywalking + apm-collector-worker-agent + ${project.version} + + + org.skywalking + apm-collector-worker-ui + ${project.version} + + + \ No newline at end of file diff --git a/apm-collector/apm-collector-worker-new/apm-collector-worker-impl/src/main/java/org/skywalking/apm/collector/worker/impl/CollectorBootStartUp.java b/apm-collector/apm-collector-worker-new/apm-collector-worker-impl/src/main/java/org/skywalking/apm/collector/worker/impl/CollectorBootStartUp.java new file mode 100644 index 0000000000000000000000000000000000000000..8ec1270fbada5d0761975c53d03b41eb611abfd9 --- /dev/null +++ b/apm-collector/apm-collector-worker-new/apm-collector-worker-impl/src/main/java/org/skywalking/apm/collector/worker/impl/CollectorBootStartUp.java @@ -0,0 +1,21 @@ +package org.skywalking.apm.collector.worker.impl; + +import org.skywalking.apm.collector.core.config.ConfigException; +import org.skywalking.apm.collector.core.framework.CollectorStarter; +import org.skywalking.apm.collector.core.framework.DefineException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public class CollectorBootStartUp { + + private static final Logger logger = LoggerFactory.getLogger(CollectorBootStartUp.class); + + public static void main(String[] args) throws ConfigException, DefineException { + logger.info("collector starting..."); + CollectorStarter starter = new CollectorStarter(); + starter.start(); + } +} diff --git a/apm-collector/apm-collector-worker-new/apm-collector-worker-impl/src/main/resources/application.yml b/apm-collector/apm-collector-worker-new/apm-collector-worker-impl/src/main/resources/application.yml new file mode 100644 index 0000000000000000000000000000000000000000..d8bfb7a21e15c6f92d824c4b33280eac093e6980 --- /dev/null +++ b/apm-collector/apm-collector-worker-new/apm-collector-worker-impl/src/main/resources/application.yml @@ -0,0 +1,12 @@ +cluster: + zookeeper: + hostPort: localhost:2181 + sessionTimeout: 1000 + redis: + host: localhost-rd + port: 2000 +worker: + ui: + host: localhost + port: 12800 + diff --git a/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/pom.xml b/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..d3b6cb57eb278488bb2434e8b7e0aa86bd3f34e7 --- /dev/null +++ b/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/pom.xml @@ -0,0 +1,14 @@ + + + + apm-collector-worker-new + org.skywalking + 3.2-2017 + + 4.0.0 + + apm-collector-worker-ui + jar + \ No newline at end of file diff --git a/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/java/org/skywalking/apm/collector/worker/ui/WorkerUIConfig.java b/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/java/org/skywalking/apm/collector/worker/ui/WorkerUIConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..bcf4a528a0b027673a062a3bb0f4626df38a7a91 --- /dev/null +++ b/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/java/org/skywalking/apm/collector/worker/ui/WorkerUIConfig.java @@ -0,0 +1,10 @@ +package org.skywalking.apm.collector.worker.ui; + +/** + * @author pengys5 + */ +public class WorkerUIConfig { + public static String HOST; + public static int PORT; + public static String CONTEXT_PATH; +} diff --git a/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/java/org/skywalking/apm/collector/worker/ui/WorkerUIConfigParser.java b/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/java/org/skywalking/apm/collector/worker/ui/WorkerUIConfigParser.java new file mode 100644 index 0000000000000000000000000000000000000000..fce4ae0f7b3dd4ffd8f2aedf0bef5249c367e815 --- /dev/null +++ b/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/java/org/skywalking/apm/collector/worker/ui/WorkerUIConfigParser.java @@ -0,0 +1,34 @@ +package org.skywalking.apm.collector.worker.ui; + +import java.util.Map; +import org.skywalking.apm.collector.core.config.ConfigParseException; +import org.skywalking.apm.collector.core.module.ModuleConfigParser; +import org.skywalking.apm.collector.core.util.StringUtils; + +/** + * @author pengys5 + */ +public class WorkerUIConfigParser implements ModuleConfigParser { + + private final String HOST = "host"; + private final String PORT = "port"; + private final String CONTEXT_PATH = "context_path"; + + @Override public void parse(Map config) throws ConfigParseException { + if (StringUtils.isEmpty(config.get(HOST))) { + throw new ConfigParseException("HOST must be require"); + } + WorkerUIConfig.HOST = (String)config.get(HOST); + + if (StringUtils.isEmpty(config.get(PORT))) { + throw new ConfigParseException(""); + } + WorkerUIConfig.PORT = (Integer)config.get(PORT); + + if (StringUtils.isEmpty(config.get(CONTEXT_PATH))) { + WorkerUIConfig.CONTEXT_PATH = "/"; + } else { + WorkerUIConfig.CONTEXT_PATH = (String)config.get(CONTEXT_PATH); + } + } +} diff --git a/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/java/org/skywalking/apm/collector/worker/ui/WorkerUIModuleDefine.java b/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/java/org/skywalking/apm/collector/worker/ui/WorkerUIModuleDefine.java new file mode 100644 index 0000000000000000000000000000000000000000..93d39dec6e80e75648b8b7def8795f087442eb49 --- /dev/null +++ b/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/java/org/skywalking/apm/collector/worker/ui/WorkerUIModuleDefine.java @@ -0,0 +1,38 @@ +package org.skywalking.apm.collector.worker.ui; + +import org.skywalking.apm.collector.core.module.ModuleConfigParser; +import org.skywalking.apm.collector.core.module.ModuleGroup; +import org.skywalking.apm.collector.core.module.ModuleRegistration; +import org.skywalking.apm.collector.core.server.Server; +import org.skywalking.apm.collector.core.worker.WorkerModuleDefine; +import org.skywalking.apm.collector.server.jetty.JettyServer; + +/** + * @author pengys5 + */ +public class WorkerUIModuleDefine extends WorkerModuleDefine { + + @Override public ModuleGroup group() { + return ModuleGroup.Worker; + } + + @Override public String name() { + return "ui"; + } + + @Override public boolean defaultModule() { + return true; + } + + @Override public ModuleConfigParser configParser() { + return new WorkerUIConfigParser(); + } + + @Override public Server server() { + return new JettyServer(WorkerUIConfig.HOST, WorkerUIConfig.PORT, WorkerUIConfig.CONTEXT_PATH); + } + + @Override protected ModuleRegistration registration() { + return new WorkerUIModuleRegistration(); + } +} diff --git a/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/java/org/skywalking/apm/collector/worker/ui/WorkerUIModuleRegistration.java b/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/java/org/skywalking/apm/collector/worker/ui/WorkerUIModuleRegistration.java new file mode 100644 index 0000000000000000000000000000000000000000..5ae6cc6d05fcb213ca58d7efd23a379c7670a1a0 --- /dev/null +++ b/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/java/org/skywalking/apm/collector/worker/ui/WorkerUIModuleRegistration.java @@ -0,0 +1,13 @@ +package org.skywalking.apm.collector.worker.ui; + +import org.skywalking.apm.collector.core.module.ModuleRegistration; + +/** + * @author pengys5 + */ +public class WorkerUIModuleRegistration extends ModuleRegistration { + + @Override protected String buildValue() { + return WorkerUIConfig.HOST + ModuleRegistration.SEPARATOR + WorkerUIConfig.PORT + ModuleRegistration.SEPARATOR + WorkerUIConfig.CONTEXT_PATH; + } +} diff --git a/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/resources/META-INF/defines/module.define b/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/resources/META-INF/defines/module.define new file mode 100644 index 0000000000000000000000000000000000000000..4518d0c8bbdca0ceba9f5b657a8359492a2d1c50 --- /dev/null +++ b/apm-collector/apm-collector-worker-new/apm-collector-worker-ui/src/main/resources/META-INF/defines/module.define @@ -0,0 +1 @@ +org.skywalking.apm.collector.worker.ui.WorkerUIModuleDefine \ No newline at end of file diff --git a/apm-collector/apm-collector-worker-new/pom.xml b/apm-collector/apm-collector-worker-new/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..37319c974ec57c52670c6e855fa5ec0b2b627fd8 --- /dev/null +++ b/apm-collector/apm-collector-worker-new/pom.xml @@ -0,0 +1,37 @@ + + + + apm-collector + org.skywalking + 3.2-2017 + + 4.0.0 + + apm-collector-worker-new + pom + + apm-collector-worker-agent + apm-collector-worker-ui + apm-collector-worker-impl + + + + + org.skywalking + apm-collector-core + ${project.version} + + + org.skywalking + jetty-server + ${project.version} + + + org.skywalking + google-rpc-server + ${project.version} + + + \ No newline at end of file diff --git a/apm-collector/apm-collector-worker/pom.xml b/apm-collector/apm-collector-worker/pom.xml index bd242f2a6d0a0fff2c43e4d0f2dea6ea365ecd98..d4d90d94bc4d40de8d08bb17e4c7eaf8e142e288 100644 --- a/apm-collector/apm-collector-worker/pom.xml +++ b/apm-collector/apm-collector-worker/pom.xml @@ -25,11 +25,6 @@ apm-collector-cluster ${project.version} - - org.skywalking - apm-collector-commons - ${project.version} - org.skywalking apm-network diff --git a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/discovery/InstanceDiscoveryServiceImpl.java b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/discovery/InstanceDiscoveryServiceImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..64d8ecd2fb35818e987f67f6ed2ca556be4594f7 --- /dev/null +++ b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/discovery/InstanceDiscoveryServiceImpl.java @@ -0,0 +1,32 @@ +package org.skywalking.apm.collector.worker.discovery; + +import io.grpc.stub.StreamObserver; +import org.skywalking.apm.collector.actor.ClusterWorkerContext; +import org.skywalking.apm.collector.actor.ProviderNotFoundException; +import org.skywalking.apm.collector.worker.grpcserver.WorkerCaller; +import org.skywalking.apm.network.proto.ApplicationInstance; +import org.skywalking.apm.network.proto.ApplicationInstanceMapping; +import org.skywalking.apm.network.proto.Downstream; +import org.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc; + +/** + * @author pengys5 + */ +public class InstanceDiscoveryServiceImpl extends InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceImplBase implements WorkerCaller { + + @Override public void preStart() throws ProviderNotFoundException { + } + + @Override public void inject(ClusterWorkerContext clusterWorkerContext) { + } + + @Override + public void register(ApplicationInstance request, StreamObserver responseObserver) { + super.register(request, responseObserver); + } + + @Override + public void registerRecover(ApplicationInstanceMapping request, StreamObserver responseObserver) { + super.registerRecover(request, responseObserver); + } +} diff --git a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/discovery/ServiceNameDisCoveryServiceImpl.java b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/discovery/ServiceNameDisCoveryServiceImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..3e8d96ba43ab9d300149a65079e6cd2ae5151a8c --- /dev/null +++ b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/discovery/ServiceNameDisCoveryServiceImpl.java @@ -0,0 +1,25 @@ +package org.skywalking.apm.collector.worker.discovery; + +import io.grpc.stub.StreamObserver; +import org.skywalking.apm.collector.actor.ClusterWorkerContext; +import org.skywalking.apm.collector.actor.ProviderNotFoundException; +import org.skywalking.apm.collector.worker.grpcserver.WorkerCaller; +import org.skywalking.apm.network.proto.ServiceNameCollection; +import org.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc; +import org.skywalking.apm.network.proto.ServiceNameMappingCollection; + +/** + * @author pengys5 + */ +public class ServiceNameDisCoveryServiceImpl extends ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceImplBase implements WorkerCaller { + @Override public void preStart() throws ProviderNotFoundException { + } + + @Override public void inject(ClusterWorkerContext clusterWorkerContext) { + } + + @Override public void discovery(ServiceNameCollection request, + StreamObserver responseObserver) { + super.discovery(request, responseObserver); + } +} diff --git a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/register/ApplicationRegisterServiceImpl.java b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/register/ApplicationRegisterServiceImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..ec06f767604e7808d90b894b44bfad8854b73c4d --- /dev/null +++ b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/register/ApplicationRegisterServiceImpl.java @@ -0,0 +1,38 @@ +package org.skywalking.apm.collector.worker.register; + +import io.grpc.stub.StreamObserver; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.skywalking.apm.collector.actor.AbstractWorker; +import org.skywalking.apm.collector.actor.ClusterWorkerContext; +import org.skywalking.apm.collector.actor.ProviderNotFoundException; +import org.skywalking.apm.collector.actor.WorkerRef; +import org.skywalking.apm.collector.worker.grpcserver.WorkerCaller; +import org.skywalking.apm.collector.worker.segment.SegmentReceiver; +import org.skywalking.apm.network.proto.Application; +import org.skywalking.apm.network.proto.ApplicationMapping; +import org.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc; + +/** + * @author pengys5 + */ +public class ApplicationRegisterServiceImpl extends ApplicationRegisterServiceGrpc.ApplicationRegisterServiceImplBase implements WorkerCaller { + + private Logger logger = LogManager.getFormatterLogger(ApplicationRegisterServiceImpl.class); + + private ClusterWorkerContext clusterWorkerContext; + private WorkerRef segmentReceiverWorkRef; + + @Override public void preStart() throws ProviderNotFoundException { + segmentReceiverWorkRef = clusterWorkerContext.findProvider(SegmentReceiver.WorkerRole.INSTANCE).create(AbstractWorker.noOwner()); + + } + + @Override public void inject(ClusterWorkerContext clusterWorkerContext) { + this.clusterWorkerContext = clusterWorkerContext; + } + + @Override public void register(Application request, StreamObserver responseObserver) { + + } +} diff --git a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/grpcserver/TraceSegmentServiceImpl.java b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/segment/TraceSegmentServiceImpl.java similarity index 95% rename from apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/grpcserver/TraceSegmentServiceImpl.java rename to apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/segment/TraceSegmentServiceImpl.java index 1ad8c9f34e8ca02b60f0c9438e46aef525dd9b7a..aa78b210a557e73abff0bd907e0265c086cffb0c 100644 --- a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/grpcserver/TraceSegmentServiceImpl.java +++ b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/segment/TraceSegmentServiceImpl.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.worker.grpcserver; +package org.skywalking.apm.collector.worker.segment; import io.grpc.stub.StreamObserver; import org.apache.logging.log4j.LogManager; @@ -8,7 +8,7 @@ import org.skywalking.apm.collector.actor.ClusterWorkerContext; import org.skywalking.apm.collector.actor.ProviderNotFoundException; import org.skywalking.apm.collector.actor.WorkerInvokeException; import org.skywalking.apm.collector.actor.WorkerRef; -import org.skywalking.apm.collector.worker.segment.SegmentReceiver; +import org.skywalking.apm.collector.worker.grpcserver.WorkerCaller; import org.skywalking.apm.network.proto.Downstream; import org.skywalking.apm.network.proto.TraceSegmentServiceGrpc; import org.skywalking.apm.network.proto.UpstreamSegment; diff --git a/apm-collector/apm-collector-worker/src/main/resources/META-INF/services/io.grpc.BindableService b/apm-collector/apm-collector-worker/src/main/resources/META-INF/services/io.grpc.BindableService index 094c4208fb71d11d3b7533ec4494b190a8c87d5b..1f128953fb4bc91c249bd2aadf18490641db98ac 100644 --- a/apm-collector/apm-collector-worker/src/main/resources/META-INF/services/io.grpc.BindableService +++ b/apm-collector/apm-collector-worker/src/main/resources/META-INF/services/io.grpc.BindableService @@ -1 +1 @@ -org.skywalking.apm.collector.worker.grpcserver.TraceSegmentServiceImpl \ No newline at end of file +org.skywalking.apm.collector.worker.segment.TraceSegmentServiceImpl \ No newline at end of file diff --git a/apm-collector/pom.xml b/apm-collector/pom.xml index 00e1c96beab62265691af71a5405f71efcb293a4..4fbdf550328a4e9db22793bae0d5837b8ee74a65 100644 --- a/apm-collector/pom.xml +++ b/apm-collector/pom.xml @@ -5,7 +5,13 @@ apm-collector-cluster apm-collector-worker - apm-collector-commons + apm-collector-core + apm-collector-queue + apm-collector-worker-new + apm-collector-storage + apm-collector-cluster-new + apm-collector-client + apm-collector-server apm @@ -21,48 +27,5 @@ - - com.typesafe.akka - akka-cluster_2.11 - ${akka.version} - - - com.typesafe.akka - akka-slf4j_2.11 - ${akka.version} - - - org.slf4j - slf4j-api - - - - - org.apache.logging.log4j - log4j-core - ${log4j.version} - - - org.apache.logging.log4j - log4j-api - ${log4j.version} - - - org.apache.logging.log4j - log4j-slf4j-impl - ${log4j.version} - runtime - - - org.apache.logging.log4j - log4j-jcl - ${log4j.version} - - - com.typesafe.akka - akka-testkit_2.11 - ${akka.version} - test -