提交 897cd242 编写于 作者: P pengys5

no message

上级 3773c468
package org.skywalking.apm.collector.agentjvm;
import java.util.List;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.MultipleModuleInstaller;
import org.skywalking.apm.collector.core.module.MultipleCommonModuleInstaller;
/**
* @author pengys5
*/
public class AgentJVMModuleInstaller extends MultipleModuleInstaller {
public class AgentJVMCommonModuleInstaller extends MultipleCommonModuleInstaller {
@Override public String groupName() {
return AgentJVMModuleGroupDefine.GROUP_NAME;
......@@ -15,4 +16,8 @@ public class AgentJVMModuleInstaller extends MultipleModuleInstaller {
@Override public Context moduleContext() {
return new AgentJVMModuleContext(groupName());
}
@Override public List<String> dependenceModules() {
return null;
}
}
package org.skywalking.apm.collector.agentjvm;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.module.ModuleDefine;
......@@ -10,7 +9,7 @@ import org.skywalking.apm.collector.core.module.ModuleDefine;
*/
public abstract class AgentJVMModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
@Override protected final Client createClient(DataMonitor dataMonitor) {
@Override protected final Client createClient() {
throw new UnsupportedOperationException("");
}
......
......@@ -11,6 +11,12 @@ public class AgentJVMModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "agent_jvm";
private final AgentJVMCommonModuleInstaller installer;
public AgentJVMModuleGroupDefine() {
installer = new AgentJVMCommonModuleInstaller();
}
@Override public String name() {
return GROUP_NAME;
}
......@@ -20,6 +26,6 @@ public class AgentJVMModuleGroupDefine implements ModuleGroupDefine {
}
@Override public ModuleInstaller moduleInstaller() {
return new AgentJVMModuleInstaller();
return installer;
}
}
......@@ -19,7 +19,7 @@ public class AgentJVMGRPCDataListener extends ClusterDataListener {
}
@Override public void serverQuitNotify() {
@Override public void serverQuitNotify(String serverAddress) {
}
}
package org.skywalking.apm.collector.agentregister;
import java.util.List;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.MultipleModuleInstaller;
import org.skywalking.apm.collector.core.module.MultipleCommonModuleInstaller;
/**
* @author pengys5
*/
public class AgentRegisterModuleInstaller extends MultipleModuleInstaller {
public class AgentRegisterCommonModuleInstaller extends MultipleCommonModuleInstaller {
@Override public String groupName() {
return AgentRegisterModuleGroupDefine.GROUP_NAME;
......@@ -15,4 +16,8 @@ public class AgentRegisterModuleInstaller extends MultipleModuleInstaller {
@Override public Context moduleContext() {
return new AgentRegisterModuleContext(groupName());
}
@Override public List<String> dependenceModules() {
return null;
}
}
package org.skywalking.apm.collector.agentregister;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.module.ModuleDefine;
......@@ -14,7 +13,7 @@ public abstract class AgentRegisterModuleDefine extends ModuleDefine implements
}
@Override protected final Client createClient(DataMonitor dataMonitor) {
@Override protected final Client createClient() {
throw new UnsupportedOperationException("");
}
......
......@@ -11,6 +11,12 @@ public class AgentRegisterModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "agent_register";
private final AgentRegisterCommonModuleInstaller installer;
public AgentRegisterModuleGroupDefine() {
installer = new AgentRegisterCommonModuleInstaller();
}
@Override public String name() {
return GROUP_NAME;
}
......@@ -20,6 +26,6 @@ public class AgentRegisterModuleGroupDefine implements ModuleGroupDefine {
}
@Override public ModuleInstaller moduleInstaller() {
return new AgentRegisterModuleInstaller();
return installer;
}
}
......@@ -19,7 +19,7 @@ public class AgentRegisterGRPCDataListener extends ClusterDataListener {
}
@Override public void serverQuitNotify() {
@Override public void serverQuitNotify(String serverAddress) {
}
}
......@@ -19,7 +19,7 @@ public class AgentRegisterJettyDataListener extends ClusterDataListener {
}
@Override public void serverQuitNotify() {
@Override public void serverQuitNotify(String serverAddress) {
}
}
package org.skywalking.apm.collector.agentserver;
import java.util.List;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.MultipleModuleInstaller;
import org.skywalking.apm.collector.core.module.MultipleCommonModuleInstaller;
/**
* @author pengys5
*/
public class AgentServerModuleInstaller extends MultipleModuleInstaller {
public class AgentServerCommonModuleInstaller extends MultipleCommonModuleInstaller {
@Override public String groupName() {
return AgentServerModuleGroupDefine.GROUP_NAME;
......@@ -15,4 +16,8 @@ public class AgentServerModuleInstaller extends MultipleModuleInstaller {
@Override public Context moduleContext() {
return new AgentServerModuleContext(groupName());
}
@Override public List<String> dependenceModules() {
return null;
}
}
package org.skywalking.apm.collector.agentserver;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.module.ModuleDefine;
......@@ -14,7 +13,7 @@ public abstract class AgentServerModuleDefine extends ModuleDefine implements Cl
}
@Override protected final Client createClient(DataMonitor dataMonitor) {
@Override protected final Client createClient() {
throw new UnsupportedOperationException("");
}
}
......@@ -10,6 +10,11 @@ import org.skywalking.apm.collector.core.module.ModuleInstaller;
public class AgentServerModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "agent_server";
private final AgentServerCommonModuleInstaller installer;
public AgentServerModuleGroupDefine() {
installer = new AgentServerCommonModuleInstaller();
}
@Override public String name() {
return GROUP_NAME;
......@@ -20,6 +25,6 @@ public class AgentServerModuleGroupDefine implements ModuleGroupDefine {
}
@Override public ModuleInstaller moduleInstaller() {
return new AgentServerModuleInstaller();
return installer;
}
}
......@@ -17,7 +17,7 @@ public class AgentServerJettyDataListener extends ClusterDataListener {
}
@Override public void serverQuitNotify() {
@Override public void serverQuitNotify(String serverAddress) {
}
}
......@@ -5,8 +5,6 @@ import com.google.gson.JsonElement;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agentstream.grpc.AgentStreamGRPCDataListener;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
......@@ -22,10 +20,10 @@ public class AgentStreamGRPCServerHandler extends JettyHandler {
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader();
ClusterModuleRegistrationReader reader = CollectorContextHelper.INSTANCE.getClusterModuleContext().getReader();
Set<String> servers = reader.read(AgentStreamGRPCDataListener.PATH);
JsonArray serverArray = new JsonArray();
servers.forEach(server -> serverArray.add(server));
servers.forEach(serverArray::add);
return serverArray;
}
......
......@@ -5,8 +5,6 @@ import com.google.gson.JsonElement;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agentstream.jetty.AgentStreamJettyDataListener;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
......@@ -22,12 +20,10 @@ public class AgentStreamJettyServerHandler extends JettyHandler {
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader();
ClusterModuleRegistrationReader reader = CollectorContextHelper.INSTANCE.getClusterModuleContext().getReader();
Set<String> servers = reader.read(AgentStreamJettyDataListener.PATH);
JsonArray serverArray = new JsonArray();
servers.forEach(server -> {
serverArray.add(server);
});
servers.forEach(serverArray::add);
return serverArray;
}
......
......@@ -4,8 +4,6 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
......@@ -22,12 +20,10 @@ public class UIJettyServerHandler extends JettyHandler {
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader();
ClusterModuleRegistrationReader reader = CollectorContextHelper.INSTANCE.getClusterModuleContext().getReader();
Set<String> servers = reader.read(UIJettyDataListener.PATH);
JsonArray serverArray = new JsonArray();
servers.forEach(server -> {
serverArray.add(server);
});
servers.forEach(serverArray::add);
return serverArray;
}
......
package org.skywalking.apm.collector.agentstream;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.MultipleModuleInstaller;
import org.skywalking.apm.collector.core.module.MultipleCommonModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerException;
/**
* @author pengys5
*/
public class AgentStreamModuleInstaller extends MultipleModuleInstaller {
public class AgentStreamCommonModuleInstaller extends MultipleCommonModuleInstaller {
@Override public String groupName() {
return AgentStreamModuleGroupDefine.GROUP_NAME;
......@@ -20,7 +22,11 @@ public class AgentStreamModuleInstaller extends MultipleModuleInstaller {
return new AgentStreamModuleContext(groupName());
}
@Override public void install() throws DefineException, ConfigException, ServerException {
@Override public List<String> dependenceModules() {
return null;
}
@Override public void install() throws DefineException, ConfigException, ServerException, ClientException {
super.install();
new PersistenceTimer().start();
}
......
package org.skywalking.apm.collector.agentstream;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.module.ModuleDefine;
......@@ -10,7 +9,7 @@ import org.skywalking.apm.collector.core.module.ModuleDefine;
*/
public abstract class AgentStreamModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
@Override protected final Client createClient(DataMonitor dataMonitor) {
@Override protected final Client createClient() {
throw new UnsupportedOperationException("");
}
......
......@@ -10,6 +10,11 @@ import org.skywalking.apm.collector.core.module.ModuleInstaller;
public class AgentStreamModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "agent_stream";
private final AgentStreamCommonModuleInstaller installer;
public AgentStreamModuleGroupDefine() {
installer = new AgentStreamCommonModuleInstaller();
}
@Override public String name() {
return GROUP_NAME;
......@@ -20,6 +25,6 @@ public class AgentStreamModuleGroupDefine implements ModuleGroupDefine {
}
@Override public ModuleInstaller moduleInstaller() {
return new AgentStreamModuleInstaller();
return installer;
}
}
......@@ -19,7 +19,7 @@ public class AgentStreamGRPCDataListener extends ClusterDataListener {
}
@Override public void serverQuitNotify() {
@Override public void serverQuitNotify(String serverAddress) {
}
}
......@@ -19,7 +19,7 @@ public class AgentStreamJettyDataListener extends ClusterDataListener {
}
@Override public void serverQuitNotify() {
@Override public void serverQuitNotify(String serverAddress) {
}
}
package org.skywalking.apm.collector.boot;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.CollectorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -13,9 +11,10 @@ public class CollectorBootStartUp {
private static final Logger logger = LoggerFactory.getLogger(CollectorBootStartUp.class);
public static void main(String[] args) throws ConfigException, DefineException, ClientException {
public static void main(String[] args) throws CollectorException {
logger.info("collector starting...");
CollectorStarter starter = new CollectorStarter();
starter.start();
logger.info("collector start successful.");
}
}
package org.skywalking.apm.collector.boot;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.framework.Starter;
import org.skywalking.apm.collector.core.module.ModuleConfigLoader;
import org.skywalking.apm.collector.core.module.ModuleDefine;
......@@ -12,6 +10,7 @@ import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleGroupDefineLoader;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -21,24 +20,26 @@ import org.slf4j.LoggerFactory;
public class CollectorStarter implements Starter {
private final Logger logger = LoggerFactory.getLogger(CollectorStarter.class);
private Map<String, ModuleGroupDefine> moduleGroupDefineMap;
@Override public void start() throws ConfigException, DefineException, ClientException {
@Override public void start() throws CollectorException {
ModuleConfigLoader configLoader = new ModuleConfigLoader();
Map<String, Map> configuration = configLoader.load();
ModuleGroupDefineLoader groupDefineLoader = new ModuleGroupDefineLoader();
Map<String, ModuleGroupDefine> moduleGroupDefineMap = groupDefineLoader.load();
moduleGroupDefineMap = groupDefineLoader.load();
ModuleDefineLoader defineLoader = new ModuleDefineLoader();
Map<String, Map<String, ModuleDefine>> moduleDefineMap = defineLoader.load();
ServerHolder serverHolder = new ServerHolder();
// moduleGroupDefineMap.get(ClusterModuleGroupDefine.GROUP_NAME).moduleInstaller().install(configuration.get(ClusterModuleGroupDefine.GROUP_NAME), moduleDefineMap.get(ClusterModuleGroupDefine.GROUP_NAME), serverHolder);
// moduleGroupDefineMap.remove(ClusterModuleGroupDefine.GROUP_NAME);
for (ModuleGroupDefine moduleGroupDefine : moduleGroupDefineMap.values()) {
moduleGroupDefine.moduleInstaller().injectConfiguration(configuration.get(moduleGroupDefine.name()), moduleDefineMap.get(moduleGroupDefine.name()));
moduleGroupDefine.moduleInstaller().injectServerHolder(serverHolder);
moduleGroupDefine.moduleInstaller().preInstall();
}
for (ModuleGroupDefine moduleGroupDefine : moduleGroupDefineMap.values()) {
moduleGroupDefine.moduleInstaller().install();
}
......@@ -49,5 +50,26 @@ public class CollectorStarter implements Starter {
logger.error(e.getMessage(), e);
}
});
dependenceAfterInstall();
}
private void dependenceAfterInstall() throws CollectorException {
for (ModuleGroupDefine moduleGroupDefine : moduleGroupDefineMap.values()) {
moduleInstall(moduleGroupDefine);
}
}
private void moduleInstall(ModuleGroupDefine moduleGroupDefine) throws CollectorException {
if (CollectionUtils.isNotEmpty(moduleGroupDefine.moduleInstaller().dependenceModules())) {
for (String groupName : moduleGroupDefine.moduleInstaller().dependenceModules()) {
moduleInstall(moduleGroupDefineMap.get(groupName));
}
logger.info("after install module group: {}", moduleGroupDefine.name());
moduleGroupDefine.moduleInstaller().afterInstall();
} else {
logger.info("after install module group: {}", moduleGroupDefine.name());
moduleGroupDefine.moduleInstaller().afterInstall();
}
}
}
......@@ -64,6 +64,10 @@ public class ElasticSearchClient implements Client {
}
}
@Override public void shutdown() {
}
private List<AddressPairs> parseClusterNodes(String nodes) {
List<AddressPairs> pairsList = new LinkedList<>();
logger.info("elasticsearch cluster nodes: {}", nodes);
......
......@@ -25,6 +25,10 @@ public class GRPCClient implements Client {
channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build();
}
@Override public void shutdown() {
channel.shutdownNow();
}
public ManagedChannel getChannel() {
return channel;
}
......
......@@ -27,6 +27,10 @@ public class H2Client implements Client {
}
}
@Override public void shutdown() {
}
public void execute(String sql) throws H2ClientException {
Statement statement = null;
try {
......
......@@ -23,6 +23,10 @@ public class RedisClient implements Client {
jedis = new Jedis(host, port);
}
@Override public void shutdown() {
}
public void setex(String key, int seconds, String value) {
jedis.setex(key, seconds, value);
}
......
......@@ -39,6 +39,10 @@ public class ZookeeperClient implements Client {
}
}
@Override public void shutdown() {
}
public void create(final String path, byte data[], List<ACL> acl,
CreateMode createMode) throws ZookeeperClientException {
try {
......
package org.skywalking.apm.collector.cluster;
import java.util.List;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
......@@ -20,17 +21,15 @@ public abstract class ClusterModuleDefine extends ModuleDefine {
public static final String BASE_CATALOG = "skywalking";
private Client client;
private DataMonitor dataMonitor;
@Override protected void initializeOtherContext() {
try {
dataMonitor = dataMonitor();
client = createClient(dataMonitor);
client = createClient();
client.initialize();
dataMonitor.setClient(client);
ClusterModuleRegistrationReader reader = registrationReader(dataMonitor);
dataMonitor().setClient(client);
ClusterModuleRegistrationReader reader = registrationReader();
CollectorContextHelper.INSTANCE.getClusterModuleContext().setDataMonitor(dataMonitor);
CollectorContextHelper.INSTANCE.getClusterModuleContext().setDataMonitor(dataMonitor());
CollectorContextHelper.INSTANCE.getClusterModuleContext().setReader(reader);
} catch (ClientException e) {
throw new UnexpectedException(e.getMessage());
......@@ -42,11 +41,11 @@ public abstract class ClusterModuleDefine extends ModuleDefine {
}
@Override public final Server server() {
throw new UnsupportedOperationException("");
return null;
}
@Override public final List<Handler> handlerList() {
throw new UnsupportedOperationException("");
return null;
}
@Override protected final ModuleRegistration registration() {
......@@ -55,6 +54,9 @@ public abstract class ClusterModuleDefine extends ModuleDefine {
public abstract DataMonitor dataMonitor();
public abstract ClusterModuleRegistrationReader registrationReader(DataMonitor dataMonitor);
public abstract ClusterModuleRegistrationReader registrationReader();
public void startMonitor() throws CollectorException {
dataMonitor().start();
}
}
......@@ -11,6 +11,11 @@ import org.skywalking.apm.collector.core.module.ModuleInstaller;
public class ClusterModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "cluster";
private final ClusterModuleInstaller installer;
public ClusterModuleGroupDefine() {
installer = new ClusterModuleInstaller();
}
@Override public String name() {
return GROUP_NAME;
......@@ -21,6 +26,6 @@ public class ClusterModuleGroupDefine implements ModuleGroupDefine {
}
@Override public ModuleInstaller moduleInstaller() {
return new ClusterModuleInstaller();
return installer;
}
}
package org.skywalking.apm.collector.cluster;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.Context;
......@@ -19,4 +22,14 @@ public class ClusterModuleInstaller extends SingleModuleInstaller {
CollectorContextHelper.INSTANCE.putClusterContext(clusterModuleContext);
return clusterModuleContext;
}
@Override public List<String> dependenceModules() {
List<String> dependenceModules = new LinkedList<>();
dependenceModules.add("collector_inside");
return dependenceModules;
}
@Override public void onAfterInstall() throws CollectorException {
((ClusterModuleDefine)getModuleDefine()).startMonitor();
}
}
......@@ -35,11 +35,11 @@ public class ClusterRedisModuleDefine extends ClusterModuleDefine {
return null;
}
@Override protected Client createClient(DataMonitor dataMonitor) {
@Override protected Client createClient() {
return new RedisClient(ClusterRedisConfig.HOST, ClusterRedisConfig.PORT);
}
@Override public ClusterModuleRegistrationReader registrationReader(DataMonitor dataMonitor) {
return new ClusterRedisModuleRegistrationReader(dataMonitor);
@Override public ClusterModuleRegistrationReader registrationReader() {
return null;
}
}
......@@ -35,11 +35,11 @@ public class ClusterStandaloneModuleDefine extends ClusterModuleDefine {
return null;
}
@Override protected Client createClient(DataMonitor dataMonitor) {
@Override protected Client createClient() {
return new H2Client();
}
@Override public ClusterModuleRegistrationReader registrationReader(DataMonitor dataMonitor) {
return new ClusterStandaloneModuleRegistrationReader(dataMonitor);
@Override public ClusterModuleRegistrationReader registrationReader() {
return null;
}
}
......@@ -53,9 +53,12 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher {
String dataStr = new String(data);
if (stat.getCzxid() == stat.getMzxid()) {
logger.info("path children has been created, path: {}, data: {}", event.getPath() + "/" + serverPath, dataStr);
listeners.get(event.getPath()).addAddress(serverPath + dataStr);
listeners.get(event.getPath()).serverJoinNotify(serverPath + dataStr);
} else {
logger.info("path children has been changed, path: {}, data: {}", event.getPath() + "/" + serverPath, dataStr);
listeners.get(event.getPath()).removeAddress(serverPath + dataStr);
listeners.get(event.getPath()).serverQuitNotify(serverPath + dataStr);
}
}
}
......
package org.skywalking.apm.collector.cluster.zookeeper;
import org.apache.zookeeper.Watcher;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
......@@ -15,6 +14,11 @@ import org.skywalking.apm.collector.core.module.ModuleConfigParser;
public class ClusterZKModuleDefine extends ClusterModuleDefine {
public static final String MODULE_NAME = "zookeeper";
private final ClusterZKDataMonitor dataMonitor;
public ClusterZKModuleDefine() {
dataMonitor = new ClusterZKDataMonitor();
}
@Override protected String group() {
return ClusterModuleGroupDefine.GROUP_NAME;
......@@ -33,14 +37,14 @@ public class ClusterZKModuleDefine extends ClusterModuleDefine {
}
@Override public DataMonitor dataMonitor() {
return new ClusterZKDataMonitor();
return dataMonitor;
}
@Override protected Client createClient(DataMonitor dataMonitor) {
return new ZookeeperClient(ClusterZKConfig.HOST_PORT, ClusterZKConfig.SESSION_TIMEOUT, (Watcher)dataMonitor);
@Override protected Client createClient() {
return new ZookeeperClient(ClusterZKConfig.HOST_PORT, ClusterZKConfig.SESSION_TIMEOUT, dataMonitor);
}
@Override public ClusterModuleRegistrationReader registrationReader(DataMonitor dataMonitor) {
@Override public ClusterModuleRegistrationReader registrationReader() {
return new ClusterZKModuleRegistrationReader(dataMonitor);
}
}
......@@ -5,4 +5,6 @@ package org.skywalking.apm.collector.core.client;
*/
public interface Client {
void initialize() throws ClientException;
void shutdown();
}
......@@ -7,7 +7,7 @@ import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public interface DataMonitor extends Starter{
public interface DataMonitor extends Starter {
void setClient(Client client);
void addListener(ClusterDataListener listener, ModuleRegistration registration) throws ClientException;
......
......@@ -31,5 +31,5 @@ public abstract class ClusterDataListener implements Listener {
public abstract void serverJoinNotify(String serverAddress);
public abstract void serverQuitNotify();
public abstract void serverQuitNotify(String serverAddress);
}
package org.skywalking.apm.collector.core.module;
import java.util.Map;
import org.skywalking.apm.collector.core.CollectorException;
/**
* @author pengys5
*/
public abstract class ModuleConfigContainer implements ModuleInstaller {
public abstract class CommonModuleInstaller implements ModuleInstaller {
private boolean isInstalled = false;
private Map<String, Map> moduleConfig;
private Map<String, ModuleDefine> moduleDefineMap;
......@@ -16,11 +18,20 @@ public abstract class ModuleConfigContainer implements ModuleInstaller {
this.moduleDefineMap = moduleDefineMap;
}
public final Map<String, Map> getModuleConfig() {
protected final Map<String, Map> getModuleConfig() {
return moduleConfig;
}
public final Map<String, ModuleDefine> getModuleDefineMap() {
protected final Map<String, ModuleDefine> getModuleDefineMap() {
return moduleDefineMap;
}
public abstract void onAfterInstall() throws CollectorException;
@Override public final void afterInstall() throws CollectorException {
if (!isInstalled) {
onAfterInstall();
}
isInstalled = true;
}
}
......@@ -2,7 +2,6 @@ package org.skywalking.apm.collector.core.module;
import java.util.List;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.framework.Define;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.server.Server;
......@@ -18,7 +17,7 @@ public abstract class ModuleDefine implements Define {
protected abstract ModuleConfigParser configParser();
protected abstract Client createClient(DataMonitor dataMonitor);
protected abstract Client createClient();
protected abstract Server server();
......
package org.skywalking.apm.collector.core.module;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import org.skywalking.apm.collector.core.framework.DefineException;
......@@ -21,11 +22,11 @@ public class ModuleGroupDefineLoader implements Loader<Map<String, ModuleGroupDe
ModuleGroupDefineFile definitionFile = new ModuleGroupDefineFile();
logger.info("module group definition file name: {}", definitionFile.fileName());
DefinitionLoader<ModuleGroupDefine> definitionLoader = DefinitionLoader.load(ModuleGroupDefine.class, definitionFile);
for (ModuleGroupDefine moduleGroupDefine : definitionLoader) {
logger.info("loaded group module definition class: {}", moduleGroupDefine.getClass().getName());
String groupName = moduleGroupDefine.name().toLowerCase();
moduleGroupDefineMap.put(groupName, moduleGroupDefine);
Iterator<ModuleGroupDefine> defineIterator = definitionLoader.iterator();
while (defineIterator.hasNext()) {
ModuleGroupDefine groupDefine = defineIterator.next();
String groupName = groupDefine.name().toLowerCase();
moduleGroupDefineMap.put(groupName, groupDefine);
}
return moduleGroupDefineMap;
}
......
package org.skywalking.apm.collector.core.module;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.Context;
......@@ -13,6 +15,8 @@ import org.skywalking.apm.collector.core.server.ServerHolder;
*/
public interface ModuleInstaller {
List<String> dependenceModules();
void injectServerHolder(ServerHolder serverHolder);
String groupName();
......@@ -24,4 +28,6 @@ public interface ModuleInstaller {
void preInstall() throws DefineException, ConfigException, ServerException;
void install() throws ClientException, DefineException, ConfigException, ServerException;
void afterInstall() throws CollectorException;
}
......@@ -4,22 +4,26 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class MultipleModuleInstaller extends ModuleConfigContainer {
public abstract class MultipleCommonModuleInstaller extends CommonModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(MultipleModuleInstaller.class);
private final Logger logger = LoggerFactory.getLogger(MultipleCommonModuleInstaller.class);
public MultipleModuleInstaller() {
public MultipleCommonModuleInstaller() {
moduleDefines = new LinkedList<>();
}
......@@ -31,6 +35,7 @@ public abstract class MultipleModuleInstaller extends ModuleConfigContainer {
}
@Override public final void preInstall() throws DefineException, ConfigException, ServerException {
logger.info("install module group: {}", groupName());
Map<String, Map> moduleConfig = getModuleConfig();
Map<String, ModuleDefine> moduleDefineMap = getModuleDefineMap();
......@@ -44,12 +49,22 @@ public abstract class MultipleModuleInstaller extends ModuleConfigContainer {
}
}
@Override public void install() throws DefineException, ConfigException, ServerException {
preInstall();
@Override public void install() throws DefineException, ConfigException, ServerException, ClientException {
CollectorContextHelper.INSTANCE.putContext(moduleContext());
moduleDefines.forEach(moduleDefine -> {
for (ModuleDefine moduleDefine : moduleDefines) {
moduleDefine.initializeOtherContext();
});
if (moduleDefine instanceof ClusterDataListenerDefine) {
ClusterDataListenerDefine listenerDefine = (ClusterDataListenerDefine)moduleDefine;
if (ObjectUtils.isNotEmpty(listenerDefine.listener()) && ObjectUtils.isNotEmpty(moduleDefine.registration())) {
logger.info("add group: {}, module: {}, listener into cluster data monitor", moduleDefine.group(), moduleDefine.name());
CollectorContextHelper.INSTANCE.getClusterModuleContext().getDataMonitor().addListener(listenerDefine.listener(), moduleDefine.registration());
}
}
}
}
@Override public void onAfterInstall() throws CollectorException {
}
}
......@@ -2,8 +2,10 @@ package org.skywalking.apm.collector.core.module;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.cluster.ClusterModuleException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
......@@ -11,13 +13,14 @@ import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class SingleModuleInstaller extends ModuleConfigContainer {
public abstract class SingleModuleInstaller extends CommonModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(SingleModuleInstaller.class);
......@@ -29,6 +32,7 @@ public abstract class SingleModuleInstaller extends ModuleConfigContainer {
}
@Override public final void preInstall() throws DefineException, ConfigException, ServerException {
logger.info("install module group: {}", groupName());
Map<String, Map> moduleConfig = getModuleConfig();
Map<String, ModuleDefine> moduleDefineMap = getModuleDefineMap();
if (CollectionUtils.isNotEmpty(moduleConfig)) {
......@@ -45,17 +49,17 @@ public abstract class SingleModuleInstaller extends ModuleConfigContainer {
}
} else {
logger.info("could not configure module, use the default");
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineIterator = moduleDefineMap.entrySet().iterator();
boolean hasDefaultModule = false;
while (moduleDefineEntry.hasNext()) {
if (moduleDefineEntry.next().getValue().defaultModule()) {
logger.info("module {} initialize", moduleDefine.getClass().getName());
while (moduleDefineIterator.hasNext()) {
Map.Entry<String, ModuleDefine> moduleDefineEntry = moduleDefineIterator.next();
if (moduleDefineEntry.getValue().defaultModule()) {
if (hasDefaultModule) {
throw new ClusterModuleException("single module, but configure multiple default module");
}
moduleDefine = moduleDefineEntry.next().getValue();
moduleDefine.configParser().parse(null);
this.moduleDefine = moduleDefineEntry.getValue();
this.moduleDefine.configParser().parse(null);
hasDefaultModule = true;
}
}
......@@ -64,13 +68,21 @@ public abstract class SingleModuleInstaller extends ModuleConfigContainer {
}
@Override public void install() throws ClientException, DefineException, ConfigException, ServerException {
preInstall();
if (!(moduleContext() instanceof ClusterModuleContext)) {
CollectorContextHelper.INSTANCE.putContext(moduleContext());
}
moduleDefine.initializeOtherContext();
CollectorContextHelper.INSTANCE.putContext(moduleContext());
if (moduleDefine instanceof ClusterDataListenerDefine) {
ClusterDataListenerDefine listenerDefine = (ClusterDataListenerDefine)moduleDefine;
CollectorContextHelper.INSTANCE.getClusterModuleContext().getDataMonitor().addListener(listenerDefine.listener(), moduleDefine.registration());
if (ObjectUtils.isNotEmpty(listenerDefine.listener()) && ObjectUtils.isNotEmpty(moduleDefine.registration())) {
CollectorContextHelper.INSTANCE.getClusterModuleContext().getDataMonitor().addListener(listenerDefine.listener(), moduleDefine.registration());
logger.info("add group: {}, module: {}, listener into cluster data monitor", moduleDefine.group(), moduleDefine.name());
}
}
}
protected ModuleDefine getModuleDefine() {
return moduleDefine;
}
}
......@@ -4,6 +4,7 @@ import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -21,6 +22,10 @@ public class ServerHolder {
}
public void holdServer(Server newServer, List<Handler> handlers) throws ServerException {
if (ObjectUtils.isEmpty(newServer) || CollectionUtils.isEmpty(handlers)) {
return;
}
boolean isNewServer = true;
for (Server server : servers) {
if (server.hostPort().equals(newServer.hostPort()) && server.serverClassify().equals(newServer.serverClassify())) {
......
......@@ -35,7 +35,6 @@ public class DefinitionLoader<D> implements Iterable<D> {
@Override public final Iterator<D> iterator() {
logger.info("load definition file: {}", definitionFile.get());
Properties properties = new Properties();
List<String> definitionList = new LinkedList<>();
try {
Enumeration<URL> urlEnumeration = this.getClass().getClassLoader().getResources(definitionFile.get());
......@@ -43,6 +42,7 @@ public class DefinitionLoader<D> implements Iterable<D> {
URL definitionFileURL = urlEnumeration.nextElement();
logger.info("definition file url: {}", definitionFileURL.getPath());
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(definitionFileURL.openStream()));
Properties properties = new Properties();
properties.load(bufferedReader);
Enumeration defineItem = properties.propertyNames();
......@@ -52,7 +52,7 @@ public class DefinitionLoader<D> implements Iterable<D> {
}
}
} catch (IOException e) {
e.printStackTrace();
logger.error(e.getMessage(), e);
}
Iterator<String> moduleDefineIterator = definitionList.iterator();
......
......@@ -4,7 +4,6 @@ import java.util.List;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
......@@ -13,12 +12,9 @@ import org.skywalking.apm.collector.core.server.Server;
* @author pengys5
*/
public abstract class QueueModuleDefine extends ModuleDefine {
@Override protected final ModuleConfigParser configParser() {
throw new UnsupportedOperationException("");
}
@Override protected final Client createClient(DataMonitor dataMonitor) {
throw new UnsupportedOperationException("");
@Override protected Client createClient() {
return null;
}
@Override protected final ModuleRegistration registration() {
......@@ -26,10 +22,10 @@ public abstract class QueueModuleDefine extends ModuleDefine {
}
@Override protected final Server server() {
throw new UnsupportedOperationException("");
return null;
}
@Override public final List<Handler> handlerList() {
throw new UnsupportedOperationException("");
return null;
}
}
......@@ -10,6 +10,11 @@ import org.skywalking.apm.collector.core.module.ModuleInstaller;
public class QueueModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "queue";
private final QueueModuleInstaller installer;
public QueueModuleGroupDefine() {
installer = new QueueModuleInstaller();
}
@Override public String name() {
return GROUP_NAME;
......@@ -20,6 +25,6 @@ public class QueueModuleGroupDefine implements ModuleGroupDefine {
}
@Override public ModuleInstaller moduleInstaller() {
return new QueueModuleInstaller();
return installer;
}
}
package org.skywalking.apm.collector.queue;
import java.util.List;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.UnexpectedException;
import org.skywalking.apm.collector.core.module.SingleModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.queue.datacarrier.DataCarrierQueueCreator;
import org.skywalking.apm.collector.queue.datacarrier.QueueDataCarrierModuleDefine;
import org.skywalking.apm.collector.queue.disruptor.DisruptorQueueCreator;
import org.skywalking.apm.collector.queue.disruptor.QueueDisruptorModuleDefine;
/**
* @author pengys5
......@@ -22,8 +28,21 @@ public class QueueModuleInstaller extends SingleModuleInstaller {
return new QueueModuleContext(groupName());
}
@Override public List<String> dependenceModules() {
return null;
}
@Override public void install() throws ClientException, DefineException, ConfigException, ServerException {
super.install();
((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(groupName())).setQueueCreator(new DataCarrierQueueCreator());
if (getModuleDefine() instanceof QueueDataCarrierModuleDefine) {
((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(groupName())).setQueueCreator(new DataCarrierQueueCreator());
} else if (getModuleDefine() instanceof QueueDisruptorModuleDefine) {
((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(groupName())).setQueueCreator(new DisruptorQueueCreator());
} else {
throw new UnexpectedException("");
}
}
@Override public void onAfterInstall() throws CollectorException {
}
}
package org.skywalking.apm.collector.queue.datacarrier;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
/**
* @author pengys5
*/
public class DataCarrierQueueConfigParser implements ModuleConfigParser {
@Override public void parse(Map config) throws ConfigParseException {
}
}
package org.skywalking.apm.collector.queue.datacarrier;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.queue.QueueModuleContext;
import org.skywalking.apm.collector.queue.QueueModuleDefine;
import org.skywalking.apm.collector.queue.QueueModuleGroupDefine;
......@@ -22,6 +23,10 @@ public class QueueDataCarrierModuleDefine extends QueueModuleDefine {
return false;
}
@Override protected ModuleConfigParser configParser() {
return new DataCarrierQueueConfigParser();
}
@Override protected void initializeOtherContext() {
((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setQueueCreator(new DataCarrierQueueCreator());
}
......
......@@ -7,7 +7,9 @@ import org.skywalking.apm.collector.core.module.ModuleConfigParser;
/**
* @author pengys5
*/
public class QueueDisruptorConfigParser implements ModuleConfigParser {
public class DisruptorQueueConfigParser implements ModuleConfigParser {
@Override public void parse(Map config) throws ConfigParseException {
}
}
package org.skywalking.apm.collector.queue.disruptor;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.queue.QueueModuleContext;
import org.skywalking.apm.collector.queue.QueueModuleDefine;
import org.skywalking.apm.collector.queue.QueueModuleGroupDefine;
......@@ -22,6 +23,10 @@ public class QueueDisruptorModuleDefine extends QueueModuleDefine {
return true;
}
@Override protected ModuleConfigParser configParser() {
return new DisruptorQueueConfigParser();
}
@Override protected void initializeOtherContext() {
((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setQueueCreator(new DisruptorQueueCreator());
}
......
......@@ -14,20 +14,16 @@ import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.storage.StorageException;
import org.skywalking.apm.collector.core.storage.StorageInstaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class StorageModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(StorageModuleDefine.class);
@Override protected void initializeOtherContext() {
try {
StorageModuleContext context = (StorageModuleContext)CollectorContextHelper.INSTANCE.getContext(StorageModuleGroupDefine.GROUP_NAME);
Client client = createClient(null);
Client client = createClient();
client.initialize();
context.setClient(client);
injectClientIntoDAO(client);
......@@ -39,19 +35,19 @@ public abstract class StorageModuleDefine extends ModuleDefine implements Cluste
}
@Override public final List<Handler> handlerList() {
throw new UnsupportedOperationException("");
return null;
}
@Override protected final Server server() {
throw new UnsupportedOperationException("");
return null;
}
@Override protected final ModuleRegistration registration() {
throw new UnsupportedOperationException("");
return null;
}
@Override public final ClusterDataListener listener() {
throw new UnsupportedOperationException("");
return null;
}
@Override public final boolean defaultModule() {
......
......@@ -10,6 +10,11 @@ import org.skywalking.apm.collector.core.module.ModuleInstaller;
public class StorageModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "storage";
private final StorageModuleInstaller installer;
public StorageModuleGroupDefine() {
installer = new StorageModuleInstaller();
}
@Override public String name() {
return GROUP_NAME;
......@@ -20,6 +25,6 @@ public class StorageModuleGroupDefine implements ModuleGroupDefine {
}
@Override public ModuleInstaller moduleInstaller() {
return new StorageModuleInstaller();
return installer;
}
}
package org.skywalking.apm.collector.storage;
import java.util.List;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.SingleModuleInstaller;
......@@ -15,4 +17,12 @@ public class StorageModuleInstaller extends SingleModuleInstaller {
@Override public Context moduleContext() {
return new StorageModuleContext(groupName());
}
@Override public List<String> dependenceModules() {
return null;
}
@Override public void onAfterInstall() throws CollectorException {
}
}
......@@ -3,7 +3,6 @@ package org.skywalking.apm.collector.storage.elasticsearch;
import java.util.List;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.storage.StorageInstaller;
......@@ -33,7 +32,7 @@ public class StorageElasticSearchModuleDefine extends StorageModuleDefine {
return new StorageElasticSearchConfigParser();
}
@Override protected Client createClient(DataMonitor dataMonitor) {
@Override protected Client createClient() {
return new ElasticSearchClient(StorageElasticSearchConfig.CLUSTER_NAME, StorageElasticSearchConfig.CLUSTER_TRANSPORT_SNIFFER, StorageElasticSearchConfig.CLUSTER_NODES);
}
......
......@@ -3,7 +3,6 @@ package org.skywalking.apm.collector.storage.h2;
import java.util.List;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.storage.StorageInstaller;
......@@ -33,7 +32,7 @@ public class StorageH2ModuleDefine extends StorageModuleDefine {
return new StorageH2ConfigParser();
}
@Override protected Client createClient(DataMonitor dataMonitor) {
@Override protected Client createClient() {
return new H2Client();
}
......
......@@ -10,6 +10,11 @@ import org.skywalking.apm.collector.core.module.ModuleInstaller;
public class StreamModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "collector_inside";
private final StreamModuleInstaller installer;
public StreamModuleGroupDefine() {
installer = new StreamModuleInstaller();
}
@Override public String name() {
return GROUP_NAME;
......@@ -20,6 +25,6 @@ public class StreamModuleGroupDefine implements ModuleGroupDefine {
}
@Override public ModuleInstaller moduleInstaller() {
return new StreamModuleInstaller();
return installer;
}
}
package org.skywalking.apm.collector.stream;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.SingleModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.queue.QueueModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
......@@ -32,8 +31,13 @@ public class StreamModuleInstaller extends SingleModuleInstaller {
return new StreamModuleContext(groupName());
}
@Override public void install() throws ClientException, DefineException, ConfigException, ServerException {
super.install();
@Override public List<String> dependenceModules() {
List<String> dependenceModules = new LinkedList<>();
dependenceModules.add(QueueModuleGroupDefine.GROUP_NAME);
return dependenceModules;
}
@Override public void onAfterInstall() throws DefineException {
initializeWorker((StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(groupName()));
}
......
package org.skywalking.apm.collector.stream.grpc;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.client.grpc.GRPCClient;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
......@@ -9,6 +11,7 @@ import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.RemoteWorkerRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -26,6 +29,7 @@ public class StreamGRPCDataListener extends ClusterDataListener {
}
private Map<String, GRPCClient> clients = new HashMap<>();
private Map<String, List<RemoteWorkerRef>> remoteWorkerRefMap = new HashMap<>();
@Override public void serverJoinNotify(String serverAddress) {
String selfAddress = StreamGRPCConfig.HOST + ":" + StreamGRPCConfig.PORT;
......@@ -50,7 +54,11 @@ public class StreamGRPCDataListener extends ClusterDataListener {
} else {
context.getClusterWorkerContext().getProviders().forEach(provider -> {
logger.info("create remote worker reference, role: {}", provider.role().roleName());
provider.create(client);
RemoteWorkerRef remoteWorkerRef = provider.create(client);
if (!remoteWorkerRefMap.containsKey(serverAddress)) {
remoteWorkerRefMap.put(selfAddress, new LinkedList<>());
}
remoteWorkerRefMap.get(serverAddress).add(remoteWorkerRef);
});
}
} else {
......@@ -58,7 +66,17 @@ public class StreamGRPCDataListener extends ClusterDataListener {
}
}
@Override public void serverQuitNotify() {
@Override public void serverQuitNotify(String serverAddress) {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
if (clients.containsKey(serverAddress)) {
clients.get(serverAddress).shutdown();
clients.remove(serverAddress);
}
if (remoteWorkerRefMap.containsKey(serverAddress)) {
for (RemoteWorkerRef remoteWorkerRef : remoteWorkerRefMap.get(serverAddress)) {
context.getClusterWorkerContext().remove(remoteWorkerRef);
}
}
}
}
......@@ -3,7 +3,6 @@ package org.skywalking.apm.collector.stream.grpc;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
......@@ -33,7 +32,7 @@ public class StreamGRPCModuleDefine extends StreamModuleDefine {
return new StreamGRPCConfigParser();
}
@Override protected Client createClient(DataMonitor dataMonitor) {
@Override protected Client createClient() {
return null;
}
......
package org.skywalking.apm.collector.ui;
import java.util.List;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.MultipleModuleInstaller;
import org.skywalking.apm.collector.core.module.MultipleCommonModuleInstaller;
/**
* @author pengys5
*/
public class UIModuleInstaller extends MultipleModuleInstaller {
public class UICommonModuleInstaller extends MultipleCommonModuleInstaller {
@Override public String groupName() {
return UIModuleGroupDefine.GROUP_NAME;
......@@ -15,4 +16,8 @@ public class UIModuleInstaller extends MultipleModuleInstaller {
@Override public Context moduleContext() {
return new UIModuleContext(groupName());
}
@Override public List<String> dependenceModules() {
return null;
}
}
package org.skywalking.apm.collector.ui;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.module.ModuleDefine;
......@@ -10,7 +9,7 @@ import org.skywalking.apm.collector.core.module.ModuleDefine;
*/
public abstract class UIModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
@Override protected final Client createClient(DataMonitor dataMonitor) {
@Override protected final Client createClient() {
throw new UnsupportedOperationException("");
}
......
......@@ -10,6 +10,11 @@ import org.skywalking.apm.collector.core.module.ModuleInstaller;
public class UIModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "ui";
private final UICommonModuleInstaller installer;
public UIModuleGroupDefine() {
installer = new UICommonModuleInstaller();
}
@Override public String name() {
return GROUP_NAME;
......@@ -20,6 +25,6 @@ public class UIModuleGroupDefine implements ModuleGroupDefine {
}
@Override public ModuleInstaller moduleInstaller() {
return new UIModuleInstaller();
return installer;
}
}
......@@ -19,7 +19,6 @@ public class UIJettyDataListener extends ClusterDataListener {
}
@Override public void serverQuitNotify() {
@Override public void serverQuitNotify(String serverAddress) {
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册