提交 b04b2435 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge pull request #293 from wu-sheng/feature/266

Feature/266
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-agentstream</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-server</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.agent.stream.server.grpc;
/**
* @author pengys5
*/
public class AgentStreamGRPCConfig {
public static String HOST;
public static int PORT;
}
package org.skywalking.apm.collector.agent.stream.server.grpc;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class AgentStreamGRPCConfigParser implements ModuleConfigParser {
private static final String HOST = "host";
private static final String PORT = "port";
@Override public void parse(Map config) throws ConfigParseException {
AgentStreamGRPCConfig.HOST = (String)config.get(HOST);
if (StringUtils.isEmpty(AgentStreamGRPCConfig.HOST)) {
throw new ConfigParseException("");
}
if (StringUtils.isEmpty(config.get(PORT))) {
throw new ConfigParseException("");
} else {
AgentStreamGRPCConfig.PORT = (Integer)config.get(PORT);
}
}
}
package org.skywalking.apm.collector.agent.stream.server.grpc;
import org.skywalking.apm.collector.core.agentstream.AgentStreamModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleGroup;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.server.grpc.GRPCServer;
/**
* @author pengys5
*/
public class AgentStreamGRPCModuleDefine extends AgentStreamModuleDefine {
@Override protected ModuleGroup group() {
return ModuleGroup.AgentStream;
}
@Override public String name() {
return "grpc";
}
@Override public boolean defaultModule() {
return true;
}
@Override protected ModuleConfigParser configParser() {
return new AgentStreamGRPCConfigParser();
}
@Override protected Server server() {
return new GRPCServer(AgentStreamGRPCConfig.HOST, AgentStreamGRPCConfig.PORT);
}
@Override protected ModuleRegistration registration() {
return new AgentStreamGRPCModuleRegistration();
}
}
package org.skywalking.apm.collector.agent.stream.server.grpc;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public class AgentStreamGRPCModuleRegistration extends ModuleRegistration {
@Override public Value buildValue() {
return new Value(AgentStreamGRPCConfig.HOST, AgentStreamGRPCConfig.PORT, null);
}
}
package org.skywalking.apm.collector.agent.stream.server.jetty;
/**
* @author pengys5
*/
public class AgentStreamJettyConfig {
public static String HOST;
public static int PORT;
public static String CONTEXT_PATH;
}
package org.skywalking.apm.collector.agent.stream.server.jetty;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class AgentStreamJettyConfigParser implements ModuleConfigParser {
private static final String HOST = "host";
private static final String PORT = "port";
public static final String CONTEXT_PATH = "contextPath";
@Override public void parse(Map config) throws ConfigParseException {
AgentStreamJettyConfig.HOST = (String)config.get(HOST);
AgentStreamJettyConfig.CONTEXT_PATH = "/";
if (StringUtils.isEmpty(AgentStreamJettyConfig.HOST)) {
throw new ConfigParseException("");
}
if (StringUtils.isEmpty(config.get(PORT))) {
throw new ConfigParseException("");
} else {
AgentStreamJettyConfig.PORT = (Integer)config.get(PORT);
}
if (!StringUtils.isEmpty(config.get(CONTEXT_PATH))) {
AgentStreamJettyConfig.CONTEXT_PATH = (String)config.get(CONTEXT_PATH);
}
}
}
package org.skywalking.apm.collector.agent.stream.server.jetty;
import org.skywalking.apm.collector.core.agentstream.AgentStreamModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleGroup;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.server.jetty.JettyServer;
/**
* @author pengys5
*/
public class AgentStreamJettyModuleDefine extends AgentStreamModuleDefine {
@Override protected ModuleGroup group() {
return ModuleGroup.AgentStream;
}
@Override public String name() {
return "jetty";
}
@Override public boolean defaultModule() {
return false;
}
@Override protected ModuleConfigParser configParser() {
return new AgentStreamJettyConfigParser();
}
@Override protected Server server() {
return new JettyServer(AgentStreamJettyConfig.HOST, AgentStreamJettyConfig.PORT, AgentStreamJettyConfig.CONTEXT_PATH);
}
@Override protected ModuleRegistration registration() {
return new AgentStreamJettyModuleRegistration();
}
}
package org.skywalking.apm.collector.agent.stream.server.jetty;
import com.google.gson.JsonObject;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public class AgentStreamJettyModuleRegistration extends ModuleRegistration {
@Override public Value buildValue() {
JsonObject data = new JsonObject();
data.addProperty(AgentStreamJettyConfigParser.CONTEXT_PATH, AgentStreamJettyConfig.CONTEXT_PATH);
return new Value(AgentStreamJettyConfig.HOST, AgentStreamJettyConfig.PORT, data);
}
}
org.skywalking.apm.collector.agent.stream.server.grpc.AgentStreamGRPCModuleDefine
org.skywalking.apm.collector.agent.stream.server.jetty.AgentStreamJettyModuleDefine
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-boot</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-cluster</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-queue</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-ui</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-agentstream</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.boot;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.CollectorStarter;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class CollectorBootStartUp {
private static final Logger logger = LoggerFactory.getLogger(CollectorBootStartUp.class);
public static void main(String[] args) throws ConfigException, DefineException, ClientException {
logger.info("collector starting...");
CollectorStarter starter = new CollectorStarter();
starter.start();
}
}
cluster:
# zookeeper:
# hostPort: localhost:2181
# sessionTimeout: 1000
# redis:
# host: localhost
# port: 6379
queue:
disruptor: on
data_carrier: off
agentstream:
grpc:
host: localhost
port: 1000
jetty:
host: localhost
port: 2000
context_path: /
discovery:
grpc: localhost
port: 1000
ui:
jetty:
host: localhost
port: 12800
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-client</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.196</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.client.h2;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.skywalking.apm.collector.core.client.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class H2Client implements Client {
private final Logger logger = LoggerFactory.getLogger(H2Client.class);
private Connection conn;
@Override public void initialize() throws H2ClientException {
try {
Class.forName("org.h2.Driver");
conn = DriverManager.getConnection("jdbc:h2:mem:collector");
} catch (ClassNotFoundException | SQLException e) {
throw new H2ClientException(e.getMessage(), e);
}
}
public void execute(String sql) throws H2ClientException {
Statement statement = null;
try {
statement = conn.createStatement();
statement.execute(sql);
statement.closeOnCompletion();
} catch (SQLException e) {
throw new H2ClientException(e.getMessage(), e);
}
}
public void executeQuery(String sql) throws H2ClientException {
Statement statement = null;
try {
statement = conn.createStatement();
ResultSet rs = statement.executeQuery(sql);
while (rs.next()) {
logger.debug(rs.getString("ADDRESS") + "," + rs.getString("DATA"));
}
statement.closeOnCompletion();
} catch (SQLException e) {
throw new H2ClientException(e.getMessage(), e);
}
}
}
package org.skywalking.apm.collector.client.h2;
import org.skywalking.apm.collector.core.client.ClientException;
/**
* @author pengys5
*/
public class H2ClientException extends ClientException {
public H2ClientException(String message) {
super(message);
}
public H2ClientException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.client.redis;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import redis.clients.jedis.Jedis;
/**
* @author pengys5
*/
public class RedisClient implements Client {
private Jedis jedis;
private final String host;
private final int port;
public RedisClient(String host, int port) {
this.host = host;
this.port = port;
}
@Override public void initialize() throws ClientException {
jedis = new Jedis(host, port);
}
public void setex(String key, int seconds, String value) {
jedis.setex(key, seconds, value);
}
}
package org.skywalking.apm.collector.client.redis;
import org.skywalking.apm.collector.core.client.ClientException;
/**
* @author pengys5
*/
public class RedisClientException extends ClientException {
public RedisClientException(String message) {
super(message);
}
public RedisClientException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.client.zookeeper;
import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.skywalking.apm.collector.core.client.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ZookeeperClient implements Client {
private final Logger logger = LoggerFactory.getLogger(ZookeeperClient.class);
private ZooKeeper zk;
private final String hostPort;
private final int sessionTimeout;
public ZookeeperClient(String hostPort, int sessionTimeout) {
this.hostPort = hostPort;
this.sessionTimeout = sessionTimeout;
}
@Override public void initialize() throws ZookeeperClientException {
try {
zk = new ZooKeeper(hostPort, sessionTimeout, new ZookeeperDataListener(this));
} catch (IOException e) {
throw new ZookeeperClientException(e.getMessage(), e);
}
}
public void create(final String path, byte data[], List<ACL> acl,
CreateMode createMode) throws ZookeeperClientException {
try {
zk.create(path, data, acl, createMode);
} catch (KeeperException | InterruptedException e) {
throw new ZookeeperClientException(e.getMessage(), e);
}
}
public Stat exists(final String path, boolean watch) throws ZookeeperClientException {
try {
return zk.exists(path, watch);
} catch (KeeperException | InterruptedException e) {
throw new ZookeeperClientException(e.getMessage(), e);
}
}
public byte[] getData(String path, boolean watch, Stat stat) throws ZookeeperClientException {
try {
return zk.getData(path, watch, stat);
} catch (KeeperException | InterruptedException e) {
throw new ZookeeperClientException(e.getMessage(), e);
}
}
public Stat setData(final String path, byte data[], int version) throws ZookeeperClientException {
try {
return zk.setData(path, data, version);
} catch (KeeperException | InterruptedException e) {
throw new ZookeeperClientException(e.getMessage(), e);
}
}
}
package org.skywalking.apm.collector.client.zookeeper;
import org.skywalking.apm.collector.core.client.ClientException;
/**
* @author pengys5
*/
public class ZookeeperClientException extends ClientException {
public ZookeeperClientException(String message) {
super(message);
}
public ZookeeperClientException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.client.zookeeper;
import java.util.LinkedList;
import java.util.List;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataListener;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ZookeeperDataListener implements DataListener, Watcher {
private final Logger logger = LoggerFactory.getLogger(ZookeeperDataListener.class);
private ZookeeperClient client;
public ZookeeperDataListener(Client client) {
this.client = (ZookeeperClient)client;
}
@Override public void process(WatchedEvent event) {
logger.debug("path {}", event.getPath());
if (StringUtils.isEmpty(event.getPath())) {
return;
}
try {
String data = String.valueOf(client.getData(event.getPath(), false, null));
logger.debug("data {}", data);
} catch (ClientException e) {
logger.error(e.getMessage(), e);
}
}
@Override public void listen() throws ClientException {
for (String itemKey : items()) {
String[] catalogs = itemKey.split("\\.");
StringBuilder pathBuilder = new StringBuilder();
for (String catalog : catalogs) {
pathBuilder.append("/").append(catalog);
}
client.exists(pathBuilder.toString(), true);
}
}
@Override public List<String> items() {
List<String> items = new LinkedList<>();
items.add(ClusterDataInitializer.FOR_AGENT_CATALOG);
items.add(ClusterDataInitializer.FOR_UI_CATALOG);
return items;
}
}
package org.skywalking.apm.collector.client.zookeeper.util;
/**
* @author pengys5
*/
public class PathUtils {
public static String convertKey2Path(String key) {
String[] keys = key.split("\\.");
StringBuilder pathBuilder = new StringBuilder();
for (String subPath : keys) {
pathBuilder.append("/").append(subPath);
}
return pathBuilder.toString();
}
}
......@@ -12,19 +12,16 @@
<artifactId>apm-collector-cluster</artifactId>
<packaging>jar</packaging>
<properties>
</properties>
<dependencies>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.6</version>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-util</artifactId>
<artifactId>apm-collector-client</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
</project>
\ No newline at end of file
package org.skywalking.apm.collector;
import akka.actor.ActorSystem;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.util.StringUtil;
import org.skywalking.apm.collector.cluster.ClusterConfig;
import org.skywalking.apm.collector.cluster.Const;
/**
* @author pengys5
*/
public enum AkkaSystem {
INSTANCE;
private Logger logger = LogManager.getFormatterLogger(AkkaSystem.class);
public ActorSystem create() {
Level logLevel = logger.getLevel();
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.HOSTNAME=" + ClusterConfig.Cluster.Current.HOSTNAME).
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.PORT=" + ClusterConfig.Cluster.Current.PORT)).
withFallback(ConfigFactory.parseString("akka.loggers=[\"akka.event.slf4j.Slf4jLogger\"]")).
withFallback(ConfigFactory.parseString("akka.loglevel=\"" + logLevel.name() + "\"")).
withFallback(ConfigFactory.load("application.conf"));
if (!StringUtil.isEmpty(ClusterConfig.Cluster.SEED_NODES)) {
config.withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=" + generateSeedNodes()));
}
return ActorSystem.create(Const.SYSTEM_NAME, config);
}
private String generateSeedNodes() {
String[] seedNodes = ClusterConfig.Cluster.SEED_NODES.split(",");
String akkaSeedNodes = "";
for (int i = 0; i < seedNodes.length; i++) {
String akkaNodeName = "\"akka.tcp://" + Const.SYSTEM_NAME + "@" + seedNodes[i] + "\"";
if (i > 0) {
akkaSeedNodes += ",";
}
akkaSeedNodes += akkaNodeName;
}
akkaSeedNodes = "[" + akkaSeedNodes + "]";
logger.info("config seedNodes: %s, generate seedNodes: %s", ClusterConfig.Cluster.SEED_NODES, akkaSeedNodes);
return akkaSeedNodes;
}
}
package org.skywalking.apm.collector;
import akka.actor.ActorSystem;
import akka.actor.Props;
import java.io.IOException;
import java.util.ServiceLoader;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.AbstractClusterWorkerProvider;
import org.skywalking.apm.collector.actor.AbstractLocalWorkerProvider;
import org.skywalking.apm.collector.actor.AbstractWorker;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LookUp;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.UsedRoleNameException;
import org.skywalking.apm.collector.cluster.WorkersListener;
import org.skywalking.apm.collector.config.ConfigInitializer;
import org.skywalking.apm.collector.rpc.RPCAddressListener;
/**
* @author pengys5
*/
public class CollectorSystem {
private static final Logger logger = LogManager.getFormatterLogger(CollectorSystem.class);
private ClusterWorkerContext clusterContext;
public LookUp getClusterContext() {
return clusterContext;
}
public void boot() throws UsedRoleNameException, ProviderNotFoundException, IOException, IllegalAccessException {
ConfigInitializer.INSTANCE.initialize();
createAkkaSystem();
createListener();
loadLocalProviders();
createClusterWorkers();
}
private void createAkkaSystem() {
ActorSystem akkaSystem = AkkaSystem.INSTANCE.create();
clusterContext = new ClusterWorkerContext(akkaSystem);
}
private void createListener() {
clusterContext.getAkkaSystem().actorOf(Props.create(WorkersListener.class, clusterContext), WorkersListener.WORK_NAME);
clusterContext.getAkkaSystem().actorOf(Props.create(RPCAddressListener.class, clusterContext), RPCAddressListener.WORK_NAME);
}
private void createClusterWorkers() throws ProviderNotFoundException {
ServiceLoader<AbstractClusterWorkerProvider> clusterServiceLoader = ServiceLoader.load(AbstractClusterWorkerProvider.class);
for (AbstractClusterWorkerProvider provider : clusterServiceLoader) {
logger.info("create {%s} worker using java service loader", provider.workerNum());
provider.setClusterContext(clusterContext);
for (int i = 1; i <= provider.workerNum(); i++) {
provider.create(AbstractWorker.noOwner());
}
}
}
private void loadLocalProviders() throws UsedRoleNameException {
ServiceLoader<AbstractLocalWorkerProvider> clusterServiceLoader = ServiceLoader.load(AbstractLocalWorkerProvider.class);
for (AbstractLocalWorkerProvider provider : clusterServiceLoader) {
logger.info("loadLocalProviders provider name: %s", provider.getClass().getName());
provider.setClusterContext(clusterContext);
clusterContext.putProvider(provider);
}
}
}
package org.skywalking.apm.collector.actor;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.cluster.WorkerListenerMessage;
import org.skywalking.apm.collector.cluster.WorkersListener;
import org.skywalking.apm.collector.rpc.RPCAddress;
import org.skywalking.apm.collector.rpc.RPCAddressListener;
import org.skywalking.apm.collector.rpc.RPCAddressListenerMessage;
import org.skywalking.apm.collector.log.LogManager;
/**
* The <code>AbstractClusterWorker</code> implementations represent workers,
* which receive remote messages.
* <p>
* Usually, the implementations are doing persistent, or aggregate works.
*
* @author pengys5
* @since v3.0-2017
*/
public abstract class AbstractClusterWorker extends AbstractWorker {
/**
* Construct an <code>AbstractClusterWorker</code> with the worker role and context.
*
* @param role If multi-workers are for load balance, they should be more likely called worker instance. Meaning,
* each worker have multi instances.
* @param clusterContext See {@link ClusterWorkerContext}
* @param selfContext See {@link LocalWorkerContext}
*/
protected AbstractClusterWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
/**
* This method use for message producer to call for send message.
*
* @param message The persistence data or metric data.
* @throws Exception The Exception happen in {@link #onWork(Object)}
*/
final public void allocateJob(Object message) throws WorkerException {
onWork(message);
}
/**
* This method use for message receiver to analyse message.
*
* @param message Cast the message object to a expect subclass.
* @throws Exception Don't handle the exception, throw it.
*/
protected abstract void onWork(Object message) throws WorkerException;
static class WorkerWithAkka extends UntypedActor {
private Logger logger = LogManager.INSTANCE.getFormatterLogger(WorkerWithAkka.class);
private Cluster cluster;
private final AbstractClusterWorker ownerWorker;
private final RPCAddress RPCAddress;
public WorkerWithAkka(AbstractClusterWorker ownerWorker, RPCAddress RPCAddress) {
this.ownerWorker = ownerWorker;
cluster = Cluster.get(getContext().system());
this.RPCAddress = RPCAddress;
}
@Override
public void preStart() {
cluster.subscribe(getSelf(), ClusterEvent.MemberUp.class);
}
@Override
public void postStop() {
cluster.unsubscribe(getSelf());
}
/**
* Listening {@link ClusterEvent.MemberUp} and {@link ClusterEvent.CurrentClusterState}
* cluster event, when event send from the member of {@link WorkersListener} then tell
* the sender to register self.
*/
@Override
public void onReceive(Object message) throws WorkerException {
if (message instanceof ClusterEvent.CurrentClusterState) {
ClusterEvent.CurrentClusterState state = (ClusterEvent.CurrentClusterState)message;
for (Member member : state.getMembers()) {
if (member.status().equals(MemberStatus.up())) {
register(member);
}
}
} else if (message instanceof ClusterEvent.MemberUp) {
ClusterEvent.MemberUp memberUp = (ClusterEvent.MemberUp)message;
logger.info("receive ClusterEvent.MemberUp message, address: %s", memberUp.member().address().toString());
register(memberUp.member());
} else {
logger.debug("worker class: %s, message class: %s", this.getClass().getName(), message.getClass().getName());
ownerWorker.allocateJob(message);
}
}
/**
* When member role is {@link WorkersListener#WORK_NAME} then Select actor from context
* and send register message to {@link WorkersListener}
*
* @param member is the new created or restart worker
*/
void register(Member member) {
if (member.hasRole(WorkersListener.WORK_NAME)) {
WorkerListenerMessage.RegisterMessage registerMessage = new WorkerListenerMessage.RegisterMessage(ownerWorker.getRole());
logger.info("member address: %s, worker path: %s", member.address().toString(), getSelf().path().toString());
getContext().actorSelection(member.address() + "/user/" + WorkersListener.WORK_NAME).tell(registerMessage, getSelf());
}
if (member.hasRole(RPCAddressListener.WORK_NAME) && RPCAddress != null) {
RPCAddressListenerMessage.ConfigMessage configMessage = new RPCAddressListenerMessage.ConfigMessage(RPCAddress);
logger.info("member address: %s, worker path: %s", member.address().toString(), getSelf().path().toString());
getContext().actorSelection(member.address() + "/user/" + RPCAddressListener.WORK_NAME).tell(configMessage, getSelf());
}
}
}
}
package org.skywalking.apm.collector.actor;
import akka.actor.ActorRef;
/**
* @author pengys5
*/
public class ClusterWorkerRef extends WorkerRef {
private ActorRef actorRef;
public ClusterWorkerRef(ActorRef actorRef, Role role) {
super(role);
this.actorRef = actorRef;
}
@Override
public void tell(Object message) {
actorRef.tell(message, ActorRef.noSender());
}
}
package org.skywalking.apm.collector.cluster;
/**
* A static class contains some config values of cluster.
* {@link Cluster.Current#HOSTNAME} is a ip address of server which start this process.
* {@link Cluster.Current#PORT} is a PORT of server use to bind
* {@link Cluster.Current#ROLES} is a ROLES of workers that use to create workers which
* has those role in this process.
* {@link Cluster#SEED_NODES} is a SEED_NODES which cluster have, List of strings, e.g. SEED_NODES = "ip:PORT,ip:PORT"..
*
* @author pengys5
*/
public class ClusterConfig {
public static class Cluster {
public static class Current {
public static String HOSTNAME = "";
public static String PORT = "";
public static String ROLES = "";
}
public static String SEED_NODES = "";
}
}
package org.skywalking.apm.collector.cluster;
import org.skywalking.apm.util.StringUtil;
import org.skywalking.apm.collector.config.ConfigProvider;
/**
* @author pengys5
*/
public class ClusterConfigProvider implements ConfigProvider {
@Override
public Class configClass() {
return ClusterConfig.class;
}
@Override
public void cliArgs() {
if (!StringUtil.isEmpty(System.getProperty("cluster.current.HOSTNAME"))) {
ClusterConfig.Cluster.Current.HOSTNAME = System.getProperty("cluster.current.HOSTNAME");
}
if (!StringUtil.isEmpty(System.getProperty("cluster.current.PORT"))) {
ClusterConfig.Cluster.Current.PORT = System.getProperty("cluster.current.PORT");
}
if (!StringUtil.isEmpty(System.getProperty("cluster.current.ROLES"))) {
ClusterConfig.Cluster.Current.ROLES = System.getProperty("cluster.current.ROLES");
}
if (!StringUtil.isEmpty(System.getProperty("cluster.SEED_NODES"))) {
ClusterConfig.Cluster.SEED_NODES = System.getProperty("cluster.SEED_NODES");
}
}
}
package org.skywalking.apm.collector.cluster;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleInstallMode;
/**
* @author pengys5
*/
public class ClusterModuleGroupDefine implements ModuleGroupDefine {
@Override public String name() {
return "cluster";
}
@Override public ModuleInstallMode mode() {
return ModuleInstallMode.Single;
}
}
package org.skywalking.apm.collector.cluster;
/**
* @author pengys5
*/
public class Const {
public static final String SYSTEM_NAME = "ClusterSystem";
}
package org.skywalking.apm.collector.cluster;
import java.io.Serializable;
import org.skywalking.apm.collector.actor.AbstractWorker;
import org.skywalking.apm.collector.actor.Role;
/**
* <code>WorkerListenerMessage</code> is a message just for the worker
* implementation of the {@link AbstractWorker}
* to register.
*
* @author pengys5
*/
public class WorkerListenerMessage {
public static class RegisterMessage implements Serializable {
private final Role role;
public RegisterMessage(Role role) {
this.role = role;
}
public Role getRole() {
return role;
}
}
}
package org.skywalking.apm.collector.cluster;
import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.AbstractWorker;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.ClusterWorkerRef;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* <code>WorkersListener</code> listening the register message from workers
* implementation of the {@link AbstractWorker}
* and terminated message from akka cluster.
* <p>
* when listened register message then begin to watch the state for this worker
* and register to {@link ClusterWorkerContext} and {@link #relation}.
* <p>
* when listened terminate message then unregister from {@link ClusterWorkerContext} and {@link #relation} .
*
* @author pengys5
*/
public class WorkersListener extends UntypedActor {
public static final String WORK_NAME = "WorkersListener";
private final Logger logger = LogManager.getFormatterLogger(WorkersListener.class);
private final ClusterWorkerContext clusterContext;
private Cluster cluster = Cluster.get(getContext().system());
private Map<ActorRef, ClusterWorkerRef> relation = new ConcurrentHashMap<>();
public WorkersListener(ClusterWorkerContext clusterContext) {
this.clusterContext = clusterContext;
}
@Override
public void preStart() throws Exception {
cluster.subscribe(getSelf(), ClusterEvent.UnreachableMember.class);
}
@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof WorkerListenerMessage.RegisterMessage) {
WorkerListenerMessage.RegisterMessage register = (WorkerListenerMessage.RegisterMessage) message;
ActorRef sender = getSender();
logger.info("register worker of role: %s, path: %s", register.getRole().roleName(), sender.toString());
ClusterWorkerRef workerRef = new ClusterWorkerRef(sender, register.getRole());
relation.put(sender, workerRef);
clusterContext.put(new ClusterWorkerRef(sender, register.getRole()));
} else if (message instanceof Terminated) {
Terminated terminated = (Terminated) message;
clusterContext.remove(relation.get(terminated.getActor()));
relation.remove(terminated.getActor());
} else if (message instanceof ClusterEvent.UnreachableMember) {
ClusterEvent.UnreachableMember unreachableMember = (ClusterEvent.UnreachableMember) message;
Iterator<Map.Entry<ActorRef, ClusterWorkerRef>> iterator = relation.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<ActorRef, ClusterWorkerRef> next = iterator.next();
if (next.getKey().path().address().equals(unreachableMember.member().address())) {
clusterContext.remove(next.getValue());
iterator.remove();
}
}
} else {
unhandled(message);
}
}
}
package org.skywalking.apm.collector.cluster.redis;
/**
* @author pengys5
*/
public class ClusterRedisConfig {
public static String HOST;
public static int PORT;
}
package org.skywalking.apm.collector.cluster.redis;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class ClusterRedisConfigParser implements ModuleConfigParser {
private static final String HOST = "host";
private static final String PORT = "port";
@Override public void parse(Map config) throws ConfigParseException {
ClusterRedisConfig.HOST = (String)config.get(HOST);
ClusterRedisConfig.PORT = (Integer)config.get(PORT);
if (StringUtils.isEmpty(ClusterRedisConfig.HOST) || ClusterRedisConfig.PORT == 0) {
throw new ConfigParseException("");
}
}
}
package org.skywalking.apm.collector.cluster.redis;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterRedisDataInitializer extends ClusterDataInitializer {
private final Logger logger = LoggerFactory.getLogger(ClusterRedisDataInitializer.class);
@Override public void addItem(Client client, String itemKey) throws ClientException {
logger.info("add the redis item key \"{}\" exist", itemKey);
}
@Override public boolean existItem(Client client, String itemKey) throws ClientException {
logger.info("assess the redis item key \"{}\" exist", itemKey);
return false;
}
}
package org.skywalking.apm.collector.cluster.redis;
import org.skywalking.apm.collector.client.redis.RedisClient;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
import org.skywalking.apm.collector.core.framework.DataInitializer;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleGroup;
/**
* @author pengys5
*/
public class ClusterRedisModuleDefine extends ClusterModuleDefine {
@Override public ModuleGroup group() {
return ModuleGroup.Cluster;
}
@Override public String name() {
return "redis";
}
@Override public boolean defaultModule() {
return false;
}
@Override protected ModuleConfigParser configParser() {
return new ClusterRedisConfigParser();
}
@Override protected Client createClient() {
return new RedisClient(ClusterRedisConfig.HOST, ClusterRedisConfig.PORT);
}
@Override protected DataInitializer dataInitializer() {
return new ClusterRedisDataInitializer();
}
@Override protected ClusterModuleRegistrationWriter registrationWriter() {
return new ClusterRedisModuleRegistrationWriter(getClient());
}
@Override protected ClusterModuleRegistrationReader registrationReader() {
return null;
}
}
package org.skywalking.apm.collector.cluster.redis;
import org.skywalking.apm.collector.client.redis.RedisClient;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterRedisModuleRegistrationWriter extends ClusterModuleRegistrationWriter {
private final Logger logger = LoggerFactory.getLogger(ClusterRedisModuleRegistrationWriter.class);
public ClusterRedisModuleRegistrationWriter(Client client) {
super(client);
}
@Override public void write(String key, ModuleRegistration.Value value) {
logger.debug("key {}, value {}", key, value.getHost());
key = key + "." + value.getHost() + ":" + value.getPort();
value.getData().addProperty("host", value.getHost());
value.getData().addProperty("port", value.getPort());
((RedisClient)client).setex(key, 120, value.getData().toString());
}
}
package org.skywalking.apm.collector.cluster.standalone;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
/**
* @author pengys5
*/
public class ClusterStandaloneConfigParser implements ModuleConfigParser {
@Override public void parse(Map config) throws ConfigParseException {
}
}
package org.skywalking.apm.collector.cluster.standalone;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterStandaloneDataInitializer extends ClusterDataInitializer {
private final Logger logger = LoggerFactory.getLogger(ClusterStandaloneDataInitializer.class);
@Override public void addItem(Client client, String itemKey) throws ClientException {
logger.info("add the h2 item key \"{}\" exist", itemKey);
itemKey = itemKey.replaceAll("\\.", "_");
String sql = "CREATE TABLE " + itemKey + "(ADDRESS VARCHAR(100) PRIMARY KEY,DATA VARCHAR(255));";
try {
((H2Client)client).execute(sql);
} catch (H2ClientException e) {
logger.error(e.getMessage(), e);
}
}
@Override public boolean existItem(Client client, String itemKey) throws ClientException {
return false;
}
}
package org.skywalking.apm.collector.cluster.standalone;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
import org.skywalking.apm.collector.core.framework.DataInitializer;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleGroup;
/**
* @author pengys5
*/
public class ClusterStandaloneModuleDefine extends ClusterModuleDefine {
@Override public ModuleGroup group() {
return ModuleGroup.Cluster;
}
@Override public String name() {
return "standalone";
}
@Override public boolean defaultModule() {
return true;
}
@Override protected ModuleConfigParser configParser() {
return new ClusterStandaloneConfigParser();
}
@Override public Client createClient() {
return new H2Client();
}
@Override protected DataInitializer dataInitializer() {
return new ClusterStandaloneDataInitializer();
}
@Override protected ClusterModuleRegistrationWriter registrationWriter() {
return new ClusterStandaloneModuleRegistrationWriter(getClient());
}
@Override protected ClusterModuleRegistrationReader registrationReader() {
return null;
}
}
package org.skywalking.apm.collector.cluster.standalone;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterStandaloneModuleRegistrationWriter extends ClusterModuleRegistrationWriter {
private final Logger logger = LoggerFactory.getLogger(ClusterStandaloneModuleRegistrationWriter.class);
public ClusterStandaloneModuleRegistrationWriter(Client client) {
super(client);
}
@Override public void write(String key, ModuleRegistration.Value value) {
key = key.replaceAll("\\.", "_");
String hostPort = value.getHost() + ":" + value.getPort();
String sql = "INSERT INTO " + key + " VALUES('" + hostPort + "', '" + value.getData().toString() + "');";
String sql2 = "SELECT * FROM " + key;
try {
((H2Client)client).execute(sql);
((H2Client)client).executeQuery(sql2);
} catch (H2ClientException e) {
logger.error(e.getMessage(), e);
}
}
}
package org.skywalking.apm.collector.cluster.zookeeper;
/**
* @author pengys5
*/
public class ClusterZKConfig {
public static String HOST_PORT;
public static int SESSION_TIMEOUT;
}
package org.skywalking.apm.collector.cluster.zookeeper;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class ClusterZKConfigParser implements ModuleConfigParser {
private static final String HOST_PORT = "hostPort";
private static final String SESSION_TIMEOUT = "sessionTimeout";
@Override public void parse(Map config) throws ConfigParseException {
ClusterZKConfig.HOST_PORT = (String)config.get(HOST_PORT);
ClusterZKConfig.SESSION_TIMEOUT = 1000;
if (StringUtils.isEmpty(ClusterZKConfig.HOST_PORT)) {
throw new ConfigParseException("");
}
if (!StringUtils.isEmpty(config.get(SESSION_TIMEOUT))) {
ClusterZKConfig.SESSION_TIMEOUT = (Integer)config.get(SESSION_TIMEOUT);
}
}
}
package org.skywalking.apm.collector.cluster.zookeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterZKDataInitializer extends ClusterDataInitializer {
private final Logger logger = LoggerFactory.getLogger(ClusterZKDataInitializer.class);
@Override public void addItem(Client client, String itemKey) throws ClientException {
logger.info("add the zookeeper item key \"{}\" exist", itemKey);
ZookeeperClient zkClient = (ZookeeperClient)client;
String[] catalogs = itemKey.split("\\.");
StringBuilder pathBuilder = new StringBuilder();
for (String catalog : catalogs) {
pathBuilder.append("/").append(catalog);
if (zkClient.exists(pathBuilder.toString(), false) == null) {
zkClient.create(pathBuilder.toString(), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
}
@Override public boolean existItem(Client client, String itemKey) throws ClientException {
logger.info("assess the zookeeper item key \"{}\" exist", itemKey);
ZookeeperClient zkClient = (ZookeeperClient)client;
String[] catalogs = itemKey.split("\\.");
StringBuilder pathBuilder = new StringBuilder();
for (String catalog : catalogs) {
pathBuilder.append("/").append(catalog);
}
// if (zkClient.exists(pathBuilder.toString(), false) == null) {
// return false;
// } else {
return true;
// }
}
}
package org.skywalking.apm.collector.cluster.zookeeper;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
import org.skywalking.apm.collector.core.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleGroup;
/**
* @author pengys5
*/
public class ClusterZKModuleDefine extends ClusterModuleDefine {
@Override protected ModuleGroup group() {
return ModuleGroup.Cluster;
}
@Override public String name() {
return "zookeeper";
}
@Override public boolean defaultModule() {
return false;
}
@Override protected ModuleConfigParser configParser() {
return new ClusterZKConfigParser();
}
@Override protected Client createClient() {
return new ZookeeperClient(ClusterZKConfig.HOST_PORT, ClusterZKConfig.SESSION_TIMEOUT);
}
@Override protected ClusterDataInitializer dataInitializer() {
return new ClusterZKDataInitializer();
}
@Override protected ClusterModuleRegistrationWriter registrationWriter() {
return new ClusterZKModuleRegistrationWriter(getClient());
}
@Override protected ClusterModuleRegistrationReader registrationReader() {
return null;
}
}
package org.skywalking.apm.collector.cluster.zookeeper;
import java.util.List;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
/**
* @author pengys5
*/
public class ClusterZKModuleRegistrationReader implements ClusterModuleRegistrationReader {
@Override public List<String> read(String key) {
return null;
}
}
package org.skywalking.apm.collector.cluster.zookeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient;
import org.skywalking.apm.collector.client.zookeeper.util.PathUtils;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterZKModuleRegistrationWriter extends ClusterModuleRegistrationWriter {
private final Logger logger = LoggerFactory.getLogger(ClusterZKModuleRegistrationWriter.class);
public ClusterZKModuleRegistrationWriter(Client client) {
super(client);
}
@Override public void write(String key, ModuleRegistration.Value value) throws ClientException {
logger.info("cluster zookeeper register key: {}, value: {}", key, value);
String workerUIPath = PathUtils.convertKey2Path(key) + "/" + value.getHost() + ":" + value.getPort();
Stat stat = ((ZookeeperClient)client).exists(workerUIPath, false);
if (stat == null) {
((ZookeeperClient)client).create(workerUIPath, value.getData() == null ? null : value.getData().toString().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} else {
((ZookeeperClient)client).setData(workerUIPath, value.getData() == null ? null : value.getData().toString().getBytes(), -1);
}
}
}
package org.skywalking.apm.collector.config;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.ServiceLoader;
/**
* @author pengys5
*/
public enum ConfigInitializer {
INSTANCE;
private Logger logger = LogManager.getFormatterLogger(ConfigInitializer.class);
public void initialize() throws IOException, IllegalAccessException {
InputStream configFileStream = ConfigInitializer.class.getResourceAsStream("/collector.config");
initializeConfigFile(configFileStream);
ServiceLoader<ConfigProvider> configProviders = ServiceLoader.load(ConfigProvider.class);
for (ConfigProvider provider : configProviders) {
provider.cliArgs();
}
}
private void initializeConfigFile(InputStream configFileStream) throws IOException, IllegalAccessException {
ServiceLoader<ConfigProvider> configProviders = ServiceLoader.load(ConfigProvider.class);
Properties properties = new Properties();
properties.load(configFileStream);
for (ConfigProvider provider : configProviders) {
logger.info("configProvider provider name: %s", provider.getClass().getName());
Class configClass = provider.configClass();
org.skywalking.apm.util.ConfigInitializer.initialize(properties, configClass);
}
}
}
package org.skywalking.apm.collector.config;
/**
* @author pengys5
*/
public interface ConfigProvider {
Class configClass();
void cliArgs();
}
package org.skywalking.apm.collector.log;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public enum LogManager {
INSTANCE;
public Logger getFormatterLogger(final Class<?> clazz) {
return org.apache.logging.log4j.LogManager.getFormatterLogger(clazz);
}
}
package org.skywalking.apm.collector.rpc;
/**
* @author pengys5
*/
public class RPCAddress {
private final String address;
private final int port;
public RPCAddress(String address, int port) {
this.address = address;
this.port = port;
}
public String getAddress() {
return address;
}
public int getPort() {
return port;
}
}
package org.skywalking.apm.collector.rpc;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author pengys5
*/
public class RPCAddressContext {
private Map<String, RPCAddress> rpcAddresses = new ConcurrentHashMap<>();
public Collection<RPCAddress> rpcAddressCollection() {
return rpcAddresses.values();
}
public void putAddress(String ownerAddress, RPCAddress rpcAddress) {
rpcAddresses.put(ownerAddress, rpcAddress);
}
public void removeAddress(String ownerAddress) {
rpcAddresses.remove(ownerAddress);
}
}
package org.skywalking.apm.collector.rpc;
import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
/**
* @author pengys5
*/
public class RPCAddressListener extends UntypedActor {
private final Logger logger = LogManager.getFormatterLogger(RPCAddressListener.class);
public static final String WORK_NAME = "RPCAddressListener";
private final ClusterWorkerContext clusterContext;
private Cluster cluster = Cluster.get(getContext().system());
public RPCAddressListener(ClusterWorkerContext clusterContext) {
this.clusterContext = clusterContext;
}
@Override
public void preStart() throws Exception {
cluster.subscribe(getSelf(), ClusterEvent.UnreachableMember.class);
}
@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof RPCAddressListenerMessage.ConfigMessage) {
RPCAddressListenerMessage.ConfigMessage configMessage = (RPCAddressListenerMessage.ConfigMessage)message;
ActorRef sender = getSender();
logger.info("address: %s, port: %s", configMessage.getConfig().getAddress(), configMessage.getConfig().getPort());
String ownerAddress = sender.path().address().hostPort();
clusterContext.getRpcContext().putAddress(ownerAddress, configMessage.getConfig());
} else if (message instanceof Terminated) {
Terminated terminated = (Terminated)message;
clusterContext.getRpcContext().removeAddress(terminated.getActor().path().address().hostPort());
} else if (message instanceof ClusterEvent.UnreachableMember) {
ClusterEvent.UnreachableMember unreachableMember = (ClusterEvent.UnreachableMember)message;
clusterContext.getRpcContext().removeAddress(unreachableMember.member().address().hostPort());
} else {
unhandled(message);
}
}
}
package org.skywalking.apm.collector.rpc;
import java.io.Serializable;
/**
* @author pengys5
*/
public class RPCAddressListenerMessage {
public static class ConfigMessage implements Serializable {
private final RPCAddress config;
public ConfigMessage(RPCAddress config) {
this.config = config;
}
public RPCAddress getConfig() {
return config;
}
}
}
org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine
\ No newline at end of file
org.skywalking.apm.collector.cluster.zookeeper.ClusterZKModuleDefine
org.skywalking.apm.collector.cluster.standalone.ClusterStandaloneModuleDefine
org.skywalking.apm.collector.cluster.redis.ClusterRedisModuleDefine
\ No newline at end of file
package org.skywalking.apm.collector.actor;
import akka.actor.ActorSystem;
import org.apache.logging.log4j.Logger;
import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.log.LogManager;
/**
* @author pengys5
*/
//@RunWith(PowerMockRunner.class)
//@PrepareForTest({LogManager.class})
public class AbstractClusterWorkerProviderTestCase {
// @Test
public void testOnCreate() throws ProviderNotFoundException {
LogManager logManager = Mockito.mock(LogManager.class);
Whitebox.setInternalState(LogManager.class, "INSTANCE", logManager);
Logger logger = Mockito.mock(Logger.class);
Mockito.when(logManager.getFormatterLogger(Mockito.any())).thenReturn(logger);
ActorSystem actorSystem = Mockito.mock(ActorSystem.class);
ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(actorSystem);
Impl impl = new Impl();
impl.onCreate(null);
}
class Impl extends AbstractClusterWorkerProvider<AbstractClusterWorkerTestCase.Impl> {
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public AbstractClusterWorkerTestCase.Impl workerInstance(ClusterWorkerContext clusterContext) {
return new AbstractClusterWorkerTestCase.Impl(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int workerNum() {
return 0;
}
}
enum Role implements org.skywalking.apm.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return AbstractClusterWorkerTestCase.Impl.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package org.skywalking.apm.collector.actor;
import akka.actor.Address;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import org.apache.logging.log4j.Logger;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import static org.mockito.Mockito.*;
/**
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest( {ClusterEvent.MemberUp.class, Address.class})
@PowerMockIgnore( {"javax.management.*"})
public class AbstractClusterWorkerTestCase {
private AbstractClusterWorker.WorkerWithAkka workerWithAkka = mock(AbstractClusterWorker.WorkerWithAkka.class, CALLS_REAL_METHODS);
private AbstractClusterWorker worker = PowerMockito.spy(new Impl(WorkerRole.INSTANCE, null, null));
@Before
public void init() {
Logger logger = mock(Logger.class);
Whitebox.setInternalState(workerWithAkka, "logger", logger);
Whitebox.setInternalState(workerWithAkka, "ownerWorker", worker);
}
@Test
public void testAllocateJob() throws Exception {
String jobStr = "TestJob";
worker.allocateJob(jobStr);
verify(worker).onWork(jobStr);
}
@Test
public void testMemberUp() throws Throwable {
ClusterEvent.MemberUp memberUp = mock(ClusterEvent.MemberUp.class);
Address address = mock(Address.class);
when(address.toString()).thenReturn("address");
Member member = mock(Member.class);
when(member.address()).thenReturn(address);
when(memberUp.member()).thenReturn(member);
workerWithAkka.onReceive(memberUp);
verify(workerWithAkka).register(member);
}
@Test
public void testMessage() throws Throwable {
String message = "test";
workerWithAkka.onReceive(message);
verify(worker).allocateJob(message);
}
static class Impl extends AbstractClusterWorker {
@Override
public void preStart() throws ProviderNotFoundException {
}
public Impl(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
protected void onWork(Object message) throws IllegalArgumentException {
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return Impl.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package org.skywalking.apm.collector.actor;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.skywalking.apm.collector.queue.EndOfBatchCommand;
import org.skywalking.apm.collector.queue.MessageHolder;
import static org.mockito.Mockito.*;
/**
* @author pengys5
*/
public class AbstractLocalAsyncWorkerTestCase {
@Test
public void testAllocateJob() throws Exception {
AbstractLocalAsyncWorker worker = mock(AbstractLocalAsyncWorker.class);
String message = "Test";
worker.allocateJob(message);
verify(worker).onWork(message);
}
@Test
public void testOnEventWhenNotEnd() throws Exception {
AbstractLocalAsyncWorker worker = mock(AbstractLocalAsyncWorker.class);
AbstractLocalAsyncWorker.WorkerWithDisruptor disruptor = new AbstractLocalAsyncWorker.WorkerWithDisruptor(null, worker);
MessageHolder holder = new MessageHolder();
String message = "Test";
holder.setMessage(message);
disruptor.onEvent(holder, 0, false);
verify(worker).onWork(message);
}
@Test
public void testOnEventWhenEnd() throws Exception {
AbstractLocalAsyncWorker worker = mock(AbstractLocalAsyncWorker.class);
AbstractLocalAsyncWorker.WorkerWithDisruptor disruptor = new AbstractLocalAsyncWorker.WorkerWithDisruptor(null, worker);
MessageHolder holder = new MessageHolder();
String message = "Test";
holder.setMessage(message);
disruptor.onEvent(holder, 0, true);
verify(worker, times(1)).onWork(message);
verify(worker, times(1)).onWork(argThat(new IsEndOfBatchCommandClass()));
}
class IsEndOfBatchCommandClass extends ArgumentMatcher<EndOfBatchCommand> {
public boolean matches(Object para) {
return para.getClass() == EndOfBatchCommand.class;
}
}
}
package org.skywalking.apm.collector.actor;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
/**
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest( {AbstractWorker.class})
@PowerMockIgnore( {"javax.management.*"})
public class AbstractWorkerProviderTestCase {
@Test(expected = IllegalArgumentException.class)
public void testNullWorkerInstanceCreate() throws ProviderNotFoundException {
AbstractWorkerProvider provider = mock(AbstractWorkerProvider.class);
when(provider.workerInstance(null)).thenReturn(null);
AbstractWorker worker = mock(AbstractWorker.class);
provider.create(worker);
}
@Test
public void testNoneWorkerOwner() throws ProviderNotFoundException {
AbstractWorkerProvider provider = mock(AbstractWorkerProvider.class);
ClusterWorkerContext context = mock(ClusterWorkerContext.class);
provider.setClusterContext(context);
AbstractWorker worker = mock(AbstractWorker.class);
when(provider.workerInstance(context)).thenReturn(worker);
provider.create(null);
Mockito.verify(provider).onCreate(null);
}
@Test
public void testHasWorkerOwner() throws ProviderNotFoundException {
AbstractWorkerProvider provider = mock(AbstractWorkerProvider.class);
ClusterWorkerContext context = mock(ClusterWorkerContext.class);
provider.setClusterContext(context);
AbstractWorker worker = mock(AbstractWorker.class);
when(provider.workerInstance(context)).thenReturn(worker);
AbstractWorker workerOwner = mock(AbstractWorker.class);
LocalWorkerContext localWorkerContext = mock(LocalWorkerContext.class);
when(workerOwner.getSelfContext()).thenReturn(localWorkerContext);
provider.create(workerOwner);
Mockito.verify(provider).onCreate(localWorkerContext);
}
@Test(expected = IllegalArgumentException.class)
public void testHasWorkerOwnerButNoneContext() throws ProviderNotFoundException {
AbstractWorkerProvider provider = mock(AbstractWorkerProvider.class);
ClusterWorkerContext context = mock(ClusterWorkerContext.class);
provider.setClusterContext(context);
AbstractWorker worker = mock(AbstractWorker.class);
when(provider.workerInstance(context)).thenReturn(worker);
AbstractWorker workerOwner = mock(AbstractWorker.class);
when(workerOwner.getSelfContext()).thenReturn(null);
provider.create(workerOwner);
}
}
package org.skywalking.apm.collector.actor.selector;
import org.junit.Assert;
import org.junit.Test;
/**
* @author pengys5
*/
public class AbstractHashMessageTestCase {
@Test
public void testGetHashCode() {
String key = "key";
Impl impl = new Impl(key);
Assert.assertEquals(key.hashCode(), impl.getHashCode());
}
class Impl extends AbstractHashMessage {
public Impl(String key) {
super(key);
}
}
}
package org.skywalking.apm.collector.actor.selector;
import org.junit.Assert;
import org.junit.Test;
import org.skywalking.apm.collector.actor.WorkerRef;
import java.util.ArrayList;
import java.util.List;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
/**
* @author pengys5
*/
public class HashCodeSelectorTestCase {
@Test
public void testSelect() {
List<WorkerRef> members = new ArrayList<>();
WorkerRef workerRef_1 = mock(WorkerRef.class);
WorkerRef workerRef_2 = mock(WorkerRef.class);
WorkerRef workerRef_3 = mock(WorkerRef.class);
members.add(workerRef_1);
members.add(workerRef_2);
members.add(workerRef_3);
AbstractHashMessage message_1 = mock(AbstractHashMessage.class);
when(message_1.getHashCode()).thenReturn(9);
AbstractHashMessage message_2 = mock(AbstractHashMessage.class);
when(message_2.getHashCode()).thenReturn(10);
AbstractHashMessage message_3 = mock(AbstractHashMessage.class);
when(message_3.getHashCode()).thenReturn(11);
HashCodeSelector selector = new HashCodeSelector();
WorkerRef select_1 = selector.select(members, message_1);
Assert.assertEquals(workerRef_1.hashCode(), select_1.hashCode());
WorkerRef select_2 = selector.select(members, message_2);
Assert.assertEquals(workerRef_2.hashCode(), select_2.hashCode());
WorkerRef select_3 = selector.select(members, message_3);
Assert.assertEquals(workerRef_3.hashCode(), select_3.hashCode());
}
@Test(expected = IllegalArgumentException.class)
public void testSelectError() {
HashCodeSelector selector = new HashCodeSelector();
selector.select(null, new Object());
}
}
package org.skywalking.apm.collector.actor.selector;
import org.junit.Assert;
import org.junit.Test;
import org.skywalking.apm.collector.actor.WorkerRef;
import java.util.ArrayList;
import java.util.List;
import static org.powermock.api.mockito.PowerMockito.mock;
/**
* @author pengys5
*/
public class RollingSelectorTestCase {
@Test
public void testSelect() {
List<WorkerRef> members = new ArrayList<>();
WorkerRef workerRef_1 = mock(WorkerRef.class);
WorkerRef workerRef_2 = mock(WorkerRef.class);
WorkerRef workerRef_3 = mock(WorkerRef.class);
members.add(workerRef_1);
members.add(workerRef_2);
members.add(workerRef_3);
Object message = new Object();
RollingSelector selector = new RollingSelector();
WorkerRef selected_1 = selector.select(members, message);
Assert.assertEquals(workerRef_2.hashCode(), selected_1.hashCode());
WorkerRef selected_2 = selector.select(members, message);
Assert.assertEquals(workerRef_3.hashCode(), selected_2.hashCode());
WorkerRef selected_3 = selector.select(members, message);
Assert.assertEquals(workerRef_1.hashCode(), selected_3.hashCode());
}
}
package org.skywalking.apm.collector.config;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.skywalking.apm.collector.cluster.ClusterConfig;
/**
* @author pengys5
*/
public class ConfigInitializerTestCase {
@Before
public void clear() {
System.clearProperty("cluster.current.HOSTNAME");
System.clearProperty("cluster.current.PORT");
System.clearProperty("cluster.current.ROLES");
System.clearProperty("cluster.SEED_NODES");
}
@Test
public void testInitialize() throws Exception {
ConfigInitializer.INSTANCE.initialize();
Assert.assertEquals("127.0.0.1", ClusterConfig.Cluster.Current.HOSTNAME);
Assert.assertEquals("1000", ClusterConfig.Cluster.Current.PORT);
Assert.assertEquals("WorkersListener", ClusterConfig.Cluster.Current.ROLES);
Assert.assertEquals("127.0.0.1:1000", ClusterConfig.Cluster.SEED_NODES);
}
@Test
public void testInitializeWithCli() throws Exception {
System.setProperty("cluster.current.HOSTNAME", "127.0.0.2");
System.setProperty("cluster.current.PORT", "1001");
System.setProperty("cluster.current.ROLES", "Test1, Test2");
System.setProperty("cluster.SEED_NODES", "127.0.0.1:1000, 127.0.0.1:1001");
ConfigInitializer.INSTANCE.initialize();
Assert.assertEquals("127.0.0.2", ClusterConfig.Cluster.Current.HOSTNAME);
Assert.assertEquals("1001", ClusterConfig.Cluster.Current.PORT);
Assert.assertEquals("Test1, Test2", ClusterConfig.Cluster.Current.ROLES);
Assert.assertEquals("127.0.0.1:1000, 127.0.0.1:1001", ClusterConfig.Cluster.SEED_NODES);
}
}
package org.skywalking.apm.collector.log;
import org.apache.logging.log4j.Logger;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
/**
* @author pengys5
*/
public class MockLog {
public Logger mockito() {
LogManager logManager = PowerMockito.mock(LogManager.class);
Logger logger = Mockito.mock(Logger.class);
Mockito.when(logManager.getFormatterLogger(Mockito.any())).thenReturn(logger);
return logger;
}
}
cluster.current.hostname=127.0.0.1
cluster.current.port=1000
cluster.current.roles=WorkersListener
cluster.seed_nodes=127.0.0.1:1000
es.cluster.name=CollectorDBCluster
es.cluster.nodes=127.0.0.1:9300
es.cluster.transport.sniffer=true
es.index.shards.number=2
es.index.replicas.number=0
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="[%d{yyyy-MM-dd HH:mm:ss:SSS}] [%p] - %l - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="debug">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
package org.skywalking.apm.collector.commons.config;
/**
* @author pengys5
*/
public enum SeedNodesFormatter {
INSTANCE;
public String formatter(String seedNodes) {
return null;
}
}
package org.skywalking.apm.collector.commons.role;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
/**
* @author pengys5
*/
public enum TraceSegmentReceiverRole implements Role {
INSTANCE;
@Override
public String roleName() {
return "TraceSegmentReceiver";
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
serializers {
java = "akka.serialization.JavaSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
// TraceSegment = "org.skywalking.apm.collector.worker.TraceSegmentSerializer"
// json = "org.skywalking.apm.collector.commons.serializer.JsonSerializer"
}
serialization-bindings {
"java.lang.String" = java
"com.google.protobuf.Message" = proto
// "TraceSegment" = TraceSegment
// "com.google.gson.JsonObject" = json
}
warn-about-java-serializer-usage = on
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
}
}
cluster {
auto-down-unreachable-after = off
metrics.enabled = off
roles = ["WorkersListener"]
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-core</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.18</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.1</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.core;
/**
* @author pengys5
*/
public class CollectorException extends Exception {
public CollectorException(String message) {
super(message);
}
public CollectorException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.core.agentstream;
import java.util.Map;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.DataInitializer;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
/**
* @author pengys5
*/
public abstract class AgentStreamModuleDefine extends ModuleDefine {
@Override public final void initialize(Map config) throws DefineException, ClientException {
try {
configParser().parse(config);
Server server = server();
server.initialize();
String key = ClusterDataInitializer.BASE_CATALOG + "." + name();
ClusterModuleContext.WRITER.write(key, registration().buildValue());
} catch (ConfigParseException | ServerException e) {
throw new AgentStreamModuleException(e.getMessage(), e);
}
}
@Override protected final DataInitializer dataInitializer() {
throw new UnsupportedOperationException("");
}
@Override protected final Client createClient() {
throw new UnsupportedOperationException("");
}
}
package org.skywalking.apm.collector.core.agentstream;
import org.skywalking.apm.collector.core.module.ModuleException;
/**
* @author pengys5
*/
public class AgentStreamModuleException extends ModuleException {
public AgentStreamModuleException(String message) {
super(message);
}
public AgentStreamModuleException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.core.client;
/**
* @author pengys5
*/
public interface Client {
void initialize() throws ClientException;
}
package org.skywalking.apm.collector.core.client;
import org.skywalking.apm.collector.core.CollectorException;
/**
* @author pengys5
*/
public abstract class ClientException extends CollectorException {
public ClientException(String message) {
super(message);
}
public ClientException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.core.client;
import java.util.List;
/**
* @author pengys5
*/
public interface DataListener {
List<String> items();
void listen() throws ClientException;
}
package org.skywalking.apm.collector.core.cluster;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.DataInitializer;
/**
* @author pengys5
*/
public abstract class ClusterDataInitializer implements DataInitializer {
public static final String BASE_CATALOG = "skywalking";
public static final String FOR_UI_CATALOG = BASE_CATALOG + ".ui";
public static final String FOR_AGENT_CATALOG = BASE_CATALOG + ".agent";
@Override public final void initialize(Client client) throws ClientException {
if (!existItem(client, FOR_UI_CATALOG)) {
addItem(client, FOR_UI_CATALOG);
}
if (!existItem(client, FOR_AGENT_CATALOG)) {
addItem(client, FOR_AGENT_CATALOG);
}
}
}
package org.skywalking.apm.collector.core.cluster;
import org.skywalking.apm.collector.core.framework.DefinitionFile;
/**
* @author pengys5
*/
public class ClusterDefinitionFile extends DefinitionFile {
@Override protected String fileName() {
return "cluster-configuration.define";
}
}
package org.skywalking.apm.collector.core.cluster;
/**
* @author pengys5
*/
public class ClusterModuleContext {
public static ClusterModuleRegistrationWriter WRITER;
public static ClusterModuleRegistrationReader READER;
}
package org.skywalking.apm.collector.core.cluster;
import java.util.Map;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
/**
* @author pengys5
*/
public abstract class ClusterModuleDefine extends ModuleDefine {
private Client client;
@Override public final void initialize(Map config) throws ClusterModuleException {
try {
configParser().parse(config);
client = createClient();
client.initialize();
dataInitializer().initialize(client);
} catch (ConfigParseException | ClientException e) {
throw new ClusterModuleException(e.getMessage(), e);
}
}
public final Client getClient() {
return this.client;
}
@Override public final Server server() {
throw new UnsupportedOperationException("");
}
@Override protected final ModuleRegistration registration() {
throw new UnsupportedOperationException("Cluster module do not need module registration.");
}
protected abstract ClusterModuleRegistrationWriter registrationWriter();
protected abstract ClusterModuleRegistrationReader registrationReader();
}
package org.skywalking.apm.collector.core.cluster;
/**
* @author pengys5
*/
public interface ClusterModuleDiscovery {
void discover();
}
package org.skywalking.apm.collector.core.cluster;
import org.skywalking.apm.collector.core.module.ModuleException;
/**
* @author pengys5
*/
public class ClusterModuleException extends ModuleException {
public ClusterModuleException(String message) {
super(message);
}
public ClusterModuleException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.core.cluster;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(ClusterModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap) throws DefineException, ClientException {
logger.info("beginning cluster module install");
ModuleDefine moduleDefine = null;
if (CollectionUtils.isEmpty(moduleConfig)) {
logger.info("could not configure cluster module, use the default");
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
moduleDefine = moduleDefineEntry.next().getValue();
if (moduleDefine.defaultModule()) {
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize(null);
break;
}
}
} else {
Map.Entry<String, Map> clusterConfigEntry = moduleConfig.entrySet().iterator().next();
moduleDefine = moduleDefineMap.get(clusterConfigEntry.getKey());
moduleDefine.initialize(clusterConfigEntry.getValue());
}
ClusterModuleContext.WRITER = ((ClusterModuleDefine)moduleDefine).registrationWriter();
}
}
package org.skywalking.apm.collector.core.cluster;
import java.util.List;
/**
* @author pengys5
*/
public interface ClusterModuleRegistrationReader {
List<String> read(String key);
}
package org.skywalking.apm.collector.core.cluster;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public abstract class ClusterModuleRegistrationWriter {
protected final Client client;
public ClusterModuleRegistrationWriter(Client client) {
this.client = client;
}
public abstract void write(String key, ModuleRegistration.Value value) throws ClientException;
}
package org.skywalking.apm.collector.core.config;
import org.skywalking.apm.collector.core.CollectorException;
/**
* @author pengys5
*/
public abstract class ConfigException extends CollectorException {
public ConfigException(String message) {
super(message);
}
public ConfigException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.core.config;
import org.skywalking.apm.collector.core.framework.Loader;
/**
* @author pengys5
*/
public interface ConfigLoader<T> extends Loader<T> {
}
package org.skywalking.apm.collector.core.config;
/**
* @author pengys5
*/
public abstract class ConfigLoaderException extends ConfigException {
public ConfigLoaderException(String message) {
super(message);
}
public ConfigLoaderException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.core.config;
/**
* @author pengys5
*/
public class ConfigParseException extends ConfigException {
public ConfigParseException(String message) {
super(message);
}
public ConfigParseException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.core.framework;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.module.ModuleConfigLoader;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleDefineLoader;
import org.skywalking.apm.collector.core.module.ModuleGroup;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleGroupDefineLoader;
import org.skywalking.apm.collector.core.module.ModuleInstallerAdapter;
import org.skywalking.apm.collector.core.remote.SerializedDefineLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class CollectorStarter implements Starter {
private final Logger logger = LoggerFactory.getLogger(CollectorStarter.class);
@Override public void start() throws ConfigException, DefineException, ClientException {
Context context = new Context();
ModuleConfigLoader configLoader = new ModuleConfigLoader();
Map<String, Map> configuration = configLoader.load();
SerializedDefineLoader serializedDefineLoader = new SerializedDefineLoader();
serializedDefineLoader.load();
ModuleDefineLoader defineLoader = new ModuleDefineLoader();
Map<String, Map<String, ModuleDefine>> moduleDefineMap = defineLoader.load();
ModuleGroupDefineLoader groupDefineLoader = new ModuleGroupDefineLoader();
Map<String, ModuleGroupDefine> moduleGroupDefineMap = groupDefineLoader.load();
ModuleInstallerAdapter moduleInstallerAdapter = new ModuleInstallerAdapter(ModuleGroup.Cluster);
moduleInstallerAdapter.install(configuration.get(ModuleGroup.Cluster.name().toLowerCase()), moduleDefineMap.get(ModuleGroup.Cluster.name().toLowerCase()));
ModuleGroup[] moduleGroups = ModuleGroup.values();
for (ModuleGroup moduleGroup : moduleGroups) {
if (!ModuleGroup.Cluster.equals(moduleGroup)) {
moduleInstallerAdapter = new ModuleInstallerAdapter(moduleGroup);
logger.info("module group {}, configuration {}", moduleGroup.name().toLowerCase(), configuration.get(moduleGroup.name().toLowerCase()));
moduleInstallerAdapter.install(configuration.get(moduleGroup.name().toLowerCase()), moduleDefineMap.get(moduleGroup.name().toLowerCase()));
}
}
}
}
package org.skywalking.apm.collector.core.framework;
/**
* @author pengys5
*/
public class Context {
}
package org.skywalking.apm.collector.core.framework;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
/**
* @author pengys5
*/
public interface DataInitializer {
void initialize(Client client) throws ClientException;
void addItem(Client client, String itemKey) throws ClientException;
boolean existItem(Client client, String itemKey) throws ClientException;
}
package org.skywalking.apm.collector.core.framework;
/**
* @author pengys5
*/
public interface Decision {
}
package org.skywalking.apm.collector.core.framework;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
/**
* @author pengys5
*/
public interface Define {
void initialize(Map config) throws DefineException, ClientException;
String name();
}
package org.skywalking.apm.collector.core.framework;
import org.skywalking.apm.collector.core.CollectorException;
/**
* @author pengys5
*/
public abstract class DefineException extends CollectorException {
public DefineException(String message) {
super(message);
}
public DefineException(String message, Throwable cause) {
super(message, cause);
}
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册