提交 9f10f8e4 编写于 作者: P pengys5

Add worker module framework

上级 972b6665
......@@ -9,13 +9,8 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-cluster-new</artifactId>
<packaging>pom</packaging>
<modules>
<module>cluster-zookeeper</module>
<module>cluster-redis</module>
<module>cluster-standalone</module>
</modules>
<artifactId>apm-collector-agentstream</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
......@@ -23,5 +18,10 @@
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-server</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.worker.agent;
package org.skywalking.apm.collector.agent.stream.server.grpc;
/**
* @author pengys5
*/
public class WorkerAgentConfig {
public class AgentStreamGRPCConfig {
public static String HOST;
public static int PORT;
}
package org.skywalking.apm.collector.worker.agent;
package org.skywalking.apm.collector.agent.stream.server.grpc;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
......@@ -8,20 +8,22 @@ import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class WorkerAgentConfigParser implements ModuleConfigParser {
public class AgentStreamGRPCConfigParser implements ModuleConfigParser {
private final String HOST = "host";
private final String PORT = "port";
@Override public void parse(Map config) throws ConfigParseException {
if (StringUtils.isEmpty(config.get(HOST))) {
AgentStreamGRPCConfig.HOST = (String)config.get(HOST);
if (StringUtils.isEmpty(AgentStreamGRPCConfig.HOST)) {
throw new ConfigParseException("");
}
WorkerAgentConfig.HOST = (String)config.get(HOST);
if (StringUtils.isEmpty(config.get(PORT))) {
throw new ConfigParseException("");
} else {
AgentStreamGRPCConfig.PORT = (Integer)config.get(PORT);
}
WorkerAgentConfig.PORT = (Integer)config.get(PORT);
}
}
package org.skywalking.apm.collector.worker.agent;
package org.skywalking.apm.collector.agent.stream.server.grpc;
import org.skywalking.apm.collector.core.agentstream.AgentStreamModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleGroup;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.worker.WorkerModuleDefine;
import org.skywalking.apm.collector.server.grpc.GRPCServer;
/**
* @author pengys5
*/
public class WorkerAgentModuleDefine extends WorkerModuleDefine {
public class AgentStreamGRPCModuleDefine extends AgentStreamModuleDefine {
@Override public ModuleGroup group() {
return ModuleGroup.Worker;
@Override protected ModuleGroup group() {
return ModuleGroup.AgentStream;
}
@Override public String name() {
return "agent";
return "grpc";
}
@Override public boolean defaultModule() {
return true;
}
@Override public ModuleConfigParser configParser() {
return new WorkerAgentConfigParser();
@Override protected ModuleConfigParser configParser() {
return new AgentStreamGRPCConfigParser();
}
@Override public Server server() {
return new GRPCServer(WorkerAgentConfig.HOST, WorkerAgentConfig.PORT);
@Override protected Server server() {
return new GRPCServer(AgentStreamGRPCConfig.HOST, AgentStreamGRPCConfig.PORT);
}
@Override protected ModuleRegistration registration() {
return new WorkerAgentModuleRegistration();
return new AgentStreamGRPCModuleRegistration();
}
}
package org.skywalking.apm.collector.worker.agent;
package org.skywalking.apm.collector.agent.stream.server.grpc;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public class WorkerAgentModuleRegistration extends ModuleRegistration {
public class AgentStreamGRPCModuleRegistration extends ModuleRegistration {
@Override public Value buildValue() {
return new Value(WorkerAgentConfig.HOST, WorkerAgentConfig.PORT, null);
return new Value(AgentStreamGRPCConfig.HOST, AgentStreamGRPCConfig.PORT, null);
}
}
package org.skywalking.apm.collector.worker.segment;
package org.skywalking.apm.collector.agent.stream.server.grpc.impl;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
......@@ -12,13 +12,15 @@ import org.skywalking.apm.collector.worker.grpcserver.WorkerCaller;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.TraceSegmentServiceGrpc;
import org.skywalking.apm.network.proto.UpstreamSegment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class TraceSegmentServiceImpl extends TraceSegmentServiceGrpc.TraceSegmentServiceImplBase implements WorkerCaller {
private Logger logger = LogManager.getFormatterLogger(TraceSegmentServiceImpl.class);
private final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceImpl.class);
private ClusterWorkerContext clusterWorkerContext;
private WorkerRef segmentReceiverWorkRef;
......
package org.skywalking.apm.collector.worker.ui;
package org.skywalking.apm.collector.agent.stream.server.jetty;
/**
* @author pengys5
*/
public class WorkerUIConfig {
public class AgentStreamJettyConfig {
public static String HOST;
public static int PORT;
public static String CONTEXT_PATH;
......
package org.skywalking.apm.collector.worker.ui;
package org.skywalking.apm.collector.agent.stream.server.jetty;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
......@@ -8,26 +8,26 @@ import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class WorkerUIConfigParser implements ModuleConfigParser {
public class AgentStreamJettyConfigParser implements ModuleConfigParser {
private final String HOST = "host";
private final String PORT = "port";
private final String CONTEXT_PATH = "context_path";
private static final String HOST = "host";
private static final String PORT = "port";
public static final String CONTEXT_PATH = "contextPath";
@Override public void parse(Map config) throws ConfigParseException {
if (StringUtils.isEmpty(config.get(HOST))) {
throw new ConfigParseException("HOST must be require");
}
WorkerUIConfig.HOST = (String)config.get(HOST);
AgentStreamJettyConfig.HOST = (String)config.get(HOST);
AgentStreamJettyConfig.CONTEXT_PATH = "/";
if (StringUtils.isEmpty(AgentStreamJettyConfig.HOST)) {
throw new ConfigParseException("");
}
if (StringUtils.isEmpty(config.get(PORT))) {
throw new ConfigParseException("");
} else {
AgentStreamJettyConfig.PORT = (Integer)config.get(PORT);
}
WorkerUIConfig.PORT = (Integer)config.get(PORT);
WorkerUIConfig.CONTEXT_PATH = "/";
if (!StringUtils.isEmpty(config.get(CONTEXT_PATH))) {
WorkerUIConfig.CONTEXT_PATH = (String)config.get(CONTEXT_PATH);
AgentStreamJettyConfig.CONTEXT_PATH = (String)config.get(CONTEXT_PATH);
}
}
}
package org.skywalking.apm.collector.worker.ui;
package org.skywalking.apm.collector.agent.stream.server.jetty;
import org.skywalking.apm.collector.core.agentstream.AgentStreamModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleGroup;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.worker.WorkerModuleDefine;
import org.skywalking.apm.collector.server.jetty.JettyServer;
/**
* @author pengys5
*/
public class WorkerUIModuleDefine extends WorkerModuleDefine {
public class AgentStreamJettyModuleDefine extends AgentStreamModuleDefine {
@Override public ModuleGroup group() {
return ModuleGroup.Worker;
@Override protected ModuleGroup group() {
return ModuleGroup.AgentStream;
}
@Override public String name() {
return "ui";
return "jetty";
}
@Override public boolean defaultModule() {
return true;
return false;
}
@Override public ModuleConfigParser configParser() {
return new WorkerUIConfigParser();
@Override protected ModuleConfigParser configParser() {
return new AgentStreamJettyConfigParser();
}
@Override public Server server() {
return new JettyServer(WorkerUIConfig.HOST, WorkerUIConfig.PORT, WorkerUIConfig.CONTEXT_PATH);
@Override protected Server server() {
return new JettyServer(AgentStreamJettyConfig.HOST, AgentStreamJettyConfig.PORT, AgentStreamJettyConfig.CONTEXT_PATH);
}
@Override protected ModuleRegistration registration() {
return new WorkerUIModuleRegistration();
return new AgentStreamJettyModuleRegistration();
}
}
package org.skywalking.apm.collector.worker.ui;
package org.skywalking.apm.collector.agent.stream.server.jetty;
import com.google.gson.JsonObject;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
......@@ -6,11 +6,11 @@ import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public class WorkerUIModuleRegistration extends ModuleRegistration {
public class AgentStreamJettyModuleRegistration extends ModuleRegistration {
@Override public Value buildValue() {
JsonObject data = new JsonObject();
data.addProperty("context_path", WorkerUIConfig.CONTEXT_PATH);
return new Value(WorkerUIConfig.HOST, WorkerUIConfig.PORT, data);
data.addProperty(AgentStreamJettyConfigParser.CONTEXT_PATH, AgentStreamJettyConfig.CONTEXT_PATH);
return new Value(AgentStreamJettyConfig.HOST, AgentStreamJettyConfig.PORT, data);
}
}
org.skywalking.apm.collector.agent.stream.server.grpc.AgentStreamGRPCModuleDefine
org.skywalking.apm.collector.agent.stream.server.jetty.AgentStreamJettyModuleDefine
\ No newline at end of file
......@@ -9,28 +9,28 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-worker-new</artifactId>
<packaging>pom</packaging>
<modules>
<module>apm-collector-worker-agent</module>
<module>apm-collector-worker-ui</module>
<module>apm-collector-worker-impl</module>
</modules>
<artifactId>apm-collector-boot</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<artifactId>apm-collector-cluster-new</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>jetty-server</artifactId>
<artifactId>apm-collector-queue</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>google-rpc-server</artifactId>
<artifactId>apm-collector-ui</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-agentstream</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
......
package org.skywalking.apm.collector.worker.impl;
package org.skywalking.apm.collector.boot;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
......
......@@ -5,8 +5,22 @@ cluster:
# redis:
# host: localhost
# port: 6379
worker:
ui:
queue:
disruptor: on
data_carrier: off
agentstream:
grpc:
host: localhost
port: 1000
jetty:
host: localhost
port: 2000
context_path: /
discovery:
grpc: localhost
port: 1000
ui:
jetty:
host: localhost
port: 12800
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-client</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>client-h2</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.196</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-client</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>client-redis</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-client</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>client-zookeeper</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -10,12 +10,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-client</artifactId>
<packaging>pom</packaging>
<modules>
<module>client-zookeeper</module>
<module>client-redis</module>
<module>client-h2</module>
</modules>
<packaging>jar</packaging>
<dependencies>
<dependency>
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-cluster-new</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cluster-redis</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>client-redis</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
org.skywalking.apm.collector.cluster.redis.ClusterRedisModuleDefine
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-cluster-new</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cluster-standalone</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>client-h2</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
org.skywalking.apm.collector.cluster.standalone.ClusterStandaloneModuleDefine
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-cluster-new</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cluster-zookeeper</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>client-zookeeper</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
org.skywalking.apm.collector.cluster.zookeeper.ClusterZKModuleDefine
\ No newline at end of file
package org.skywalking.apm.collector.cluster.zookeeper;
import java.io.FileNotFoundException;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.skywalking.apm.collector.core.cluster.ClusterModuleException;
import org.yaml.snakeyaml.Yaml;
/**
* @author pengys5
*/
public class ClusterZKModuleDefineTestCase {
private Map config;
@Before
public void before() throws FileNotFoundException {
Yaml yaml = new Yaml();
config = (Map)yaml.load("hostPort: localhost:2181" + System.lineSeparator() + "sessionTimeout: 2000");
}
@Test
public void testInitialize() throws ClusterModuleException {
ClusterZKModuleDefine define = new ClusterZKModuleDefine();
define.initialize(config);
}
}
package org.skywalking.apm.collector.cluster.zookeeper;
import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Test;
/**
* @author pengys5
*/
public class ZookeeperTestCase {
@Test
public void test() throws IOException, KeeperException, InterruptedException {
String hostPort = "localhost:2181";
String znode = "/collector/module";
String filename = "";
String exec[] = new String[5 - 3];
// new ZookeeperExecutor(hostPort, znode, filename, exec).run();
ZooKeeper zk = new ZooKeeper(hostPort, 1000, new Watcher() {
@Override public void process(WatchedEvent event) {
String path = event.getPath();
System.out.println("已经触发了" + event.getType() + "事件!");
System.out.println("path: " + path);
}
});
zk.create("/testRootPath", "testRootData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
// 创建一个子目录节点
zk.create("/testRootPath/testChildPathOne", "testChildDataOne".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
System.out.println(new String(zk.getData("/testRootPath",false,null)));
// 取出子目录节点列表
System.out.println(zk.getChildren("/testRootPath",true));
// 修改子目录节点数据
zk.setData("/testRootPath/testChildPathOne","modifyChildDataOne".getBytes(),-1);
System.out.println("目录节点状态:["+zk.exists("/testRootPath",true)+"]");
// 创建另外一个子目录节点
zk.create("/testRootPath/testChildPathTwo", "testChildDataTwo".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
// System.out.println(new String(zk.getData("/testRootPath/testChildPathTwo",true,null)));
// 删除子目录节点
zk.delete("/testRootPath/testChildPathTwo",-1);
zk.delete("/testRootPath/testChildPathOne",-1);
// 删除父目录节点
zk.delete("/testRootPath",-1);
zk.close();
}
}
......@@ -12,19 +12,16 @@
<artifactId>apm-collector-cluster</artifactId>
<packaging>jar</packaging>
<properties>
</properties>
<dependencies>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.6</version>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-util</artifactId>
<artifactId>apm-collector-client</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
</project>
\ No newline at end of file
package org.skywalking.apm.collector;
import akka.actor.ActorSystem;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.util.StringUtil;
import org.skywalking.apm.collector.cluster.ClusterConfig;
import org.skywalking.apm.collector.cluster.Const;
/**
* @author pengys5
*/
public enum AkkaSystem {
INSTANCE;
private Logger logger = LogManager.getFormatterLogger(AkkaSystem.class);
public ActorSystem create() {
Level logLevel = logger.getLevel();
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.HOSTNAME=" + ClusterConfig.Cluster.Current.HOSTNAME).
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.PORT=" + ClusterConfig.Cluster.Current.PORT)).
withFallback(ConfigFactory.parseString("akka.loggers=[\"akka.event.slf4j.Slf4jLogger\"]")).
withFallback(ConfigFactory.parseString("akka.loglevel=\"" + logLevel.name() + "\"")).
withFallback(ConfigFactory.load("application.conf"));
if (!StringUtil.isEmpty(ClusterConfig.Cluster.SEED_NODES)) {
config.withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=" + generateSeedNodes()));
}
return ActorSystem.create(Const.SYSTEM_NAME, config);
}
private String generateSeedNodes() {
String[] seedNodes = ClusterConfig.Cluster.SEED_NODES.split(",");
String akkaSeedNodes = "";
for (int i = 0; i < seedNodes.length; i++) {
String akkaNodeName = "\"akka.tcp://" + Const.SYSTEM_NAME + "@" + seedNodes[i] + "\"";
if (i > 0) {
akkaSeedNodes += ",";
}
akkaSeedNodes += akkaNodeName;
}
akkaSeedNodes = "[" + akkaSeedNodes + "]";
logger.info("config seedNodes: %s, generate seedNodes: %s", ClusterConfig.Cluster.SEED_NODES, akkaSeedNodes);
return akkaSeedNodes;
}
}
package org.skywalking.apm.collector;
import akka.actor.ActorSystem;
import akka.actor.Props;
import java.io.IOException;
import java.util.ServiceLoader;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.AbstractClusterWorkerProvider;
import org.skywalking.apm.collector.actor.AbstractLocalWorkerProvider;
import org.skywalking.apm.collector.actor.AbstractWorker;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LookUp;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.UsedRoleNameException;
import org.skywalking.apm.collector.cluster.WorkersListener;
import org.skywalking.apm.collector.config.ConfigInitializer;
import org.skywalking.apm.collector.rpc.RPCAddressListener;
/**
* @author pengys5
*/
public class CollectorSystem {
private static final Logger logger = LogManager.getFormatterLogger(CollectorSystem.class);
private ClusterWorkerContext clusterContext;
public LookUp getClusterContext() {
return clusterContext;
}
public void boot() throws UsedRoleNameException, ProviderNotFoundException, IOException, IllegalAccessException {
ConfigInitializer.INSTANCE.initialize();
createAkkaSystem();
createListener();
loadLocalProviders();
createClusterWorkers();
}
private void createAkkaSystem() {
ActorSystem akkaSystem = AkkaSystem.INSTANCE.create();
clusterContext = new ClusterWorkerContext(akkaSystem);
}
private void createListener() {
clusterContext.getAkkaSystem().actorOf(Props.create(WorkersListener.class, clusterContext), WorkersListener.WORK_NAME);
clusterContext.getAkkaSystem().actorOf(Props.create(RPCAddressListener.class, clusterContext), RPCAddressListener.WORK_NAME);
}
private void createClusterWorkers() throws ProviderNotFoundException {
ServiceLoader<AbstractClusterWorkerProvider> clusterServiceLoader = ServiceLoader.load(AbstractClusterWorkerProvider.class);
for (AbstractClusterWorkerProvider provider : clusterServiceLoader) {
logger.info("create {%s} worker using java service loader", provider.workerNum());
provider.setClusterContext(clusterContext);
for (int i = 1; i <= provider.workerNum(); i++) {
provider.create(AbstractWorker.noOwner());
}
}
}
private void loadLocalProviders() throws UsedRoleNameException {
ServiceLoader<AbstractLocalWorkerProvider> clusterServiceLoader = ServiceLoader.load(AbstractLocalWorkerProvider.class);
for (AbstractLocalWorkerProvider provider : clusterServiceLoader) {
logger.info("loadLocalProviders provider name: %s", provider.getClass().getName());
provider.setClusterContext(clusterContext);
clusterContext.putProvider(provider);
}
}
}
package org.skywalking.apm.collector.actor;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.cluster.WorkerListenerMessage;
import org.skywalking.apm.collector.cluster.WorkersListener;
import org.skywalking.apm.collector.rpc.RPCAddress;
import org.skywalking.apm.collector.rpc.RPCAddressListener;
import org.skywalking.apm.collector.rpc.RPCAddressListenerMessage;
import org.skywalking.apm.collector.log.LogManager;
/**
* The <code>AbstractClusterWorker</code> implementations represent workers,
* which receive remote messages.
* <p>
* Usually, the implementations are doing persistent, or aggregate works.
*
* @author pengys5
* @since v3.0-2017
*/
public abstract class AbstractClusterWorker extends AbstractWorker {
/**
* Construct an <code>AbstractClusterWorker</code> with the worker role and context.
*
* @param role If multi-workers are for load balance, they should be more likely called worker instance. Meaning,
* each worker have multi instances.
* @param clusterContext See {@link ClusterWorkerContext}
* @param selfContext See {@link LocalWorkerContext}
*/
protected AbstractClusterWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
/**
* This method use for message producer to call for send message.
*
* @param message The persistence data or metric data.
* @throws Exception The Exception happen in {@link #onWork(Object)}
*/
final public void allocateJob(Object message) throws WorkerException {
onWork(message);
}
/**
* This method use for message receiver to analyse message.
*
* @param message Cast the message object to a expect subclass.
* @throws Exception Don't handle the exception, throw it.
*/
protected abstract void onWork(Object message) throws WorkerException;
static class WorkerWithAkka extends UntypedActor {
private Logger logger = LogManager.INSTANCE.getFormatterLogger(WorkerWithAkka.class);
private Cluster cluster;
private final AbstractClusterWorker ownerWorker;
private final RPCAddress RPCAddress;
public WorkerWithAkka(AbstractClusterWorker ownerWorker, RPCAddress RPCAddress) {
this.ownerWorker = ownerWorker;
cluster = Cluster.get(getContext().system());
this.RPCAddress = RPCAddress;
}
@Override
public void preStart() {
cluster.subscribe(getSelf(), ClusterEvent.MemberUp.class);
}
@Override
public void postStop() {
cluster.unsubscribe(getSelf());
}
/**
* Listening {@link ClusterEvent.MemberUp} and {@link ClusterEvent.CurrentClusterState}
* cluster event, when event send from the member of {@link WorkersListener} then tell
* the sender to register self.
*/
@Override
public void onReceive(Object message) throws WorkerException {
if (message instanceof ClusterEvent.CurrentClusterState) {
ClusterEvent.CurrentClusterState state = (ClusterEvent.CurrentClusterState)message;
for (Member member : state.getMembers()) {
if (member.status().equals(MemberStatus.up())) {
register(member);
}
}
} else if (message instanceof ClusterEvent.MemberUp) {
ClusterEvent.MemberUp memberUp = (ClusterEvent.MemberUp)message;
logger.info("receive ClusterEvent.MemberUp message, address: %s", memberUp.member().address().toString());
register(memberUp.member());
} else {
logger.debug("worker class: %s, message class: %s", this.getClass().getName(), message.getClass().getName());
ownerWorker.allocateJob(message);
}
}
/**
* When member role is {@link WorkersListener#WORK_NAME} then Select actor from context
* and send register message to {@link WorkersListener}
*
* @param member is the new created or restart worker
*/
void register(Member member) {
if (member.hasRole(WorkersListener.WORK_NAME)) {
WorkerListenerMessage.RegisterMessage registerMessage = new WorkerListenerMessage.RegisterMessage(ownerWorker.getRole());
logger.info("member address: %s, worker path: %s", member.address().toString(), getSelf().path().toString());
getContext().actorSelection(member.address() + "/user/" + WorkersListener.WORK_NAME).tell(registerMessage, getSelf());
}
if (member.hasRole(RPCAddressListener.WORK_NAME) && RPCAddress != null) {
RPCAddressListenerMessage.ConfigMessage configMessage = new RPCAddressListenerMessage.ConfigMessage(RPCAddress);
logger.info("member address: %s, worker path: %s", member.address().toString(), getSelf().path().toString());
getContext().actorSelection(member.address() + "/user/" + RPCAddressListener.WORK_NAME).tell(configMessage, getSelf());
}
}
}
}
package org.skywalking.apm.collector.actor;
import akka.actor.ActorRef;
/**
* @author pengys5
*/
public class ClusterWorkerRef extends WorkerRef {
private ActorRef actorRef;
public ClusterWorkerRef(ActorRef actorRef, Role role) {
super(role);
this.actorRef = actorRef;
}
@Override
public void tell(Object message) {
actorRef.tell(message, ActorRef.noSender());
}
}
package org.skywalking.apm.collector.cluster;
/**
* A static class contains some config values of cluster.
* {@link Cluster.Current#HOSTNAME} is a ip address of server which start this process.
* {@link Cluster.Current#PORT} is a PORT of server use to bind
* {@link Cluster.Current#ROLES} is a ROLES of workers that use to create workers which
* has those role in this process.
* {@link Cluster#SEED_NODES} is a SEED_NODES which cluster have, List of strings, e.g. SEED_NODES = "ip:PORT,ip:PORT"..
*
* @author pengys5
*/
public class ClusterConfig {
public static class Cluster {
public static class Current {
public static String HOSTNAME = "";
public static String PORT = "";
public static String ROLES = "";
}
public static String SEED_NODES = "";
}
}
package org.skywalking.apm.collector.cluster;
import org.skywalking.apm.util.StringUtil;
import org.skywalking.apm.collector.config.ConfigProvider;
/**
* @author pengys5
*/
public class ClusterConfigProvider implements ConfigProvider {
@Override
public Class configClass() {
return ClusterConfig.class;
}
@Override
public void cliArgs() {
if (!StringUtil.isEmpty(System.getProperty("cluster.current.HOSTNAME"))) {
ClusterConfig.Cluster.Current.HOSTNAME = System.getProperty("cluster.current.HOSTNAME");
}
if (!StringUtil.isEmpty(System.getProperty("cluster.current.PORT"))) {
ClusterConfig.Cluster.Current.PORT = System.getProperty("cluster.current.PORT");
}
if (!StringUtil.isEmpty(System.getProperty("cluster.current.ROLES"))) {
ClusterConfig.Cluster.Current.ROLES = System.getProperty("cluster.current.ROLES");
}
if (!StringUtil.isEmpty(System.getProperty("cluster.SEED_NODES"))) {
ClusterConfig.Cluster.SEED_NODES = System.getProperty("cluster.SEED_NODES");
}
}
}
package org.skywalking.apm.collector.cluster;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleInstallMode;
/**
* @author pengys5
*/
public class ClusterModuleGroupDefine implements ModuleGroupDefine {
@Override public String name() {
return "cluster";
}
@Override public ModuleInstallMode mode() {
return ModuleInstallMode.Single;
}
}
package org.skywalking.apm.collector.cluster;
/**
* @author pengys5
*/
public class Const {
public static final String SYSTEM_NAME = "ClusterSystem";
}
package org.skywalking.apm.collector.cluster;
import java.io.Serializable;
import org.skywalking.apm.collector.actor.AbstractWorker;
import org.skywalking.apm.collector.actor.Role;
/**
* <code>WorkerListenerMessage</code> is a message just for the worker
* implementation of the {@link AbstractWorker}
* to register.
*
* @author pengys5
*/
public class WorkerListenerMessage {
public static class RegisterMessage implements Serializable {
private final Role role;
public RegisterMessage(Role role) {
this.role = role;
}
public Role getRole() {
return role;
}
}
}
package org.skywalking.apm.collector.cluster;
import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.AbstractWorker;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.ClusterWorkerRef;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* <code>WorkersListener</code> listening the register message from workers
* implementation of the {@link AbstractWorker}
* and terminated message from akka cluster.
* <p>
* when listened register message then begin to watch the state for this worker
* and register to {@link ClusterWorkerContext} and {@link #relation}.
* <p>
* when listened terminate message then unregister from {@link ClusterWorkerContext} and {@link #relation} .
*
* @author pengys5
*/
public class WorkersListener extends UntypedActor {
public static final String WORK_NAME = "WorkersListener";
private final Logger logger = LogManager.getFormatterLogger(WorkersListener.class);
private final ClusterWorkerContext clusterContext;
private Cluster cluster = Cluster.get(getContext().system());
private Map<ActorRef, ClusterWorkerRef> relation = new ConcurrentHashMap<>();
public WorkersListener(ClusterWorkerContext clusterContext) {
this.clusterContext = clusterContext;
}
@Override
public void preStart() throws Exception {
cluster.subscribe(getSelf(), ClusterEvent.UnreachableMember.class);
}
@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof WorkerListenerMessage.RegisterMessage) {
WorkerListenerMessage.RegisterMessage register = (WorkerListenerMessage.RegisterMessage) message;
ActorRef sender = getSender();
logger.info("register worker of role: %s, path: %s", register.getRole().roleName(), sender.toString());
ClusterWorkerRef workerRef = new ClusterWorkerRef(sender, register.getRole());
relation.put(sender, workerRef);
clusterContext.put(new ClusterWorkerRef(sender, register.getRole()));
} else if (message instanceof Terminated) {
Terminated terminated = (Terminated) message;
clusterContext.remove(relation.get(terminated.getActor()));
relation.remove(terminated.getActor());
} else if (message instanceof ClusterEvent.UnreachableMember) {
ClusterEvent.UnreachableMember unreachableMember = (ClusterEvent.UnreachableMember) message;
Iterator<Map.Entry<ActorRef, ClusterWorkerRef>> iterator = relation.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<ActorRef, ClusterWorkerRef> next = iterator.next();
if (next.getKey().path().address().equals(unreachableMember.member().address())) {
clusterContext.remove(next.getValue());
iterator.remove();
}
}
} else {
unhandled(message);
}
}
}
......@@ -14,12 +14,13 @@ public class ClusterZKConfigParser implements ModuleConfigParser {
private final String SESSION_TIMEOUT = "sessionTimeout";
@Override public void parse(Map config) throws ConfigParseException {
if (StringUtils.isEmpty(config.get(HOST_PORT))) {
throw new ConfigParseException("");
}
ClusterZKConfig.HOST_PORT = (String)config.get(HOST_PORT);
ClusterZKConfig.SESSION_TIMEOUT = 1000;
if (StringUtils.isEmpty(ClusterZKConfig.HOST_PORT)) {
throw new ConfigParseException("");
}
if (!StringUtils.isEmpty(config.get(SESSION_TIMEOUT))) {
ClusterZKConfig.SESSION_TIMEOUT = (Integer)config.get(SESSION_TIMEOUT);
}
......
package org.skywalking.apm.collector.config;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.ServiceLoader;
/**
* @author pengys5
*/
public enum ConfigInitializer {
INSTANCE;
private Logger logger = LogManager.getFormatterLogger(ConfigInitializer.class);
public void initialize() throws IOException, IllegalAccessException {
InputStream configFileStream = ConfigInitializer.class.getResourceAsStream("/collector.config");
initializeConfigFile(configFileStream);
ServiceLoader<ConfigProvider> configProviders = ServiceLoader.load(ConfigProvider.class);
for (ConfigProvider provider : configProviders) {
provider.cliArgs();
}
}
private void initializeConfigFile(InputStream configFileStream) throws IOException, IllegalAccessException {
ServiceLoader<ConfigProvider> configProviders = ServiceLoader.load(ConfigProvider.class);
Properties properties = new Properties();
properties.load(configFileStream);
for (ConfigProvider provider : configProviders) {
logger.info("configProvider provider name: %s", provider.getClass().getName());
Class configClass = provider.configClass();
org.skywalking.apm.util.ConfigInitializer.initialize(properties, configClass);
}
}
}
package org.skywalking.apm.collector.config;
/**
* @author pengys5
*/
public interface ConfigProvider {
Class configClass();
void cliArgs();
}
package org.skywalking.apm.collector.log;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public enum LogManager {
INSTANCE;
public Logger getFormatterLogger(final Class<?> clazz) {
return org.apache.logging.log4j.LogManager.getFormatterLogger(clazz);
}
}
package org.skywalking.apm.collector.rpc;
/**
* @author pengys5
*/
public class RPCAddress {
private final String address;
private final int port;
public RPCAddress(String address, int port) {
this.address = address;
this.port = port;
}
public String getAddress() {
return address;
}
public int getPort() {
return port;
}
}
package org.skywalking.apm.collector.rpc;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author pengys5
*/
public class RPCAddressContext {
private Map<String, RPCAddress> rpcAddresses = new ConcurrentHashMap<>();
public Collection<RPCAddress> rpcAddressCollection() {
return rpcAddresses.values();
}
public void putAddress(String ownerAddress, RPCAddress rpcAddress) {
rpcAddresses.put(ownerAddress, rpcAddress);
}
public void removeAddress(String ownerAddress) {
rpcAddresses.remove(ownerAddress);
}
}
package org.skywalking.apm.collector.rpc;
import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
/**
* @author pengys5
*/
public class RPCAddressListener extends UntypedActor {
private final Logger logger = LogManager.getFormatterLogger(RPCAddressListener.class);
public static final String WORK_NAME = "RPCAddressListener";
private final ClusterWorkerContext clusterContext;
private Cluster cluster = Cluster.get(getContext().system());
public RPCAddressListener(ClusterWorkerContext clusterContext) {
this.clusterContext = clusterContext;
}
@Override
public void preStart() throws Exception {
cluster.subscribe(getSelf(), ClusterEvent.UnreachableMember.class);
}
@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof RPCAddressListenerMessage.ConfigMessage) {
RPCAddressListenerMessage.ConfigMessage configMessage = (RPCAddressListenerMessage.ConfigMessage)message;
ActorRef sender = getSender();
logger.info("address: %s, port: %s", configMessage.getConfig().getAddress(), configMessage.getConfig().getPort());
String ownerAddress = sender.path().address().hostPort();
clusterContext.getRpcContext().putAddress(ownerAddress, configMessage.getConfig());
} else if (message instanceof Terminated) {
Terminated terminated = (Terminated)message;
clusterContext.getRpcContext().removeAddress(terminated.getActor().path().address().hostPort());
} else if (message instanceof ClusterEvent.UnreachableMember) {
ClusterEvent.UnreachableMember unreachableMember = (ClusterEvent.UnreachableMember)message;
clusterContext.getRpcContext().removeAddress(unreachableMember.member().address().hostPort());
} else {
unhandled(message);
}
}
}
package org.skywalking.apm.collector.rpc;
import java.io.Serializable;
/**
* @author pengys5
*/
public class RPCAddressListenerMessage {
public static class ConfigMessage implements Serializable {
private final RPCAddress config;
public ConfigMessage(RPCAddress config) {
this.config = config;
}
public RPCAddress getConfig() {
return config;
}
}
}
org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine
\ No newline at end of file
org.skywalking.apm.collector.cluster.zookeeper.ClusterZKModuleDefine
org.skywalking.apm.collector.cluster.standalone.ClusterStandaloneModuleDefine
org.skywalking.apm.collector.cluster.redis.ClusterRedisModuleDefine
\ No newline at end of file
package org.skywalking.apm.collector.actor;
import akka.actor.ActorSystem;
import org.apache.logging.log4j.Logger;
import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.log.LogManager;
/**
* @author pengys5
*/
//@RunWith(PowerMockRunner.class)
//@PrepareForTest({LogManager.class})
public class AbstractClusterWorkerProviderTestCase {
// @Test
public void testOnCreate() throws ProviderNotFoundException {
LogManager logManager = Mockito.mock(LogManager.class);
Whitebox.setInternalState(LogManager.class, "INSTANCE", logManager);
Logger logger = Mockito.mock(Logger.class);
Mockito.when(logManager.getFormatterLogger(Mockito.any())).thenReturn(logger);
ActorSystem actorSystem = Mockito.mock(ActorSystem.class);
ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(actorSystem);
Impl impl = new Impl();
impl.onCreate(null);
}
class Impl extends AbstractClusterWorkerProvider<AbstractClusterWorkerTestCase.Impl> {
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public AbstractClusterWorkerTestCase.Impl workerInstance(ClusterWorkerContext clusterContext) {
return new AbstractClusterWorkerTestCase.Impl(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int workerNum() {
return 0;
}
}
enum Role implements org.skywalking.apm.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return AbstractClusterWorkerTestCase.Impl.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package org.skywalking.apm.collector.actor;
import akka.actor.Address;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import org.apache.logging.log4j.Logger;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import static org.mockito.Mockito.*;
/**
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest( {ClusterEvent.MemberUp.class, Address.class})
@PowerMockIgnore( {"javax.management.*"})
public class AbstractClusterWorkerTestCase {
private AbstractClusterWorker.WorkerWithAkka workerWithAkka = mock(AbstractClusterWorker.WorkerWithAkka.class, CALLS_REAL_METHODS);
private AbstractClusterWorker worker = PowerMockito.spy(new Impl(WorkerRole.INSTANCE, null, null));
@Before
public void init() {
Logger logger = mock(Logger.class);
Whitebox.setInternalState(workerWithAkka, "logger", logger);
Whitebox.setInternalState(workerWithAkka, "ownerWorker", worker);
}
@Test
public void testAllocateJob() throws Exception {
String jobStr = "TestJob";
worker.allocateJob(jobStr);
verify(worker).onWork(jobStr);
}
@Test
public void testMemberUp() throws Throwable {
ClusterEvent.MemberUp memberUp = mock(ClusterEvent.MemberUp.class);
Address address = mock(Address.class);
when(address.toString()).thenReturn("address");
Member member = mock(Member.class);
when(member.address()).thenReturn(address);
when(memberUp.member()).thenReturn(member);
workerWithAkka.onReceive(memberUp);
verify(workerWithAkka).register(member);
}
@Test
public void testMessage() throws Throwable {
String message = "test";
workerWithAkka.onReceive(message);
verify(worker).allocateJob(message);
}
static class Impl extends AbstractClusterWorker {
@Override
public void preStart() throws ProviderNotFoundException {
}
public Impl(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
protected void onWork(Object message) throws IllegalArgumentException {
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return Impl.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package org.skywalking.apm.collector.actor;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.skywalking.apm.collector.queue.EndOfBatchCommand;
import org.skywalking.apm.collector.queue.MessageHolder;
import static org.mockito.Mockito.*;
/**
* @author pengys5
*/
public class AbstractLocalAsyncWorkerTestCase {
@Test
public void testAllocateJob() throws Exception {
AbstractLocalAsyncWorker worker = mock(AbstractLocalAsyncWorker.class);
String message = "Test";
worker.allocateJob(message);
verify(worker).onWork(message);
}
@Test
public void testOnEventWhenNotEnd() throws Exception {
AbstractLocalAsyncWorker worker = mock(AbstractLocalAsyncWorker.class);
AbstractLocalAsyncWorker.WorkerWithDisruptor disruptor = new AbstractLocalAsyncWorker.WorkerWithDisruptor(null, worker);
MessageHolder holder = new MessageHolder();
String message = "Test";
holder.setMessage(message);
disruptor.onEvent(holder, 0, false);
verify(worker).onWork(message);
}
@Test
public void testOnEventWhenEnd() throws Exception {
AbstractLocalAsyncWorker worker = mock(AbstractLocalAsyncWorker.class);
AbstractLocalAsyncWorker.WorkerWithDisruptor disruptor = new AbstractLocalAsyncWorker.WorkerWithDisruptor(null, worker);
MessageHolder holder = new MessageHolder();
String message = "Test";
holder.setMessage(message);
disruptor.onEvent(holder, 0, true);
verify(worker, times(1)).onWork(message);
verify(worker, times(1)).onWork(argThat(new IsEndOfBatchCommandClass()));
}
class IsEndOfBatchCommandClass extends ArgumentMatcher<EndOfBatchCommand> {
public boolean matches(Object para) {
return para.getClass() == EndOfBatchCommand.class;
}
}
}
package org.skywalking.apm.collector.actor;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
/**
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest( {AbstractWorker.class})
@PowerMockIgnore( {"javax.management.*"})
public class AbstractWorkerProviderTestCase {
@Test(expected = IllegalArgumentException.class)
public void testNullWorkerInstanceCreate() throws ProviderNotFoundException {
AbstractWorkerProvider provider = mock(AbstractWorkerProvider.class);
when(provider.workerInstance(null)).thenReturn(null);
AbstractWorker worker = mock(AbstractWorker.class);
provider.create(worker);
}
@Test
public void testNoneWorkerOwner() throws ProviderNotFoundException {
AbstractWorkerProvider provider = mock(AbstractWorkerProvider.class);
ClusterWorkerContext context = mock(ClusterWorkerContext.class);
provider.setClusterContext(context);
AbstractWorker worker = mock(AbstractWorker.class);
when(provider.workerInstance(context)).thenReturn(worker);
provider.create(null);
Mockito.verify(provider).onCreate(null);
}
@Test
public void testHasWorkerOwner() throws ProviderNotFoundException {
AbstractWorkerProvider provider = mock(AbstractWorkerProvider.class);
ClusterWorkerContext context = mock(ClusterWorkerContext.class);
provider.setClusterContext(context);
AbstractWorker worker = mock(AbstractWorker.class);
when(provider.workerInstance(context)).thenReturn(worker);
AbstractWorker workerOwner = mock(AbstractWorker.class);
LocalWorkerContext localWorkerContext = mock(LocalWorkerContext.class);
when(workerOwner.getSelfContext()).thenReturn(localWorkerContext);
provider.create(workerOwner);
Mockito.verify(provider).onCreate(localWorkerContext);
}
@Test(expected = IllegalArgumentException.class)
public void testHasWorkerOwnerButNoneContext() throws ProviderNotFoundException {
AbstractWorkerProvider provider = mock(AbstractWorkerProvider.class);
ClusterWorkerContext context = mock(ClusterWorkerContext.class);
provider.setClusterContext(context);
AbstractWorker worker = mock(AbstractWorker.class);
when(provider.workerInstance(context)).thenReturn(worker);
AbstractWorker workerOwner = mock(AbstractWorker.class);
when(workerOwner.getSelfContext()).thenReturn(null);
provider.create(workerOwner);
}
}
package org.skywalking.apm.collector.actor.selector;
import org.junit.Assert;
import org.junit.Test;
/**
* @author pengys5
*/
public class AbstractHashMessageTestCase {
@Test
public void testGetHashCode() {
String key = "key";
Impl impl = new Impl(key);
Assert.assertEquals(key.hashCode(), impl.getHashCode());
}
class Impl extends AbstractHashMessage {
public Impl(String key) {
super(key);
}
}
}
package org.skywalking.apm.collector.actor.selector;
import org.junit.Assert;
import org.junit.Test;
import org.skywalking.apm.collector.actor.WorkerRef;
import java.util.ArrayList;
import java.util.List;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
/**
* @author pengys5
*/
public class HashCodeSelectorTestCase {
@Test
public void testSelect() {
List<WorkerRef> members = new ArrayList<>();
WorkerRef workerRef_1 = mock(WorkerRef.class);
WorkerRef workerRef_2 = mock(WorkerRef.class);
WorkerRef workerRef_3 = mock(WorkerRef.class);
members.add(workerRef_1);
members.add(workerRef_2);
members.add(workerRef_3);
AbstractHashMessage message_1 = mock(AbstractHashMessage.class);
when(message_1.getHashCode()).thenReturn(9);
AbstractHashMessage message_2 = mock(AbstractHashMessage.class);
when(message_2.getHashCode()).thenReturn(10);
AbstractHashMessage message_3 = mock(AbstractHashMessage.class);
when(message_3.getHashCode()).thenReturn(11);
HashCodeSelector selector = new HashCodeSelector();
WorkerRef select_1 = selector.select(members, message_1);
Assert.assertEquals(workerRef_1.hashCode(), select_1.hashCode());
WorkerRef select_2 = selector.select(members, message_2);
Assert.assertEquals(workerRef_2.hashCode(), select_2.hashCode());
WorkerRef select_3 = selector.select(members, message_3);
Assert.assertEquals(workerRef_3.hashCode(), select_3.hashCode());
}
@Test(expected = IllegalArgumentException.class)
public void testSelectError() {
HashCodeSelector selector = new HashCodeSelector();
selector.select(null, new Object());
}
}
package org.skywalking.apm.collector.actor.selector;
import org.junit.Assert;
import org.junit.Test;
import org.skywalking.apm.collector.actor.WorkerRef;
import java.util.ArrayList;
import java.util.List;
import static org.powermock.api.mockito.PowerMockito.mock;
/**
* @author pengys5
*/
public class RollingSelectorTestCase {
@Test
public void testSelect() {
List<WorkerRef> members = new ArrayList<>();
WorkerRef workerRef_1 = mock(WorkerRef.class);
WorkerRef workerRef_2 = mock(WorkerRef.class);
WorkerRef workerRef_3 = mock(WorkerRef.class);
members.add(workerRef_1);
members.add(workerRef_2);
members.add(workerRef_3);
Object message = new Object();
RollingSelector selector = new RollingSelector();
WorkerRef selected_1 = selector.select(members, message);
Assert.assertEquals(workerRef_2.hashCode(), selected_1.hashCode());
WorkerRef selected_2 = selector.select(members, message);
Assert.assertEquals(workerRef_3.hashCode(), selected_2.hashCode());
WorkerRef selected_3 = selector.select(members, message);
Assert.assertEquals(workerRef_1.hashCode(), selected_3.hashCode());
}
}
package org.skywalking.apm.collector.config;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.skywalking.apm.collector.cluster.ClusterConfig;
/**
* @author pengys5
*/
public class ConfigInitializerTestCase {
@Before
public void clear() {
System.clearProperty("cluster.current.HOSTNAME");
System.clearProperty("cluster.current.PORT");
System.clearProperty("cluster.current.ROLES");
System.clearProperty("cluster.SEED_NODES");
}
@Test
public void testInitialize() throws Exception {
ConfigInitializer.INSTANCE.initialize();
Assert.assertEquals("127.0.0.1", ClusterConfig.Cluster.Current.HOSTNAME);
Assert.assertEquals("1000", ClusterConfig.Cluster.Current.PORT);
Assert.assertEquals("WorkersListener", ClusterConfig.Cluster.Current.ROLES);
Assert.assertEquals("127.0.0.1:1000", ClusterConfig.Cluster.SEED_NODES);
}
@Test
public void testInitializeWithCli() throws Exception {
System.setProperty("cluster.current.HOSTNAME", "127.0.0.2");
System.setProperty("cluster.current.PORT", "1001");
System.setProperty("cluster.current.ROLES", "Test1, Test2");
System.setProperty("cluster.SEED_NODES", "127.0.0.1:1000, 127.0.0.1:1001");
ConfigInitializer.INSTANCE.initialize();
Assert.assertEquals("127.0.0.2", ClusterConfig.Cluster.Current.HOSTNAME);
Assert.assertEquals("1001", ClusterConfig.Cluster.Current.PORT);
Assert.assertEquals("Test1, Test2", ClusterConfig.Cluster.Current.ROLES);
Assert.assertEquals("127.0.0.1:1000, 127.0.0.1:1001", ClusterConfig.Cluster.SEED_NODES);
}
}
package org.skywalking.apm.collector.log;
import org.apache.logging.log4j.Logger;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
/**
* @author pengys5
*/
public class MockLog {
public Logger mockito() {
LogManager logManager = PowerMockito.mock(LogManager.class);
Logger logger = Mockito.mock(Logger.class);
Mockito.when(logManager.getFormatterLogger(Mockito.any())).thenReturn(logger);
return logger;
}
}
cluster.current.hostname=127.0.0.1
cluster.current.port=1000
cluster.current.roles=WorkersListener
cluster.seed_nodes=127.0.0.1:1000
es.cluster.name=CollectorDBCluster
es.cluster.nodes=127.0.0.1:9300
es.cluster.transport.sniffer=true
es.index.shards.number=2
es.index.replicas.number=0
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="[%d{yyyy-MM-dd HH:mm:ss:SSS}] [%p] - %l - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="debug">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
package org.skywalking.apm.collector.core.worker;
package org.skywalking.apm.collector.core.agentstream;
import java.util.Map;
import org.skywalking.apm.collector.core.client.Client;
......@@ -7,17 +7,17 @@ import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.DataInitializer;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleException;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
/**
* @author pengys5
*/
public abstract class WorkerModuleDefine extends ModuleDefine {
public abstract class AgentStreamModuleDefine extends ModuleDefine {
@Override public final void initialize(Map config) throws ModuleException, ClientException {
@Override public final void initialize(Map config) throws DefineException, ClientException {
try {
configParser().parse(config);
Server server = server();
......@@ -26,15 +26,15 @@ public abstract class WorkerModuleDefine extends ModuleDefine {
String key = ClusterDataInitializer.BASE_CATALOG + "." + name();
ClusterModuleContext.writer.write(key, registration().buildValue());
} catch (ConfigParseException | ServerException e) {
throw new WorkerModuleException(e.getMessage(), e);
throw new AgentStreamModuleException(e.getMessage(), e);
}
}
@Override public final Client createClient() {
throw new UnsupportedOperationException();
@Override protected final DataInitializer dataInitializer() {
throw new UnsupportedOperationException("");
}
@Override public final DataInitializer dataInitializer() {
throw new UnsupportedOperationException();
@Override protected final Client createClient() {
throw new UnsupportedOperationException("");
}
}
package org.skywalking.apm.collector.core.worker;
package org.skywalking.apm.collector.core.agentstream;
import org.skywalking.apm.collector.core.module.ModuleException;
/**
* @author pengys5
*/
public class WorkerModuleException extends ModuleException {
public WorkerModuleException(String message) {
public class AgentStreamModuleException extends ModuleException{
public AgentStreamModuleException(String message) {
super(message);
}
public WorkerModuleException(String message, Throwable cause) {
public AgentStreamModuleException(String message, Throwable cause) {
super(message, cause);
}
}
......@@ -5,4 +5,6 @@ package org.skywalking.apm.collector.core.cluster;
*/
public class ClusterModuleContext {
public static ClusterModuleRegistrationWriter writer;
public static ClusterModuleRegistrationReader reader;
}
......@@ -39,4 +39,6 @@ public abstract class ClusterModuleDefine extends ModuleDefine {
}
protected abstract ClusterModuleRegistrationWriter registrationWriter();
protected abstract ClusterModuleRegistrationReader registrationReader();
}
......@@ -7,7 +7,10 @@ import org.skywalking.apm.collector.core.module.ModuleConfigLoader;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleDefineLoader;
import org.skywalking.apm.collector.core.module.ModuleGroup;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleGroupDefineLoader;
import org.skywalking.apm.collector.core.module.ModuleInstallerAdapter;
import org.skywalking.apm.collector.core.remote.SerializedDefineLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -19,12 +22,19 @@ public class CollectorStarter implements Starter {
private final Logger logger = LoggerFactory.getLogger(CollectorStarter.class);
@Override public void start() throws ConfigException, DefineException, ClientException {
Context context = new Context();
ModuleConfigLoader configLoader = new ModuleConfigLoader();
Map<String, Map> configuration = configLoader.load();
SerializedDefineLoader serializedDefineLoader = new SerializedDefineLoader();
serializedDefineLoader.load();
ModuleDefineLoader defineLoader = new ModuleDefineLoader();
Map<String, Map<String, ModuleDefine>> moduleDefineMap = defineLoader.load();
ModuleGroupDefineLoader groupDefineLoader = new ModuleGroupDefineLoader();
Map<String, ModuleGroupDefine> moduleGroupDefineMap = groupDefineLoader.load();
ModuleInstallerAdapter moduleInstallerAdapter = new ModuleInstallerAdapter(ModuleGroup.Cluster);
moduleInstallerAdapter.install(configuration.get(ModuleGroup.Cluster.name().toLowerCase()), moduleDefineMap.get(ModuleGroup.Cluster.name().toLowerCase()));
......
package org.skywalking.apm.collector.core.framework;
/**
* @author pengys5
*/
public class Context {
}
package org.skywalking.apm.collector.core.module;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientConfig;
import org.skywalking.apm.collector.core.framework.DataInitializer;
import org.skywalking.apm.collector.core.framework.Define;
import org.skywalking.apm.collector.core.server.Server;
......
......@@ -19,10 +19,10 @@ public class ModuleDefineLoader implements Loader<Map<String, Map<String, Module
Map<String, Map<String, ModuleDefine>> moduleDefineMap = new LinkedHashMap<>();
ModuleDefinitionFile definitionFile = new ModuleDefinitionFile();
logger.info("definition file name: {}", definitionFile.fileName());
logger.info("module definition file name: {}", definitionFile.fileName());
DefinitionLoader<ModuleDefine> definitionLoader = DefinitionLoader.load(ModuleDefine.class, definitionFile);
for (ModuleDefine moduleDefine : definitionLoader) {
logger.info("loaded module class: {}", moduleDefine.getClass().getName());
logger.info("loaded module definition class: {}", moduleDefine.getClass().getName());
String groupName = moduleDefine.group().name().toLowerCase();
if (!moduleDefineMap.containsKey(groupName)) {
......
......@@ -4,5 +4,5 @@ package org.skywalking.apm.collector.core.module;
* @author pengys5
*/
public enum ModuleGroup {
Cluster, Worker
Cluster, Queue, AgentStream
}
package org.skywalking.apm.collector.core.module;
/**
* @author pengys5
*/
public interface ModuleGroupDefine {
String name();
ModuleInstallMode mode();
}
package org.skywalking.apm.collector.core.module;
import org.skywalking.apm.collector.core.framework.DefinitionFile;
/**
* @author pengys5
*/
public class ModuleGroupDefineFile extends DefinitionFile {
@Override protected String fileName() {
return "group.define";
}
}
package org.skywalking.apm.collector.core.module;
import java.util.LinkedHashMap;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.Loader;
import org.skywalking.apm.collector.core.util.DefinitionLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ModuleGroupDefineLoader implements Loader<Map<String, ModuleGroupDefine>> {
private final Logger logger = LoggerFactory.getLogger(ModuleGroupDefineLoader.class);
@Override public Map<String, ModuleGroupDefine> load() throws ConfigException {
Map<String, ModuleGroupDefine> moduleGroupDefineMap = new LinkedHashMap<>();
ModuleGroupDefineFile definitionFile = new ModuleGroupDefineFile();
logger.info("module group definition file name: {}", definitionFile.fileName());
DefinitionLoader<ModuleGroupDefine> definitionLoader = DefinitionLoader.load(ModuleGroupDefine.class, definitionFile);
for (ModuleGroupDefine moduleGroupDefine : definitionLoader) {
logger.info("loaded group module definition class: {}", moduleGroupDefine.getClass().getName());
String groupName = moduleGroupDefine.name().toLowerCase();
moduleGroupDefineMap.put(groupName, moduleGroupDefine);
}
return moduleGroupDefineMap;
}
}
package org.skywalking.apm.collector.core.module;
/**
* @author pengys5
*/
public enum ModuleInstallMode {
Single, Multiple
}
package org.skywalking.apm.collector.core.queue;
import org.skywalking.apm.collector.core.worker.AbstractLocalAsyncWorker;
/**
* @author pengys5
*/
public interface Creator {
QueueEventHandler create(int queueSize, AbstractLocalAsyncWorker localAsyncWorker);
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册