提交 1cde27fa 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge pull request #409 from wu-sheng/feature/cluster

Feature/cluster
package org.skywalking.apm.collector.agentjvm;
import java.util.List;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.MultipleCommonModuleInstaller;
/**
* @author pengys5
*/
public class AgentJVMCommonModuleInstaller extends MultipleCommonModuleInstaller {
@Override public String groupName() {
return AgentJVMModuleGroupDefine.GROUP_NAME;
}
@Override public Context moduleContext() {
return new AgentJVMModuleContext(groupName());
}
@Override public List<String> dependenceModules() {
return null;
}
}
package org.skywalking.apm.collector.agentjvm;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class AgentJVMModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(AgentJVMModuleDefine.class);
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
try {
configParser().parse(config);
Server server = server();
serverHolder.holdServer(server, handlerList());
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) {
throw new AgentJVMModuleException(e.getMessage(), e);
}
@Override protected final Client createClient() {
throw new UnsupportedOperationException("");
}
@Override protected final Client createClient(DataMonitor dataMonitor) {
throw new UnsupportedOperationException("");
@Override protected void initializeOtherContext() {
}
@Override public final boolean defaultModule() {
return true;
}
public abstract List<Handler> handlerList();
}
......@@ -11,6 +11,12 @@ public class AgentJVMModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "agent_jvm";
private final AgentJVMCommonModuleInstaller installer;
public AgentJVMModuleGroupDefine() {
installer = new AgentJVMCommonModuleInstaller();
}
@Override public String name() {
return GROUP_NAME;
}
......@@ -20,6 +26,6 @@ public class AgentJVMModuleGroupDefine implements ModuleGroupDefine {
}
@Override public ModuleInstaller moduleInstaller() {
return new AgentJVMModuleInstaller();
return installer;
}
}
package org.skywalking.apm.collector.agentjvm;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class AgentJVMModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(AgentJVMModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning agent jvm module install");
AgentJVMModuleContext context = new AgentJVMModuleContext(AgentJVMModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder);
}
}
}
......@@ -15,7 +15,11 @@ public class AgentJVMGRPCDataListener extends ClusterDataListener {
return PATH;
}
@Override public void addressChangedNotify() {
@Override public void serverJoinNotify(String serverAddress) {
}
@Override public void serverQuitNotify(String serverAddress) {
}
}
package org.skywalking.apm.collector.agentregister;
import java.util.List;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.MultipleCommonModuleInstaller;
/**
* @author pengys5
*/
public class AgentRegisterCommonModuleInstaller extends MultipleCommonModuleInstaller {
@Override public String groupName() {
return AgentRegisterModuleGroupDefine.GROUP_NAME;
}
@Override public Context moduleContext() {
return new AgentRegisterModuleContext(groupName());
}
@Override public List<String> dependenceModules() {
return null;
}
}
package org.skywalking.apm.collector.agentregister;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class AgentRegisterModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(AgentRegisterModuleDefine.class);
@Override protected void initializeOtherContext() {
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
try {
configParser().parse(config);
Server server = server();
serverHolder.holdServer(server, handlerList());
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) {
throw new AgentRegisterModuleException(e.getMessage(), e);
}
}
@Override protected final Client createClient(DataMonitor dataMonitor) {
@Override protected final Client createClient() {
throw new UnsupportedOperationException("");
}
@Override public final boolean defaultModule() {
return true;
}
public abstract List<Handler> handlerList();
}
......@@ -11,6 +11,12 @@ public class AgentRegisterModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "agent_register";
private final AgentRegisterCommonModuleInstaller installer;
public AgentRegisterModuleGroupDefine() {
installer = new AgentRegisterCommonModuleInstaller();
}
@Override public String name() {
return GROUP_NAME;
}
......@@ -20,6 +26,6 @@ public class AgentRegisterModuleGroupDefine implements ModuleGroupDefine {
}
@Override public ModuleInstaller moduleInstaller() {
return new AgentRegisterModuleInstaller();
return installer;
}
}
package org.skywalking.apm.collector.agentregister;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class AgentRegisterModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(AgentRegisterModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning agent register module install");
AgentRegisterModuleContext context = new AgentRegisterModuleContext(AgentRegisterModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder);
}
}
}
......@@ -15,6 +15,11 @@ public class AgentRegisterGRPCDataListener extends ClusterDataListener {
return PATH;
}
@Override public void addressChangedNotify() {
@Override public void serverJoinNotify(String serverAddress) {
}
@Override public void serverQuitNotify(String serverAddress) {
}
}
......@@ -15,6 +15,11 @@ public class AgentRegisterJettyDataListener extends ClusterDataListener {
return PATH;
}
@Override public void addressChangedNotify() {
@Override public void serverJoinNotify(String serverAddress) {
}
@Override public void serverQuitNotify(String serverAddress) {
}
}
package org.skywalking.apm.collector.agentserver;
import java.util.List;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.MultipleCommonModuleInstaller;
/**
* @author pengys5
*/
public class AgentServerCommonModuleInstaller extends MultipleCommonModuleInstaller {
@Override public String groupName() {
return AgentServerModuleGroupDefine.GROUP_NAME;
}
@Override public Context moduleContext() {
return new AgentServerModuleContext(groupName());
}
@Override public List<String> dependenceModules() {
return null;
}
}
package org.skywalking.apm.collector.agentserver;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class AgentServerModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(AgentServerModuleDefine.class);
@Override protected void initializeOtherContext() {
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
try {
configParser().parse(config);
Server server = server();
serverHolder.holdServer(server, handlerList());
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) {
throw new AgentServerModuleException(e.getMessage(), e);
}
}
@Override protected final Client createClient(DataMonitor dataMonitor) {
@Override protected final Client createClient() {
throw new UnsupportedOperationException("");
}
public abstract List<Handler> handlerList();
}
......@@ -10,6 +10,11 @@ import org.skywalking.apm.collector.core.module.ModuleInstaller;
public class AgentServerModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "agent_server";
private final AgentServerCommonModuleInstaller installer;
public AgentServerModuleGroupDefine() {
installer = new AgentServerCommonModuleInstaller();
}
@Override public String name() {
return GROUP_NAME;
......@@ -20,6 +25,6 @@ public class AgentServerModuleGroupDefine implements ModuleGroupDefine {
}
@Override public ModuleInstaller moduleInstaller() {
return new AgentServerModuleInstaller();
return installer;
}
}
package org.skywalking.apm.collector.agentserver;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class AgentServerModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(AgentServerModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning agent server module install");
AgentServerModuleContext context = new AgentServerModuleContext(AgentServerModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
logger.info("could not configure agent server module, use the default");
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder);
}
}
}
......@@ -13,7 +13,11 @@ public class AgentServerJettyDataListener extends ClusterDataListener {
return ClusterModuleDefine.BASE_CATALOG + "." + AgentServerModuleGroupDefine.GROUP_NAME + "." + AgentServerJettyModuleDefine.MODULE_NAME;
}
@Override public void addressChangedNotify() {
@Override public void serverJoinNotify(String serverAddress) {
}
@Override public void serverQuitNotify(String serverAddress) {
}
}
......@@ -2,11 +2,9 @@ package org.skywalking.apm.collector.agentserver.jetty.handler;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import java.util.List;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agentstream.grpc.AgentStreamGRPCDataListener;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
......@@ -22,10 +20,10 @@ public class AgentStreamGRPCServerHandler extends JettyHandler {
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader();
List<String> servers = reader.read(AgentStreamGRPCDataListener.PATH);
ClusterModuleRegistrationReader reader = CollectorContextHelper.INSTANCE.getClusterModuleContext().getReader();
Set<String> servers = reader.read(AgentStreamGRPCDataListener.PATH);
JsonArray serverArray = new JsonArray();
servers.forEach(server -> serverArray.add(server));
servers.forEach(serverArray::add);
return serverArray;
}
......
......@@ -2,11 +2,9 @@ package org.skywalking.apm.collector.agentserver.jetty.handler;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import java.util.List;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agentstream.jetty.AgentStreamJettyDataListener;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
......@@ -22,12 +20,10 @@ public class AgentStreamJettyServerHandler extends JettyHandler {
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader();
List<String> servers = reader.read(AgentStreamJettyDataListener.PATH);
ClusterModuleRegistrationReader reader = CollectorContextHelper.INSTANCE.getClusterModuleContext().getReader();
Set<String> servers = reader.read(AgentStreamJettyDataListener.PATH);
JsonArray serverArray = new JsonArray();
servers.forEach(server -> {
serverArray.add(server);
});
servers.forEach(serverArray::add);
return serverArray;
}
......
......@@ -2,10 +2,8 @@ package org.skywalking.apm.collector.agentserver.jetty.handler;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import java.util.List;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
......@@ -22,12 +20,10 @@ public class UIJettyServerHandler extends JettyHandler {
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader();
List<String> servers = reader.read(UIJettyDataListener.PATH);
ClusterModuleRegistrationReader reader = CollectorContextHelper.INSTANCE.getClusterModuleContext().getReader();
Set<String> servers = reader.read(UIJettyDataListener.PATH);
JsonArray serverArray = new JsonArray();
servers.forEach(server -> {
serverArray.add(server);
});
servers.forEach(serverArray::add);
return serverArray;
}
......
package org.skywalking.apm.collector.agentstream;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.MultipleCommonModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerException;
/**
* @author pengys5
*/
public class AgentStreamCommonModuleInstaller extends MultipleCommonModuleInstaller {
@Override public String groupName() {
return AgentStreamModuleGroupDefine.GROUP_NAME;
}
@Override public Context moduleContext() {
return new AgentStreamModuleContext(groupName());
}
@Override public List<String> dependenceModules() {
return null;
}
@Override public void install() throws DefineException, ConfigException, ServerException, ClientException {
super.install();
new PersistenceTimer().start();
}
}
package org.skywalking.apm.collector.agentstream;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class AgentStreamModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(AgentStreamModuleDefine.class);
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
try {
configParser().parse(config);
Server server = server();
serverHolder.holdServer(server, handlerList());
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) {
throw new AgentStreamModuleException(e.getMessage(), e);
}
}
@Override protected final Client createClient(DataMonitor dataMonitor) {
@Override protected final Client createClient() {
throw new UnsupportedOperationException("");
}
......@@ -47,5 +17,7 @@ public abstract class AgentStreamModuleDefine extends ModuleDefine implements Cl
return true;
}
public abstract List<Handler> handlerList();
@Override protected void initializeOtherContext() {
}
}
......@@ -10,6 +10,11 @@ import org.skywalking.apm.collector.core.module.ModuleInstaller;
public class AgentStreamModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "agent_stream";
private final AgentStreamCommonModuleInstaller installer;
public AgentStreamModuleGroupDefine() {
installer = new AgentStreamCommonModuleInstaller();
}
@Override public String name() {
return GROUP_NAME;
......@@ -20,6 +25,6 @@ public class AgentStreamModuleGroupDefine implements ModuleGroupDefine {
}
@Override public ModuleInstaller moduleInstaller() {
return new AgentStreamModuleInstaller();
return installer;
}
}
package org.skywalking.apm.collector.agentstream;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class AgentStreamModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(AgentStreamModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning agent stream module install");
AgentStreamModuleContext context = new AgentStreamModuleContext(AgentStreamModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder);
}
new PersistenceTimer().start();
}
}
......@@ -15,6 +15,11 @@ public class AgentStreamGRPCDataListener extends ClusterDataListener {
return PATH;
}
@Override public void addressChangedNotify() {
@Override public void serverJoinNotify(String serverAddress) {
}
@Override public void serverQuitNotify(String serverAddress) {
}
}
......@@ -15,6 +15,11 @@ public class AgentStreamJettyDataListener extends ClusterDataListener {
return PATH;
}
@Override public void addressChangedNotify() {
@Override public void serverJoinNotify(String serverAddress) {
}
@Override public void serverQuitNotify(String serverAddress) {
}
}
package org.skywalking.apm.collector.boot;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.CollectorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -13,9 +11,10 @@ public class CollectorBootStartUp {
private static final Logger logger = LoggerFactory.getLogger(CollectorBootStartUp.class);
public static void main(String[] args) throws ConfigException, DefineException, ClientException {
public static void main(String[] args) throws CollectorException {
logger.info("collector starting...");
CollectorStarter starter = new CollectorStarter();
starter.start();
logger.info("collector start successful.");
}
}
package org.skywalking.apm.collector.boot;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.framework.Starter;
import org.skywalking.apm.collector.core.module.ModuleConfigLoader;
import org.skywalking.apm.collector.core.module.ModuleDefine;
......@@ -13,6 +10,7 @@ import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleGroupDefineLoader;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -22,23 +20,27 @@ import org.slf4j.LoggerFactory;
public class CollectorStarter implements Starter {
private final Logger logger = LoggerFactory.getLogger(CollectorStarter.class);
private Map<String, ModuleGroupDefine> moduleGroupDefineMap;
@Override public void start() throws ConfigException, DefineException, ClientException {
@Override public void start() throws CollectorException {
ModuleConfigLoader configLoader = new ModuleConfigLoader();
Map<String, Map> configuration = configLoader.load();
ModuleGroupDefineLoader groupDefineLoader = new ModuleGroupDefineLoader();
Map<String, ModuleGroupDefine> moduleGroupDefineMap = groupDefineLoader.load();
moduleGroupDefineMap = groupDefineLoader.load();
ModuleDefineLoader defineLoader = new ModuleDefineLoader();
Map<String, Map<String, ModuleDefine>> moduleDefineMap = defineLoader.load();
ServerHolder serverHolder = new ServerHolder();
moduleGroupDefineMap.get(ClusterModuleGroupDefine.GROUP_NAME).moduleInstaller().install(configuration.get(ClusterModuleGroupDefine.GROUP_NAME), moduleDefineMap.get(ClusterModuleGroupDefine.GROUP_NAME), serverHolder);
moduleGroupDefineMap.remove(ClusterModuleGroupDefine.GROUP_NAME);
for (ModuleGroupDefine moduleGroupDefine : moduleGroupDefineMap.values()) {
moduleGroupDefine.moduleInstaller().injectConfiguration(configuration.get(moduleGroupDefine.name()), moduleDefineMap.get(moduleGroupDefine.name()));
moduleGroupDefine.moduleInstaller().injectServerHolder(serverHolder);
moduleGroupDefine.moduleInstaller().preInstall();
}
for (ModuleGroupDefine moduleGroupDefine : moduleGroupDefineMap.values()) {
moduleGroupDefine.moduleInstaller().install(configuration.get(moduleGroupDefine.name()), moduleDefineMap.get(moduleGroupDefine.name()), serverHolder);
moduleGroupDefine.moduleInstaller().install();
}
serverHolder.getServers().forEach(server -> {
......@@ -48,5 +50,26 @@ public class CollectorStarter implements Starter {
logger.error(e.getMessage(), e);
}
});
dependenceAfterInstall();
}
private void dependenceAfterInstall() throws CollectorException {
for (ModuleGroupDefine moduleGroupDefine : moduleGroupDefineMap.values()) {
moduleInstall(moduleGroupDefine);
}
}
private void moduleInstall(ModuleGroupDefine moduleGroupDefine) throws CollectorException {
if (CollectionUtils.isNotEmpty(moduleGroupDefine.moduleInstaller().dependenceModules())) {
for (String groupName : moduleGroupDefine.moduleInstaller().dependenceModules()) {
moduleInstall(moduleGroupDefineMap.get(groupName));
}
logger.info("after install module group: {}", moduleGroupDefine.name());
moduleGroupDefine.moduleInstaller().afterInstall();
} else {
logger.info("after install module group: {}", moduleGroupDefine.name());
moduleGroupDefine.moduleInstaller().afterInstall();
}
}
}
......@@ -2,34 +2,42 @@ cluster:
zookeeper:
hostPort: localhost:2181
sessionTimeout: 100000
#agent_server:
# jetty:
# host: localhost
# port: 10800
# context_path: /
#agent_register:
# grpc:
# host: localhost
# port: 11800
# jetty:
# host: localhost
# port: 12800
# context_path: /
#agent_stream:
# grpc:
# host: localhost
# port: 11800
# jetty:
# host: localhost
# port: 12800
# context_path: /
#ui:
# jetty:
# host: localhost
# port: 12800
# context_path: /
agent_server:
jetty:
host: localhost
port: 10800
context_path: /
agent_register:
grpc:
host: localhost
port: 11800
jetty:
host: localhost
port: 12800
context_path: /
agent_stream:
grpc:
host: localhost
port: 11800
jetty:
host: localhost
port: 12800
context_path: /
agent_jvm:
grpc:
host: localhost
port: 11800
ui:
jetty:
host: localhost
port: 12800
context_path: /
collector_inside:
grpc:
host: localhost
port: 11800
storage:
elasticsearch:
cluster_name: CollectorDBCluster
cluster_transport_sniffer: true
cluster_nodes: localhost:9300
\ No newline at end of file
cluster_nodes: localhost:9300
......@@ -64,6 +64,10 @@ public class ElasticSearchClient implements Client {
}
}
@Override public void shutdown() {
}
private List<AddressPairs> parseClusterNodes(String nodes) {
List<AddressPairs> pairsList = new LinkedList<>();
logger.info("elasticsearch cluster nodes: {}", nodes);
......
......@@ -4,16 +4,12 @@ import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class GRPCClient implements Client {
private final Logger logger = LoggerFactory.getLogger(GRPCClient.class);
private final String host;
private final int port;
......@@ -29,7 +25,15 @@ public class GRPCClient implements Client {
channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build();
}
@Override public void shutdown() {
channel.shutdownNow();
}
public ManagedChannel getChannel() {
return channel;
}
@Override public String toString() {
return host + ":" + port;
}
}
......@@ -27,6 +27,10 @@ public class H2Client implements Client {
}
}
@Override public void shutdown() {
}
public void execute(String sql) throws H2ClientException {
Statement statement = null;
try {
......
......@@ -23,6 +23,10 @@ public class RedisClient implements Client {
jedis = new Jedis(host, port);
}
@Override public void shutdown() {
}
public void setex(String key, int seconds, String value) {
jedis.setex(key, seconds, value);
}
......
......@@ -39,6 +39,10 @@ public class ZookeeperClient implements Client {
}
}
@Override public void shutdown() {
}
public void create(final String path, byte data[], List<ACL> acl,
CreateMode createMode) throws ZookeeperClientException {
try {
......
package org.skywalking.apm.collector.cluster;
import java.util.Map;
import java.util.List;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.cluster.ClusterModuleException;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.framework.UnexpectedException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerHolder;
/**
* @author pengys5
......@@ -23,21 +22,17 @@ public abstract class ClusterModuleDefine extends ModuleDefine {
private Client client;
@Override public final void initialize(Map config, ServerHolder serverHolder) throws ClusterModuleException {
@Override protected void initializeOtherContext() {
try {
configParser().parse(config);
DataMonitor dataMonitor = dataMonitor();
client = createClient(dataMonitor);
client = createClient();
client.initialize();
dataMonitor.setClient(client);
ClusterModuleRegistrationReader reader = registrationReader(dataMonitor);
dataMonitor().setClient(client);
ClusterModuleRegistrationReader reader = registrationReader();
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setDataMonitor(dataMonitor);
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setReader(reader);
} catch (ConfigParseException | ClientException e) {
throw new ClusterModuleException(e.getMessage(), e);
CollectorContextHelper.INSTANCE.getClusterModuleContext().setDataMonitor(dataMonitor());
CollectorContextHelper.INSTANCE.getClusterModuleContext().setReader(reader);
} catch (ClientException e) {
throw new UnexpectedException(e.getMessage());
}
}
......@@ -46,7 +41,11 @@ public abstract class ClusterModuleDefine extends ModuleDefine {
}
@Override public final Server server() {
throw new UnsupportedOperationException("");
return null;
}
@Override public final List<Handler> handlerList() {
return null;
}
@Override protected final ModuleRegistration registration() {
......@@ -55,5 +54,9 @@ public abstract class ClusterModuleDefine extends ModuleDefine {
public abstract DataMonitor dataMonitor();
public abstract ClusterModuleRegistrationReader registrationReader(DataMonitor dataMonitor);
public abstract ClusterModuleRegistrationReader registrationReader();
public void startMonitor() throws CollectorException {
dataMonitor().start();
}
}
......@@ -11,6 +11,11 @@ import org.skywalking.apm.collector.core.module.ModuleInstaller;
public class ClusterModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "cluster";
private final ClusterModuleInstaller installer;
public ClusterModuleGroupDefine() {
installer = new ClusterModuleInstaller();
}
@Override public String name() {
return GROUP_NAME;
......@@ -21,6 +26,6 @@ public class ClusterModuleGroupDefine implements ModuleGroupDefine {
}
@Override public ModuleInstaller moduleInstaller() {
return new ClusterModuleInstaller();
return installer;
}
}
package org.skywalking.apm.collector.cluster;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.SingleModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterModuleInstaller extends SingleModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(ClusterModuleInstaller.class);
@Override public String groupName() {
return ClusterModuleGroupDefine.GROUP_NAME;
}
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning cluster module install");
@Override public Context moduleContext() {
ClusterModuleContext clusterModuleContext = new ClusterModuleContext(ClusterModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putClusterContext(clusterModuleContext);
return clusterModuleContext;
}
ClusterModuleContext context = new ClusterModuleContext(ClusterModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
@Override public List<String> dependenceModules() {
List<String> dependenceModules = new LinkedList<>();
dependenceModules.add("collector_inside");
return dependenceModules;
}
installSingle(moduleConfig, moduleDefineMap, serverHolder);
@Override public void onAfterInstall() throws CollectorException {
((ClusterModuleDefine)getModuleDefine()).startMonitor();
}
}
......@@ -35,11 +35,11 @@ public class ClusterRedisModuleDefine extends ClusterModuleDefine {
return null;
}
@Override protected Client createClient(DataMonitor dataMonitor) {
@Override protected Client createClient() {
return new RedisClient(ClusterRedisConfig.HOST, ClusterRedisConfig.PORT);
}
@Override public ClusterModuleRegistrationReader registrationReader(DataMonitor dataMonitor) {
return new ClusterRedisModuleRegistrationReader(dataMonitor);
@Override public ClusterModuleRegistrationReader registrationReader() {
return null;
}
}
......@@ -35,11 +35,11 @@ public class ClusterStandaloneModuleDefine extends ClusterModuleDefine {
return null;
}
@Override protected Client createClient(DataMonitor dataMonitor) {
@Override protected Client createClient() {
return new H2Client();
}
@Override public ClusterModuleRegistrationReader registrationReader(DataMonitor dataMonitor) {
return new ClusterStandaloneModuleRegistrationReader(dataMonitor);
@Override public ClusterModuleRegistrationReader registrationReader() {
return null;
}
}
package org.skywalking.apm.collector.cluster.zookeeper;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
......@@ -7,10 +8,12 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperClientException;
import org.skywalking.apm.collector.client.zookeeper.util.PathUtils;
import org.skywalking.apm.collector.cluster.ClusterNodeExistException;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
......@@ -30,25 +33,33 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher {
private ZookeeperClient client;
private Map<String, ClusterDataListener> listeners;
private Map<String, ModuleRegistration> registrations;
public ClusterZKDataMonitor() {
listeners = new LinkedHashMap<>();
registrations = new LinkedHashMap<>();
}
@Override public void process(WatchedEvent event) {
logger.debug("changed path {}", event.getPath());
logger.info("changed path {}, event type: {}", event.getPath(), event.getType().name());
if (listeners.containsKey(event.getPath())) {
List<String> paths = null;
List<String> paths;
try {
paths = client.getChildren(event.getPath(), true);
listeners.get(event.getPath()).clearData();
if (CollectionUtils.isNotEmpty(paths)) {
for (String serverPath : paths) {
byte[] data = client.getData(event.getPath() + "/" + serverPath, false, null);
Stat stat = new Stat();
byte[] data = client.getData(event.getPath() + "/" + serverPath, true, stat);
String dataStr = new String(data);
logger.debug("path children has been changed, path: {}, data: {}", event.getPath() + "/" + serverPath, dataStr);
listeners.get(event.getPath()).addAddress(serverPath + dataStr);
listeners.get(event.getPath()).addressChangedNotify();
if (stat.getCzxid() == stat.getMzxid()) {
logger.info("path children has been created, path: {}, data: {}", event.getPath() + "/" + serverPath, dataStr);
listeners.get(event.getPath()).addAddress(serverPath + dataStr);
listeners.get(event.getPath()).serverJoinNotify(serverPath + dataStr);
} else {
logger.info("path children has been changed, path: {}, data: {}", event.getPath() + "/" + serverPath, dataStr);
listeners.get(event.getPath()).removeAddress(serverPath + dataStr);
listeners.get(event.getPath()).serverQuitNotify(serverPath + dataStr);
}
}
}
} catch (ZookeeperClientException e) {
......@@ -61,25 +72,32 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher {
this.client = (ZookeeperClient)client;
}
@Override public void start() throws CollectorException {
Iterator<Map.Entry<String, ModuleRegistration>> entryIterator = registrations.entrySet().iterator();
while (entryIterator.hasNext()) {
Map.Entry<String, ModuleRegistration> next = entryIterator.next();
createPath(next.getKey());
ModuleRegistration.Value value = next.getValue().buildValue();
String contextPath = value.getContextPath() == null ? "" : value.getContextPath();
client.getChildren(next.getKey(), true);
String serverPath = next.getKey() + "/" + value.getHostPort();
if (client.exists(serverPath, false) == null) {
setData(serverPath, contextPath);
} else {
throw new ClusterNodeExistException("current address: " + value.getHostPort() + " has been registered, check the host and port configuration or wait a moment.");
}
}
}
@Override
public void addListener(ClusterDataListener listener, ModuleRegistration registration) throws ClientException {
String path = PathUtils.convertKey2Path(listener.path());
logger.info("listener path: {}", path);
listeners.put(path, listener);
createPath(path);
ModuleRegistration.Value value = registration.buildValue();
String contextPath = value.getContextPath() == null ? "" : value.getContextPath();
client.getChildren(path, true);
String serverPath = path + "/" + value.getHostPort();
listener.addAddress(value.getHostPort() + contextPath);
if (client.exists(serverPath, false) == null) {
setData(serverPath, contextPath);
} else {
throw new ClusterNodeExistException("current address: " + value.getHostPort() + " has been registered, check the host and port configuration or wait a moment.");
}
registrations.put(path, registration);
}
@Override public ClusterDataListener getListener(String path) {
......
package org.skywalking.apm.collector.cluster.zookeeper;
import org.apache.zookeeper.Watcher;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
......@@ -15,6 +14,11 @@ import org.skywalking.apm.collector.core.module.ModuleConfigParser;
public class ClusterZKModuleDefine extends ClusterModuleDefine {
public static final String MODULE_NAME = "zookeeper";
private final ClusterZKDataMonitor dataMonitor;
public ClusterZKModuleDefine() {
dataMonitor = new ClusterZKDataMonitor();
}
@Override protected String group() {
return ClusterModuleGroupDefine.GROUP_NAME;
......@@ -33,14 +37,14 @@ public class ClusterZKModuleDefine extends ClusterModuleDefine {
}
@Override public DataMonitor dataMonitor() {
return new ClusterZKDataMonitor();
return dataMonitor;
}
@Override protected Client createClient(DataMonitor dataMonitor) {
return new ZookeeperClient(ClusterZKConfig.HOST_PORT, ClusterZKConfig.SESSION_TIMEOUT, (Watcher)dataMonitor);
@Override protected Client createClient() {
return new ZookeeperClient(ClusterZKConfig.HOST_PORT, ClusterZKConfig.SESSION_TIMEOUT, dataMonitor);
}
@Override public ClusterModuleRegistrationReader registrationReader(DataMonitor dataMonitor) {
@Override public ClusterModuleRegistrationReader registrationReader() {
return new ClusterZKModuleRegistrationReader(dataMonitor);
}
}
......@@ -5,4 +5,6 @@ package org.skywalking.apm.collector.core.client;
*/
public interface Client {
void initialize() throws ClientException;
void shutdown();
}
package org.skywalking.apm.collector.core.client;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.Starter;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public interface DataMonitor {
public interface DataMonitor extends Starter {
void setClient(Client client);
void addListener(ClusterDataListener listener, ModuleRegistration registration) throws ClientException;
......
package org.skywalking.apm.collector.core.cluster;
import java.util.LinkedList;
import java.util.List;
import java.util.HashSet;
import java.util.Set;
import org.skywalking.apm.collector.core.framework.Listener;
/**
......@@ -9,10 +9,10 @@ import org.skywalking.apm.collector.core.framework.Listener;
*/
public abstract class ClusterDataListener implements Listener {
private List<String> addresses;
private Set<String> addresses;
public ClusterDataListener() {
addresses = new LinkedList<>();
addresses = new HashSet<>();
}
public abstract String path();
......@@ -21,13 +21,15 @@ public abstract class ClusterDataListener implements Listener {
addresses.add(address);
}
public final List<String> getAddresses() {
return addresses;
public final void removeAddress(String address) {
addresses.remove(address);
}
public final void clearData() {
addresses.clear();
public final Set<String> getAddresses() {
return addresses;
}
public abstract void addressChangedNotify();
public abstract void serverJoinNotify(String serverAddress);
public abstract void serverQuitNotify(String serverAddress);
}
package org.skywalking.apm.collector.core.cluster;
import java.util.List;
import java.util.Set;
import org.skywalking.apm.collector.core.client.DataMonitor;
/**
......@@ -14,7 +14,7 @@ public abstract class ClusterModuleRegistrationReader {
this.dataMonitor = dataMonitor;
}
public final List<String> read(String path) {
public final Set<String> read(String path) {
return dataMonitor.getListener(path).getAddresses();
}
}
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.core.framework;
import java.util.LinkedHashMap;
import java.util.Map;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
/**
* @author pengys5
......@@ -9,12 +10,17 @@ import java.util.Map;
public enum CollectorContextHelper {
INSTANCE;
private Map<String, Context> contexts = new LinkedHashMap();
private ClusterModuleContext clusterModuleContext;
private Map<String, Context> contexts = new LinkedHashMap<>();
public Context getContext(String moduleGroupName) {
return contexts.get(moduleGroupName);
}
public ClusterModuleContext getClusterModuleContext() {
return this.clusterModuleContext;
}
public void putContext(Context context) {
if (contexts.containsKey(context.getGroupName())) {
throw new UnsupportedOperationException("This module context was put, do not allow put a new one");
......@@ -22,4 +28,8 @@ public enum CollectorContextHelper {
contexts.put(context.getGroupName(), context);
}
}
public void putClusterContext(ClusterModuleContext clusterModuleContext) {
this.clusterModuleContext = clusterModuleContext;
}
}
package org.skywalking.apm.collector.core.framework;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.server.ServerHolder;
/**
* @author pengys5
*/
public interface Define {
void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException;
String name();
}
package org.skywalking.apm.collector.core.module;
import java.util.Map;
import org.skywalking.apm.collector.core.CollectorException;
/**
* @author pengys5
*/
public abstract class CommonModuleInstaller implements ModuleInstaller {
private boolean isInstalled = false;
private Map<String, Map> moduleConfig;
private Map<String, ModuleDefine> moduleDefineMap;
@Override
public final void injectConfiguration(Map<String, Map> moduleConfig, Map<String, ModuleDefine> moduleDefineMap) {
this.moduleConfig = moduleConfig;
this.moduleDefineMap = moduleDefineMap;
}
protected final Map<String, Map> getModuleConfig() {
return moduleConfig;
}
protected final Map<String, ModuleDefine> getModuleDefineMap() {
return moduleDefineMap;
}
public abstract void onAfterInstall() throws CollectorException;
@Override public final void afterInstall() throws CollectorException {
if (!isInstalled) {
onAfterInstall();
}
isInstalled = true;
}
}
package org.skywalking.apm.collector.core.module;
import java.util.List;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.framework.Define;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.server.Server;
/**
......@@ -16,9 +17,13 @@ public abstract class ModuleDefine implements Define {
protected abstract ModuleConfigParser configParser();
protected abstract Client createClient(DataMonitor dataMonitor);
protected abstract Client createClient();
protected abstract Server server();
public abstract List<Handler> handlerList();
protected abstract ModuleRegistration registration();
protected abstract void initializeOtherContext();
}
package org.skywalking.apm.collector.core.module;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import org.skywalking.apm.collector.core.framework.DefineException;
......@@ -21,11 +22,11 @@ public class ModuleGroupDefineLoader implements Loader<Map<String, ModuleGroupDe
ModuleGroupDefineFile definitionFile = new ModuleGroupDefineFile();
logger.info("module group definition file name: {}", definitionFile.fileName());
DefinitionLoader<ModuleGroupDefine> definitionLoader = DefinitionLoader.load(ModuleGroupDefine.class, definitionFile);
for (ModuleGroupDefine moduleGroupDefine : definitionLoader) {
logger.info("loaded group module definition class: {}", moduleGroupDefine.getClass().getName());
String groupName = moduleGroupDefine.name().toLowerCase();
moduleGroupDefineMap.put(groupName, moduleGroupDefine);
Iterator<ModuleGroupDefine> defineIterator = definitionLoader.iterator();
while (defineIterator.hasNext()) {
ModuleGroupDefine groupDefine = defineIterator.next();
String groupName = groupDefine.name().toLowerCase();
moduleGroupDefineMap.put(groupName, groupDefine);
}
return moduleGroupDefineMap;
}
......
package org.skywalking.apm.collector.core.module;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
/**
* @author pengys5
*/
public interface ModuleInstaller {
void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException;
List<String> dependenceModules();
void injectServerHolder(ServerHolder serverHolder);
String groupName();
Context moduleContext();
void injectConfiguration(Map<String, Map> moduleConfig, Map<String, ModuleDefine> moduleDefineMap);
void preInstall() throws DefineException, ConfigException, ServerException;
void install() throws ClientException, DefineException, ConfigException, ServerException;
void afterInstall() throws CollectorException;
}
package org.skywalking.apm.collector.core.module;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class MultipleCommonModuleInstaller extends CommonModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(MultipleCommonModuleInstaller.class);
public MultipleCommonModuleInstaller() {
moduleDefines = new LinkedList<>();
}
private List<ModuleDefine> moduleDefines;
private ServerHolder serverHolder;
@Override public final void injectServerHolder(ServerHolder serverHolder) {
this.serverHolder = serverHolder;
}
@Override public final void preInstall() throws DefineException, ConfigException, ServerException {
logger.info("install module group: {}", groupName());
Map<String, Map> moduleConfig = getModuleConfig();
Map<String, ModuleDefine> moduleDefineMap = getModuleDefineMap();
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineIterator = moduleDefineMap.entrySet().iterator();
while (moduleDefineIterator.hasNext()) {
Map.Entry<String, ModuleDefine> moduleDefineEntry = moduleDefineIterator.next();
logger.info("module {} initialize", moduleDefineEntry.getKey());
moduleDefineEntry.getValue().configParser().parse(moduleConfig.get(moduleDefineEntry.getKey()));
moduleDefines.add(moduleDefineEntry.getValue());
serverHolder.holdServer(moduleDefineEntry.getValue().server(), moduleDefineEntry.getValue().handlerList());
}
}
@Override public void install() throws DefineException, ConfigException, ServerException, ClientException {
CollectorContextHelper.INSTANCE.putContext(moduleContext());
for (ModuleDefine moduleDefine : moduleDefines) {
moduleDefine.initializeOtherContext();
if (moduleDefine instanceof ClusterDataListenerDefine) {
ClusterDataListenerDefine listenerDefine = (ClusterDataListenerDefine)moduleDefine;
if (ObjectUtils.isNotEmpty(listenerDefine.listener()) && ObjectUtils.isNotEmpty(moduleDefine.registration())) {
logger.info("add group: {}, module: {}, listener into cluster data monitor", moduleDefine.group(), moduleDefine.name());
CollectorContextHelper.INSTANCE.getClusterModuleContext().getDataMonitor().addListener(listenerDefine.listener(), moduleDefine.registration());
}
}
}
}
@Override public void onAfterInstall() throws CollectorException {
}
}
......@@ -3,37 +3,85 @@ package org.skywalking.apm.collector.core.module;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.cluster.ClusterModuleException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class SingleModuleInstaller implements ModuleInstaller {
public abstract class SingleModuleInstaller extends CommonModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(SingleModuleInstaller.class);
protected void installSingle(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
ModuleDefine moduleDefine = null;
if (CollectionUtils.isEmpty(moduleConfig)) {
private ModuleDefine moduleDefine;
private ServerHolder serverHolder;
@Override public final void injectServerHolder(ServerHolder serverHolder) {
this.serverHolder = serverHolder;
}
@Override public final void preInstall() throws DefineException, ConfigException, ServerException {
logger.info("install module group: {}", groupName());
Map<String, Map> moduleConfig = getModuleConfig();
Map<String, ModuleDefine> moduleDefineMap = getModuleDefineMap();
if (CollectionUtils.isNotEmpty(moduleConfig)) {
if (moduleConfig.size() > 1) {
throw new ClusterModuleException("single module, but configure multiple modules");
}
Map.Entry<String, Map> configEntry = moduleConfig.entrySet().iterator().next();
if (moduleDefineMap.containsKey(configEntry.getKey())) {
moduleDefine = moduleDefineMap.get(configEntry.getKey());
moduleDefine.configParser().parse(configEntry.getValue());
} else {
throw new ClusterModuleException("module name incorrect, please check the module name in application.yml");
}
} else {
logger.info("could not configure module, use the default");
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
moduleDefine = moduleDefineEntry.next().getValue();
if (moduleDefine.defaultModule()) {
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize(null, serverHolder);
break;
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineIterator = moduleDefineMap.entrySet().iterator();
boolean hasDefaultModule = false;
while (moduleDefineIterator.hasNext()) {
Map.Entry<String, ModuleDefine> moduleDefineEntry = moduleDefineIterator.next();
if (moduleDefineEntry.getValue().defaultModule()) {
if (hasDefaultModule) {
throw new ClusterModuleException("single module, but configure multiple default module");
}
this.moduleDefine = moduleDefineEntry.getValue();
this.moduleDefine.configParser().parse(null);
hasDefaultModule = true;
}
}
} else {
Map.Entry<String, Map> configEntry = moduleConfig.entrySet().iterator().next();
moduleDefine = moduleDefineMap.get(configEntry.getKey());
moduleDefine.initialize(configEntry.getValue(), serverHolder);
}
serverHolder.holdServer(moduleDefine.server(), moduleDefine.handlerList());
}
@Override public void install() throws ClientException, DefineException, ConfigException, ServerException {
if (!(moduleContext() instanceof ClusterModuleContext)) {
CollectorContextHelper.INSTANCE.putContext(moduleContext());
}
moduleDefine.initializeOtherContext();
if (moduleDefine instanceof ClusterDataListenerDefine) {
ClusterDataListenerDefine listenerDefine = (ClusterDataListenerDefine)moduleDefine;
if (ObjectUtils.isNotEmpty(listenerDefine.listener()) && ObjectUtils.isNotEmpty(moduleDefine.registration())) {
CollectorContextHelper.INSTANCE.getClusterModuleContext().getDataMonitor().addListener(listenerDefine.listener(), moduleDefine.registration());
logger.info("add group: {}, module: {}, listener into cluster data monitor", moduleDefine.group(), moduleDefine.name());
}
}
}
protected ModuleDefine getModuleDefine() {
return moduleDefine;
}
}
......@@ -4,6 +4,7 @@ import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -21,6 +22,10 @@ public class ServerHolder {
}
public void holdServer(Server newServer, List<Handler> handlers) throws ServerException {
if (ObjectUtils.isEmpty(newServer) || CollectionUtils.isEmpty(handlers)) {
return;
}
boolean isNewServer = true;
for (Server server : servers) {
if (server.hostPort().equals(newServer.hostPort()) && server.serverClassify().equals(newServer.serverClassify())) {
......
......@@ -20,6 +20,10 @@ public class CollectionUtils {
return !isEmpty(list);
}
public static boolean isNotEmpty(Map map) {
return !isEmpty(map);
}
public static <T> boolean isNotEmpty(T[] array) {
return array != null && array.length > 0;
}
......
......@@ -35,7 +35,6 @@ public class DefinitionLoader<D> implements Iterable<D> {
@Override public final Iterator<D> iterator() {
logger.info("load definition file: {}", definitionFile.get());
Properties properties = new Properties();
List<String> definitionList = new LinkedList<>();
try {
Enumeration<URL> urlEnumeration = this.getClass().getClassLoader().getResources(definitionFile.get());
......@@ -43,6 +42,7 @@ public class DefinitionLoader<D> implements Iterable<D> {
URL definitionFileURL = urlEnumeration.nextElement();
logger.info("definition file url: {}", definitionFileURL.getPath());
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(definitionFileURL.openStream()));
Properties properties = new Properties();
properties.load(bufferedReader);
Enumeration defineItem = properties.propertyNames();
......@@ -52,7 +52,7 @@ public class DefinitionLoader<D> implements Iterable<D> {
}
}
} catch (IOException e) {
e.printStackTrace();
logger.error(e.getMessage(), e);
}
Iterator<String> moduleDefineIterator = definitionList.iterator();
......
package org.skywalking.apm.collector.queue;
import java.util.List;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
......@@ -11,12 +11,9 @@ import org.skywalking.apm.collector.core.server.Server;
* @author pengys5
*/
public abstract class QueueModuleDefine extends ModuleDefine {
@Override protected final ModuleConfigParser configParser() {
throw new UnsupportedOperationException("");
}
@Override protected Client createClient(DataMonitor dataMonitor) {
throw new UnsupportedOperationException("");
@Override protected Client createClient() {
return null;
}
@Override protected final ModuleRegistration registration() {
......@@ -24,6 +21,10 @@ public abstract class QueueModuleDefine extends ModuleDefine {
}
@Override protected final Server server() {
throw new UnsupportedOperationException("");
return null;
}
@Override public final List<Handler> handlerList() {
return null;
}
}
......@@ -10,6 +10,11 @@ import org.skywalking.apm.collector.core.module.ModuleInstaller;
public class QueueModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "queue";
private final QueueModuleInstaller installer;
public QueueModuleGroupDefine() {
installer = new QueueModuleInstaller();
}
@Override public String name() {
return GROUP_NAME;
......@@ -20,6 +25,6 @@ public class QueueModuleGroupDefine implements ModuleGroupDefine {
}
@Override public ModuleInstaller moduleInstaller() {
return new QueueModuleInstaller();
return installer;
}
}
package org.skywalking.apm.collector.queue;
import java.util.Map;
import java.util.List;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.framework.UnexpectedException;
import org.skywalking.apm.collector.core.module.SingleModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.queue.datacarrier.DataCarrierQueueCreator;
import org.skywalking.apm.collector.queue.datacarrier.QueueDataCarrierModuleDefine;
import org.skywalking.apm.collector.queue.disruptor.DisruptorQueueCreator;
import org.skywalking.apm.collector.queue.disruptor.QueueDisruptorModuleDefine;
/**
* @author pengys5
*/
public class QueueModuleInstaller extends SingleModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(QueueModuleInstaller.class);
@Override public String groupName() {
return QueueModuleGroupDefine.GROUP_NAME;
}
@Override public Context moduleContext() {
return new QueueModuleContext(groupName());
}
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning queue module install");
QueueModuleContext context = new QueueModuleContext(QueueModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
@Override public List<String> dependenceModules() {
return null;
}
@Override public void install() throws ClientException, DefineException, ConfigException, ServerException {
super.install();
if (getModuleDefine() instanceof QueueDataCarrierModuleDefine) {
((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(groupName())).setQueueCreator(new DataCarrierQueueCreator());
} else if (getModuleDefine() instanceof QueueDisruptorModuleDefine) {
((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(groupName())).setQueueCreator(new DisruptorQueueCreator());
} else {
throw new UnexpectedException("");
}
}
installSingle(moduleConfig, moduleDefineMap, serverHolder);
@Override public void onAfterInstall() throws CollectorException {
}
}
package org.skywalking.apm.collector.queue.datacarrier;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
/**
* @author pengys5
*/
public class DataCarrierQueueConfigParser implements ModuleConfigParser {
@Override public void parse(Map config) throws ConfigParseException {
}
}
package org.skywalking.apm.collector.queue.datacarrier;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.queue.QueueModuleContext;
import org.skywalking.apm.collector.queue.QueueModuleDefine;
import org.skywalking.apm.collector.queue.QueueModuleGroupDefine;
......@@ -26,8 +23,11 @@ public class QueueDataCarrierModuleDefine extends QueueModuleDefine {
return false;
}
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
@Override protected ModuleConfigParser configParser() {
return new DataCarrierQueueConfigParser();
}
@Override protected void initializeOtherContext() {
((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setQueueCreator(new DataCarrierQueueCreator());
}
}
......@@ -7,7 +7,9 @@ import org.skywalking.apm.collector.core.module.ModuleConfigParser;
/**
* @author pengys5
*/
public class QueueDisruptorConfigParser implements ModuleConfigParser {
public class DisruptorQueueConfigParser implements ModuleConfigParser {
@Override public void parse(Map config) throws ConfigParseException {
}
}
package org.skywalking.apm.collector.queue.disruptor;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.queue.QueueModuleContext;
import org.skywalking.apm.collector.queue.QueueModuleDefine;
import org.skywalking.apm.collector.queue.QueueModuleGroupDefine;
......@@ -26,8 +23,11 @@ public class QueueDisruptorModuleDefine extends QueueModuleDefine {
return true;
}
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
@Override protected ModuleConfigParser configParser() {
return new DisruptorQueueConfigParser();
}
@Override protected void initializeOtherContext() {
((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setQueueCreator(new DisruptorQueueCreator());
}
}
package org.skywalking.apm.collector.storage;
import java.util.Map;
import java.util.List;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.framework.UnexpectedException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.storage.StorageException;
import org.skywalking.apm.collector.core.storage.StorageInstaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class StorageModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(StorageModuleDefine.class);
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
@Override protected void initializeOtherContext() {
try {
configParser().parse(config);
StorageModuleContext context = (StorageModuleContext)CollectorContextHelper.INSTANCE.getContext(StorageModuleGroupDefine.GROUP_NAME);
Client client = createClient(null);
Client client = createClient();
client.initialize();
context.setClient(client);
injectClientIntoDAO(client);
storageInstaller().install(client);
} catch (ConfigParseException | StorageException e) {
throw new StorageModuleException(e.getMessage(), e);
} catch (ClientException | StorageException | DefineException e) {
throw new UnexpectedException(e.getMessage());
}
}
@Override public final List<Handler> handlerList() {
return null;
}
@Override protected final Server server() {
throw new UnsupportedOperationException("");
return null;
}
@Override protected final ModuleRegistration registration() {
throw new UnsupportedOperationException("");
return null;
}
@Override public final ClusterDataListener listener() {
throw new UnsupportedOperationException("");
return null;
}
@Override public final boolean defaultModule() {
......
......@@ -10,6 +10,11 @@ import org.skywalking.apm.collector.core.module.ModuleInstaller;
public class StorageModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "storage";
private final StorageModuleInstaller installer;
public StorageModuleGroupDefine() {
installer = new StorageModuleInstaller();
}
@Override public String name() {
return GROUP_NAME;
......@@ -20,6 +25,6 @@ public class StorageModuleGroupDefine implements ModuleGroupDefine {
}
@Override public ModuleInstaller moduleInstaller() {
return new StorageModuleInstaller();
return installer;
}
}
package org.skywalking.apm.collector.storage;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import java.util.List;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.SingleModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class StorageModuleInstaller extends SingleModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(StorageModuleInstaller.class);
@Override public String groupName() {
return StorageModuleGroupDefine.GROUP_NAME;
}
@Override public Context moduleContext() {
return new StorageModuleContext(groupName());
}
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning storage module install");
@Override public List<String> dependenceModules() {
return null;
}
StorageModuleContext context = new StorageModuleContext(StorageModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
@Override public void onAfterInstall() throws CollectorException {
installSingle(moduleConfig, moduleDefineMap, serverHolder);
}
}
......@@ -34,7 +34,7 @@ public class NodeReferenceDataDefine extends DataDefine {
@Override public Object deserialize(RemoteData remoteData) {
String id = remoteData.getDataStrings(0);
int applicationId = remoteData.getDataIntegers(0);
int frontApplicationId = remoteData.getDataIntegers(0);
int behindApplicationId = remoteData.getDataIntegers(1);
String behindPeer = remoteData.getDataStrings(1);
int s1LTE = remoteData.getDataIntegers(2);
......@@ -44,23 +44,23 @@ public class NodeReferenceDataDefine extends DataDefine {
int summary = remoteData.getDataIntegers(6);
int error = remoteData.getDataIntegers(7);
long timeBucket = remoteData.getDataLongs(0);
return new NodeReference(id, applicationId, behindApplicationId, behindPeer, s1LTE, s3LTE, s5LTE, s5GT, summary, error, timeBucket);
return new NodeReference(id, frontApplicationId, behindApplicationId, behindPeer, s1LTE, s3LTE, s5LTE, s5GT, summary, error, timeBucket);
}
@Override public RemoteData serialize(Object object) {
NodeReference nodeReference = (NodeReference)object;
Data data = (Data)object;
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(nodeReference.getId());
builder.addDataIntegers(nodeReference.getFrontApplicationId());
builder.addDataIntegers(nodeReference.getBehindApplicationId());
builder.addDataStrings(nodeReference.getBehindPeer());
builder.addDataIntegers(nodeReference.getS1LTE());
builder.addDataIntegers(nodeReference.getS3LTE());
builder.addDataIntegers(nodeReference.getS5LTE());
builder.addDataIntegers(nodeReference.getS5GT());
builder.addDataIntegers(nodeReference.getSummary());
builder.addDataIntegers(nodeReference.getError());
builder.addDataLongs(nodeReference.getTimeBucket());
builder.addDataStrings(data.getDataString(0));
builder.addDataIntegers(data.getDataInteger(0));
builder.addDataIntegers(data.getDataInteger(1));
builder.addDataStrings(data.getDataString(1));
builder.addDataIntegers(data.getDataInteger(2));
builder.addDataIntegers(data.getDataInteger(3));
builder.addDataIntegers(data.getDataInteger(4));
builder.addDataIntegers(data.getDataInteger(5));
builder.addDataIntegers(data.getDataInteger(6));
builder.addDataIntegers(data.getDataInteger(7));
builder.addDataLongs(data.getDataLong(0));
return builder.build();
}
......
......@@ -43,6 +43,7 @@ public class InstanceDataDefine extends DataDefine {
builder.addDataStrings(instance.getId());
builder.addDataIntegers(instance.getApplicationId());
builder.addDataStrings(instance.getAgentUUID());
builder.addDataIntegers(instance.getInstanceId());
builder.addDataLongs(instance.getRegisterTime());
builder.addDataLongs(instance.getHeartBeatTime());
builder.addDataStrings(instance.getOsInfo());
......
......@@ -57,23 +57,23 @@ public class ServiceReferenceDataDefine extends DataDefine {
}
@Override public RemoteData serialize(Object object) {
ServiceReference serviceReference = (ServiceReference)object;
Data data = (Data)object;
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(serviceReference.getId());
builder.addDataIntegers(serviceReference.getEntryServiceId());
builder.addDataStrings(serviceReference.getEntryServiceName());
builder.addDataIntegers(serviceReference.getFrontServiceId());
builder.addDataStrings(serviceReference.getFrontServiceName());
builder.addDataIntegers(serviceReference.getBehindServiceId());
builder.addDataStrings(serviceReference.getBehindServiceName());
builder.addDataLongs(serviceReference.getS1Lte());
builder.addDataLongs(serviceReference.getS3Lte());
builder.addDataLongs(serviceReference.getS5Lte());
builder.addDataLongs(serviceReference.getS5Gt());
builder.addDataLongs(serviceReference.getSummary());
builder.addDataLongs(serviceReference.getError());
builder.addDataLongs(serviceReference.getCostSummary());
builder.addDataLongs(serviceReference.getTimeBucket());
builder.addDataStrings(data.getDataString(0));
builder.addDataIntegers(data.getDataInteger(0));
builder.addDataStrings(data.getDataString(1));
builder.addDataIntegers(data.getDataInteger(1));
builder.addDataStrings(data.getDataString(2));
builder.addDataIntegers(data.getDataInteger(2));
builder.addDataStrings(data.getDataString(3));
builder.addDataLongs(data.getDataLong(0));
builder.addDataLongs(data.getDataLong(1));
builder.addDataLongs(data.getDataLong(2));
builder.addDataLongs(data.getDataLong(3));
builder.addDataLongs(data.getDataLong(4));
builder.addDataLongs(data.getDataLong(5));
builder.addDataLongs(data.getDataLong(6));
builder.addDataLongs(data.getDataLong(7));
return builder.build();
}
......
......@@ -3,7 +3,6 @@ package org.skywalking.apm.collector.storage.elasticsearch;
import java.util.List;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.storage.StorageInstaller;
......@@ -33,7 +32,7 @@ public class StorageElasticSearchModuleDefine extends StorageModuleDefine {
return new StorageElasticSearchConfigParser();
}
@Override protected Client createClient(DataMonitor dataMonitor) {
@Override protected Client createClient() {
return new ElasticSearchClient(StorageElasticSearchConfig.CLUSTER_NAME, StorageElasticSearchConfig.CLUSTER_TRANSPORT_SNIFFER, StorageElasticSearchConfig.CLUSTER_NODES);
}
......
......@@ -3,7 +3,6 @@ package org.skywalking.apm.collector.storage.h2;
import java.util.List;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.storage.StorageInstaller;
......@@ -33,7 +32,7 @@ public class StorageH2ModuleDefine extends StorageModuleDefine {
return new StorageH2ConfigParser();
}
@Override protected Client createClient(DataMonitor dataMonitor) {
@Override protected Client createClient() {
return new H2Client();
}
......
package org.skywalking.apm.collector.stream;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class StreamModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(StreamModuleDefine.class);
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
try {
configParser().parse(config);
Server server = server();
serverHolder.holdServer(server, handlerList());
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) {
throw new StreamModuleException(e.getMessage(), e);
}
}
@Override public final boolean defaultModule() {
return true;
}
public abstract List<Handler> handlerList() throws DefineException;
@Override protected final void initializeOtherContext() {
}
}
......@@ -9,7 +9,12 @@ import org.skywalking.apm.collector.core.module.ModuleInstaller;
*/
public class StreamModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "stream";
public static final String GROUP_NAME = "collector_inside";
private final StreamModuleInstaller installer;
public StreamModuleGroupDefine() {
installer = new StreamModuleInstaller();
}
@Override public String name() {
return GROUP_NAME;
......@@ -20,6 +25,6 @@ public class StreamModuleGroupDefine implements ModuleGroupDefine {
}
@Override public ModuleInstaller moduleInstaller() {
return new StreamModuleInstaller();
return installer;
}
}
package org.skywalking.apm.collector.stream;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.module.SingleModuleInstaller;
import org.skywalking.apm.collector.queue.QueueModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
......@@ -22,25 +19,26 @@ import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class StreamModuleInstaller implements ModuleInstaller {
public class StreamModuleInstaller extends SingleModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(StreamModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig, Map<String, ModuleDefine> moduleDefineMap,
ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning stream module install");
StreamModuleContext context = new StreamModuleContext(StreamModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
@Override public String groupName() {
return StreamModuleGroupDefine.GROUP_NAME;
}
initializeWorker(context);
@Override public Context moduleContext() {
return new StreamModuleContext(groupName());
}
logger.info("could not configure cluster module, use the default");
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder);
}
@Override public List<String> dependenceModules() {
List<String> dependenceModules = new LinkedList<>();
dependenceModules.add(QueueModuleGroupDefine.GROUP_NAME);
return dependenceModules;
}
@Override public void onAfterInstall() throws DefineException {
initializeWorker((StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(groupName()));
}
private void initializeWorker(StreamModuleContext context) throws DefineException {
......
package org.skywalking.apm.collector.stream.grpc;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.client.grpc.GRPCClient;
......@@ -28,46 +29,53 @@ public class StreamGRPCDataListener extends ClusterDataListener {
}
private Map<String, GRPCClient> clients = new HashMap<>();
private Map<String, RemoteWorkerRef> workerRefs = new HashMap<>();
private Map<String, List<RemoteWorkerRef>> remoteWorkerRefMap = new HashMap<>();
@Override public void addressChangedNotify() {
@Override public void serverJoinNotify(String serverAddress) {
String selfAddress = StreamGRPCConfig.HOST + ":" + StreamGRPCConfig.PORT;
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
List<String> addresses = getAddresses();
clients.keySet().forEach(address -> {
if (!addresses.contains(address)) {
context.getClusterWorkerContext().remove(workerRefs.get(address));
workerRefs.remove(address);
if (!clients.containsKey(serverAddress)) {
logger.info("new address: {}, create this address remote worker reference", serverAddress);
String[] hostPort = serverAddress.split(":");
GRPCClient client = new GRPCClient(hostPort[0], Integer.valueOf(hostPort[1]));
try {
client.initialize();
} catch (ClientException e) {
e.printStackTrace();
}
});
for (String address : addresses) {
if (!clients.containsKey(address)) {
logger.debug("new address: {}, create this address remote worker reference", address);
String[] hostPort = address.split(":");
GRPCClient client = new GRPCClient(hostPort[0], Integer.valueOf(hostPort[1]));
try {
client.initialize();
} catch (ClientException e) {
e.printStackTrace();
}
clients.put(address, client);
clients.put(serverAddress, client);
if (selfAddress.equals(address)) {
context.getClusterWorkerContext().getProviders().forEach(provider -> {
logger.debug("create remote worker self reference, role: {}", provider.role().roleName());
provider.create();
});
} else {
context.getClusterWorkerContext().getProviders().forEach(provider -> {
logger.debug("create remote worker reference, role: {}", provider.role().roleName());
RemoteWorkerRef workerRef = provider.create(client);
});
}
if (selfAddress.equals(serverAddress)) {
context.getClusterWorkerContext().getProviders().forEach(provider -> {
logger.info("create remote worker self reference, role: {}", provider.role().roleName());
provider.create();
});
} else {
logger.debug("address: {} had remote worker reference, ignore", address);
context.getClusterWorkerContext().getProviders().forEach(provider -> {
logger.info("create remote worker reference, role: {}", provider.role().roleName());
RemoteWorkerRef remoteWorkerRef = provider.create(client);
if (!remoteWorkerRefMap.containsKey(serverAddress)) {
remoteWorkerRefMap.put(serverAddress, new LinkedList<>());
}
remoteWorkerRefMap.get(serverAddress).add(remoteWorkerRef);
});
}
} else {
logger.info("address: {} had remote worker reference, ignore", serverAddress);
}
}
@Override public void serverQuitNotify(String serverAddress) {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
if (clients.containsKey(serverAddress)) {
clients.get(serverAddress).shutdown();
clients.remove(serverAddress);
}
if (remoteWorkerRefMap.containsKey(serverAddress)) {
for (RemoteWorkerRef remoteWorkerRef : remoteWorkerRefMap.get(serverAddress)) {
context.getClusterWorkerContext().remove(remoteWorkerRef);
}
}
}
......
......@@ -3,9 +3,7 @@ package org.skywalking.apm.collector.stream.grpc;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
......@@ -20,7 +18,7 @@ import org.skywalking.apm.collector.stream.grpc.handler.RemoteCommonServiceHandl
*/
public class StreamGRPCModuleDefine extends StreamModuleDefine {
public static final String MODULE_NAME = "stream";
public static final String MODULE_NAME = "grpc";
@Override public String name() {
return MODULE_NAME;
......@@ -34,7 +32,7 @@ public class StreamGRPCModuleDefine extends StreamModuleDefine {
return new StreamGRPCConfigParser();
}
@Override protected Client createClient(DataMonitor dataMonitor) {
@Override protected Client createClient() {
return null;
}
......@@ -50,7 +48,7 @@ public class StreamGRPCModuleDefine extends StreamModuleDefine {
return new StreamGRPCDataListener();
}
@Override public List<Handler> handlerList() throws DefineException {
@Override public List<Handler> handlerList() {
List<Handler> handlers = new ArrayList<>();
handlers.add(new RemoteCommonServiceHandler());
return handlers;
......
......@@ -10,8 +10,6 @@ import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -30,10 +28,10 @@ public class RemoteCommonServiceHandler extends RemoteCommonServiceGrpc.RemoteCo
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
Role role = context.getClusterWorkerContext().getRole(roleName);
Object object = role.dataDefine().deserialize(remoteData);
try {
Object object = role.dataDefine().deserialize(remoteData);
context.getClusterWorkerContext().lookupInSide(roleName).tell(object);
} catch (WorkerNotFoundException | WorkerInvokeException e) {
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
......
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.stream.worker;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.client.grpc.GRPCClient;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.remote.grpc.proto.Empty;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
......@@ -20,12 +21,14 @@ public class RemoteWorkerRef extends WorkerRef {
private final RemoteCommonServiceGrpc.RemoteCommonServiceStub stub;
private StreamObserver<RemoteMessage> streamObserver;
private final AbstractRemoteWorker remoteWorker;
private final String address;
public RemoteWorkerRef(Role role, AbstractRemoteWorker remoteWorker) {
super(role);
this.remoteWorker = remoteWorker;
this.acrossJVM = false;
this.stub = null;
this.address = Const.EMPTY_STRING;
}
public RemoteWorkerRef(Role role, GRPCClient client) {
......@@ -33,19 +36,23 @@ public class RemoteWorkerRef extends WorkerRef {
this.remoteWorker = null;
this.acrossJVM = true;
this.stub = RemoteCommonServiceGrpc.newStub(client.getChannel());
this.address = client.toString();
createStreamObserver();
}
@Override
public void tell(Object message) throws WorkerInvokeException {
if (acrossJVM) {
RemoteData remoteData = getRole().dataDefine().serialize(message);
RemoteMessage.Builder builder = RemoteMessage.newBuilder();
builder.setWorkerRole(getRole().roleName());
builder.setRemoteData(remoteData);
streamObserver.onNext(builder.build());
try {
RemoteData remoteData = getRole().dataDefine().serialize(message);
RemoteMessage.Builder builder = RemoteMessage.newBuilder();
builder.setWorkerRole(getRole().roleName());
builder.setRemoteData(remoteData);
streamObserver.onNext(builder.build());
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
} else {
remoteWorker.allocateJob(message);
}
......@@ -113,4 +120,11 @@ public class RemoteWorkerRef extends WorkerRef {
}
}
}
@Override public String toString() {
StringBuilder toString = new StringBuilder();
toString.append("acrossJVM: ").append(acrossJVM);
toString.append(", address: ").append(address);
return toString.toString();
}
}
......@@ -29,6 +29,11 @@ public class WorkerRefs<T extends WorkerRef> {
public void tell(Object message) throws WorkerInvokeException {
logger.debug("WorkerSelector instance of {}", workerSelector.getClass());
workerRefs.forEach(workerRef -> {
if (workerRef instanceof RemoteWorkerRef) {
logger.debug("message hashcode: {}, select workers: {}", message.hashCode(), workerRef.toString());
}
});
workerSelector.select(workerRefs, message).tell(message);
}
}
......@@ -122,7 +122,7 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker {
Data data = (Data)message;
if (dataCache.containsKey(data.id())) {
getRole().dataDefine().mergeData(data, dataCache.get(data.id()));
getRole().dataDefine().mergeData(dataCache.get(data.id()), data);
} else {
dataCache.put(data.id(), data);
}
......
package org.skywalking.apm.collector.ui;
import java.util.List;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.MultipleCommonModuleInstaller;
/**
* @author pengys5
*/
public class UICommonModuleInstaller extends MultipleCommonModuleInstaller {
@Override public String groupName() {
return UIModuleGroupDefine.GROUP_NAME;
}
@Override public Context moduleContext() {
return new UIModuleContext(groupName());
}
@Override public List<String> dependenceModules() {
return null;
}
}
package org.skywalking.apm.collector.ui;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class UIModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(UIModuleDefine.class);
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
try {
configParser().parse(config);
Server server = server();
serverHolder.holdServer(server, handlerList());
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) {
throw new UIModuleException(e.getMessage(), e);
}
}
@Override protected final Client createClient(DataMonitor dataMonitor) {
@Override protected final Client createClient() {
throw new UnsupportedOperationException("");
}
......@@ -47,5 +17,7 @@ public abstract class UIModuleDefine extends ModuleDefine implements ClusterData
return true;
}
public abstract List<Handler> handlerList();
@Override protected final void initializeOtherContext() {
}
}
......@@ -10,6 +10,11 @@ import org.skywalking.apm.collector.core.module.ModuleInstaller;
public class UIModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "ui";
private final UICommonModuleInstaller installer;
public UIModuleGroupDefine() {
installer = new UICommonModuleInstaller();
}
@Override public String name() {
return GROUP_NAME;
......@@ -20,6 +25,6 @@ public class UIModuleGroupDefine implements ModuleGroupDefine {
}
@Override public ModuleInstaller moduleInstaller() {
return new UIModuleInstaller();
return installer;
}
}
package org.skywalking.apm.collector.ui;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class UIModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(UIModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning ui module install");
UIModuleContext context = new UIModuleContext(UIModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder);
}
}
}
......@@ -15,6 +15,10 @@ public class UIJettyDataListener extends ClusterDataListener {
return PATH;
}
@Override public void addressChangedNotify() {
@Override public void serverJoinNotify(String serverAddress) {
}
@Override public void serverQuitNotify(String serverAddress) {
}
}
......@@ -3,7 +3,7 @@ package org.skywalking.apm.network.trace.component;
/**
* @author wusheng
*/
public class OfficialComponent implements Component{
public class OfficialComponent implements Component {
private int id;
private String name;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册