提交 dd6ae268 编写于 作者: P pengys5

1. remote worker use common service, send data object with bytes and data...

1. remote worker use common service, send data object with bytes and data define id, use define id to find the grpc deserialize object.
2. merge the metric data and record data to data object.
上级 2042709b
package org.skywalking.apm.collector.agentstream.worker;
/**
* @author pengys5
*/
public class CommonTable {
public static final String COLUMN_AGG = "agg";
public static final String COLUMN_TIME_BUCKET = "time_bucket";
}
package org.skywalking.apm.collector.agentstream.worker;
/**
* @author pengys5
*/
public abstract class TimeSlice {
private String sliceType;
private long startTime;
private long endTime;
public TimeSlice(String sliceType, long startTime, long endTime) {
this.startTime = startTime;
this.endTime = endTime;
this.sliceType = sliceType;
}
public String getSliceType() {
return sliceType;
}
public long getStartTime() {
return startTime;
}
public long getEndTime() {
return endTime;
}
}
package org.skywalking.apm.collector.agentstream.worker.config;
/**
* @author pengys5
*/
public class CacheSizeConfig {
public static class Cache {
public static class Analysis {
public static int SIZE = 1024;
}
public static class Persistence {
public static int SIZE = 5000;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.config;
/**
* @author pengys5
*/
public class WorkerConfig {
public static class WorkerNum {
public static class Node {
public static class NodeCompAgg {
public static int VALUE = 2;
}
public static class NodeMappingDayAgg {
public static int VALUE = 2;
}
public static class NodeMappingHourAgg {
public static int VALUE = 2;
}
public static class NodeMappingMinuteAgg {
public static int VALUE = 2;
}
}
public static class NodeRef {
public static class NodeRefDayAgg {
public static int VALUE = 2;
}
public static class NodeRefHourAgg {
public static int VALUE = 2;
}
public static class NodeRefMinuteAgg {
public static int VALUE = 2;
}
public static class NodeRefResSumDayAgg {
public static int VALUE = 2;
}
public static class NodeRefResSumHourAgg {
public static int VALUE = 2;
}
public static class NodeRefResSumMinuteAgg {
public static int VALUE = 2;
}
}
public static class GlobalTrace {
public static class GlobalTraceAgg {
public static int VALUE = 2;
}
}
}
public static class Queue {
public static class GlobalTrace {
public static class GlobalTraceAnalysis {
public static int SIZE = 1024;
}
}
public static class Segment {
public static class SegmentAnalysis {
public static int SIZE = 1024;
}
public static class SegmentCostAnalysis {
public static int SIZE = 4096;
}
public static class SegmentExceptionAnalysis {
public static int SIZE = 4096;
}
}
public static class Node {
public static class NodeCompAnalysis {
public static int SIZE = 1024;
}
public static class NodeMappingDayAnalysis {
public static int SIZE = 1024;
}
public static class NodeMappingHourAnalysis {
public static int SIZE = 1024;
}
public static class NodeMappingMinuteAnalysis {
public static int SIZE = 1024;
}
}
public static class NodeRef {
public static class NodeRefDayAnalysis {
public static int SIZE = 1024;
}
public static class NodeRefHourAnalysis {
public static int SIZE = 1024;
}
public static class NodeRefMinuteAnalysis {
public static int SIZE = 1024;
}
public static class NodeRefResSumDayAnalysis {
public static int SIZE = 1024;
}
public static class NodeRefResSumHourAnalysis {
public static int SIZE = 1024;
}
public static class NodeRefResSumMinuteAnalysis {
public static int SIZE = 1024;
}
}
}
}
package org.skywalking.apm.collector.agentstream.worker.node.aggregation;
import org.skywalking.apm.collector.stream.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.LocalWorkerContext;
import org.skywalking.apm.collector.stream.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.Role;
import org.skywalking.apm.collector.stream.impl.AggregationWorker;
/**
* @author pengys5
*/
public class NodeComponentAggDayWorker extends AggregationWorker {
public NodeComponentAggDayWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected void sendToNext() {
}
}
package org.skywalking.apm.collector.agentstream.worker.node.define;
import org.skywalking.apm.collector.stream.impl.data.Attribute;
import org.skywalking.apm.collector.stream.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.impl.data.operate.NonOperation;
/**
* @author pengys5
*/
public class NodeComponentDataDefine extends DataDefine {
@Override protected int defineId() {
return 0;
}
@Override protected int initialCapacity() {
return 4;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute("id", AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute("name", AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute("peers", AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute("aggregation", AttributeType.STRING, new CoverOperation()));
}
}
package org.skywalking.apm.collector.agentstream.worker.node.define;
import org.skywalking.apm.collector.agentstream.worker.node.NodeComponentTable;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
......@@ -28,5 +27,6 @@ public class NodeComponentEsTableDefine extends ElasticSearchTableDefine {
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_PEERS, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_AGG, ElasticSearchColumnDefine.Type.Keyword.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.node.define;
import org.skywalking.apm.collector.agentstream.worker.node.NodeComponentTable;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
......@@ -16,5 +15,6 @@ public class NodeComponentH2TableDefine extends H2TableDefine {
@Override public void initialize() {
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_NAME, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_PEERS, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_AGG, H2ColumnDefine.Type.Varchar.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.node;
package org.skywalking.apm.collector.agentstream.worker.node.define;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
/**
* @author pengys5
*/
public class NodeComponentTable {
public class NodeComponentTable extends CommonTable {
public static final String TABLE = "node_component";
public static final String COLUMN_NAME = "name";
public static final String COLUMN_PEERS = "peers";
......
package org.skywalking.apm.collector.agentstream.worker.node.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class NodeMappingEsTableDefine extends ElasticSearchTableDefine {
public NodeMappingEsTableDefine() {
super(NodeMappingTable.TABLE);
}
@Override public int refreshInterval() {
return 0;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(NodeMappingTable.COLUMN_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeMappingTable.COLUMN_PEERS, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeMappingTable.COLUMN_AGG, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeMappingTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.node.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class NodeMappingH2TableDefine extends H2TableDefine {
public NodeMappingH2TableDefine() {
super(NodeMappingTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(NodeMappingTable.COLUMN_NAME, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeMappingTable.COLUMN_PEERS, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeMappingTable.COLUMN_AGG, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeMappingTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.node.define;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
/**
* @author pengys5
*/
public class NodeMappingTable extends CommonTable {
public static final String TABLE = "node_mapping";
public static final String COLUMN_NAME = "name";
public static final String COLUMN_PEERS = "peers";
}
syntax = "proto3";
option java_multiple_files = true;
option java_package = "org.skywalking.apm.collector.agentstream.worker.node.define.proto";
message Message {
string id = 1;
string name = 2;
string peers = 3;
string aggregation = 4;
}
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.node.define.NodeComponentDataDefine
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.node.define.NodeComponentEsTableDefine
org.skywalking.apm.collector.agentstream.worker.node.define.NodeComponentH2TableDefine
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.node.define.NodeComponentH2TableDefine
org.skywalking.apm.collector.agentstream.worker.node.define.NodeMappingEsTableDefine
org.skywalking.apm.collector.agentstream.worker.node.define.NodeMappingH2TableDefine
\ No newline at end of file
......@@ -37,6 +37,10 @@
<artifactId>snakeyaml</artifactId>
<groupId>org.yaml</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-api</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
......@@ -52,6 +56,10 @@
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
......
......@@ -7,6 +7,7 @@ import java.util.List;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
......@@ -99,4 +100,8 @@ public class ElasticSearchClient implements Client {
IndicesExistsResponse response = adminClient.prepareExists(indexName).get();
return response.isExists();
}
public IndexRequestBuilder prepareIndex(String indexName) {
return null;
}
}
package org.skywalking.apm.collector.core.storage;
import java.util.List;
import java.util.Map;
/**
* @author pengys5
*/
public abstract class StorageBatchBuilder<B, C, D> {
public abstract List<B> build(C client, Map<String, D> lastData);
}
......@@ -29,6 +29,11 @@
<artifactId>apm-collector-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-cluster</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
......
package org.skywalking.apm.collector.remote;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class RemoteModuleDefine extends ModuleDefine {
public abstract class RemoteModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(RemoteModuleDefine.class);
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
try {
configParser().parse(config);
Server server = server();
serverHolder.holdServer(server, handlerList());
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) {
throw new RemoteModuleException(e.getMessage(), e);
}
}
public abstract List<Handler> handlerList() throws DefineException;
}
package org.skywalking.apm.collector.remote;
import org.skywalking.apm.collector.core.module.ModuleException;
/**
* @author pengys5
*/
public class RemoteModuleException extends ModuleException {
public RemoteModuleException(String message) {
super(message);
}
public RemoteModuleException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.remote.grpc;
/**
* @author pengys5
*/
public class RemoteGRPCConfig {
public static String HOST;
public static int PORT;
}
package org.skywalking.apm.collector.remote.grpc;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class RemoteGRPCConfigParser implements ModuleConfigParser {
private static final String HOST = "host";
private static final String PORT = "port";
@Override public void parse(Map config) throws ConfigParseException {
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(HOST))) {
RemoteGRPCConfig.HOST = "localhost";
}
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(PORT))) {
RemoteGRPCConfig.PORT = 11800;
} else {
RemoteGRPCConfig.PORT = (Integer)config.get(PORT);
}
}
}
package org.skywalking.apm.collector.remote.grpc;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.remote.RemoteModuleGroupDefine;
/**
* @author pengys5
*/
public class RemoteGRPCDataListener extends ClusterDataListener {
public static final String PATH = ClusterModuleDefine.BASE_CATALOG + "." + RemoteModuleGroupDefine.GROUP_NAME + "." + RemoteGRPCModuleDefine.MODULE_NAME;
@Override public String path() {
return PATH;
}
}
package org.skywalking.apm.collector.remote.grpc;
import java.util.Map;
import java.util.List;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.remote.RemoteModuleDefine;
import org.skywalking.apm.collector.remote.RemoteModuleGroupDefine;
import org.skywalking.apm.collector.remote.grpc.handler.RemoteHandlerDefineException;
import org.skywalking.apm.collector.remote.grpc.handler.RemoteHandlerDefineLoader;
import org.skywalking.apm.collector.server.grpc.GRPCServer;
/**
* @author pengys5
*/
public class RemoteGRPCModuleDefine extends RemoteModuleDefine {
public static final String MODULE_NAME = "remote";
@Override public String name() {
return MODULE_NAME;
}
@Override protected String group() {
return RemoteModuleGroupDefine.GROUP_NAME;
}
@Override public boolean defaultModule() {
return false;
return true;
}
@Override protected ModuleConfigParser configParser() {
return null;
return new RemoteGRPCConfigParser();
}
@Override protected Client createClient(DataMonitor dataMonitor) {
......@@ -33,18 +44,25 @@ public class RemoteGRPCModuleDefine extends RemoteModuleDefine {
}
@Override protected Server server() {
return null;
return new GRPCServer(RemoteGRPCConfig.HOST, RemoteGRPCConfig.PORT);
}
@Override protected ModuleRegistration registration() {
return null;
return new RemoteGRPCModuleRegistration();
}
@Override public void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
@Override public ClusterDataListener listener() {
return new RemoteGRPCDataListener();
}
@Override public String name() {
return null;
@Override public List<Handler> handlerList() throws DefineException {
RemoteHandlerDefineLoader loader = new RemoteHandlerDefineLoader();
List<Handler> handlers = null;
try {
handlers = loader.load();
} catch (ConfigException e) {
throw new RemoteHandlerDefineException(e.getMessage(), e);
}
return handlers;
}
}
package org.skywalking.apm.collector.remote.grpc;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public class RemoteGRPCModuleRegistration extends ModuleRegistration {
@Override public Value buildValue() {
return new Value(RemoteGRPCConfig.HOST, RemoteGRPCConfig.PORT, null);
}
}
package org.skywalking.apm.collector.remote.grpc.handler;
import org.skywalking.apm.collector.core.framework.DefineException;
/**
* @author pengys5
*/
public class RemoteHandlerDefineException extends DefineException {
public RemoteHandlerDefineException(String message) {
super(message);
}
public RemoteHandlerDefineException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.remote.grpc.handler;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.framework.Loader;
import org.skywalking.apm.collector.core.util.DefinitionLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class RemoteHandlerDefineLoader implements Loader<List<Handler>> {
private final Logger logger = LoggerFactory.getLogger(RemoteHandlerDefineLoader.class);
@Override public List<Handler> load() throws ConfigException {
List<Handler> handlers = new ArrayList<>();
RemoteHandlerDefinitionFile definitionFile = new RemoteHandlerDefinitionFile();
DefinitionLoader<Handler> definitionLoader = DefinitionLoader.load(Handler.class, definitionFile);
for (Handler handler : definitionLoader) {
logger.info("loaded remote handler definition class: {}", handler.getClass().getName());
handlers.add(handler);
}
return handlers;
}
}
package org.skywalking.apm.collector.remote.grpc.handler;
import org.skywalking.apm.collector.core.framework.DefinitionFile;
/**
* @author pengys5
*/
public class RemoteHandlerDefinitionFile extends DefinitionFile {
@Override protected String fileName() {
return "remote_handler.define";
}
}
......@@ -10,8 +10,8 @@ service RemoteCommonService {
message Message {
string workerRole = 1;
int32 objectId = 2;
bytes objectBytes = 3; // the byte array of data object
int32 dataDefineId = 2;
bytes dataBytes = 3;
}
message Empty {
......
......@@ -23,5 +23,10 @@
<artifactId>apm-collector-queue</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-remote</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.stream;
/**
* The <code>AbstractClusterWorker</code> implementations represent workers,
* The <code>AbstractRemoteWorker</code> implementations represent workers,
* which receive remote messages.
* <p>
* Usually, the implementations are doing persistent, or aggregate works.
......@@ -9,17 +9,17 @@ package org.skywalking.apm.collector.stream;
* @author pengys5
* @since v3.0-2017
*/
public abstract class AbstractClusterWorker extends AbstractWorker {
public abstract class AbstractRemoteWorker extends AbstractWorker {
/**
* Construct an <code>AbstractClusterWorker</code> with the worker role and context.
* Construct an <code>AbstractRemoteWorker</code> with the worker role and context.
*
* @param role If multi-workers are for load balance, they should be more likely called worker instance. Meaning,
* each worker have multi instances.
* @param clusterContext See {@link ClusterWorkerContext}
* @param selfContext See {@link LocalWorkerContext}
*/
protected AbstractClusterWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
protected AbstractRemoteWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......
package org.skywalking.apm.collector.stream;
/**
* The <code>AbstractClusterWorkerProvider</code> implementations represent providers,
* which create instance of cluster workers whose implemented {@link AbstractClusterWorker}.
* The <code>AbstractRemoteWorkerProvider</code> implementations represent providers,
* which create instance of cluster workers whose implemented {@link AbstractRemoteWorker}.
* <p>
*
* @author pengys5
* @since v3.0-2017
*/
public abstract class AbstractClusterWorkerProvider<T extends AbstractClusterWorker> extends AbstractWorkerProvider<T> {
public abstract class AbstractRemoteWorkerProvider<T extends AbstractRemoteWorker> extends AbstractWorkerProvider<T> {
/**
* Create how many worker instance of {@link AbstractClusterWorker} in one jvm.
* Create how many worker instance of {@link AbstractRemoteWorker} in one jvm.
*
* @return The worker instance number.
*/
......@@ -21,7 +21,7 @@ public abstract class AbstractClusterWorkerProvider<T extends AbstractClusterWor
* Create the worker instance into akka system, the akka system will control the cluster worker life cycle.
*
* @param localContext Not used, will be null.
* @return The created worker reference. See {@link ClusterWorkerRef}
* @return The created worker reference. See {@link RemoteWorkerRef}
* @throws ProviderNotFoundException This worker instance attempted to find a provider which use to create another
* worker instance, when the worker provider not find then Throw this Exception.
*/
......@@ -30,7 +30,7 @@ public abstract class AbstractClusterWorkerProvider<T extends AbstractClusterWor
T clusterWorker = workerInstance(getClusterContext());
clusterWorker.preStart();
ClusterWorkerRef workerRef = new ClusterWorkerRef(role(), clusterWorker);
RemoteWorkerRef workerRef = new RemoteWorkerRef(role(), clusterWorker);
getClusterContext().put(workerRef);
return workerRef;
}
......
......@@ -19,21 +19,21 @@ public abstract class AbstractWorker implements Executor {
this.selfContext = selfContext;
}
@Override public final void execute(Object message) {
}
public abstract void preStart() throws ProviderNotFoundException;
final public LookUp getSelfContext() {
final public LocalWorkerContext getSelfContext() {
return selfContext;
}
final public LookUp getClusterContext() {
final public ClusterWorkerContext getClusterContext() {
return clusterContext;
}
final public Role getRole() {
return role;
}
final public static AbstractWorker noOwner() {
return null;
}
}
......@@ -3,11 +3,11 @@ package org.skywalking.apm.collector.stream;
/**
* @author pengys5
*/
public class ClusterWorkerRef extends WorkerRef {
public class RemoteWorkerRef extends WorkerRef {
private AbstractClusterWorker clusterWorker;
private AbstractRemoteWorker clusterWorker;
public ClusterWorkerRef(Role role, AbstractClusterWorker clusterWorker) {
public RemoteWorkerRef(Role role, AbstractRemoteWorker clusterWorker) {
super(role);
this.clusterWorker = clusterWorker;
}
......
......@@ -4,12 +4,15 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.skywalking.apm.collector.stream.impl.data.DataDefine;
/**
* @author pengys5
*/
public abstract class WorkerContext implements Context {
private Map<Integer, DataDefine> dataDefineMap;
private Map<String, List<WorkerRef>> roleWorkers;
public WorkerContext() {
......@@ -20,8 +23,11 @@ public abstract class WorkerContext implements Context {
return this.roleWorkers;
}
@Override
final public WorkerRefs lookup(Role role) throws WorkerNotFoundException {
public final DataDefine getDataDefine(int defineId) {
return dataDefineMap.get(defineId);
}
@Override final public WorkerRefs lookup(Role role) throws WorkerNotFoundException {
if (getRoleWorkers().containsKey(role.roleName())) {
WorkerRefs refs = new WorkerRefs(getRoleWorkers().get(role.roleName()), role.workerSelector());
return refs;
......@@ -30,16 +36,14 @@ public abstract class WorkerContext implements Context {
}
}
@Override
final public void put(WorkerRef workerRef) {
@Override final public void put(WorkerRef workerRef) {
if (!getRoleWorkers().containsKey(workerRef.getRole().roleName())) {
getRoleWorkers().putIfAbsent(workerRef.getRole().roleName(), new ArrayList<WorkerRef>());
}
getRoleWorkers().get(workerRef.getRole().roleName()).add(workerRef);
}
@Override
final public void remove(WorkerRef workerRef) {
@Override final public void remove(WorkerRef workerRef) {
getRoleWorkers().remove(workerRef.getRole().roleName());
}
}
package org.skywalking.apm.collector.stream;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class WorkerModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(WorkerModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap) throws DefineException, ClientException {
logger.info("beginning worker module install");
Map.Entry<String, Map> workerConfigEntry = moduleConfig.entrySet().iterator().next();
ModuleDefine moduleDefine = moduleDefineMap.get(workerConfigEntry.getKey());
moduleDefine.initialize(workerConfigEntry.getValue());
}
}
package org.skywalking.apm.collector.stream.impl;
import org.skywalking.apm.collector.core.queue.EndOfBatchCommand;
import org.skywalking.apm.collector.stream.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.LocalWorkerContext;
import org.skywalking.apm.collector.stream.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.Role;
import org.skywalking.apm.collector.stream.WorkerException;
import org.skywalking.apm.collector.stream.impl.data.Data;
import org.skywalking.apm.collector.stream.impl.data.DataCache;
/**
* @author pengys5
*/
public abstract class AggregationWorker extends AbstractLocalAsyncWorker {
private DataCache dataCache;
public AggregationWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
dataCache = new DataCache();
}
private int messageNum;
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected final void onWork(Object message) throws WorkerException {
if (message instanceof EndOfBatchCommand) {
sendToNext();
} else {
messageNum++;
aggregate(message);
if (messageNum >= 100) {
sendToNext();
messageNum = 0;
}
}
}
protected abstract void sendToNext();
protected final void aggregate(Object message) {
Data data = (Data)message;
if (dataCache.containsKey(data.id())) {
getClusterContext().getDataDefine(data.getDefineId()).mergeData(data, dataCache.get(data.id()));
} else {
dataCache.put(data.id(), data);
}
}
}
package org.skywalking.apm.collector.stream.impl;
/**
* @author pengys5
*/
public class Const {
public static final String ID_SPLIT = "..-..";
public static final String IDS_SPLIT = "\\.\\.-\\.\\.";
public static final String PEERS_FRONT_SPLIT = "[";
public static final String PEERS_BEHIND_SPLIT = "]";
public static final String USER_CODE = "User";
public static final String RESULT = "result";
}
package org.skywalking.apm.collector.stream.impl;
import org.skywalking.apm.collector.stream.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.LocalWorkerContext;
import org.skywalking.apm.collector.stream.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.Role;
import org.skywalking.apm.collector.stream.WorkerException;
/**
* @author pengys5
*/
public class GRPCRemoteWorker extends AbstractRemoteWorker {
protected GRPCRemoteWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override public void preStart() throws ProviderNotFoundException {
}
@Override protected final void onWork(Object message) throws WorkerException {
}
}
package org.skywalking.apm.collector.stream.impl;
import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.remote.grpc.proto.Empty;
import org.skywalking.apm.collector.remote.grpc.proto.Message;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class RemoteCommonServiceHandler extends RemoteCommonServiceGrpc.RemoteCommonServiceImplBase implements GRPCHandler {
private final Logger logger = LoggerFactory.getLogger(RemoteCommonServiceHandler.class);
@Override public void call(Message request, StreamObserver<Empty> responseObserver) {
String workerRole = request.getWorkerRole();
int dataDefineId = request.getDataDefineId();
ByteString bytesData = request.getDataBytes();
}
}
package org.skywalking.apm.collector.stream.impl.data;
/**
* @author pengys5
*/
public class Attribute {
private final String name;
private final AttributeType type;
private final Operation operation;
public Attribute(String name, AttributeType type, Operation operation) {
this.name = name;
this.type = type;
this.operation = operation;
}
public String getName() {
return name;
}
public AttributeType getType() {
return type;
}
public Operation getOperation() {
return operation;
}
}
package org.skywalking.apm.collector.stream.impl.data;
/**
* @author pengys5
*/
public enum AttributeType {
STRING, LONG, FLOAT
}
package org.skywalking.apm.collector.stream.impl.data;
/**
* @author pengys5
*/
public class Data {
private int defineId;
private String[] dataStrings;
private Long[] dataLongs;
private Float[] dataFloats;
public Data(int defineId, int stringCapacity, int longCapacity, int floatCapacity) {
this.defineId = defineId;
this.dataStrings = new String[stringCapacity];
this.dataLongs = new Long[longCapacity];
this.dataFloats = new Float[floatCapacity];
}
public void setDataString(int position, String value) {
dataStrings[position] = value;
}
public void setDataLong(int position, Long value) {
dataLongs[position] = value;
}
public void setDataFloat(int position, Float value) {
dataFloats[position] = value;
}
public String getDataString(int position) {
return dataStrings[position];
}
public Long getDataLong(int position) {
return dataLongs[position];
}
public Float getDataFloat(int position) {
return dataFloats[position];
}
public String id() {
return dataStrings[0];
}
public int getDefineId() {
return defineId;
}
}
package org.skywalking.apm.collector.stream.impl.data;
/**
* @author pengys5
*/
public class DataCache extends Window {
private DataCollection lockedDataCollection;
public boolean containsKey(String id) {
return lockedDataCollection.containsKey(id);
}
public Data get(String id) {
return lockedDataCollection.get(id);
}
public void put(String id, Data data) {
lockedDataCollection.put(id, data);
}
public void hold() {
lockedDataCollection = getCurrentAndHold();
}
public void release() {
lockedDataCollection.release();
lockedDataCollection = null;
}
}
package org.skywalking.apm.collector.stream.impl.data;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class DataCollection {
private Map<String, Data> data;
private volatile boolean isHold;
public DataCollection() {
this.data = new HashMap<>();
this.isHold = false;
}
public void release() {
isHold = false;
}
public void hold() {
isHold = true;
}
public boolean isHolding() {
return isHold;
}
public boolean containsKey(String key) {
return data.containsKey(key);
}
public void put(String key, Data value) {
data.put(key, value);
}
public Data get(String key) {
return data.get(key);
}
public int size() {
return data.size();
}
public void clear() {
data.clear();
}
public Map<String, Data> asMap() {
return data;
}
}
package org.skywalking.apm.collector.stream.impl.data;
/**
* @author pengys5
*/
public abstract class DataDefine {
private Attribute[] attributes;
private int stringCapacity;
private int longCapacity;
private int floatCapacity;
public DataDefine() {
stringCapacity = 0;
longCapacity = 0;
floatCapacity = 0;
}
public final void initial() {
for (Attribute attribute : attributes) {
if (AttributeType.STRING.equals(attribute.getType())) {
stringCapacity++;
} else if (AttributeType.LONG.equals(attribute.getType())) {
longCapacity++;
} else if (AttributeType.FLOAT.equals(attribute.getType())) {
floatCapacity++;
}
}
}
public final void addAttribute(int position, Attribute attribute) {
attributes[position] = attribute;
}
public final void define() {
attributes = new Attribute[initialCapacity()];
}
protected abstract int defineId();
protected abstract int initialCapacity();
protected abstract void attributeDefine();
public int getStringCapacity() {
return stringCapacity;
}
public int getLongCapacity() {
return longCapacity;
}
public int getFloatCapacity() {
return floatCapacity;
}
public Data build() {
return new Data(defineId(), getStringCapacity(), getLongCapacity(), getFloatCapacity());
}
public void mergeData(Data newData, Data oldData) {
int stringPosition = 0;
int longPosition = 0;
int floatPosition = 0;
for (int i = 0; i < initialCapacity(); i++) {
Attribute attribute = attributes[i];
if (AttributeType.STRING.equals(attribute.getType())) {
attribute.getOperation().operate(newData.getDataString(stringPosition), oldData.getDataString(stringPosition));
stringPosition++;
} else if (AttributeType.LONG.equals(attribute.getType())) {
attribute.getOperation().operate(newData.getDataLong(longPosition), oldData.getDataLong(longPosition));
longPosition++;
} else if (AttributeType.FLOAT.equals(attribute.getType())) {
attribute.getOperation().operate(newData.getDataFloat(floatPosition), oldData.getDataFloat(floatPosition));
floatPosition++;
}
}
}
}
package org.skywalking.apm.collector.stream.impl.data;
import java.util.HashMap;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.Loader;
import org.skywalking.apm.collector.core.util.DefinitionLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class DataDefineLoader implements Loader<Map<Integer, DataDefine>> {
private final Logger logger = LoggerFactory.getLogger(DataDefineLoader.class);
@Override public Map<Integer, DataDefine> load() throws ConfigException {
Map<Integer, DataDefine> dataDefineMap = new HashMap<>();
DataDefinitionFile definitionFile = new DataDefinitionFile();
DefinitionLoader<DataDefine> definitionLoader = DefinitionLoader.load(DataDefine.class, definitionFile);
for (DataDefine dataDefine : definitionLoader) {
logger.info("loaded data definition class: {}", dataDefine.getClass().getName());
dataDefineMap.put(dataDefine.defineId(), dataDefine);
}
return dataDefineMap;
}
}
package org.skywalking.apm.collector.stream.impl.data;
import org.skywalking.apm.collector.core.framework.DefinitionFile;
/**
* @author pengys5
*/
public class DataDefinitionFile extends DefinitionFile {
@Override protected String fileName() {
return "data.define";
}
}
package org.skywalking.apm.collector.stream.impl.data;
/**
* @author pengys5
*/
public interface Operation {
String operate(String newValue, String oldValue);
Long operate(Long newValue, Long oldValue);
Float operate(Float newValue, Float oldValue);
}
package org.skywalking.apm.collector.stream.impl.data;
/**
* @author pengys5
*/
public abstract class Window {
private DataCollection pointer;
private DataCollection windowDataA;
private DataCollection windowDataB;
public Window() {
windowDataA = new DataCollection();
windowDataB = new DataCollection();
pointer = windowDataA;
}
public void switchPointer() {
if (pointer == windowDataA) {
pointer = windowDataB;
} else {
pointer = windowDataA;
}
}
protected DataCollection getCurrentAndHold() {
if (pointer == windowDataA) {
windowDataA.hold();
return windowDataA;
} else {
windowDataB.hold();
return windowDataB;
}
}
public DataCollection getLast() {
if (pointer == windowDataA) {
return windowDataB;
} else {
return windowDataA;
}
}
}
package org.skywalking.apm.collector.stream.impl.data.operate;
import org.skywalking.apm.collector.stream.impl.data.Operation;
/**
* @author pengys5
*/
public class CoverOperation implements Operation {
@Override public String operate(String newValue, String oldValue) {
return newValue;
}
@Override public Long operate(Long newValue, Long oldValue) {
return newValue;
}
@Override public Float operate(Float newValue, Float oldValue) {
return newValue;
}
}
package org.skywalking.apm.collector.stream.impl.data.operate;
import org.skywalking.apm.collector.stream.impl.data.Operation;
/**
* @author pengys5
*/
public class NonOperation implements Operation {
@Override public String operate(String newValue, String oldValue) {
return oldValue;
}
@Override public Long operate(Long newValue, Long oldValue) {
return oldValue;
}
@Override public Float operate(Float newValue, Float oldValue) {
return oldValue;
}
}
org.skywalking.apm.collector.stream.impl.RemoteCommonServiceHandler
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册