提交 dae5c692 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge pull request #428 from wu-sheng/fix/419

Fix/419 make agent jvm, agent register as agent stream sub maven module.
package org.skywalking.apm.collector.agentjvm;
import org.skywalking.apm.collector.core.framework.Context;
/**
* @author pengys5
*/
public class AgentJVMModuleContext extends Context {
public AgentJVMModuleContext(String groupName) {
super(groupName);
}
}
package org.skywalking.apm.collector.agentjvm;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.module.ModuleDefine;
/**
* @author pengys5
*/
public abstract class AgentJVMModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
@Override protected final Client createClient() {
throw new UnsupportedOperationException("");
}
@Override protected void initializeOtherContext() {
}
@Override public final boolean defaultModule() {
return true;
}
}
package org.skywalking.apm.collector.agentjvm;
import org.skywalking.apm.collector.core.module.ModuleException;
/**
* @author pengys5
*/
public class AgentJVMModuleException extends ModuleException {
public AgentJVMModuleException(String message) {
super(message);
}
public AgentJVMModuleException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.agentjvm;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
/**
* @author pengys5
*/
public class AgentJVMModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "agent_jvm";
private final AgentJVMCommonModuleInstaller installer;
public AgentJVMModuleGroupDefine() {
installer = new AgentJVMCommonModuleInstaller();
}
@Override public String name() {
return GROUP_NAME;
}
@Override public Context groupContext() {
return new AgentJVMModuleContext(GROUP_NAME);
}
@Override public ModuleInstaller moduleInstaller() {
return installer;
}
}
package org.skywalking.apm.collector.agentjvm.grpc;
/**
* @author pengys5
*/
public class AgentJVMGRPCConfig {
public static String HOST;
public static int PORT;
}
package org.skywalking.apm.collector.agentjvm.grpc;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class AgentJVMGRPCConfigParser implements ModuleConfigParser {
private static final String HOST = "host";
private static final String PORT = "port";
@Override public void parse(Map config) throws ConfigParseException {
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(HOST))) {
AgentJVMGRPCConfig.HOST = "localhost";
} else {
AgentJVMGRPCConfig.HOST = (String)config.get(HOST);
}
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(PORT))) {
AgentJVMGRPCConfig.PORT = 11800;
} else {
AgentJVMGRPCConfig.PORT = (Integer)config.get(PORT);
}
}
}
package org.skywalking.apm.collector.agentjvm.grpc;
import org.skywalking.apm.collector.agentjvm.AgentJVMModuleGroupDefine;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
/**
* @author pengys5
*/
public class AgentJVMGRPCDataListener extends ClusterDataListener {
public static final String PATH = ClusterModuleDefine.BASE_CATALOG + "." + AgentJVMModuleGroupDefine.GROUP_NAME + "." + AgentJVMGRPCModuleDefine.MODULE_NAME;
@Override public String path() {
return PATH;
}
@Override public void serverJoinNotify(String serverAddress) {
}
@Override public void serverQuitNotify(String serverAddress) {
}
}
package org.skywalking.apm.collector.agentjvm.grpc;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.agentjvm.AgentJVMModuleDefine;
import org.skywalking.apm.collector.agentjvm.AgentJVMModuleGroupDefine;
import org.skywalking.apm.collector.agentjvm.grpc.handler.JVMMetricsServiceHandler;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.server.grpc.GRPCServer;
/**
* @author pengys5
*/
public class AgentJVMGRPCModuleDefine extends AgentJVMModuleDefine {
public static final String MODULE_NAME = "grpc";
@Override protected String group() {
return AgentJVMModuleGroupDefine.GROUP_NAME;
}
@Override public String name() {
return MODULE_NAME;
}
@Override protected ModuleConfigParser configParser() {
return new AgentJVMGRPCConfigParser();
}
@Override protected Server server() {
return new GRPCServer(AgentJVMGRPCConfig.HOST, AgentJVMGRPCConfig.PORT);
}
@Override protected ModuleRegistration registration() {
return new AgentJVMGRPCModuleRegistration();
}
@Override public ClusterDataListener listener() {
return new AgentJVMGRPCDataListener();
}
@Override public List<Handler> handlerList() {
List<Handler> handlers = new LinkedList<>();
handlers.add(new JVMMetricsServiceHandler());
return handlers;
}
}
package org.skywalking.apm.collector.agentjvm.grpc;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public class AgentJVMGRPCModuleRegistration extends ModuleRegistration {
@Override public Value buildValue() {
return new Value(AgentJVMGRPCConfig.HOST, AgentJVMGRPCConfig.PORT, null);
}
}
......@@ -17,14 +17,6 @@ public class CpuMetricEsTableDefine extends ElasticSearchTableDefine {
return 1;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(CpuMetricTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(CpuMetricTable.COLUMN_USAGE_PERCENT, ElasticSearchColumnDefine.Type.Double.name()));
......
......@@ -17,14 +17,6 @@ public class GCMetricEsTableDefine extends ElasticSearchTableDefine {
return 1;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_PHRASE, ElasticSearchColumnDefine.Type.Integer.name()));
......
......@@ -17,14 +17,6 @@ public class MemoryMetricEsTableDefine extends ElasticSearchTableDefine {
return 1;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_APPLICATION_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_IS_HEAP, ElasticSearchColumnDefine.Type.Boolean.name()));
......
......@@ -17,14 +17,6 @@ public class MemoryPoolMetricEsTableDefine extends ElasticSearchTableDefine {
return 1;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(MemoryPoolMetricTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(MemoryPoolMetricTable.COLUMN_POOL_TYPE, ElasticSearchColumnDefine.Type.Integer.name()));
......
org.skywalking.apm.collector.agentjvm.AgentJVMModuleGroupDefine
\ No newline at end of file
org.skywalking.apm.collector.agentjvm.grpc.AgentJVMGRPCModuleDefine
\ No newline at end of file
......@@ -15,22 +15,22 @@
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-cluster</artifactId>
<artifactId>apm-collector-stream</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-server</artifactId>
<artifactId>apm-collector-cluster</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-network</artifactId>
<artifactId>apm-collector-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-agentstream</artifactId>
<artifactId>apm-network</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
......
package org.skywalking.apm.collector.agentregister;
import java.util.List;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.MultipleCommonModuleInstaller;
/**
* @author pengys5
*/
public class AgentRegisterCommonModuleInstaller extends MultipleCommonModuleInstaller {
@Override public String groupName() {
return AgentRegisterModuleGroupDefine.GROUP_NAME;
}
@Override public Context moduleContext() {
return new AgentRegisterModuleContext(groupName());
}
@Override public List<String> dependenceModules() {
return null;
}
}
package org.skywalking.apm.collector.agentregister;
import org.skywalking.apm.collector.core.framework.Context;
/**
* @author pengys5
*/
public class AgentRegisterModuleContext extends Context {
public AgentRegisterModuleContext(String groupName) {
super(groupName);
}
}
package org.skywalking.apm.collector.agentregister;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.module.ModuleDefine;
/**
* @author pengys5
*/
public abstract class AgentRegisterModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
@Override protected void initializeOtherContext() {
}
@Override protected final Client createClient() {
throw new UnsupportedOperationException("");
}
@Override public final boolean defaultModule() {
return true;
}
}
package org.skywalking.apm.collector.agentregister;
import org.skywalking.apm.collector.core.module.ModuleException;
/**
* @author pengys5
*/
public class AgentRegisterModuleException extends ModuleException {
public AgentRegisterModuleException(String message) {
super(message);
}
public AgentRegisterModuleException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.agentregister;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
/**
* @author pengys5
*/
public class AgentRegisterModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "agent_register";
private final AgentRegisterCommonModuleInstaller installer;
public AgentRegisterModuleGroupDefine() {
installer = new AgentRegisterCommonModuleInstaller();
}
@Override public String name() {
return GROUP_NAME;
}
@Override public Context groupContext() {
return new AgentRegisterModuleContext(GROUP_NAME);
}
@Override public ModuleInstaller moduleInstaller() {
return installer;
}
}
package org.skywalking.apm.collector.agentregister.application;
import org.skywalking.apm.collector.agentstream.worker.cache.ApplicationCache;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterRemoteWorker;
import org.skywalking.apm.collector.agentregister.worker.application.ApplicationRegisterRemoteWorker;
import org.skywalking.apm.collector.agentregister.worker.cache.ApplicationCache;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
......
package org.skywalking.apm.collector.agentregister.grpc;
/**
* @author pengys5
*/
public class AgentRegisterGRPCConfig {
public static String HOST;
public static int PORT;
}
package org.skywalking.apm.collector.agentregister.grpc;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class AgentRegisterGRPCConfigParser implements ModuleConfigParser {
private static final String HOST = "host";
private static final String PORT = "port";
@Override public void parse(Map config) throws ConfigParseException {
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(HOST))) {
AgentRegisterGRPCConfig.HOST = "localhost";
} else {
AgentRegisterGRPCConfig.HOST = (String)config.get(HOST);
}
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(PORT))) {
AgentRegisterGRPCConfig.PORT = 11800;
} else {
AgentRegisterGRPCConfig.PORT = (Integer)config.get(PORT);
}
}
}
package org.skywalking.apm.collector.agentregister.grpc;
import org.skywalking.apm.collector.agentregister.AgentRegisterModuleGroupDefine;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
/**
* @author pengys5
*/
public class AgentRegisterGRPCDataListener extends ClusterDataListener {
public static final String PATH = ClusterModuleDefine.BASE_CATALOG + "." + AgentRegisterModuleGroupDefine.GROUP_NAME + "." + AgentRegisterGRPCModuleDefine.MODULE_NAME;
@Override public String path() {
return PATH;
}
@Override public void serverJoinNotify(String serverAddress) {
}
@Override public void serverQuitNotify(String serverAddress) {
}
}
package org.skywalking.apm.collector.agentregister.grpc;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.agentregister.AgentRegisterModuleDefine;
import org.skywalking.apm.collector.agentregister.AgentRegisterModuleGroupDefine;
import org.skywalking.apm.collector.agentregister.grpc.handler.ApplicationRegisterServiceHandler;
import org.skywalking.apm.collector.agentregister.grpc.handler.InstanceDiscoveryServiceHandler;
import org.skywalking.apm.collector.agentregister.grpc.handler.ServiceNameDiscoveryServiceHandler;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.server.grpc.GRPCServer;
/**
* @author pengys5
*/
public class AgentRegisterGRPCModuleDefine extends AgentRegisterModuleDefine {
public static final String MODULE_NAME = "grpc";
@Override protected String group() {
return AgentRegisterModuleGroupDefine.GROUP_NAME;
}
@Override public String name() {
return MODULE_NAME;
}
@Override protected ModuleConfigParser configParser() {
return new AgentRegisterGRPCConfigParser();
}
@Override protected Server server() {
return new GRPCServer(AgentRegisterGRPCConfig.HOST, AgentRegisterGRPCConfig.PORT);
}
@Override protected ModuleRegistration registration() {
return new AgentRegisterGRPCModuleRegistration();
}
@Override public ClusterDataListener listener() {
return new AgentRegisterGRPCDataListener();
}
@Override public List<Handler> handlerList() {
List<Handler> handlers = new LinkedList<>();
handlers.add(new ApplicationRegisterServiceHandler());
handlers.add(new InstanceDiscoveryServiceHandler());
handlers.add(new ServiceNameDiscoveryServiceHandler());
return handlers;
}
}
package org.skywalking.apm.collector.agentregister.grpc;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public class AgentRegisterGRPCModuleRegistration extends ModuleRegistration {
@Override public Value buildValue() {
return new Value(AgentRegisterGRPCConfig.HOST, AgentRegisterGRPCConfig.PORT, null);
}
}
package org.skywalking.apm.collector.agentregister.instance;
import org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceRegisterRemoteWorker;
import org.skywalking.apm.collector.agentstream.worker.register.instance.dao.IInstanceDAO;
import org.skywalking.apm.collector.agentregister.worker.instance.InstanceRegisterRemoteWorker;
import org.skywalking.apm.collector.agentregister.worker.instance.dao.IInstanceDAO;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
......
package org.skywalking.apm.collector.agentregister.jetty;
/**
* @author pengys5
*/
public class AgentRegisterJettyConfig {
public static String HOST;
public static int PORT;
public static String CONTEXT_PATH;
}
package org.skywalking.apm.collector.agentregister.jetty;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class AgentRegisterJettyConfigParser implements ModuleConfigParser {
private static final String HOST = "host";
private static final String PORT = "port";
public static final String CONTEXT_PATH = "contextPath";
@Override public void parse(Map config) throws ConfigParseException {
AgentRegisterJettyConfig.CONTEXT_PATH = "/";
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(HOST))) {
AgentRegisterJettyConfig.HOST = "localhost";
} else {
AgentRegisterJettyConfig.HOST = (String)config.get(HOST);
}
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(PORT))) {
AgentRegisterJettyConfig.PORT = 12800;
} else {
AgentRegisterJettyConfig.PORT = (Integer)config.get(PORT);
}
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(CONTEXT_PATH))) {
AgentRegisterJettyConfig.CONTEXT_PATH = (String)config.get(CONTEXT_PATH);
}
}
}
package org.skywalking.apm.collector.agentregister.jetty;
import org.skywalking.apm.collector.agentregister.AgentRegisterModuleGroupDefine;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
/**
* @author pengys5
*/
public class AgentRegisterJettyDataListener extends ClusterDataListener {
public static final String PATH = ClusterModuleDefine.BASE_CATALOG + "." + AgentRegisterModuleGroupDefine.GROUP_NAME + "." + AgentRegisterJettyModuleDefine.MODULE_NAME;
@Override public String path() {
return PATH;
}
@Override public void serverJoinNotify(String serverAddress) {
}
@Override public void serverQuitNotify(String serverAddress) {
}
}
package org.skywalking.apm.collector.agentregister.jetty;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.agentregister.AgentRegisterModuleDefine;
import org.skywalking.apm.collector.agentregister.AgentRegisterModuleGroupDefine;
import org.skywalking.apm.collector.agentregister.jetty.handler.ApplicationRegisterServletHandler;
import org.skywalking.apm.collector.agentregister.jetty.handler.InstanceDiscoveryServletHandler;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.server.jetty.JettyServer;
/**
* @author pengys5
*/
public class AgentRegisterJettyModuleDefine extends AgentRegisterModuleDefine {
public static final String MODULE_NAME = "jetty";
@Override protected String group() {
return AgentRegisterModuleGroupDefine.GROUP_NAME;
}
@Override public String name() {
return MODULE_NAME;
}
@Override protected ModuleConfigParser configParser() {
return new AgentRegisterJettyConfigParser();
}
@Override protected Server server() {
return new JettyServer(AgentRegisterJettyConfig.HOST, AgentRegisterJettyConfig.PORT, AgentRegisterJettyConfig.CONTEXT_PATH);
}
@Override protected ModuleRegistration registration() {
return new AgentRegisterJettyModuleRegistration();
}
@Override public ClusterDataListener listener() {
return new AgentRegisterJettyDataListener();
}
@Override public List<Handler> handlerList() {
List<Handler> handlers = new LinkedList<>();
handlers.add(new ApplicationRegisterServletHandler());
handlers.add(new InstanceDiscoveryServletHandler());
return handlers;
}
}
package org.skywalking.apm.collector.agentregister.jetty;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public class AgentRegisterJettyModuleRegistration extends ModuleRegistration {
@Override public Value buildValue() {
return new Value(AgentRegisterJettyConfig.HOST, AgentRegisterJettyConfig.PORT, AgentRegisterJettyConfig.CONTEXT_PATH);
}
}
package org.skywalking.apm.collector.agentregister.jetty.handler;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agentregister.servicename.ServiceNameService;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceNameDiscoveryServiceHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(ServiceNameDiscoveryServiceHandler.class);
private ServiceNameService serviceNameService = new ServiceNameService();
private Gson gson = new Gson();
@Override public String pathSpec() {
return "/servicename/discovery";
}
private static final String APPLICATION_ID = "ai";
private static final String SERVICE_NAME = "sn";
private static final String SERVICE_ID = "si";
private static final String ELEMENT = "el";
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}
@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
JsonArray responseArray = new JsonArray();
try {
JsonArray services = gson.fromJson(req.getReader(), JsonArray.class);
for (JsonElement service : services) {
int applicationId = service.getAsJsonObject().get(APPLICATION_ID).getAsInt();
String serviceName = service.getAsJsonObject().get(SERVICE_NAME).getAsString();
int serviceId = serviceNameService.getOrCreate(applicationId, serviceName);
if (serviceId != 0) {
JsonObject responseJson = new JsonObject();
responseJson.addProperty(SERVICE_ID, serviceId);
responseJson.add(ELEMENT, service);
responseArray.add(responseJson);
}
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
return responseArray;
}
}
package org.skywalking.apm.collector.agentregister.servicename;
import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameRegisterRemoteWorker;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.IServiceNameDAO;
import org.skywalking.apm.collector.agentregister.worker.servicename.ServiceNameRegisterRemoteWorker;
import org.skywalking.apm.collector.agentregister.worker.servicename.dao.IServiceNameDAO;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.StreamModuleContext;
......
package org.skywalking.apm.collector.agentstream.worker.register.application;
package org.skywalking.apm.collector.agentregister.worker.application;
import org.skywalking.apm.collector.storage.define.register.ApplicationTable;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.define.register.ApplicationTable;
/**
* @author pengys5
......@@ -17,14 +17,6 @@ public class ApplicationEsTableDefine extends ElasticSearchTableDefine {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(ApplicationTable.COLUMN_APPLICATION_CODE, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(ApplicationTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
......
package org.skywalking.apm.collector.agentstream.worker.register.application;
package org.skywalking.apm.collector.agentregister.worker.application;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
......
package org.skywalking.apm.collector.agentstream.worker.register.application;
package org.skywalking.apm.collector.agentregister.worker.application;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
......
package org.skywalking.apm.collector.agentstream.worker.register.application;
package org.skywalking.apm.collector.agentregister.worker.application;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.agentstream.worker.register.IdAutoIncrement;
import org.skywalking.apm.collector.agentstream.worker.register.application.dao.IApplicationDAO;
import org.skywalking.apm.collector.agentregister.worker.IdAutoIncrement;
import org.skywalking.apm.collector.agentregister.worker.application.dao.IApplicationDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker;
......
package org.skywalking.apm.collector.agentstream.worker.register.application.dao;
package org.skywalking.apm.collector.agentregister.worker.application.dao;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
import org.skywalking.apm.collector.client.h2.H2Client;
......
package org.skywalking.apm.collector.agentstream.worker.register.application.dao;
package org.skywalking.apm.collector.agentregister.worker.application.dao;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
......
package org.skywalking.apm.collector.agentstream.worker.cache;
package org.skywalking.apm.collector.agentregister.worker.cache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.agentstream.worker.register.application.dao.IApplicationDAO;
import org.skywalking.apm.collector.agentregister.worker.application.dao.IApplicationDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
/**
......
package org.skywalking.apm.collector.agentstream.worker.register.instance;
package org.skywalking.apm.collector.agentregister.worker.instance;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
......@@ -17,14 +17,6 @@ public class InstanceEsTableDefine extends ElasticSearchTableDefine {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_AGENT_UUID, ElasticSearchColumnDefine.Type.Keyword.name()));
......
package org.skywalking.apm.collector.agentstream.worker.register.instance;
package org.skywalking.apm.collector.agentregister.worker.instance;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
......
package org.skywalking.apm.collector.agentstream.worker.register.instance;
package org.skywalking.apm.collector.agentregister.worker.instance;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
......
package org.skywalking.apm.collector.agentstream.worker.register.instance;
package org.skywalking.apm.collector.agentregister.worker.instance;
import org.skywalking.apm.collector.agentstream.worker.register.instance.dao.IInstanceDAO;
import org.skywalking.apm.collector.agentregister.worker.instance.dao.IInstanceDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
......
package org.skywalking.apm.collector.agentstream.worker.register.instance.dao;
package org.skywalking.apm.collector.agentregister.worker.instance.dao;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
......
package org.skywalking.apm.collector.agentstream.worker.register.instance.dao;
package org.skywalking.apm.collector.agentregister.worker.instance.dao;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
......
package org.skywalking.apm.collector.agentstream.worker.register.servicename;
package org.skywalking.apm.collector.agentregister.worker.servicename;
import org.skywalking.apm.collector.storage.define.register.ServiceNameTable;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.define.register.ServiceNameTable;
/**
* @author pengys5
......@@ -17,14 +17,6 @@ public class ServiceNameEsTableDefine extends ElasticSearchTableDefine {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(ServiceNameTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(ServiceNameTable.COLUMN_SERVICE_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
......
package org.skywalking.apm.collector.agentstream.worker.register.servicename;
package org.skywalking.apm.collector.agentregister.worker.servicename;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
......
package org.skywalking.apm.collector.agentstream.worker.register.servicename;
package org.skywalking.apm.collector.agentregister.worker.servicename;
import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
......
package org.skywalking.apm.collector.agentstream.worker.register.servicename;
package org.skywalking.apm.collector.agentregister.worker.servicename;
import org.skywalking.apm.collector.agentstream.worker.register.IdAutoIncrement;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.IServiceNameDAO;
import org.skywalking.apm.collector.agentregister.worker.IdAutoIncrement;
import org.skywalking.apm.collector.agentregister.worker.servicename.dao.IServiceNameDAO;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.define.DataDefine;
......
package org.skywalking.apm.collector.agentstream.worker.register.servicename.dao;
package org.skywalking.apm.collector.agentregister.worker.servicename.dao;
import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine;
......
package org.skywalking.apm.collector.agentstream.worker.register.servicename.dao;
package org.skywalking.apm.collector.agentregister.worker.servicename.dao;
import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
......
org.skywalking.apm.collector.agentregister.worker.application.dao.ApplicationEsDAO
org.skywalking.apm.collector.agentregister.worker.instance.dao.InstanceEsDAO
org.skywalking.apm.collector.agentregister.worker.servicename.dao.ServiceNameEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentregister.AgentRegisterModuleGroupDefine
\ No newline at end of file
org.skywalking.apm.collector.agentregister.worker.application.dao.ApplicationH2DAO
org.skywalking.apm.collector.agentregister.worker.instance.dao.InstanceH2DAO
org.skywalking.apm.collector.agentregister.worker.servicename.dao.ServiceNameH2DAO
\ No newline at end of file
org.skywalking.apm.collector.agentregister.worker.application.ApplicationRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentregister.worker.instance.InstanceRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentregister.worker.servicename.ServiceNameRegisterSerialWorker$Factory
\ No newline at end of file
org.skywalking.apm.collector.agentregister.grpc.AgentRegisterGRPCModuleDefine
org.skywalking.apm.collector.agentregister.jetty.AgentRegisterJettyModuleDefine
\ No newline at end of file
org.skywalking.apm.collector.agentregister.worker.application.ApplicationEsTableDefine
org.skywalking.apm.collector.agentregister.worker.application.ApplicationH2TableDefine
org.skywalking.apm.collector.agentregister.worker.instance.InstanceEsTableDefine
org.skywalking.apm.collector.agentregister.worker.instance.InstanceH2TableDefine
org.skywalking.apm.collector.agentregister.worker.servicename.ServiceNameEsTableDefine
org.skywalking.apm.collector.agentregister.worker.servicename.ServiceNameH2TableDefine
\ No newline at end of file
......@@ -33,5 +33,15 @@
<artifactId>apm-collector-storage</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-agentjvm</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-agentregister</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -10,10 +10,10 @@ import org.skywalking.apm.collector.core.module.ModuleInstaller;
public class AgentStreamModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "agent_stream";
private final AgentStreamCommonModuleInstaller installer;
private final AgentStreamModuleInstaller installer;
public AgentStreamModuleGroupDefine() {
installer = new AgentStreamCommonModuleInstaller();
installer = new AgentStreamModuleInstaller();
}
@Override public String name() {
......
package org.skywalking.apm.collector.agentjvm;
package org.skywalking.apm.collector.agentstream;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.MultipleCommonModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerException;
/**
* @author pengys5
*/
public class AgentJVMCommonModuleInstaller extends MultipleCommonModuleInstaller {
public class AgentStreamModuleInstaller extends MultipleCommonModuleInstaller {
@Override public String groupName() {
return AgentJVMModuleGroupDefine.GROUP_NAME;
return AgentStreamModuleGroupDefine.GROUP_NAME;
}
@Override public Context moduleContext() {
return new AgentJVMModuleContext(groupName());
return new AgentStreamModuleContext(groupName());
}
@Override public List<String> dependenceModules() {
return null;
}
@Override public void install() throws DefineException, ConfigException, ServerException, ClientException {
super.install();
new PersistenceTimer().start();
}
}
......@@ -2,6 +2,10 @@ package org.skywalking.apm.collector.agentstream.grpc;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.agentjvm.grpc.handler.JVMMetricsServiceHandler;
import org.skywalking.apm.collector.agentregister.grpc.handler.ApplicationRegisterServiceHandler;
import org.skywalking.apm.collector.agentregister.grpc.handler.InstanceDiscoveryServiceHandler;
import org.skywalking.apm.collector.agentregister.grpc.handler.ServiceNameDiscoveryServiceHandler;
import org.skywalking.apm.collector.agentstream.AgentStreamModuleDefine;
import org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine;
import org.skywalking.apm.collector.agentstream.grpc.handler.TraceSegmentServiceHandler;
......@@ -46,6 +50,10 @@ public class AgentStreamGRPCModuleDefine extends AgentStreamModuleDefine {
@Override public List<Handler> handlerList() {
List<Handler> handlers = new LinkedList<>();
handlers.add(new TraceSegmentServiceHandler());
handlers.add(new ApplicationRegisterServiceHandler());
handlers.add(new InstanceDiscoveryServiceHandler());
handlers.add(new ServiceNameDiscoveryServiceHandler());
handlers.add(new JVMMetricsServiceHandler());
return handlers;
}
}
......@@ -2,6 +2,9 @@ package org.skywalking.apm.collector.agentstream.jetty;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.agentregister.jetty.handler.ApplicationRegisterServletHandler;
import org.skywalking.apm.collector.agentregister.jetty.handler.InstanceDiscoveryServletHandler;
import org.skywalking.apm.collector.agentregister.jetty.handler.ServiceNameDiscoveryServiceHandler;
import org.skywalking.apm.collector.agentstream.AgentStreamModuleDefine;
import org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine;
import org.skywalking.apm.collector.agentstream.jetty.handler.TraceSegmentServletHandler;
......@@ -46,6 +49,9 @@ public class AgentStreamJettyModuleDefine extends AgentStreamModuleDefine {
@Override public List<Handler> handlerList() {
List<Handler> handlers = new LinkedList<>();
handlers.add(new TraceSegmentServletHandler());
handlers.add(new ApplicationRegisterServletHandler());
handlers.add(new InstanceDiscoveryServletHandler());
handlers.add(new ServiceNameDiscoveryServiceHandler());
return handlers;
}
}
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.cache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.agentstream.worker.register.instance.dao.IInstanceDAO;
import org.skywalking.apm.collector.agentregister.worker.instance.dao.IInstanceDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
/**
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.cache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.IServiceNameDAO;
import org.skywalking.apm.collector.agentregister.worker.servicename.dao.IServiceNameDAO;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
......
package org.skywalking.apm.collector.agentstream.worker.config;
/**
* @author pengys5
*/
public class CacheSizeConfig {
public static class Cache {
public static class Analysis {
public static int SIZE = 1024;
}
public static class Persistence {
public static int SIZE = 5000;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.config;
/**
* @author pengys5
*/
public class WorkerConfig {
public static class WorkerNum {
public static class Node {
public static class NodeCompAgg {
public static int VALUE = 2;
}
public static class NodeMappingDayAgg {
public static int VALUE = 2;
}
public static class NodeMappingHourAgg {
public static int VALUE = 2;
}
public static class NodeMappingMinuteAgg {
public static int VALUE = 2;
}
}
public static class NodeRef {
public static class NodeRefDayAgg {
public static int VALUE = 2;
}
public static class NodeRefHourAgg {
public static int VALUE = 2;
}
public static class NodeRefMinuteAgg {
public static int VALUE = 2;
}
public static class NodeRefResSumDayAgg {
public static int VALUE = 2;
}
public static class NodeRefResSumHourAgg {
public static int VALUE = 2;
}
public static class NodeRefResSumMinuteAgg {
public static int VALUE = 2;
}
}
public static class GlobalTrace {
public static class GlobalTraceAgg {
public static int VALUE = 2;
}
}
}
public static class Queue {
public static class GlobalTrace {
public static class GlobalTraceAnalysis {
public static int SIZE = 1024;
}
}
public static class Segment {
public static class SegmentAnalysis {
public static int SIZE = 1024;
}
public static class SegmentCostAnalysis {
public static int SIZE = 4096;
}
public static class SegmentExceptionAnalysis {
public static int SIZE = 4096;
}
}
public static class Node {
public static class NodeCompAnalysis {
public static int SIZE = 1024;
}
public static class NodeMappingDayAnalysis {
public static int SIZE = 1024;
}
public static class NodeMappingHourAnalysis {
public static int SIZE = 1024;
}
public static class NodeMappingMinuteAnalysis {
public static int SIZE = 1024;
}
}
public static class NodeRef {
public static class NodeRefDayAnalysis {
public static int SIZE = 1024;
}
public static class NodeRefHourAnalysis {
public static int SIZE = 1024;
}
public static class NodeRefMinuteAnalysis {
public static int SIZE = 1024;
}
public static class NodeRefResSumDayAnalysis {
public static int SIZE = 1024;
}
public static class NodeRefResSumHourAnalysis {
public static int SIZE = 1024;
}
public static class NodeRefResSumMinuteAnalysis {
public static int SIZE = 1024;
}
}
}
}
......@@ -17,14 +17,6 @@ public class GlobalTraceEsTableDefine extends ElasticSearchTableDefine {
return 5;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(GlobalTraceTable.COLUMN_SEGMENT_ID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, ElasticSearchColumnDefine.Type.Keyword.name()));
......
......@@ -17,14 +17,6 @@ public class InstPerformanceEsTableDefine extends ElasticSearchTableDefine {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
......
......@@ -17,14 +17,6 @@ public class NodeComponentEsTableDefine extends ElasticSearchTableDefine {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_COMPONENT_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_COMPONENT_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
......
......@@ -17,14 +17,6 @@ public class NodeMappingEsTableDefine extends ElasticSearchTableDefine {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(NodeMappingTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeMappingTable.COLUMN_ADDRESS_ID, ElasticSearchColumnDefine.Type.Integer.name()));
......
......@@ -17,14 +17,6 @@ public class NodeReferenceEsTableDefine extends ElasticSearchTableDefine {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
......
......@@ -17,14 +17,6 @@ public class SegmentCostEsTableDefine extends ElasticSearchTableDefine {
return 5;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_SEGMENT_ID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_SERVICE_NAME, ElasticSearchColumnDefine.Type.Text.name()));
......
......@@ -17,14 +17,6 @@ public class SegmentEsTableDefine extends ElasticSearchTableDefine {
return 10;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(SegmentTable.COLUMN_DATA_BINARY, ElasticSearchColumnDefine.Type.Binary.name()));
}
......
......@@ -17,14 +17,6 @@ public class ServiceEntryEsTableDefine extends ElasticSearchTableDefine {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(ServiceEntryTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
......
......@@ -17,14 +17,6 @@ public class ServiceReferenceEsTableDefine extends ElasticSearchTableDefine {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(ServiceReferenceTable.COLUMN_AGG, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
......
org.skywalking.apm.collector.agentstream.worker.register.application.dao.ApplicationEsDAO
org.skywalking.apm.collector.agentstream.worker.register.instance.dao.InstanceEsDAO
org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.ServiceNameEsDAO
org.skywalking.apm.collector.agentstream.worker.node.component.dao.NodeComponentEsDAO
org.skywalking.apm.collector.agentstream.worker.node.mapping.dao.NodeMappingEsDAO
org.skywalking.apm.collector.agentstream.worker.noderef.dao.NodeReferenceEsDAO
......
org.skywalking.apm.collector.agentstream.worker.register.application.dao.ApplicationH2DAO
org.skywalking.apm.collector.agentstream.worker.register.instance.dao.InstanceH2DAO
org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.ServiceNameH2DAO
org.skywalking.apm.collector.agentstream.worker.node.component.dao.NodeComponentH2DAO
org.skywalking.apm.collector.agentstream.worker.node.mapping.dao.NodeMappingH2DAO
org.skywalking.apm.collector.agentstream.worker.noderef.dao.NodeReferenceH2DAO
......
......@@ -17,8 +17,4 @@ org.skywalking.apm.collector.agentstream.worker.segment.origin.SegmentPersistenc
org.skywalking.apm.collector.agentstream.worker.segment.cost.SegmentCostPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.global.GlobalTracePersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentstream.worker.instance.performance.InstPerformancePersistenceWorker$Factory
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceRegisterRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameRegisterRemoteWorker$Factory
org.skywalking.apm.collector.agentregister.worker.application.ApplicationRegisterRemoteWorker$Factory
org.skywalking.apm.collector.agentregister.worker.instance.InstanceRegisterRemoteWorker$Factory
org.skywalking.apm.collector.agentregister.worker.servicename.ServiceNameRegisterRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingRemoteWorker$Factory
......
......@@ -7,15 +7,6 @@ org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingH
org.skywalking.apm.collector.agentstream.worker.noderef.define.NodeReferenceEsTableDefine
org.skywalking.apm.collector.agentstream.worker.noderef.define.NodeReferenceH2TableDefine
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationEsTableDefine
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationH2TableDefine
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceEsTableDefine
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceH2TableDefine
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameEsTableDefine
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameH2TableDefine
org.skywalking.apm.collector.agentstream.worker.segment.origin.define.SegmentEsTableDefine
org.skywalking.apm.collector.agentstream.worker.segment.origin.define.SegmentH2TableDefine
......
......@@ -5,9 +5,9 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import org.skywalking.apm.collector.agentstream.HttpClientTools;
import org.skywalking.apm.collector.agentstream.worker.register.application.dao.ApplicationEsDAO;
import org.skywalking.apm.collector.agentstream.worker.register.instance.dao.InstanceEsDAO;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.ServiceNameEsDAO;
import org.skywalking.apm.collector.agentregister.worker.application.dao.ApplicationEsDAO;
import org.skywalking.apm.collector.agentregister.worker.instance.dao.InstanceEsDAO;
import org.skywalking.apm.collector.agentregister.worker.servicename.dao.ServiceNameEsDAO;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
......
......@@ -7,14 +7,6 @@ agent_server:
host: localhost
port: 10800
context_path: /
agent_register:
grpc:
host: localhost
port: 11800
jetty:
host: localhost
port: 12800
context_path: /
agent_stream:
grpc:
host: localhost
......@@ -23,10 +15,6 @@ agent_stream:
host: localhost
port: 12800
context_path: /
agent_jvm:
grpc:
host: localhost
port: 11800
ui:
jetty:
host: localhost
......@@ -41,3 +29,5 @@ storage:
cluster_name: CollectorDBCluster
cluster_transport_sniffer: true
cluster_nodes: localhost:9300
index_shards_number: 2
index_replicas_number: 0
......@@ -7,4 +7,6 @@ public class StorageElasticSearchConfig {
public static String CLUSTER_NAME;
public static Boolean CLUSTER_TRANSPORT_SNIFFER;
public static String CLUSTER_NODES;
public static Integer INDEX_SHARDS_NUMBER;
public static Integer INDEX_REPLICAS_NUMBER;
}
......@@ -14,6 +14,8 @@ public class StorageElasticSearchConfigParser implements ModuleConfigParser {
private static final String CLUSTER_NAME = "cluster_name";
private static final String CLUSTER_TRANSPORT_SNIFFER = "cluster_transport_sniffer";
private static final String CLUSTER_NODES = "cluster_nodes";
private static final String INDEX_SHARDS_NUMBER = "index_shards_number";
private static final String INDEX_REPLICAS_NUMBER = "index_replicas_number";
@Override public void parse(Map config) throws ConfigParseException {
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(CLUSTER_NAME))) {
......@@ -25,5 +27,15 @@ public class StorageElasticSearchConfigParser implements ModuleConfigParser {
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(CLUSTER_NODES))) {
StorageElasticSearchConfig.CLUSTER_NODES = (String)config.get(CLUSTER_NODES);
}
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(INDEX_SHARDS_NUMBER))) {
StorageElasticSearchConfig.INDEX_SHARDS_NUMBER = (Integer)config.get(INDEX_SHARDS_NUMBER);
} else {
StorageElasticSearchConfig.INDEX_SHARDS_NUMBER = 2;
}
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(INDEX_REPLICAS_NUMBER))) {
StorageElasticSearchConfig.INDEX_REPLICAS_NUMBER = (Integer)config.get(INDEX_REPLICAS_NUMBER);
} else {
StorageElasticSearchConfig.INDEX_REPLICAS_NUMBER = 0;
}
}
}
......@@ -11,6 +11,7 @@ import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.storage.ColumnDefine;
import org.skywalking.apm.collector.core.storage.StorageInstaller;
import org.skywalking.apm.collector.core.storage.TableDefine;
import org.skywalking.apm.collector.storage.elasticsearch.StorageElasticSearchConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -51,8 +52,8 @@ public class ElasticSearchStorageInstaller extends StorageInstaller {
private Settings createSettingBuilder(ElasticSearchTableDefine tableDefine) {
return Settings.builder()
.put("index.number_of_shards", tableDefine.numberOfShards())
.put("index.number_of_replicas", tableDefine.numberOfReplicas())
.put("index.number_of_shards", StorageElasticSearchConfig.INDEX_SHARDS_NUMBER)
.put("index.number_of_replicas", StorageElasticSearchConfig.INDEX_REPLICAS_NUMBER)
.put("index.refresh_interval", String.valueOf(tableDefine.refreshInterval()) + "s")
.put("analysis.analyzer.collector_analyzer.tokenizer", "collector_tokenizer")
......
......@@ -16,8 +16,4 @@ public abstract class ElasticSearchTableDefine extends TableDefine {
}
public abstract int refreshInterval();
public abstract int numberOfShards();
public abstract int numberOfReplicas();
}
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.ui.dao;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import org.elasticsearch.action.search.SearchRequestBuilder;
......@@ -14,6 +15,7 @@ import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.skywalking.apm.collector.core.util.ColumnNameUtils;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.define.serviceref.ServiceReferenceTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
......@@ -112,13 +114,12 @@ public class ServiceReferenceEsDAO extends EsDAO implements IServiceReferenceDAO
Map<String, JsonObject> serviceReferenceMap = new LinkedHashMap<>();
JsonArray serviceReferenceArray = new JsonArray();
SearchResponse searchResponse = searchRequestBuilder.get();
Terms frontServiceIdTerms = searchResponse.getAggregations().get(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID);
for (Terms.Bucket frontServiceBucket : frontServiceIdTerms.getBuckets()) {
int frontServiceId = frontServiceBucket.getKeyAsNumber().intValue();
if (frontServiceId != 0) {
parseSubAggregate(serviceReferenceMap, serviceReferenceArray, frontServiceBucket, frontServiceId);
parseSubAggregate(serviceReferenceMap, frontServiceBucket, frontServiceId);
}
}
......@@ -128,15 +129,23 @@ public class ServiceReferenceEsDAO extends EsDAO implements IServiceReferenceDAO
if (StringUtils.isNotEmpty(frontServiceName)) {
String[] serviceNames = frontServiceName.split(Const.ID_SPLIT);
int frontServiceId = ServiceIdCache.getForUI(Integer.parseInt(serviceNames[0]), serviceNames[1]);
parseSubAggregate(serviceReferenceMap, serviceReferenceArray, frontServiceBucket, frontServiceId);
parseSubAggregate(serviceReferenceMap, frontServiceBucket, frontServiceId);
}
}
serviceReferenceMap.values().forEach(serviceReferenceArray::add);
JsonArray serviceReferenceArray = new JsonArray();
JsonObject rootServiceReference = findRoot(serviceReferenceMap);
if (ObjectUtils.isNotEmpty(rootServiceReference)) {
String id = rootServiceReference.get(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID)) + Const.ID_SPLIT + rootServiceReference.get(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID));
serviceReferenceMap.remove(id);
int rootServiceId = rootServiceReference.get(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID)).getAsInt();
sortAsTree(rootServiceId, serviceReferenceArray, serviceReferenceMap);
}
return serviceReferenceArray;
}
private void parseSubAggregate(Map<String, JsonObject> serviceReferenceMap, JsonArray serviceReferenceArray,
private void parseSubAggregate(Map<String, JsonObject> serviceReferenceMap,
Terms.Bucket frontServiceBucket,
int frontServiceId) {
Terms behindServiceIdTerms = frontServiceBucket.getAggregations().get(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID);
......@@ -232,4 +241,29 @@ public class ServiceReferenceEsDAO extends EsDAO implements IServiceReferenceDAO
long newValue = newReference.get(key).getAsLong();
oldReference.addProperty(key, oldValue + newValue);
}
private JsonObject findRoot(Map<String, JsonObject> serviceReferenceMap) {
for (JsonObject serviceReference : serviceReferenceMap.values()) {
int behindServiceId = serviceReference.get(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID)).getAsInt();
if (behindServiceId == 1) {
return serviceReference;
}
}
return null;
}
private void sortAsTree(int serviceId, JsonArray serviceReferenceArray,
Map<String, JsonObject> serviceReferenceMap) {
Iterator<JsonObject> iterator = serviceReferenceMap.values().iterator();
while (iterator.hasNext()) {
JsonObject serviceReference = iterator.next();
int frontServiceId = serviceReference.get(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID)).getAsInt();
if (serviceId == frontServiceId) {
serviceReferenceArray.add(serviceReference);
int behindServiceId = serviceReference.get(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID)).getAsInt();
sortAsTree(behindServiceId, serviceReferenceArray, serviceReferenceMap);
}
}
}
}
......@@ -28,11 +28,12 @@ public class InstanceHealthService {
IInstanceDAO instanceDAO = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
List<InstanceDataDefine.Instance> instanceList = instanceDAO.getInstances(applicationId, halfHourBeforeTimeBucket);
JsonArray instances = new JsonArray();
response.add("instances", instances);
instanceList.forEach(instance -> {
JsonArray instances = new JsonArray();
response.addProperty("applicationCode", ApplicationCache.getForUI(applicationId));
response.addProperty("applicationId", applicationId);
response.add("instances", instances);
IInstPerformanceDAO instPerformanceDAO = (IInstPerformanceDAO)DAOContainer.INSTANCE.get(IInstPerformanceDAO.class.getName());
IInstPerformanceDAO.InstPerformance performance = instPerformanceDAO.get(timeBuckets, instance.getInstanceId());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册