提交 b0003f01 编写于 作者: P pengys5

Provide modular development framework

上级 a29a1c02
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-core</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.18</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.core;
/**
* @author pengys5
*/
public class CollectorException extends Exception {
public CollectorException(String message) {
super(message);
}
public CollectorException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.core.client;
/**
* @author pengys5
*/
public interface Client {
void initialize() throws ClientException;
void insert(String path) throws ClientException;
void update();
String select(String path) throws ClientException;
void delete();
boolean exist(String path) throws ClientException;
void listen(String path) throws ClientException;
}
package org.skywalking.apm.collector.core.client;
import org.skywalking.apm.collector.core.CollectorException;
/**
* @author pengys5
*/
public abstract class ClientException extends CollectorException {
public ClientException(String message) {
super(message);
}
public ClientException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.core.client;
import java.util.List;
/**
* @author pengys5
*/
public interface DataListener {
List<String> items();
void listen() throws ClientException;
}
package org.skywalking.apm.collector.core.cluster;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.DataInitializer;
/**
* @author pengys5
*/
public abstract class ClusterDataInitializer implements DataInitializer {
public static final String BASE_CATALOG = "collector.cluster";
public static final String FOR_UI_CATALOG = BASE_CATALOG + ".ui";
public static final String FOR_AGENT_CATALOG = BASE_CATALOG + ".agent";
@Override public final void initialize(Client client) throws ClientException {
if (!existItem(client, FOR_UI_CATALOG)) {
addItem(client, FOR_UI_CATALOG);
}
if (!existItem(client, FOR_AGENT_CATALOG)) {
addItem(client, FOR_AGENT_CATALOG);
}
}
}
package org.skywalking.apm.collector.core.cluster;
import org.skywalking.apm.collector.core.framework.DefinitionFile;
/**
* @author pengys5
*/
public class ClusterDefinitionFile extends DefinitionFile {
@Override protected String fileName() {
return "cluster-configuration.define";
}
}
package org.skywalking.apm.collector.core.cluster;
import java.util.Map;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
/**
* @author pengys5
*/
public abstract class ClusterModuleDefine extends ModuleDefine {
@Override public final void initialize(Map config) throws ClusterModuleException {
try {
configParser().parse(config);
Client client = client();
client.initialize();
dataInitializer().initialize(client);
} catch (ConfigParseException | ClientException e) {
throw new ClusterModuleException(e.getMessage(), e);
}
}
@Override public final Server server() {
throw new UnsupportedOperationException("");
}
}
package org.skywalking.apm.collector.core.cluster;
import org.skywalking.apm.collector.core.module.ModuleException;
/**
* @author pengys5
*/
public class ClusterModuleException extends ModuleException {
public ClusterModuleException(String message) {
super(message);
}
public ClusterModuleException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.core.cluster;
/**
* @author pengys5
*/
public interface Discovery {
void discover();
}
package org.skywalking.apm.collector.core.cluster;
/**
* @author pengys5
*/
public interface Registration {
void register();
}
package org.skywalking.apm.collector.core.config;
import org.skywalking.apm.collector.core.CollectorException;
/**
* @author pengys5
*/
public abstract class ConfigException extends CollectorException {
public ConfigException(String message) {
super(message);
}
public ConfigException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.core.config;
import org.skywalking.apm.collector.core.framework.Loader;
/**
* @author pengys5
*/
public interface ConfigLoader extends Loader {
}
package org.skywalking.apm.collector.core.config;
/**
* @author pengys5
*/
public abstract class ConfigLoaderException extends ConfigException {
public ConfigLoaderException(String message) {
super(message);
}
public ConfigLoaderException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.core.config;
/**
* @author pengys5
*/
public class ConfigParseException extends ConfigException {
public ConfigParseException(String message) {
super(message);
}
public ConfigParseException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.core.framework;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
/**
* @author pengys5
*/
public interface DataInitializer {
void initialize(Client client) throws ClientException;
void addItem(Client client, String itemKey) throws ClientException;
boolean existItem(Client client, String itemKey) throws ClientException;
}
package org.skywalking.apm.collector.core.framework;
/**
* @author pengys5
*/
public interface Decision {
}
package org.skywalking.apm.collector.core.framework;
import java.util.Map;
/**
* @author pengys5
*/
public interface Define {
void initialize(Map config) throws DefineException;
String getName();
void setName(String name);
}
package org.skywalking.apm.collector.core.framework;
import org.skywalking.apm.collector.core.CollectorException;
/**
* @author pengys5
*/
public abstract class DefineException extends CollectorException {
public DefineException(String message) {
super(message);
}
public DefineException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.core.framework;
/**
* @author pengys5
*/
public abstract class DefinitionFile {
private final String CATALOG = "META-INF/defines/";
protected abstract String fileName();
public final String get() {
return CATALOG + fileName();
}
}
package org.skywalking.apm.collector.core.framework;
import org.skywalking.apm.collector.core.config.ConfigException;
/**
* @author pengys5
*/
public interface Loader {
void load() throws ConfigException;
}
package org.skywalking.apm.collector.core.framework;
import java.util.List;
/**
* @author pengys5
*/
public class PriorityDecision implements Decision {
public Object decide(List<Priority> source) {
return source.get(0);
}
public static class Priority {
private final int value;
private final Object object;
public Priority(int value, Object object) {
this.value = value;
this.object = object;
}
public int getValue() {
return value;
}
public Object getObject() {
return object;
}
}
}
package org.skywalking.apm.collector.core.framework;
/**
* @author pengys5
*/
public interface Provider<D> {
D create();
}
package org.skywalking.apm.collector.core.framework;
/**
* @author pengys5
*/
public interface Starter {
void start();
}
package org.skywalking.apm.collector.core.framework;
/**
* @author pengys5
*/
public interface Writer {
}
package org.skywalking.apm.collector.core.module;
import java.util.Map;
/**
* @author pengys5
*/
public interface Module {
void install(Map configuration);
}
package org.skywalking.apm.collector.core.module;
import java.io.FileNotFoundException;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigLoader;
import org.skywalking.apm.collector.core.util.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
/**
* @author pengys5
*/
public class ModuleConfigLoader implements ConfigLoader {
private final Logger logger = LoggerFactory.getLogger(ModuleConfigLoader.class);
@Override public void load() throws ModuleConfigLoaderException {
Yaml yaml = new Yaml();
ModuleInstaller installer = new ModuleInstaller();
Map<String, Map> configurations = null;
try {
configurations = (Map<String, Map>)yaml.load(ResourceUtils.read("application.yml"));
} catch (FileNotFoundException e) {
throw new ModuleConfigLoaderException(e.getMessage(), e);
}
configurations.forEach((moduleName, moduleConfig) -> {
logger.info("module name \"{}\" from application.yml", moduleName);
try {
installer.install(moduleName, moduleConfig);
} catch (ModuleException e) {
logger.error("module \"{}\" install failure", moduleName);
logger.error(e.getMessage(), e);
}
});
}
}
package org.skywalking.apm.collector.core.module;
import org.skywalking.apm.collector.core.config.ConfigLoaderException;
/**
* @author pengys5
*/
public class ModuleConfigLoaderException extends ConfigLoaderException {
public ModuleConfigLoaderException(String message) {
super(message);
}
public ModuleConfigLoaderException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.core.module;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
/**
* @author pengys5
*/
public interface ModuleConfigParser {
void parse(Map config) throws ConfigParseException;
}
package org.skywalking.apm.collector.core.module;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.framework.DataInitializer;
import org.skywalking.apm.collector.core.framework.Define;
import org.skywalking.apm.collector.core.server.Server;
/**
* @author pengys5
*/
public abstract class ModuleDefine implements Define {
private String moduleName;
@Override public final String getName() {
return moduleName;
}
@Override public final void setName(String name) {
this.moduleName = name;
}
protected abstract ModuleGroup group();
protected abstract boolean defaultModule();
protected abstract ModuleConfigParser configParser();
protected abstract Client client();
protected abstract Server server();
protected abstract DataInitializer dataInitializer();
}
package org.skywalking.apm.collector.core.module;
import org.skywalking.apm.collector.core.framework.DefinitionFile;
/**
* @author pengys5
*/
public class ModuleDefinitionFile extends DefinitionFile {
@Override protected String fileName() {
return "module.define";
}
}
package org.skywalking.apm.collector.core.module;
import org.skywalking.apm.collector.core.framework.DefineException;
/**
* @author pengys5
*/
public abstract class ModuleException extends DefineException {
public ModuleException(String message) {
super(message);
}
public ModuleException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.core.module;
/**
* @author pengys5
*/
public enum ModuleGroup {
Cluster, Worker, Queue
}
package org.skywalking.apm.collector.core.module;
import java.util.LinkedHashMap;
import java.util.Map;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.util.DefinitionLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(ModuleInstaller.class);
private final Map<String, ModuleDefine> moduleDefineMap;
protected ModuleInstaller() {
moduleDefineMap = new LinkedHashMap<>();
ModuleDefinitionFile definitionFile = new ModuleDefinitionFile();
logger.info("definition file name: {}", definitionFile.fileName());
DefinitionLoader<ModuleDefine> definitionLoader = DefinitionLoader.load(ModuleDefine.class, definitionFile);
for (ModuleDefine moduleDefine : definitionLoader) {
logger.info("loaded module class: {}", moduleDefine.getClass().getName());
moduleDefineMap.put(moduleDefine.getName(), moduleDefine);
}
}
public void install(String moduleName, Map moduleConfig) throws ModuleException {
Map<String, Map> module = (LinkedHashMap)moduleConfig;
module.entrySet().forEach(subModuleConfig -> {
String subMoudleName = moduleName + "." + subModuleConfig.getKey();
logger.info("install sub module {}", subMoudleName);
try {
if (moduleDefineMap.containsKey(subMoudleName)) {
moduleDefineMap.get(subMoudleName).initialize(subModuleConfig.getValue());
} else {
logger.error("could not found the module definition, module name: {}", subMoudleName);
}
} catch (DefineException e) {
logger.error(e.getMessage(), e);
}
});
}
}
package org.skywalking.apm.collector.core.server;
/**
* @author pengys5
*/
public interface Server {
void initialize() throws ServerException;
}
package org.skywalking.apm.collector.core.server;
import org.skywalking.apm.collector.core.CollectorException;
/**
* @author pengys5
*/
public abstract class ServerException extends CollectorException {
public ServerException(String message) {
super(message);
}
public ServerException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.core.server;
import org.skywalking.apm.collector.core.module.ModuleDefine;
/**
* @author pengys5
*/
public abstract class ServerModuleDefine extends ModuleDefine {
}
package org.skywalking.apm.collector.core.util;
import java.io.FileReader;
import java.io.IOException;
import java.net.URL;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.skywalking.apm.collector.core.framework.Define;
import org.skywalking.apm.collector.core.framework.DefinitionFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class DefinitionLoader<D> implements Iterable<D> {
private final Logger logger = LoggerFactory.getLogger(DefinitionLoader.class);
private final Class<D> definition;
private final DefinitionFile definitionFile;
protected DefinitionLoader(Class<D> svc, DefinitionFile definitionFile) {
this.definition = Objects.requireNonNull(svc, "definition interface cannot be null");
this.definitionFile = definitionFile;
}
public static <D> DefinitionLoader<D> load(Class<D> definition, DefinitionFile definitionFile) {
return new DefinitionLoader(definition, definitionFile);
}
@Override public final Iterator<D> iterator() {
logger.info("load definition file: {}", definitionFile.get());
Properties properties = new Properties();
Map<String, String> definitionList = new LinkedHashMap<>();
try {
Enumeration<URL> urlEnumeration = this.getClass().getClassLoader().getResources(definitionFile.get());
while (urlEnumeration.hasMoreElements()) {
URL definitionFileURL = urlEnumeration.nextElement();
logger.info("definition file url: {}", definitionFileURL.getPath());
properties.load(new FileReader(definitionFileURL.getPath()));
Enumeration defineItem = properties.propertyNames();
while (defineItem.hasMoreElements()) {
String key = (String)defineItem.nextElement();
String fullNameClass = properties.getProperty(key);
definitionList.put(key, fullNameClass);
}
}
} catch (IOException e) {
e.printStackTrace();
}
Iterator<Map.Entry<String, String>> moduleDefineIterator = definitionList.entrySet().iterator();
return new Iterator<D>() {
@Override public boolean hasNext() {
return moduleDefineIterator.hasNext();
}
@Override public D next() {
Map.Entry<String, String> moduleDefineEntry = moduleDefineIterator.next();
String definitionName = moduleDefineEntry.getKey();
String definitionClass = moduleDefineEntry.getValue();
logger.info("key: {}, definitionClass: {}", definitionName, definitionClass);
try {
Class c = Class.forName(definitionClass);
Define define = (Define)c.newInstance();
define.setName(definitionName);
return (D)define;
} catch (Exception e) {
}
return null;
}
};
}
}
package org.skywalking.apm.collector.core.util;
import com.sun.istack.internal.Nullable;
/**
* @author pengys5
*/
public class ObjectUtils {
public static boolean isEmpty(@Nullable Object obj) {
return obj == null;
}
}
package org.skywalking.apm.collector.core.util;
import java.io.FileNotFoundException;
import java.io.FileReader;
/**
* @author pengys5
*/
public class ResourceUtils {
private static final String PATH = ResourceUtils.class.getResource("/").getPath();
public static FileReader read(String fileName) throws FileNotFoundException {
return new FileReader(PATH + fileName);
}
}
package org.skywalking.apm.collector.core.util;
import com.sun.istack.internal.Nullable;
/**
* @author pengys5
*/
public class StringUtils {
public static final String EMPTY_STRING = "";
public static boolean isEmpty(@Nullable Object str) {
return (str == null || EMPTY_STRING.equals(str));
}
}
package org.skywalking.apm.collector.core.worker;
import java.util.Map;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.DataInitializer;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleException;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
/**
* @author pengys5
*/
public abstract class WorkerModuleDefine extends ModuleDefine {
@Override public final void initialize(Map config) throws ModuleException {
try {
configParser().parse(config);
Server server = server();
server.initialize();
} catch (ConfigParseException | ServerException e) {
throw new WorkerModuleException(e.getMessage(), e);
}
}
@Override public final Client client() {
throw new UnsupportedOperationException();
}
@Override public final DataInitializer dataInitializer() {
throw new UnsupportedOperationException();
}
}
package org.skywalking.apm.collector.core.worker;
import org.skywalking.apm.collector.core.module.ModuleException;
/**
* @author pengys5
*/
public class WorkerModuleException extends ModuleException {
public WorkerModuleException(String message) {
super(message);
}
public WorkerModuleException(String message, Throwable cause) {
super(message, cause);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</Pattern>
</layout>
</appender>
<logger name="com.base22" level="TRACE"/>
<root level="debug">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
\ No newline at end of file
package org.skywalking.apm.collector.core.config;
import java.io.FileNotFoundException;
import org.junit.Test;
import org.skywalking.apm.collector.core.module.ModuleConfigLoader;
import org.skywalking.apm.collector.core.module.ModuleConfigLoaderException;
/**
* @author pengys5
*/
public class ModuleConfigLoaderTestCase {
@Test
public void testLoad() throws ModuleConfigLoaderException {
ModuleConfigLoader loader = new ModuleConfigLoader();
loader.load();
}
}
package org.skywalking.apm.collector.core.module;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterModuleForTest implements Module {
private final Logger logger = LoggerFactory.getLogger(ModuleInstaller.class);
@Override public void install(Map configuration) {
logger.debug(configuration.toString());
}
}
package org.skywalking.apm.collector.core.module;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
*/
public class ModuleInstallerTestCase {
@Before
public void init() {
}
@Test
public void testInstall() {
ModuleInstaller installer = new ModuleInstaller();
}
}
cluster=org.skywalking.apm.collector.core.module.ClusterModuleForTest
\ No newline at end of file
cluster:
zookeeper:
host: localhost-zk
port: 1000
redis:
host: localhost-rd
port: 2000
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</Pattern>
</layout>
</appender>
<root level="debug">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册