提交 3de64ff0 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge pull request #326 from wu-sheng/feature/266

Feature/266 collector support modularization development
......@@ -18,5 +18,5 @@ script:
- mvn clean install
after_success:
- mvn clean cobertura:cobertura coveralls:report
- mvn clean test jacoco:report coveralls:report
- bash ./travis/push_image.sh
......@@ -9,6 +9,24 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-discovery</artifactId>
<artifactId>apm-collector-agentjvm</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-cluster</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-network</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.agentjvm;
import org.skywalking.apm.collector.core.framework.Context;
/**
* @author pengys5
*/
public class AgentJVMModuleContext extends Context {
public AgentJVMModuleContext(String groupName) {
super(groupName);
}
}
package org.skywalking.apm.collector.agentjvm;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class AgentJVMModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(AgentJVMModuleDefine.class);
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
try {
configParser().parse(config);
Server server = server();
serverHolder.holdServer(server, handlerList());
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) {
throw new AgentJVMModuleException(e.getMessage(), e);
}
}
@Override protected final Client createClient(DataMonitor dataMonitor) {
throw new UnsupportedOperationException("");
}
@Override public final boolean defaultModule() {
return true;
}
public abstract List<Handler> handlerList();
}
package org.skywalking.apm.collector.agentjvm;
import org.skywalking.apm.collector.core.module.ModuleException;
/**
* @author pengys5
*/
public class AgentJVMModuleException extends ModuleException {
public AgentJVMModuleException(String message) {
super(message);
}
public AgentJVMModuleException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.agentjvm;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
/**
* @author pengys5
*/
public class AgentJVMModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "agent_jvm";
@Override public String name() {
return GROUP_NAME;
}
@Override public Context groupContext() {
return new AgentJVMModuleContext(GROUP_NAME);
}
@Override public ModuleInstaller moduleInstaller() {
return new AgentJVMModuleInstaller();
}
}
package org.skywalking.apm.collector.agentjvm;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class AgentJVMModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(AgentJVMModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning agent jvm module install");
AgentJVMModuleContext context = new AgentJVMModuleContext(AgentJVMModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder);
}
}
}
package org.skywalking.apm.collector.agentjvm.grpc;
/**
* @author pengys5
*/
public class AgentJVMGRPCConfig {
public static String HOST;
public static int PORT;
}
package org.skywalking.apm.collector.agentjvm.grpc;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class AgentJVMGRPCConfigParser implements ModuleConfigParser {
private static final String HOST = "host";
private static final String PORT = "port";
@Override public void parse(Map config) throws ConfigParseException {
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(HOST))) {
AgentJVMGRPCConfig.HOST = "localhost";
} else {
AgentJVMGRPCConfig.HOST = (String)config.get(HOST);
}
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(PORT))) {
AgentJVMGRPCConfig.PORT = 11800;
} else {
AgentJVMGRPCConfig.PORT = (Integer)config.get(PORT);
}
}
}
package org.skywalking.apm.collector.agentjvm.grpc;
import org.skywalking.apm.collector.agentjvm.AgentJVMModuleGroupDefine;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
/**
* @author pengys5
*/
public class AgentJVMGRPCDataListener extends ClusterDataListener {
public static final String PATH = ClusterModuleDefine.BASE_CATALOG + "." + AgentJVMModuleGroupDefine.GROUP_NAME + "." + AgentJVMGRPCModuleDefine.MODULE_NAME;
@Override public String path() {
return PATH;
}
@Override public void addressChangedNotify() {
}
}
package org.skywalking.apm.collector.agentjvm.grpc;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.agentjvm.AgentJVMModuleDefine;
import org.skywalking.apm.collector.agentjvm.AgentJVMModuleGroupDefine;
import org.skywalking.apm.collector.agentjvm.grpc.handler.JVMMetricsServiceHandler;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.server.grpc.GRPCServer;
/**
* @author pengys5
*/
public class AgentJVMGRPCModuleDefine extends AgentJVMModuleDefine {
public static final String MODULE_NAME = "grpc";
@Override protected String group() {
return AgentJVMModuleGroupDefine.GROUP_NAME;
}
@Override public String name() {
return MODULE_NAME;
}
@Override protected ModuleConfigParser configParser() {
return new AgentJVMGRPCConfigParser();
}
@Override protected Server server() {
return new GRPCServer(AgentJVMGRPCConfig.HOST, AgentJVMGRPCConfig.PORT);
}
@Override protected ModuleRegistration registration() {
return new AgentJVMGRPCModuleRegistration();
}
@Override public ClusterDataListener listener() {
return new AgentJVMGRPCDataListener();
}
@Override public List<Handler> handlerList() {
List<Handler> handlers = new LinkedList<>();
handlers.add(new JVMMetricsServiceHandler());
return handlers;
}
}
package org.skywalking.apm.collector.agentjvm.grpc;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public class AgentJVMGRPCModuleRegistration extends ModuleRegistration {
@Override public Value buildValue() {
return new Value(AgentJVMGRPCConfig.HOST, AgentJVMGRPCConfig.PORT, null);
}
}
package org.skywalking.apm.collector.agentjvm.grpc.handler;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.JVMMetrics;
import org.skywalking.apm.network.proto.JVMMetricsServiceGrpc;
/**
* @author pengys5
*/
public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsServiceImplBase implements GRPCHandler {
@Override public void collect(JVMMetrics request, StreamObserver<Downstream> responseObserver) {
super.collect(request, responseObserver);
}
}
org.skywalking.apm.collector.agentjvm.AgentJVMModuleGroupDefine
\ No newline at end of file
org.skywalking.apm.collector.agentjvm.grpc.AgentJVMGRPCModuleDefine
\ No newline at end of file
......@@ -3,12 +3,35 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-storage</artifactId>
<artifactId>apm-collector</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>elasticsearch-storage</artifactId>
<artifactId>apm-collector-agentregister</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-cluster</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-network</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-agentstream</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.agentregister;
import org.skywalking.apm.collector.core.framework.Context;
/**
* @author pengys5
*/
public class AgentRegisterModuleContext extends Context {
public AgentRegisterModuleContext(String groupName) {
super(groupName);
}
}
package org.skywalking.apm.collector.agentregister;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class AgentRegisterModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(AgentRegisterModuleDefine.class);
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
try {
configParser().parse(config);
Server server = server();
serverHolder.holdServer(server, handlerList());
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) {
throw new AgentRegisterModuleException(e.getMessage(), e);
}
}
@Override protected final Client createClient(DataMonitor dataMonitor) {
throw new UnsupportedOperationException("");
}
@Override public final boolean defaultModule() {
return true;
}
public abstract List<Handler> handlerList();
}
package org.skywalking.apm.collector.agentregister;
import org.skywalking.apm.collector.core.module.ModuleException;
/**
* @author pengys5
*/
public class AgentRegisterModuleException extends ModuleException {
public AgentRegisterModuleException(String message) {
super(message);
}
public AgentRegisterModuleException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.agentregister;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
/**
* @author pengys5
*/
public class AgentRegisterModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "agent_register";
@Override public String name() {
return GROUP_NAME;
}
@Override public Context groupContext() {
return new AgentRegisterModuleContext(GROUP_NAME);
}
@Override public ModuleInstaller moduleInstaller() {
return new AgentRegisterModuleInstaller();
}
}
package org.skywalking.apm.collector.agentregister;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class AgentRegisterModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(AgentRegisterModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning agent register module install");
AgentRegisterModuleContext context = new AgentRegisterModuleContext(AgentRegisterModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder);
}
}
}
package org.skywalking.apm.collector.agentregister.application;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterRemoteWorker;
import org.skywalking.apm.collector.agentstream.worker.register.application.dao.IApplicationDAO;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ApplicationIDService {
private final Logger logger = LoggerFactory.getLogger(ApplicationIDService.class);
public int getOrCreate(String applicationCode) {
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
int applicationId = dao.getApplicationId(applicationCode);
if (applicationId == 0) {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
ApplicationDataDefine.Application application = new ApplicationDataDefine.Application(applicationCode, applicationCode, 0);
try {
context.getClusterWorkerContext().lookup(ApplicationRegisterRemoteWorker.WorkerRole.INSTANCE).tell(application);
} catch (WorkerNotFoundException | WorkerInvokeException e) {
logger.error(e.getMessage(), e);
}
}
return applicationId;
}
}
package org.skywalking.apm.collector.agentregister.grpc;
/**
* @author pengys5
*/
public class AgentRegisterGRPCConfig {
public static String HOST;
public static int PORT;
}
package org.skywalking.apm.collector.agentregister.grpc;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class AgentRegisterGRPCConfigParser implements ModuleConfigParser {
private static final String HOST = "host";
private static final String PORT = "port";
@Override public void parse(Map config) throws ConfigParseException {
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(HOST))) {
AgentRegisterGRPCConfig.HOST = "localhost";
} else {
AgentRegisterGRPCConfig.HOST = (String)config.get(HOST);
}
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(PORT))) {
AgentRegisterGRPCConfig.PORT = 11800;
} else {
AgentRegisterGRPCConfig.PORT = (Integer)config.get(PORT);
}
}
}
package org.skywalking.apm.collector.agentregister.grpc;
import org.skywalking.apm.collector.agentregister.AgentRegisterModuleGroupDefine;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
/**
* @author pengys5
*/
public class AgentRegisterGRPCDataListener extends ClusterDataListener {
public static final String PATH = ClusterModuleDefine.BASE_CATALOG + "." + AgentRegisterModuleGroupDefine.GROUP_NAME + "." + AgentRegisterGRPCModuleDefine.MODULE_NAME;
@Override public String path() {
return PATH;
}
@Override public void addressChangedNotify() {
}
}
package org.skywalking.apm.collector.agentregister.grpc;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.agentregister.AgentRegisterModuleDefine;
import org.skywalking.apm.collector.agentregister.AgentRegisterModuleGroupDefine;
import org.skywalking.apm.collector.agentregister.grpc.handler.ApplicationRegisterServiceHandler;
import org.skywalking.apm.collector.agentregister.grpc.handler.InstanceDiscoveryServiceHandler;
import org.skywalking.apm.collector.agentregister.grpc.handler.ServiceNameDiscoveryServiceHandler;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.server.grpc.GRPCServer;
/**
* @author pengys5
*/
public class AgentRegisterGRPCModuleDefine extends AgentRegisterModuleDefine {
public static final String MODULE_NAME = "grpc";
@Override protected String group() {
return AgentRegisterModuleGroupDefine.GROUP_NAME;
}
@Override public String name() {
return MODULE_NAME;
}
@Override protected ModuleConfigParser configParser() {
return new AgentRegisterGRPCConfigParser();
}
@Override protected Server server() {
return new GRPCServer(AgentRegisterGRPCConfig.HOST, AgentRegisterGRPCConfig.PORT);
}
@Override protected ModuleRegistration registration() {
return new AgentRegisterGRPCModuleRegistration();
}
@Override public ClusterDataListener listener() {
return new AgentRegisterGRPCDataListener();
}
@Override public List<Handler> handlerList() {
List<Handler> handlers = new LinkedList<>();
handlers.add(new ApplicationRegisterServiceHandler());
handlers.add(new InstanceDiscoveryServiceHandler());
handlers.add(new ServiceNameDiscoveryServiceHandler());
return handlers;
}
}
package org.skywalking.apm.collector.agentregister.grpc;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public class AgentRegisterGRPCModuleRegistration extends ModuleRegistration {
@Override public Value buildValue() {
return new Value(AgentRegisterGRPCConfig.HOST, AgentRegisterGRPCConfig.PORT, null);
}
}
package org.skywalking.apm.collector.agentregister.grpc.handler;
import com.google.protobuf.ProtocolStringList;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agentregister.application.ApplicationIDService;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Application;
import org.skywalking.apm.network.proto.ApplicationMapping;
import org.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.skywalking.apm.network.proto.KeyWithIntegerValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ApplicationRegisterServiceHandler extends ApplicationRegisterServiceGrpc.ApplicationRegisterServiceImplBase implements GRPCHandler {
private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterServiceHandler.class);
private ApplicationIDService applicationIDService = new ApplicationIDService();
@Override public void register(Application request, StreamObserver<ApplicationMapping> responseObserver) {
logger.debug("register application");
ProtocolStringList applicationCodes = request.getApplicationCodeList();
for (int i = 0; i < applicationCodes.size(); i++) {
String applicationCode = applicationCodes.get(i);
int applicationId = applicationIDService.getOrCreate(applicationCode);
KeyWithIntegerValue value = KeyWithIntegerValue.newBuilder().setKey(applicationCode).setValue(applicationId).build();
ApplicationMapping mapping = ApplicationMapping.newBuilder().addApplication(i, value).build();
responseObserver.onNext(mapping);
}
responseObserver.onCompleted();
}
}
package org.skywalking.apm.collector.agentregister.grpc.handler;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agentregister.instance.InstanceIDService;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.ApplicationInstance;
import org.skywalking.apm.network.proto.ApplicationInstanceHeartbeat;
import org.skywalking.apm.network.proto.ApplicationInstanceMapping;
import org.skywalking.apm.network.proto.ApplicationInstanceRecover;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceImplBase implements GRPCHandler {
private final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServiceHandler.class);
private InstanceIDService instanceIDService = new InstanceIDService();
@Override
public void register(ApplicationInstance request, StreamObserver<ApplicationInstanceMapping> responseObserver) {
int instanceId = instanceIDService.getOrCreate(request.getApplicationId(), request.getAgentUUID(), request.getRegisterTime());
ApplicationInstanceMapping.Builder builder = ApplicationInstanceMapping.newBuilder();
builder.setApplicationId(request.getApplicationId());
builder.setApplicationInstanceId(instanceId);
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
@Override public void heartbeat(ApplicationInstanceHeartbeat request, StreamObserver<Downstream> responseObserver) {
instanceIDService.heartBeat(request.getApplicationInstanceId(), request.getHeartbeatTime());
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}
@Override
public void registerRecover(ApplicationInstanceRecover request, StreamObserver<Downstream> responseObserver) {
instanceIDService.recover(request.getApplicationInstanceId(), request.getApplicationId(), request.getRegisterTime());
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}
}
package org.skywalking.apm.collector.agentregister.grpc.handler;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.skywalking.apm.collector.agentregister.servicename.ServiceNameService;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.ServiceNameCollection;
import org.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;
import org.skywalking.apm.network.proto.ServiceNameElement;
import org.skywalking.apm.network.proto.ServiceNameMappingCollection;
import org.skywalking.apm.network.proto.ServiceNameMappingElement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceNameDiscoveryServiceHandler extends ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceImplBase implements GRPCHandler {
private final Logger logger = LoggerFactory.getLogger(ServiceNameDiscoveryServiceHandler.class);
private ServiceNameService serviceNameService = new ServiceNameService();
@Override public void discovery(ServiceNameCollection request,
StreamObserver<ServiceNameMappingCollection> responseObserver) {
List<ServiceNameElement> serviceNameElementList = request.getElementsList();
ServiceNameMappingCollection.Builder builder = ServiceNameMappingCollection.newBuilder();
for (ServiceNameElement serviceNameElement : serviceNameElementList) {
int applicationId = serviceNameElement.getApplicationId();
String serviceName = serviceNameElement.getServiceName();
int serviceId = serviceNameService.getOrCreate(applicationId, serviceName);
ServiceNameMappingElement.Builder mappingElement = ServiceNameMappingElement.newBuilder();
mappingElement.setServiceId(serviceId);
mappingElement.setElement(serviceNameElement);
builder.addElements(mappingElement);
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
}
package org.skywalking.apm.collector.agentregister.instance;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterRemoteWorker;
import org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.instance.dao.IInstanceDAO;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class InstanceIDService {
private final Logger logger = LoggerFactory.getLogger(InstanceIDService.class);
public int getOrCreate(int applicationId, String agentUUID, long registerTime) {
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
int instanceId = dao.getInstanceId(applicationId, agentUUID);
if (instanceId == 0) {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
InstanceDataDefine.Instance instance = new InstanceDataDefine.Instance("0", applicationId, agentUUID, registerTime, 0);
try {
context.getClusterWorkerContext().lookup(ApplicationRegisterRemoteWorker.WorkerRole.INSTANCE).tell(instance);
} catch (WorkerNotFoundException | WorkerInvokeException e) {
logger.error(e.getMessage(), e);
}
}
return applicationId;
}
public void heartBeat(int instanceId, long heartbeatTime) {
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
dao.updateHeartbeatTime(instanceId, heartbeatTime);
}
public void recover(int instanceId, int applicationId, long registerTime) {
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
InstanceDataDefine.Instance instance = new InstanceDataDefine.Instance(String.valueOf(instanceId), applicationId, "", registerTime, instanceId);
dao.save(instance);
}
}
package org.skywalking.apm.collector.agentregister.servicename;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameRegisterRemoteWorker;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.IServiceNameDAO;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceNameService {
private final Logger logger = LoggerFactory.getLogger(ServiceNameService.class);
public int getOrCreate(int applicationId, String serviceName) {
IServiceNameDAO dao = (IServiceNameDAO)DAOContainer.INSTANCE.get(IServiceNameDAO.class.getName());
int serviceId = dao.getServiceId(applicationId, serviceName);
if (serviceId == 0) {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
ServiceNameDataDefine.ServiceName service = new ServiceNameDataDefine.ServiceName("0", serviceName, applicationId, 0);
try {
context.getClusterWorkerContext().lookup(ServiceNameRegisterRemoteWorker.WorkerRole.INSTANCE).tell(service);
} catch (WorkerNotFoundException | WorkerInvokeException e) {
logger.error(e.getMessage(), e);
}
}
return serviceId;
}
}
org.skywalking.apm.collector.agentregister.AgentRegisterModuleGroupDefine
\ No newline at end of file
org.skywalking.apm.collector.agentregister.grpc.AgentRegisterGRPCModuleDefine
\ No newline at end of file
package org.skywalking.apm.collector.agentregister.grpc.handler;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.skywalking.apm.network.proto.Application;
import org.skywalking.apm.network.proto.ApplicationMapping;
import org.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
/**
* @author pengys5
*/
public class ApplicationRegisterServiceHandlerTestCase {
private ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub stub;
public void testRegister() {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build();
stub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);
Application application = Application.newBuilder().addApplicationCode("test141").build();
ApplicationMapping mapping = stub.register(application);
System.out.println(mapping.getApplication(0).getKey() + ", " + mapping.getApplication(0).getValue());
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-agentserver</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-cluster</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-agentstream</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-agentregister</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-ui</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.agentserver;
import org.skywalking.apm.collector.core.framework.Context;
/**
* @author pengys5
*/
public class AgentServerModuleContext extends Context {
public AgentServerModuleContext(String groupName) {
super(groupName);
}
}
package org.skywalking.apm.collector.agentserver;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class AgentServerModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(AgentServerModuleDefine.class);
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
try {
configParser().parse(config);
Server server = server();
serverHolder.holdServer(server, handlerList());
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) {
throw new AgentServerModuleException(e.getMessage(), e);
}
}
@Override protected final Client createClient(DataMonitor dataMonitor) {
throw new UnsupportedOperationException("");
}
public abstract List<Handler> handlerList();
}
package org.skywalking.apm.collector.agentserver;
import org.skywalking.apm.collector.core.module.ModuleException;
/**
* @author pengys5
*/
public class AgentServerModuleException extends ModuleException {
public AgentServerModuleException(String message) {
super(message);
}
public AgentServerModuleException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.agentserver;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
/**
* @author pengys5
*/
public class AgentServerModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "agent_server";
@Override public String name() {
return GROUP_NAME;
}
@Override public Context groupContext() {
return new AgentServerModuleContext(GROUP_NAME);
}
@Override public ModuleInstaller moduleInstaller() {
return new AgentServerModuleInstaller();
}
}
package org.skywalking.apm.collector.agentserver;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class AgentServerModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(AgentServerModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning agent server module install");
AgentServerModuleContext context = new AgentServerModuleContext(AgentServerModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
logger.info("could not configure agent server module, use the default");
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder);
}
}
}
package org.skywalking.apm.collector.agentserver.jetty;
/**
* @author pengys5
*/
public class AgentServerJettyConfig {
public static String HOST;
public static int PORT;
public static String CONTEXT_PATH;
}
package org.skywalking.apm.collector.agentserver.jetty;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class AgentServerJettyConfigParser implements ModuleConfigParser {
private static final String HOST = "host";
private static final String PORT = "port";
public static final String CONTEXT_PATH = "contextPath";
@Override public void parse(Map config) throws ConfigParseException {
AgentServerJettyConfig.CONTEXT_PATH = "/";
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(HOST))) {
AgentServerJettyConfig.HOST = "localhost";
} else {
AgentServerJettyConfig.HOST = (String)config.get(HOST);
}
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(PORT))) {
AgentServerJettyConfig.PORT = 10800;
} else {
AgentServerJettyConfig.PORT = (Integer)config.get(PORT);
}
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(CONTEXT_PATH))) {
AgentServerJettyConfig.CONTEXT_PATH = (String)config.get(CONTEXT_PATH);
}
}
}
package org.skywalking.apm.collector.agentserver.jetty;
import org.skywalking.apm.collector.agentserver.AgentServerModuleGroupDefine;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
/**
* @author pengys5
*/
public class AgentServerJettyDataListener extends ClusterDataListener {
@Override public String path() {
return ClusterModuleDefine.BASE_CATALOG + "." + AgentServerModuleGroupDefine.GROUP_NAME + "." + AgentServerJettyModuleDefine.MODULE_NAME;
}
@Override public void addressChangedNotify() {
}
}
package org.skywalking.apm.collector.agentserver.jetty;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.agentserver.AgentServerModuleDefine;
import org.skywalking.apm.collector.agentserver.AgentServerModuleGroupDefine;
import org.skywalking.apm.collector.agentserver.jetty.handler.AgentStreamGRPCServerHandler;
import org.skywalking.apm.collector.agentserver.jetty.handler.AgentStreamJettyServerHandler;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.server.jetty.JettyServer;
/**
* @author pengys5
*/
public class AgentServerJettyModuleDefine extends AgentServerModuleDefine {
public static final String MODULE_NAME = "jetty";
@Override protected String group() {
return AgentServerModuleGroupDefine.GROUP_NAME;
}
@Override public String name() {
return MODULE_NAME;
}
@Override public boolean defaultModule() {
return true;
}
@Override protected ModuleConfigParser configParser() {
return new AgentServerJettyConfigParser();
}
@Override protected Server server() {
return new JettyServer(AgentServerJettyConfig.HOST, AgentServerJettyConfig.PORT, AgentServerJettyConfig.CONTEXT_PATH);
}
@Override protected ModuleRegistration registration() {
return new AgentServerJettyModuleRegistration();
}
@Override public ClusterDataListener listener() {
return new AgentServerJettyDataListener();
}
@Override public List<Handler> handlerList() {
List<Handler> handlers = new LinkedList<>();
handlers.add(new AgentStreamGRPCServerHandler());
handlers.add(new AgentStreamJettyServerHandler());
return handlers;
}
}
package org.skywalking.apm.collector.agentserver.jetty;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public class AgentServerJettyModuleRegistration extends ModuleRegistration {
@Override public Value buildValue() {
return new Value(AgentServerJettyConfig.HOST, AgentServerJettyConfig.PORT, AgentServerJettyConfig.CONTEXT_PATH);
}
}
package org.skywalking.apm.collector.agentserver.jetty.handler;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import java.util.List;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agentstream.grpc.AgentStreamGRPCDataListener;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
/**
* @author pengys5
*/
public class AgentStreamGRPCServerHandler extends JettyHandler {
@Override public String pathSpec() {
return "/agentstream/grpc";
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader();
List<String> servers = reader.read(AgentStreamGRPCDataListener.PATH);
JsonArray serverArray = new JsonArray();
servers.forEach(server -> {
serverArray.add(server);
});
return serverArray;
}
}
package org.skywalking.apm.collector.agentserver.jetty.handler;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import java.util.List;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agentstream.jetty.AgentStreamJettyDataListener;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
/**
* @author pengys5
*/
public class AgentStreamJettyServerHandler extends JettyHandler {
@Override public String pathSpec() {
return "/agentstream/jetty";
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader();
List<String> servers = reader.read(AgentStreamJettyDataListener.PATH);
JsonArray serverArray = new JsonArray();
servers.forEach(server -> {
serverArray.add(server);
});
return serverArray;
}
}
org.skywalking.apm.collector.agentserver.AgentServerModuleGroupDefine
\ No newline at end of file
org.skywalking.apm.collector.agentserver.jetty.AgentServerJettyModuleDefine
\ No newline at end of file
......@@ -15,7 +15,12 @@
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<artifactId>apm-collector-stream</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-cluster</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
......@@ -23,5 +28,10 @@
<artifactId>apm-collector-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-storage</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.agentstream;
import org.skywalking.apm.collector.core.framework.Context;
/**
* @author pengys5
*/
public class AgentStreamModuleContext extends Context {
public AgentStreamModuleContext(String groupName) {
super(groupName);
}
}
package org.skywalking.apm.collector.core.agentstream;
package org.skywalking.apm.collector.agentstream;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.DataInitializer;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class AgentStreamModuleDefine extends ModuleDefine {
public abstract class AgentStreamModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
@Override public final void initialize(Map config) throws DefineException, ClientException {
private final Logger logger = LoggerFactory.getLogger(AgentStreamModuleDefine.class);
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
try {
configParser().parse(config);
Server server = server();
server.initialize();
serverHolder.holdServer(server, handlerList());
String key = ClusterDataInitializer.BASE_CATALOG + "." + name();
ClusterModuleContext.WRITER.write(key, registration().buildValue());
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) {
throw new AgentStreamModuleException(e.getMessage(), e);
}
}
@Override protected final DataInitializer dataInitializer() {
@Override protected final Client createClient(DataMonitor dataMonitor) {
throw new UnsupportedOperationException("");
}
@Override protected final Client createClient() {
throw new UnsupportedOperationException("");
@Override public final boolean defaultModule() {
return true;
}
public abstract List<Handler> handlerList();
}
package org.skywalking.apm.collector.agentstream;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
/**
* @author pengys5
*/
public class AgentStreamModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "agent_stream";
@Override public String name() {
return GROUP_NAME;
}
@Override public Context groupContext() {
return new AgentStreamModuleContext(GROUP_NAME);
}
@Override public ModuleInstaller moduleInstaller() {
return new AgentStreamModuleInstaller();
}
}
package org.skywalking.apm.collector.agentstream;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class AgentStreamModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(AgentStreamModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning agent stream module install");
AgentStreamModuleContext context = new AgentStreamModuleContext(AgentStreamModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder);
}
new PersistenceTimer().start();
}
}
package org.skywalking.apm.collector.agent.stream.server.grpc;
package org.skywalking.apm.collector.agentstream.grpc;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
......@@ -14,16 +15,16 @@ public class AgentStreamGRPCConfigParser implements ModuleConfigParser {
private static final String PORT = "port";
@Override public void parse(Map config) throws ConfigParseException {
AgentStreamGRPCConfig.HOST = (String)config.get(HOST);
if (StringUtils.isEmpty(AgentStreamGRPCConfig.HOST)) {
throw new ConfigParseException("");
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(HOST))) {
AgentStreamGRPCConfig.HOST = "localhost";
} else {
AgentStreamGRPCConfig.HOST = (String)config.get(HOST);
}
if (StringUtils.isEmpty(config.get(PORT))) {
throw new ConfigParseException("");
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(PORT))) {
AgentStreamGRPCConfig.PORT = 11800;
} else {
AgentStreamGRPCConfig.PORT = (Integer)config.get(PORT);
}
}
}
package org.skywalking.apm.collector.agentstream.grpc;
import org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
/**
* @author pengys5
*/
public class AgentStreamGRPCDataListener extends ClusterDataListener {
public static final String PATH = ClusterModuleDefine.BASE_CATALOG + "." + AgentStreamModuleGroupDefine.GROUP_NAME + "." + AgentStreamGRPCModuleDefine.MODULE_NAME;
@Override public String path() {
return PATH;
}
@Override public void addressChangedNotify() {
}
}
package org.skywalking.apm.collector.agent.stream.server.grpc;
import org.skywalking.apm.collector.core.agentstream.AgentStreamModuleDefine;
package org.skywalking.apm.collector.agentstream.grpc;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.AgentStreamModuleDefine;
import org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine;
import org.skywalking.apm.collector.agentstream.grpc.handler.TraceSegmentServiceHandler;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleGroup;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.server.grpc.GRPCServer;
......@@ -12,16 +17,14 @@ import org.skywalking.apm.collector.server.grpc.GRPCServer;
*/
public class AgentStreamGRPCModuleDefine extends AgentStreamModuleDefine {
@Override protected ModuleGroup group() {
return ModuleGroup.AgentStream;
}
public static final String MODULE_NAME = "grpc";
@Override public String name() {
return "grpc";
@Override protected String group() {
return AgentStreamModuleGroupDefine.GROUP_NAME;
}
@Override public boolean defaultModule() {
return true;
@Override public String name() {
return MODULE_NAME;
}
@Override protected ModuleConfigParser configParser() {
......@@ -35,4 +38,14 @@ public class AgentStreamGRPCModuleDefine extends AgentStreamModuleDefine {
@Override protected ModuleRegistration registration() {
return new AgentStreamGRPCModuleRegistration();
}
@Override public ClusterDataListener listener() {
return new AgentStreamGRPCDataListener();
}
@Override public List<Handler> handlerList() {
List<Handler> handlers = new LinkedList<>();
handlers.add(new TraceSegmentServiceHandler());
return handlers;
}
}
package org.skywalking.apm.collector.agentstream.grpc.handler;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.segment.SegmentParse;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.TraceSegmentServiceGrpc;
import org.skywalking.apm.network.proto.UniqueId;
import org.skywalking.apm.network.proto.UpstreamSegment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSegmentServiceImplBase implements GRPCHandler {
private final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceHandler.class);
@Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Downstream> responseObserver) {
return new StreamObserver<UpstreamSegment>() {
@Override public void onNext(UpstreamSegment segment) {
logger.debug("receive segment");
SegmentParse segmentParse = new SegmentParse();
try {
List<UniqueId> traceIds = segment.getGlobalTraceIdsList();
TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(segment.getSegment());
segmentParse.parse(traceIds, segmentObject);
} catch (InvalidProtocolBufferException e) {
logger.error(e.getMessage(), e);
}
}
@Override public void onError(Throwable throwable) {
logger.error(throwable.getMessage(), throwable);
}
@Override public void onCompleted() {
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}
};
}
}
package org.skywalking.apm.collector.agent.stream.server.jetty;
package org.skywalking.apm.collector.agentstream.jetty;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
......@@ -15,18 +16,20 @@ public class AgentStreamJettyConfigParser implements ModuleConfigParser {
public static final String CONTEXT_PATH = "contextPath";
@Override public void parse(Map config) throws ConfigParseException {
AgentStreamJettyConfig.HOST = (String)config.get(HOST);
AgentStreamJettyConfig.CONTEXT_PATH = "/";
if (StringUtils.isEmpty(AgentStreamJettyConfig.HOST)) {
throw new ConfigParseException("");
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(HOST))) {
AgentStreamJettyConfig.HOST = "localhost";
} else {
AgentStreamJettyConfig.HOST = (String)config.get(HOST);
}
if (StringUtils.isEmpty(config.get(PORT))) {
throw new ConfigParseException("");
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(PORT))) {
AgentStreamJettyConfig.PORT = 12800;
} else {
AgentStreamJettyConfig.PORT = (Integer)config.get(PORT);
}
if (!StringUtils.isEmpty(config.get(CONTEXT_PATH))) {
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(CONTEXT_PATH))) {
AgentStreamJettyConfig.CONTEXT_PATH = (String)config.get(CONTEXT_PATH);
}
}
......
package org.skywalking.apm.collector.agentstream.jetty;
import org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
/**
* @author pengys5
*/
public class AgentStreamJettyDataListener extends ClusterDataListener {
public static final String PATH = ClusterModuleDefine.BASE_CATALOG + "." + AgentStreamModuleGroupDefine.GROUP_NAME + "." + AgentStreamJettyModuleDefine.MODULE_NAME;
@Override public String path() {
return PATH;
}
@Override public void addressChangedNotify() {
}
}
package org.skywalking.apm.collector.agent.stream.server.jetty;
package org.skywalking.apm.collector.agentstream.jetty;
import org.skywalking.apm.collector.core.agentstream.AgentStreamModuleDefine;
import java.util.List;
import org.skywalking.apm.collector.agentstream.AgentStreamModuleDefine;
import org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleGroup;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.server.jetty.JettyServer;
......@@ -12,16 +15,14 @@ import org.skywalking.apm.collector.server.jetty.JettyServer;
*/
public class AgentStreamJettyModuleDefine extends AgentStreamModuleDefine {
@Override protected ModuleGroup group() {
return ModuleGroup.AgentStream;
}
public static final String MODULE_NAME = "jetty";
@Override public String name() {
return "jetty";
@Override protected String group() {
return AgentStreamModuleGroupDefine.GROUP_NAME;
}
@Override public boolean defaultModule() {
return false;
@Override public String name() {
return MODULE_NAME;
}
@Override protected ModuleConfigParser configParser() {
......@@ -35,4 +36,12 @@ public class AgentStreamJettyModuleDefine extends AgentStreamModuleDefine {
@Override protected ModuleRegistration registration() {
return new AgentStreamJettyModuleRegistration();
}
@Override public ClusterDataListener listener() {
return new AgentStreamJettyDataListener();
}
@Override public List<Handler> handlerList() {
return null;
}
}
package org.skywalking.apm.collector.agent.stream.server.jetty;
package org.skywalking.apm.collector.agentstream.jetty;
import com.google.gson.JsonObject;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
......@@ -9,8 +8,6 @@ import org.skywalking.apm.collector.core.module.ModuleRegistration;
public class AgentStreamJettyModuleRegistration extends ModuleRegistration {
@Override public Value buildValue() {
JsonObject data = new JsonObject();
data.addProperty(AgentStreamJettyConfigParser.CONTEXT_PATH, AgentStreamJettyConfig.CONTEXT_PATH);
return new Value(AgentStreamJettyConfig.HOST, AgentStreamJettyConfig.PORT, data);
return new Value(AgentStreamJettyConfig.HOST, AgentStreamJettyConfig.PORT, AgentStreamJettyConfig.CONTEXT_PATH);
}
}
package org.skywalking.apm.collector.agentstream.worker;
import org.skywalking.apm.collector.core.framework.DefineException;
/**
* @author pengys5
*/
public class AgentStreamModuleDefineException extends DefineException {
public AgentStreamModuleDefineException(String message) {
super(message);
}
public AgentStreamModuleDefineException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.agentstream.worker;
/**
* @author pengys5
*/
public class CommonTable {
public static final String TABLE_TYPE = "type";
public static final String COLUMN_ID = "id";
public static final String COLUMN_AGG = "agg";
public static final String COLUMN_TIME_BUCKET = "time_bucket";
}
package org.skywalking.apm.collector.agentstream.worker;
/**
* @author pengys5
*/
public class Const {
public static final String ID_SPLIT = "..-..";
public static final String IDS_SPLIT = "\\.\\.-\\.\\.";
public static final String PEERS_FRONT_SPLIT = "[";
public static final String PEERS_BEHIND_SPLIT = "]";
public static final String USER_CODE = "User";
public static final String SEGMENT_SPAN_SPLIT = "S";
}
package org.skywalking.apm.collector.agentstream.worker.config;
/**
* @author pengys5
*/
public class CacheSizeConfig {
public static class Cache {
public static class Analysis {
public static int SIZE = 1024;
}
public static class Persistence {
public static int SIZE = 5000;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.config;
/**
* @author pengys5
*/
public class WorkerConfig {
public static class WorkerNum {
public static class Node {
public static class NodeCompAgg {
public static int VALUE = 2;
}
public static class NodeMappingDayAgg {
public static int VALUE = 2;
}
public static class NodeMappingHourAgg {
public static int VALUE = 2;
}
public static class NodeMappingMinuteAgg {
public static int VALUE = 2;
}
}
public static class NodeRef {
public static class NodeRefDayAgg {
public static int VALUE = 2;
}
public static class NodeRefHourAgg {
public static int VALUE = 2;
}
public static class NodeRefMinuteAgg {
public static int VALUE = 2;
}
public static class NodeRefResSumDayAgg {
public static int VALUE = 2;
}
public static class NodeRefResSumHourAgg {
public static int VALUE = 2;
}
public static class NodeRefResSumMinuteAgg {
public static int VALUE = 2;
}
}
public static class GlobalTrace {
public static class GlobalTraceAgg {
public static int VALUE = 2;
}
}
}
public static class Queue {
public static class GlobalTrace {
public static class GlobalTraceAnalysis {
public static int SIZE = 1024;
}
}
public static class Segment {
public static class SegmentAnalysis {
public static int SIZE = 1024;
}
public static class SegmentCostAnalysis {
public static int SIZE = 4096;
}
public static class SegmentExceptionAnalysis {
public static int SIZE = 4096;
}
}
public static class Node {
public static class NodeCompAnalysis {
public static int SIZE = 1024;
}
public static class NodeMappingDayAnalysis {
public static int SIZE = 1024;
}
public static class NodeMappingHourAnalysis {
public static int SIZE = 1024;
}
public static class NodeMappingMinuteAnalysis {
public static int SIZE = 1024;
}
}
public static class NodeRef {
public static class NodeRefDayAnalysis {
public static int SIZE = 1024;
}
public static class NodeRefHourAnalysis {
public static int SIZE = 1024;
}
public static class NodeRefMinuteAnalysis {
public static int SIZE = 1024;
}
public static class NodeRefResSumDayAnalysis {
public static int SIZE = 1024;
}
public static class NodeRefResSumHourAnalysis {
public static int SIZE = 1024;
}
public static class NodeRefResSumMinuteAnalysis {
public static int SIZE = 1024;
}
}
}
}
package org.skywalking.apm.collector.agentstream.worker.global;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.global.dao.IGlobalTraceDAO;
import org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class GlobalTracePersistenceWorker extends PersistenceWorker {
public GlobalTracePersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected List<?> prepareBatch(Map<String, Data> dataMap) {
IGlobalTraceDAO dao = (IGlobalTraceDAO)DAOContainer.INSTANCE.get(IGlobalTraceDAO.class.getName());
return dao.prepareBatch(dataMap);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<GlobalTracePersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public GlobalTracePersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new GlobalTracePersistenceWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return GlobalTracePersistenceWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
@Override public DataDefine dataDefine() {
return new GlobalTraceDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.global;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.GlobalTraceIdsListener;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.UniqueId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class GlobalTraceSpanListener implements FirstSpanListener, GlobalTraceIdsListener {
private final Logger logger = LoggerFactory.getLogger(GlobalTraceSpanListener.class);
private List<String> globalTraceIds = new ArrayList<>();
private String segmentId;
private long timeBucket;
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
this.timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
this.segmentId = segmentId;
}
@Override public void parseGlobalTraceId(UniqueId uniqueId) {
StringBuilder globalTraceIdBuilder = new StringBuilder();
uniqueId.getIdPartsList().forEach(globalTraceIdBuilder::append);
globalTraceIds.add(globalTraceIdBuilder.toString());
}
@Override public void build() {
logger.debug("global trace listener build");
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
for (String globalTraceId : globalTraceIds) {
GlobalTraceDataDefine.GlobalTrace globalTrace = new GlobalTraceDataDefine.GlobalTrace();
globalTrace.setGlobalTraceId(globalTraceId);
globalTrace.setId(segmentId + globalTraceId);
globalTrace.setSegmentId(segmentId);
globalTrace.setTimeBucket(timeBucket);
try {
logger.debug("send to global trace persistence worker, id: {}", globalTrace.getId());
context.getClusterWorkerContext().lookup(GlobalTracePersistenceWorker.WorkerRole.INSTANCE).tell(globalTrace.transform());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
}
}
\ No newline at end of file
package org.skywalking.apm.collector.agentstream.worker.global.dao;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class GlobalTraceEsDAO extends EsDAO implements IGlobalTraceDAO {
private final Logger logger = LoggerFactory.getLogger(GlobalTraceEsDAO.class);
@Override public List<?> prepareBatch(Map<String, Data> dataMap) {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
logger.debug("global trace prepareBatch, id: {}", id);
Map<String, Object> source = new HashMap();
source.put(GlobalTraceTable.COLUMN_SEGMENT_ID, data.getDataString(1));
source.put(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, data.getDataString(2));
source.put(GlobalTraceTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
logger.debug("global trace source: {}", source.toString());
IndexRequestBuilder builder = getClient().prepareIndex(GlobalTraceTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
}
}
package org.skywalking.apm.collector.agentstream.worker.global.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class GlobalTraceH2DAO extends H2DAO implements IGlobalTraceDAO {
@Override public List<?> prepareBatch(Map map) {
return null;
}
}
package org.skywalking.apm.collector.agentstream.worker.global.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public interface IGlobalTraceDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
}
package org.skywalking.apm.collector.agentstream.worker.global.define;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
/**
* @author pengys5
*/
public class GlobalTraceDataDefine extends DataDefine {
@Override public int defineId() {
return 403;
}
@Override protected int initialCapacity() {
return 4;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(GlobalTraceTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(GlobalTraceTable.COLUMN_SEGMENT_ID, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute(GlobalTraceTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
String id = remoteData.getDataStrings(0);
String segmentId = remoteData.getDataStrings(1);
String globalTraceId = remoteData.getDataStrings(2);
Long timeBucket = remoteData.getDataLongs(0);
return new GlobalTrace(id, segmentId, globalTraceId, timeBucket);
}
@Override public RemoteData serialize(Object object) {
GlobalTrace globalTrace = (GlobalTrace)object;
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(globalTrace.getId());
builder.addDataStrings(globalTrace.getSegmentId());
builder.addDataStrings(globalTrace.getGlobalTraceId());
builder.addDataLongs(globalTrace.getTimeBucket());
return builder.build();
}
public static class GlobalTrace implements TransformToData {
private String id;
private String segmentId;
private String globalTraceId;
private long timeBucket;
GlobalTrace(String id, String segmentId, String globalTraceId, long timeBucket) {
this.id = id;
this.segmentId = segmentId;
this.globalTraceId = globalTraceId;
this.timeBucket = timeBucket;
}
public GlobalTrace() {
}
@Override public Data transform() {
GlobalTraceDataDefine define = new GlobalTraceDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataString(1, this.segmentId);
data.setDataString(2, this.globalTraceId);
data.setDataLong(0, this.timeBucket);
return data;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getSegmentId() {
return segmentId;
}
public void setSegmentId(String segmentId) {
this.segmentId = segmentId;
}
public String getGlobalTraceId() {
return globalTraceId;
}
public void setGlobalTraceId(String globalTraceId) {
this.globalTraceId = globalTraceId;
}
public long getTimeBucket() {
return timeBucket;
}
public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.global.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class GlobalTraceEsTableDefine extends ElasticSearchTableDefine {
public GlobalTraceEsTableDefine() {
super(GlobalTraceTable.TABLE);
}
@Override public int refreshInterval() {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(GlobalTraceTable.COLUMN_SEGMENT_ID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(GlobalTraceTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.global.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class GlobalTraceH2TableDefine extends H2TableDefine {
public GlobalTraceH2TableDefine() {
super(GlobalTraceTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(GlobalTraceTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(GlobalTraceTable.COLUMN_SEGMENT_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(GlobalTraceTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.global.define;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
/**
* @author pengys5
*/
public class GlobalTraceTable extends CommonTable {
public static final String TABLE = "global_trace";
public static final String COLUMN_SEGMENT_ID = "segment_id";
public static final String COLUMN_GLOBAL_TRACE_ID = "global_trace_id";
}
package org.skywalking.apm.collector.agentstream.worker.node.component;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.WorkerRefs;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class NodeComponentAggregationWorker extends AggregationWorker {
public NodeComponentAggregationWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected WorkerRefs nextWorkRef(String id) throws WorkerNotFoundException {
return getClusterContext().lookup(NodeComponentRemoteWorker.WorkerRole.INSTANCE);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeComponentAggregationWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeComponentAggregationWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeComponentAggregationWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeComponentAggregationWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new NodeComponentDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.node.component;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.node.component.dao.INodeComponentDAO;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class NodeComponentPersistenceWorker extends PersistenceWorker {
public NodeComponentPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected List<?> prepareBatch(Map<String, Data> dataMap) {
INodeComponentDAO dao = (INodeComponentDAO)DAOContainer.INSTANCE.get(INodeComponentDAO.class.getName());
return dao.prepareBatch(dataMap);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeComponentPersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeComponentPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeComponentPersistenceWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeComponentPersistenceWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new NodeComponentDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.node.component;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class NodeComponentRemoteWorker extends AbstractRemoteWorker {
protected NodeComponentRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
}
@Override protected void onWork(Object message) throws WorkerException {
getClusterContext().lookup(NodeComponentPersistenceWorker.WorkerRole.INSTANCE).tell(message);
}
public static class Factory extends AbstractRemoteWorkerProvider<NodeComponentRemoteWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeComponentRemoteWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeComponentRemoteWorker(role(), clusterContext);
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeComponentRemoteWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new NodeComponentDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.node.component;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanListener, FirstSpanListener {
private final Logger logger = LoggerFactory.getLogger(NodeComponentSpanListener.class);
private List<String> nodeComponents = new ArrayList<>();
private long timeBucket;
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String peers = spanObject.getPeer();
if (spanObject.getPeerId() == 0) {
peers = String.valueOf(spanObject.getPeerId());
}
String agg = spanObject.getComponentId() + Const.ID_SPLIT + peers;
nodeComponents.add(agg);
}
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String peers = String.valueOf(applicationId);
String agg = spanObject.getComponentId() + Const.ID_SPLIT + peers;
nodeComponents.add(agg);
}
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
}
@Override public void build() {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
for (String agg : nodeComponents) {
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.setId(timeBucket + Const.ID_SPLIT + agg);
nodeComponent.setAgg(agg);
nodeComponent.setTimeBucket(timeBucket);
try {
logger.debug("send to node component aggregation worker, id: {}", nodeComponent.getId());
context.getClusterWorkerContext().lookup(NodeComponentAggregationWorker.WorkerRole.INSTANCE).tell(nodeComponent.transform());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
}
}
package org.skywalking.apm.collector.agentstream.worker.node.component.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public interface INodeComponentDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
}
package org.skywalking.apm.collector.agentstream.worker.node.component.dao;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public class NodeComponentEsDAO extends EsDAO implements INodeComponentDAO {
@Override public List<?> prepareBatch(Map<String, Data> dataMap) {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
Map<String, Object> source = new HashMap();
source.put(NodeComponentTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
IndexRequestBuilder builder = getClient().prepareIndex(NodeComponentTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
}
}
package org.skywalking.apm.collector.agentstream.worker.node.component.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class NodeComponentH2DAO extends H2DAO implements INodeComponentDAO {
@Override public List<?> prepareBatch(Map map) {
return null;
}
}
package org.skywalking.apm.collector.agentstream.worker.node.component.define;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
/**
* @author pengys5
*/
public class NodeComponentDataDefine extends DataDefine {
@Override public int defineId() {
return 101;
}
@Override protected int initialCapacity() {
return 3;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(NodeComponentTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(NodeComponentTable.COLUMN_AGG, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(NodeComponentTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
return null;
}
@Override public RemoteData serialize(Object object) {
return null;
}
public static class NodeComponent implements TransformToData {
private String id;
private String agg;
private long timeBucket;
public NodeComponent(String id, String agg, long timeBucket) {
this.id = id;
this.agg = agg;
this.timeBucket = timeBucket;
}
public NodeComponent() {
}
@Override public Data transform() {
NodeComponentDataDefine define = new NodeComponentDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataString(1, this.agg);
data.setDataLong(0, this.timeBucket);
return data;
}
public String getId() {
return id;
}
public String getAgg() {
return agg;
}
public long getTimeBucket() {
return timeBucket;
}
public void setId(String id) {
this.id = id;
}
public void setAgg(String agg) {
this.agg = agg;
}
public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.node.component.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class NodeComponentEsTableDefine extends ElasticSearchTableDefine {
public NodeComponentEsTableDefine() {
super(NodeComponentTable.TABLE);
}
@Override public int refreshInterval() {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_AGG, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.node.component.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class NodeComponentH2TableDefine extends H2TableDefine {
public NodeComponentH2TableDefine() {
super(NodeComponentTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_AGG, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.node.component.define;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
/**
* @author pengys5
*/
public class NodeComponentTable extends CommonTable {
public static final String TABLE = "node_component";
}
package org.skywalking.apm.collector.agentstream.worker.node.mapping;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.WorkerRefs;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class NodeMappingAggregationWorker extends AggregationWorker {
public NodeMappingAggregationWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected WorkerRefs nextWorkRef(String id) throws WorkerNotFoundException {
return getClusterContext().lookup(NodeMappingRemoteWorker.WorkerRole.INSTANCE);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeMappingAggregationWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeMappingAggregationWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeMappingAggregationWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeMappingAggregationWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new NodeMappingDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.node.mapping;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.dao.INodeMappingDAO;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class NodeMappingPersistenceWorker extends PersistenceWorker {
public NodeMappingPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected List<?> prepareBatch(Map<String, Data> dataMap) {
INodeMappingDAO dao = (INodeMappingDAO)DAOContainer.INSTANCE.get(INodeMappingDAO.class.getName());
return dao.prepareBatch(dataMap);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeMappingPersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeMappingPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeMappingPersistenceWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeMappingPersistenceWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new NodeMappingDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.node.mapping;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class NodeMappingRemoteWorker extends AbstractRemoteWorker {
protected NodeMappingRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
}
@Override protected void onWork(Object message) throws WorkerException {
getClusterContext().lookup(NodeMappingPersistenceWorker.WorkerRole.INSTANCE).tell(message);
}
public static class Factory extends AbstractRemoteWorkerProvider<NodeMappingRemoteWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeMappingRemoteWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeMappingRemoteWorker(role(), clusterContext);
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeMappingRemoteWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new NodeMappingDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.node.mapping;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class NodeMappingSpanListener implements RefsListener, FirstSpanListener {
private final Logger logger = LoggerFactory.getLogger(NodeMappingSpanListener.class);
private List<String> nodeMappings = new ArrayList<>();
private long timeBucket;
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
String segmentId) {
logger.debug("node mapping listener parse reference");
String peers = Const.PEERS_FRONT_SPLIT + reference.getNetworkAddressId() + Const.PEERS_BEHIND_SPLIT;
if (reference.getNetworkAddressId() == 0) {
peers = Const.PEERS_FRONT_SPLIT + reference.getNetworkAddress() + Const.PEERS_BEHIND_SPLIT;
}
String agg = applicationId + Const.ID_SPLIT + peers;
nodeMappings.add(agg);
}
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
}
@Override public void build() {
logger.debug("node mapping listener build");
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
for (String agg : nodeMappings) {
NodeMappingDataDefine.NodeMapping nodeMapping = new NodeMappingDataDefine.NodeMapping();
nodeMapping.setId(timeBucket + Const.ID_SPLIT + agg);
nodeMapping.setAgg(agg);
nodeMapping.setTimeBucket(timeBucket);
try {
logger.debug("send to node mapping aggregation worker, id: {}", nodeMapping.getId());
context.getClusterWorkerContext().lookup(NodeMappingAggregationWorker.WorkerRole.INSTANCE).tell(nodeMapping.transform());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
}
}
package org.skywalking.apm.collector.agentstream.worker.node.mapping.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public interface INodeMappingDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
}
package org.skywalking.apm.collector.agentstream.worker.node.mapping.dao;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public class NodeMappingEsDAO extends EsDAO implements INodeMappingDAO {
@Override public List<?> prepareBatch(Map<String, Data> dataMap) {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
Map<String, Object> source = new HashMap();
source.put(NodeMappingTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeMappingTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
IndexRequestBuilder builder = getClient().prepareIndex(NodeMappingTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
}
}
package org.skywalking.apm.collector.agentstream.worker.node.mapping.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class NodeMappingH2DAO extends H2DAO implements INodeMappingDAO {
@Override public List<?> prepareBatch(Map map) {
return null;
}
}
package org.skywalking.apm.collector.agentstream.worker.node.mapping.define;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
/**
* @author pengys5
*/
public class NodeMappingDataDefine extends DataDefine {
@Override public int defineId() {
return 102;
}
@Override protected int initialCapacity() {
return 3;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(NodeMappingTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(NodeMappingTable.COLUMN_AGG, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(NodeMappingTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
return null;
}
@Override public RemoteData serialize(Object object) {
return null;
}
public static class NodeMapping implements TransformToData {
private String id;
private String agg;
private long timeBucket;
public NodeMapping(String id, String agg, long timeBucket) {
this.id = id;
this.agg = agg;
this.timeBucket = timeBucket;
}
public NodeMapping() {
}
@Override public Data transform() {
NodeMappingDataDefine define = new NodeMappingDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataString(1, this.agg);
data.setDataLong(0, this.timeBucket);
return data;
}
public String getId() {
return id;
}
public String getAgg() {
return agg;
}
public long getTimeBucket() {
return timeBucket;
}
public void setId(String id) {
this.id = id;
}
public void setAgg(String agg) {
this.agg = agg;
}
public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
}
}
}
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册