提交 54488d64 编写于 作者: P pengys5

Merge remote-tracking branch 'origin/feature/3.0' into feature/collector

# Conflicts:
#	skywalking-collector/skywalking-collector-commons/src/main/java/com/a/eye/skywalking/collector/commons/serializer/TraceSegmentSerializer.java
......@@ -56,6 +56,12 @@
</properties>
<dependencies>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
......
......@@ -2,6 +2,7 @@ package com.a.eye.skywalking.collector;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.cluster.ClusterConfig;
import com.a.eye.skywalking.collector.cluster.Const;
......@@ -39,8 +40,10 @@ public class CollectorSystem {
private void createAkkaSystem() {
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + ClusterConfig.Cluster.Current.hostname).
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + ClusterConfig.Cluster.Current.port)).
withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=" + ClusterConfig.Cluster.seed_nodes)).
withFallback(ConfigFactory.load("application.conf"));
if (!StringUtil.isEmpty(ClusterConfig.Cluster.seed_nodes)) {
config.withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=" + ClusterConfig.Cluster.seed_nodes));
}
ActorSystem akkaSystem = ActorSystem.create(Const.SystemName, config);
clusterContext = new ClusterWorkerContext(akkaSystem);
......
......@@ -17,7 +17,7 @@ public abstract class AbstractClusterWorkerProvider<T extends AbstractClusterWor
T clusterWorker = (T) workerInstance(getClusterContext());
clusterWorker.preStart();
ActorRef actorRef = getClusterContext().getAkkaSystem().actorOf(Props.create(AbstractClusterWorker.WorkerWithAkka.class, clusterWorker), role() + "_" + num);
ActorRef actorRef = getClusterContext().getAkkaSystem().actorOf(Props.create(AbstractClusterWorker.WorkerWithAkka.class, clusterWorker), role().roleName() + "_" + num);
ClusterWorkerRef workerRef = new ClusterWorkerRef(actorRef, role());
getClusterContext().put(workerRef);
......
......@@ -19,6 +19,6 @@ public class ClusterConfig {
public static String roles = "";
}
public static String seed_nodes = "127.0.0.1:2551";
public static String seed_nodes = "";
}
}
......@@ -23,10 +23,5 @@
<artifactId>skywalking-trace</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
</project>
package com.a.eye.skywalking.collector.commons.serializer;
import akka.serialization.JSerializer;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.proto.SegmentMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class TraceSegmentSerializer extends JSerializer {
private Logger logger = LogManager.getFormatterLogger(TraceSegmentSerializer.class);
@Override
public boolean includeManifest() {
return false;
}
@Override
public int identifier() {
return 30;
}
@Override
public byte[] toBinary(Object o) {
TraceSegment traceSegment = (TraceSegment) o;
return traceSegment.serialize().toByteArray();
}
@Override
public Object fromBinaryJava(byte[] bytes, Class<?> manifest) {
TraceSegment traceSegment = null;
try {
traceSegment = new TraceSegment(SegmentMessage.parseFrom(bytes));
} catch (InvalidProtocolBufferException e) {
logger.warn("Can't covert message from byte[] to SegmentMessage");
}
return traceSegment;
}
}
......@@ -5,14 +5,14 @@ akka {
serializers {
java = "akka.serialization.JavaSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
TraceSegment = "com.a.eye.skywalking.collector.worker.TraceSegmentSerializer"
// TraceSegment = "com.a.eye.skywalking.collector.worker.TraceSegmentSerializer"
json = "com.a.eye.skywalking.collector.commons.serializer.JsonSerializer"
}
serialization-bindings {
"java.lang.String" = java
"com.google.protobuf.Message" = proto
"com.a.eye.skywalking.trace.TraceSegment" = TraceSegment
// "com.a.eye.skywalking.trace.TraceSegment" = TraceSegment
"com.google.gson.JsonObject" = json
}
......
......@@ -8,7 +8,7 @@ import com.a.eye.skywalking.collector.worker.application.analysis.NodeInstanceAn
import com.a.eye.skywalking.collector.worker.application.analysis.ResponseCostAnalysis;
import com.a.eye.skywalking.collector.worker.application.analysis.ResponseSummaryAnalysis;
import com.a.eye.skywalking.collector.worker.application.persistence.TraceSegmentRecordPersistence;
import com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
......@@ -40,9 +40,9 @@ public class ApplicationMain extends AbstractLocalSyncWorker {
@Override
protected void onWork(Object request, Object response) throws Exception {
if (request instanceof TraceSegmentReceiver.TraceSegmentTimeSlice) {
if (request instanceof SegmentPost.SegmentWithTimeSlice) {
logger.debug("begin translate TraceSegment Object to JsonObject");
TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment = (TraceSegmentReceiver.TraceSegmentTimeSlice) request;
SegmentPost.SegmentWithTimeSlice traceSegment = (SegmentPost.SegmentWithTimeSlice) request;
getSelfContext().lookup(TraceSegmentRecordPersistence.Role.INSTANCE).tell(traceSegment);
......@@ -81,7 +81,7 @@ public class ApplicationMain extends AbstractLocalSyncWorker {
}
}
private void sendToDAGNodePersistence(TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment) throws Exception {
private void sendToDAGNodePersistence(SegmentPost.SegmentWithTimeSlice traceSegment) throws Exception {
String code = traceSegment.getTraceSegment().getApplicationCode();
String component = null;
......@@ -93,11 +93,11 @@ public class ApplicationMain extends AbstractLocalSyncWorker {
}
}
DAGNodeAnalysis.Metric node = new DAGNodeAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), code, component, layer);
getSelfContext().lookup(DAGNodeAnalysis.Role.INSTANCE).tell(node);
// DAGNodeAnalysis.Metric node = new DAGNodeAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), code, component, layer);
// getSelfContext().lookup(DAGNodeAnalysis.Role.INSTANCE).tell(node);
}
private void sendToNodeInstanceAnalysis(TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment) throws Exception {
private void sendToNodeInstanceAnalysis(SegmentPost.SegmentWithTimeSlice traceSegment) throws Exception {
TraceSegment segment = traceSegment.getTraceSegment();
List<TraceSegmentRef> refs = segment.getRefs();
......@@ -106,13 +106,13 @@ public class ApplicationMain extends AbstractLocalSyncWorker {
String code = segment.getApplicationCode();
String address = ref.getPeerHost();
NodeInstanceAnalysis.Metric property = new NodeInstanceAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), code, address);
getSelfContext().lookup(NodeInstanceAnalysis.Role.INSTANCE).tell(property);
// NodeInstanceAnalysis.Metric property = new NodeInstanceAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), code, address);
// getSelfContext().lookup(NodeInstanceAnalysis.Role.INSTANCE).tell(property);
}
}
}
private void sendToResponseCostPersistence(TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment) throws Exception {
private void sendToResponseCostPersistence(SegmentPost.SegmentWithTimeSlice traceSegment) throws Exception {
String code = traceSegment.getTraceSegment().getApplicationCode();
long startTime = -1;
......@@ -127,11 +127,11 @@ public class ApplicationMain extends AbstractLocalSyncWorker {
}
}
ResponseCostAnalysis.Metric cost = new ResponseCostAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), code, isError, startTime, endTime);
getSelfContext().lookup(ResponseCostAnalysis.Role.INSTANCE).tell(cost);
// ResponseCostAnalysis.Metric cost = new ResponseCostAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), code, isError, startTime, endTime);
// getSelfContext().lookup(ResponseCostAnalysis.Role.INSTANCE).tell(cost);
}
private void sendToResponseSummaryPersistence(TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment) throws Exception {
private void sendToResponseSummaryPersistence(SegmentPost.SegmentWithTimeSlice traceSegment) throws Exception {
String code = traceSegment.getTraceSegment().getApplicationCode();
boolean isError = false;
......@@ -141,7 +141,7 @@ public class ApplicationMain extends AbstractLocalSyncWorker {
}
}
ResponseSummaryAnalysis.Metric summary = new ResponseSummaryAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), code, isError);
getSelfContext().lookup(ResponseSummaryAnalysis.Role.INSTANCE).tell(summary);
// ResponseSummaryAnalysis.Metric summary = new ResponseSummaryAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), code, isError);
// getSelfContext().lookup(ResponseSummaryAnalysis.Role.INSTANCE).tell(summary);
}
}
......@@ -9,7 +9,7 @@ import com.a.eye.skywalking.collector.worker.RecordAnalysisMember;
import com.a.eye.skywalking.collector.worker.application.receiver.DAGNodeReceiver;
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.DateTools;
import com.a.eye.skywalking.collector.worker.storage.index.AbstractIndex;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -31,7 +31,7 @@ public class DAGNodeAnalysis extends RecordAnalysisMember {
Metric metric = (Metric) message;
JsonObject propertyJsonObj = new JsonObject();
propertyJsonObj.addProperty("code", metric.code);
propertyJsonObj.addProperty(DateTools.Time_Slice_Column_Name, metric.getMinute());
propertyJsonObj.addProperty(AbstractIndex.Time_Slice_Column_Name, metric.getMinute());
propertyJsonObj.addProperty("component", metric.component);
propertyJsonObj.addProperty("layer", metric.layer);
......@@ -89,8 +89,8 @@ public class DAGNodeAnalysis extends RecordAnalysisMember {
private final String component;
private final String layer;
public Metric(long minute, int second, String code, String component, String layer) {
super(minute, second);
public Metric(long minute, long hour, long day, int second, String code, String component, String layer) {
super(minute, hour, day, second);
this.code = code;
this.component = component;
this.layer = layer;
......
......@@ -10,7 +10,7 @@ import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.receiver.NodeInstanceReceiver;
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.DateTools;
import com.a.eye.skywalking.collector.worker.storage.index.AbstractIndex;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -32,7 +32,7 @@ public class NodeInstanceAnalysis extends RecordAnalysisMember {
Metric metric = (Metric) message;
JsonObject propertyJsonObj = new JsonObject();
propertyJsonObj.addProperty("code", metric.code);
propertyJsonObj.addProperty(DateTools.Time_Slice_Column_Name, metric.getMinute());
propertyJsonObj.addProperty(AbstractIndex.Time_Slice_Column_Name, metric.getMinute());
propertyJsonObj.addProperty("address", metric.address);
String id = metric.getMinute() + "-" + metric.address;
......@@ -88,8 +88,8 @@ public class NodeInstanceAnalysis extends RecordAnalysisMember {
private final String code;
private final String address;
public Metric(long minute, int second, String code, String address) {
super(minute, second);
public Metric(long minute, long hour, long day, int second, String code, String address) {
super(minute, hour, day, second);
this.code = code;
this.address = address;
}
......
......@@ -84,8 +84,8 @@ public class ResponseCostAnalysis extends MetricAnalysisMember {
private final Long startTime;
private final Long endTime;
public Metric(long minute, int second, String code, Boolean isError, Long startTime, Long endTime) {
super(minute, second);
public Metric(long minute, long hour, long day, int second, String code, Boolean isError, Long startTime, Long endTime) {
super(minute, hour, day, second);
this.code = code;
this.isError = isError;
this.startTime = startTime;
......
......@@ -79,8 +79,8 @@ public class ResponseSummaryAnalysis extends MetricAnalysisMember {
private final String code;
private final Boolean isError;
public Metric(long minute, int second, String code, Boolean isError) {
super(minute, second);
public Metric(long minute, long hour, long day, int second, String code, Boolean isError) {
super(minute, hour, day, second);
this.code = code;
this.isError = isError;
}
......
......@@ -8,9 +8,9 @@ import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.DateTools;
import com.a.eye.skywalking.collector.worker.storage.index.AbstractIndex;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
......@@ -45,8 +45,8 @@ public class TraceSegmentRecordPersistence extends RecordPersistenceMember {
@Override
public void analyse(Object message) throws Exception {
if (message instanceof TraceSegmentReceiver.TraceSegmentTimeSlice) {
TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment = (TraceSegmentReceiver.TraceSegmentTimeSlice) message;
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice traceSegment = (SegmentPost.SegmentWithTimeSlice) message;
JsonObject jsonObject = parseTraceSegment(traceSegment.getTraceSegment(), traceSegment.getMinute());
RecordData recordData = new RecordData(traceSegment.getTraceSegment().getTraceSegmentId());
......@@ -91,7 +91,7 @@ public class TraceSegmentRecordPersistence extends RecordPersistenceMember {
private JsonObject parseTraceSegment(TraceSegment traceSegment, long minute) {
JsonObject traceJsonObj = new JsonObject();
traceJsonObj.addProperty("segmentId", traceSegment.getTraceSegmentId());
traceJsonObj.addProperty(DateTools.Time_Slice_Column_Name, minute);
traceJsonObj.addProperty(AbstractIndex.Time_Slice_Column_Name, minute);
traceJsonObj.addProperty("startTime", traceSegment.getStartTime());
traceJsonObj.addProperty("endTime", traceSegment.getEndTime());
traceJsonObj.addProperty("appCode", traceSegment.getApplicationCode());
......
......@@ -5,7 +5,7 @@ import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.TimeSlice;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.index.NodeInstanceIndex;
import com.a.eye.skywalking.collector.worker.nodeinst.NodeInstIndex;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
......@@ -40,7 +40,7 @@ public class NodeInstanceSearchPersistence extends AbstractLocalSyncWorker {
}
public JsonArray search(String type, long timeSlice) {
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeInstanceIndex.Index);
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeInstIndex.Index);
searchRequestBuilder.setTypes(type);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.multiMatchQuery(timeSlice, "timeSlice"));
......
......@@ -5,7 +5,7 @@ import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.TimeSlice;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.index.NodeInstanceIndex;
import com.a.eye.skywalking.collector.worker.nodeinst.NodeInstIndex;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
......@@ -40,7 +40,7 @@ public class NodeInstanceSummarySearchPersistence extends AbstractLocalSyncWorke
}
public JsonArray search(String type, long timeSlice) {
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeInstanceIndex.Index);
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeInstIndex.Index);
searchRequestBuilder.setTypes(type);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.multiMatchQuery(timeSlice, "timeSlice"));
......
......@@ -5,7 +5,7 @@ import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.TimeSlice;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.index.NodeRefIndex;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
......
......@@ -5,7 +5,7 @@ import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.TimeSlice;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.index.ServerNodeIndex;
import com.a.eye.skywalking.collector.worker.node.NodeIndex;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
......@@ -40,7 +40,7 @@ public class ServerNodeSearchPersistence extends AbstractLocalSyncWorker {
}
public JsonArray search(String type, long timeSlice) {
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(ServerNodeIndex.Index);
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeIndex.Index);
searchRequestBuilder.setTypes(type);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.termQuery("timeSlice", timeSlice));
......
......@@ -7,8 +7,8 @@ import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.dagnode.persistence.ClientNodeSearchPersistence;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractSearcher;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractSearcherProvider;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractGet;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractGetProvider;
import com.a.eye.skywalking.collector.worker.tools.ParameterTools;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
......@@ -20,11 +20,11 @@ import java.util.Map;
/**
* @author pengys5
*/
public class ClientNodeWithTimeSliceSearcher extends AbstractSearcher {
public class ClientNodeWithTimeSliceGet extends AbstractGet {
private Logger logger = LogManager.getFormatterLogger(ClientNodeWithTimeSliceSearcher.class);
private Logger logger = LogManager.getFormatterLogger(ClientNodeWithTimeSliceGet.class);
private ClientNodeWithTimeSliceSearcher(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
private ClientNodeWithTimeSliceGet(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -52,7 +52,7 @@ public class ClientNodeWithTimeSliceSearcher extends AbstractSearcher {
getSelfContext().lookup(ClientNodeSearchPersistence.WorkerRole.INSTANCE).ask(requestEntity, response);
}
public static class Factory extends AbstractSearcherProvider<ClientNodeWithTimeSliceSearcher> {
public static class Factory extends AbstractGetProvider<ClientNodeWithTimeSliceGet> {
@Override
public Role role() {
......@@ -60,13 +60,13 @@ public class ClientNodeWithTimeSliceSearcher extends AbstractSearcher {
}
@Override
public ClientNodeWithTimeSliceSearcher workerInstance(ClusterWorkerContext clusterContext) {
return new ClientNodeWithTimeSliceSearcher(role(), clusterContext, new LocalWorkerContext());
public ClientNodeWithTimeSliceGet workerInstance(ClusterWorkerContext clusterContext) {
return new ClientNodeWithTimeSliceGet(role(), clusterContext, new LocalWorkerContext());
}
@Override
public String servletPath() {
return "/dagNode/search/clientNodeWithTimeSlice";
return "/search/dagNode/clientNodeWithTimeSlice";
}
}
......@@ -75,7 +75,7 @@ public class ClientNodeWithTimeSliceSearcher extends AbstractSearcher {
@Override
public String roleName() {
return ClientNodeWithTimeSliceSearcher.class.getSimpleName();
return ClientNodeWithTimeSliceGet.class.getSimpleName();
}
@Override
......
......@@ -7,8 +7,8 @@ import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.dagnode.persistence.NodeInstanceSearchPersistence;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractSearcher;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractSearcherProvider;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractGet;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractGetProvider;
import com.a.eye.skywalking.collector.worker.tools.ParameterTools;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
......@@ -20,11 +20,11 @@ import java.util.Map;
/**
* @author pengys5
*/
public class NodeInstanceWithTimeSliceSearcher extends AbstractSearcher {
public class NodeInstanceWithTimeSliceGet extends AbstractGet {
private Logger logger = LogManager.getFormatterLogger(NodeInstanceWithTimeSliceSearcher.class);
private Logger logger = LogManager.getFormatterLogger(NodeInstanceWithTimeSliceGet.class);
private NodeInstanceWithTimeSliceSearcher(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
private NodeInstanceWithTimeSliceGet(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -52,7 +52,7 @@ public class NodeInstanceWithTimeSliceSearcher extends AbstractSearcher {
getSelfContext().lookup(NodeInstanceSearchPersistence.WorkerRole.INSTANCE).ask(requestEntity, response);
}
public static class Factory extends AbstractSearcherProvider<NodeInstanceWithTimeSliceSearcher> {
public static class Factory extends AbstractGetProvider<NodeInstanceWithTimeSliceGet> {
@Override
public Role role() {
......@@ -60,8 +60,8 @@ public class NodeInstanceWithTimeSliceSearcher extends AbstractSearcher {
}
@Override
public NodeInstanceWithTimeSliceSearcher workerInstance(ClusterWorkerContext clusterContext) {
return new NodeInstanceWithTimeSliceSearcher(role(), clusterContext, new LocalWorkerContext());
public NodeInstanceWithTimeSliceGet workerInstance(ClusterWorkerContext clusterContext) {
return new NodeInstanceWithTimeSliceGet(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -75,7 +75,7 @@ public class NodeInstanceWithTimeSliceSearcher extends AbstractSearcher {
@Override
public String roleName() {
return NodeInstanceWithTimeSliceSearcher.class.getSimpleName();
return NodeInstanceWithTimeSliceGet.class.getSimpleName();
}
@Override
......
......@@ -7,8 +7,8 @@ import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.dagnode.persistence.NodeRefSearchPersistence;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractSearcher;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractSearcherProvider;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractGet;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractGetProvider;
import com.a.eye.skywalking.collector.worker.tools.ParameterTools;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
......@@ -20,11 +20,11 @@ import java.util.Map;
/**
* @author pengys5
*/
public class NodeRefWithTimeSliceSearcher extends AbstractSearcher {
public class NodeRefWithTimeSliceGet extends AbstractGet {
private Logger logger = LogManager.getFormatterLogger(NodeRefWithTimeSliceSearcher.class);
private Logger logger = LogManager.getFormatterLogger(NodeRefWithTimeSliceGet.class);
private NodeRefWithTimeSliceSearcher(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
private NodeRefWithTimeSliceGet(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -52,7 +52,7 @@ public class NodeRefWithTimeSliceSearcher extends AbstractSearcher {
getSelfContext().lookup(NodeRefSearchPersistence.WorkerRole.INSTANCE).ask(requestEntity, response);
}
public static class Factory extends AbstractSearcherProvider<NodeRefWithTimeSliceSearcher> {
public static class Factory extends AbstractGetProvider<NodeRefWithTimeSliceGet> {
@Override
public Role role() {
......@@ -60,8 +60,8 @@ public class NodeRefWithTimeSliceSearcher extends AbstractSearcher {
}
@Override
public NodeRefWithTimeSliceSearcher workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefWithTimeSliceSearcher(role(), clusterContext, new LocalWorkerContext());
public NodeRefWithTimeSliceGet workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefWithTimeSliceGet(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -75,7 +75,7 @@ public class NodeRefWithTimeSliceSearcher extends AbstractSearcher {
@Override
public String roleName() {
return NodeRefWithTimeSliceSearcher.class.getSimpleName();
return NodeRefWithTimeSliceGet.class.getSimpleName();
}
@Override
......
......@@ -7,8 +7,8 @@ import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.dagnode.persistence.ServerNodeSearchPersistence;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractSearcher;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractSearcherProvider;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractGet;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractGetProvider;
import com.a.eye.skywalking.collector.worker.tools.ParameterTools;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
......@@ -20,11 +20,11 @@ import java.util.Map;
/**
* @author pengys5
*/
public class ServerNodeWithTimeSliceSearcher extends AbstractSearcher {
public class ServerNodeWithTimeSliceGet extends AbstractGet {
private Logger logger = LogManager.getFormatterLogger(ServerNodeWithTimeSliceSearcher.class);
private Logger logger = LogManager.getFormatterLogger(ServerNodeWithTimeSliceGet.class);
private ServerNodeWithTimeSliceSearcher(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
private ServerNodeWithTimeSliceGet(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -52,7 +52,7 @@ public class ServerNodeWithTimeSliceSearcher extends AbstractSearcher {
getSelfContext().lookup(ServerNodeSearchPersistence.WorkerRole.INSTANCE).ask(requestEntity, response);
}
public static class Factory extends AbstractSearcherProvider<ServerNodeWithTimeSliceSearcher> {
public static class Factory extends AbstractGetProvider<ServerNodeWithTimeSliceGet> {
@Override
public Role role() {
......@@ -60,8 +60,8 @@ public class ServerNodeWithTimeSliceSearcher extends AbstractSearcher {
}
@Override
public ServerNodeWithTimeSliceSearcher workerInstance(ClusterWorkerContext clusterContext) {
return new ServerNodeWithTimeSliceSearcher(role(), clusterContext, new LocalWorkerContext());
public ServerNodeWithTimeSliceGet workerInstance(ClusterWorkerContext clusterContext) {
return new ServerNodeWithTimeSliceGet(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -75,7 +75,7 @@ public class ServerNodeWithTimeSliceSearcher extends AbstractSearcher {
@Override
public String roleName() {
return ServerNodeWithTimeSliceSearcher.class.getSimpleName();
return ServerNodeWithTimeSliceGet.class.getSimpleName();
}
@Override
......
......@@ -12,9 +12,9 @@ import java.util.Map;
/**
* @author pengys5
*/
public abstract class AbstractSearcher extends AbstractLocalSyncWorker {
public abstract class AbstractGet extends AbstractLocalSyncWorker {
public AbstractSearcher(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
public AbstractGet(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -26,11 +26,11 @@ public abstract class AbstractSearcher extends AbstractLocalSyncWorker {
protected abstract void onSearch(Map<String, String[]> request, JsonObject response) throws Exception;
static class SearchWithHttpServlet extends AbstractHttpServlet {
static class GetWithHttpServlet extends AbstractHttpServlet {
private final LocalSyncWorkerRef ownerWorkerRef;
SearchWithHttpServlet(LocalSyncWorkerRef ownerWorkerRef) {
GetWithHttpServlet(LocalSyncWorkerRef ownerWorkerRef) {
this.ownerWorkerRef = ownerWorkerRef;
}
......
......@@ -7,13 +7,13 @@ import org.eclipse.jetty.servlet.ServletHolder;
/**
* @author pengys5
*/
public abstract class AbstractSearcherProvider<T extends AbstractLocalSyncWorker> extends AbstractLocalSyncWorkerProvider<T> {
public abstract class AbstractGetProvider<T extends AbstractLocalSyncWorker> extends AbstractLocalSyncWorkerProvider<T> {
public abstract String servletPath();
final protected void createSearcher(ServletContextHandler context) throws IllegalArgumentException, ProviderNotFoundException {
final protected void create(ServletContextHandler context) throws IllegalArgumentException, ProviderNotFoundException {
LocalSyncWorkerRef workerRef = (LocalSyncWorkerRef) super.create(AbstractWorker.noOwner());
AbstractSearcher.SearchWithHttpServlet searchWithHttpServlet = new AbstractSearcher.SearchWithHttpServlet(workerRef);
context.addServlet(new ServletHolder(searchWithHttpServlet), servletPath());
AbstractGet.GetWithHttpServlet getWithHttpServlet = new AbstractGet.GetWithHttpServlet(workerRef);
context.addServlet(new ServletHolder(getWithHttpServlet), servletPath());
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.BufferedReader;
import java.io.IOException;
/**
* @author pengys5
*/
public abstract class AbstractReceiver extends AbstractLocalAsyncWorker {
public abstract class AbstractPost extends AbstractLocalAsyncWorker {
public AbstractReceiver(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
private Logger logger = LogManager.getFormatterLogger(AbstractPost.class);
public AbstractPost(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
final public void onWork(Object request) throws Exception {
if (request instanceof ReceiverMessage) {
ReceiverMessage receiverMessage = (ReceiverMessage) request;
onReceive(receiverMessage.request);
if (request instanceof String) {
onReceive((String) request);
} else if (request instanceof EndOfBatchCommand) {
} else {
logger.error("unhandled request, request instance must String, but is %s", request.getClass().toString());
}
}
protected abstract void onReceive(JsonObject request) throws Exception;
protected abstract void onReceive(String reqJsonStr) throws Exception;
static class ReceiveWithHttpServlet extends AbstractHttpServlet {
static class PostWithHttpServlet extends AbstractHttpServlet {
private final LocalAsyncWorkerRef ownerWorkerRef;
public ReceiveWithHttpServlet(LocalAsyncWorkerRef ownerWorkerRef) {
protected PostWithHttpServlet(LocalAsyncWorkerRef ownerWorkerRef) {
this.ownerWorkerRef = ownerWorkerRef;
}
@Override
final protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
JsonObject reqJson = new JsonObject();
final protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
JsonObject resJson = new JsonObject();
try {
request.getParameter("json");
ownerWorkerRef.tell(new ReceiverMessage(reqJson));
BufferedReader bufferedReader = request.getReader();
StringBuffer dataStr = new StringBuffer();
String tmpStr;
while ((tmpStr = bufferedReader.readLine()) != null) {
dataStr.append(tmpStr);
}
ownerWorkerRef.tell(dataStr.toString());
reply(response, resJson, HttpServletResponse.SC_OK);
} catch (Exception e) {
resJson.addProperty("error", e.getMessage());
......@@ -49,12 +62,4 @@ public abstract class AbstractReceiver extends AbstractLocalAsyncWorker {
}
}
}
static class ReceiverMessage {
private final JsonObject request;
public ReceiverMessage(JsonObject request) {
this.request = request;
}
}
}
......@@ -7,23 +7,13 @@ import org.eclipse.jetty.servlet.ServletHolder;
/**
* @author pengys5
*/
public abstract class AbstractReceiverProvider<T extends AbstractLocalAsyncWorker> extends AbstractLocalAsyncWorkerProvider<T> {
public abstract class AbstractPostProvider<T extends AbstractLocalAsyncWorker> extends AbstractLocalAsyncWorkerProvider<T> {
public abstract String servletPath();
final protected void createReceiver(ServletContextHandler context) throws IllegalArgumentException, ProviderNotFoundException {
final protected void create(ServletContextHandler context) throws IllegalArgumentException, ProviderNotFoundException {
LocalAsyncWorkerRef workerRef = (LocalAsyncWorkerRef) super.create(AbstractWorker.noOwner());
AbstractReceiver.ReceiveWithHttpServlet receiveWithHttpServlet = new AbstractReceiver.ReceiveWithHttpServlet(workerRef);
context.addServlet(new ServletHolder(receiveWithHttpServlet), servletPath());
}
@Override
final public T workerInstance(ClusterWorkerContext clusterContext) {
return null;
}
@Override
final public Role role() {
return null;
AbstractPost.PostWithHttpServlet postWithHttpServlet = new AbstractPost.PostWithHttpServlet(workerRef);
context.addServlet(new ServletHolder(postWithHttpServlet), servletPath());
}
}
......@@ -18,7 +18,7 @@ public enum HttpServer {
public void boot(ClusterWorkerContext clusterContext) throws Exception {
Server server = new Server(7001);
String contextPath = "/skywalking";
String contextPath = "/";
ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
servletContextHandler.setContextPath(contextPath);
logger.info("http server root context path: %s", contextPath);
......
......@@ -17,18 +17,18 @@ public enum ServletsCreator {
private Logger logger = LogManager.getFormatterLogger(ServletsCreator.class);
public void boot(ServletContextHandler servletContextHandler, ClusterWorkerContext clusterContext) throws IllegalArgumentException, ProviderNotFoundException {
ServiceLoader<AbstractReceiverProvider> receiverLoader = java.util.ServiceLoader.load(AbstractReceiverProvider.class);
for (AbstractReceiverProvider provider : receiverLoader) {
ServiceLoader<AbstractPostProvider> receiverLoader = java.util.ServiceLoader.load(AbstractPostProvider.class);
for (AbstractPostProvider provider : receiverLoader) {
provider.setClusterContext(clusterContext);
provider.createReceiver(servletContextHandler);
logger.info("add receiver servlet mapping path: %s ", provider.servletPath());
provider.create(servletContextHandler);
logger.info("add post servlet mapping path: %s ", provider.servletPath());
}
ServiceLoader<AbstractSearcherProvider> searcherLoader = java.util.ServiceLoader.load(AbstractSearcherProvider.class);
for (AbstractSearcherProvider provider : searcherLoader) {
ServiceLoader<AbstractGetProvider> searcherLoader = java.util.ServiceLoader.load(AbstractGetProvider.class);
for (AbstractGetProvider provider : searcherLoader) {
provider.setClusterContext(clusterContext);
provider.createSearcher(servletContextHandler);
logger.info("add searcher servlet mapping path: %s ", provider.servletPath());
provider.create(servletContextHandler);
logger.info("add get servlet mapping path: %s ", provider.servletPath());
}
}
}
package com.a.eye.skywalking.collector.worker.storage.index;
package com.a.eye.skywalking.collector.worker.node;
import com.a.eye.skywalking.collector.worker.storage.index.AbstractIndex;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.xcontent.XContentBuilder;
......@@ -10,18 +11,22 @@ import java.io.IOException;
/**
* @author pengys5
*/
public class ServerNodeIndex extends AbstractIndex {
public class NodeIndex extends AbstractIndex {
private Logger logger = LogManager.getFormatterLogger(ServerNodeIndex.class);
public static final String Index = "server_node_idx";
private Logger logger = LogManager.getFormatterLogger(NodeIndex.class);
public static final String Index = "node_idx";
@Override
public String index() {
return Index;
}
@Override
public boolean isRecord() {
return false;
}
@Override
public XContentBuilder createMappingBuilder() throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
......
package com.a.eye.skywalking.collector.worker.node.analysis;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.worker.RecordAnalysisMember;
import com.a.eye.skywalking.collector.worker.storage.index.AbstractIndex;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.tag.Tags;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
/**
* @author pengys5
*/
public abstract class AbstractNodeAnalysis extends RecordAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(AbstractNodeAnalysis.class);
public AbstractNodeAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
public void analyseSpans(TraceSegment segment, long timeSlice) throws Exception {
List<Span> spanList = segment.getSpans();
if (spanList != null && spanList.size() > 0) {
for (Span span : spanList) {
JsonObject dataJsonObj = new JsonObject();
String kind = Tags.SPAN_KIND.get(span);
String layer = null;
String component = null;
String code = null;
if (Tags.SPAN_KIND_CLIENT.equals(kind)) {
layer = Tags.SPAN_LAYER.get(span);
component = Tags.COMPONENT.get(span);
code = Tags.PEERS.get(span) + "-" + component;
} else if (Tags.SPAN_KIND_SERVER.equals(kind)) {
code = segment.getApplicationCode();
layer = Tags.SPAN_LAYER.get(span);
component = Tags.COMPONENT.get(span);
} else {
logger.error("The span kind value is incorrect which segment record id is %s, the value must client or server", segment.getTraceSegmentId());
return;
}
dataJsonObj.addProperty("code", code);
dataJsonObj.addProperty("component", component);
dataJsonObj.addProperty("layer", layer);
dataJsonObj.addProperty("kind", kind);
dataJsonObj.addProperty(AbstractIndex.Time_Slice_Column_Name, timeSlice);
String id = timeSlice + "-" + code;
logger.debug("node: %s", dataJsonObj.toString());
setRecord(id, dataJsonObj);
}
}
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector.worker.node.analysis;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.node.persistence.NodeDayAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.trace.TraceSegment;
/**
* @author pengys5
*/
public class NodeDayAnalysis extends AbstractNodeAnalysis {
public NodeDayAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
analyseSpans(segment, segmentWithTimeSlice.getDay());
}
}
@Override
protected void aggregation() throws Exception {
RecordData oneRecord;
while ((oneRecord = pushOne()) != null) {
getClusterContext().lookup(NodeDayAgg.Role.INSTANCE).tell(oneRecord);
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeDayAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeDayAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new NodeDayAnalysis(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return 1024;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeDayAnalysis.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.node.analysis;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.node.persistence.NodeHourAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.trace.TraceSegment;
/**
* @author pengys5
*/
public class NodeHourAnalysis extends AbstractNodeAnalysis {
public NodeHourAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
analyseSpans(segment, segmentWithTimeSlice.getHour());
}
}
@Override
protected void aggregation() throws Exception {
RecordData oneRecord;
while ((oneRecord = pushOne()) != null) {
getClusterContext().lookup(NodeHourAgg.Role.INSTANCE).tell(oneRecord);
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeHourAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeHourAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new NodeHourAnalysis(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return 1024;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeHourAnalysis.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.applicationref;
package com.a.eye.skywalking.collector.worker.node.analysis;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.applicationref.analysis.DAGNodeRefAnalysis;
import com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver;
import com.a.eye.skywalking.collector.worker.node.persistence.NodeMinuteAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
import java.util.List;
/**
* @author pengys5
*/
public class ApplicationRefMain extends AbstractLocalSyncWorker {
private DAGNodeRefAnalysis dagNodeRefAnalysis;
public class NodeMinuteAnalysis extends AbstractNodeAnalysis {
public ApplicationRefMain(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
public NodeMinuteAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(DAGNodeRefAnalysis.Role.INSTANCE).create(this);
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
analyseSpans(segment, segmentWithTimeSlice.getMinute());
}
}
@Override
protected void onWork(Object request, Object response) throws Exception {
TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment = (TraceSegmentReceiver.TraceSegmentTimeSlice) request;
TraceSegment segment = traceSegment.getTraceSegment();
List<TraceSegmentRef> refs = segment.getRefs();
if (refs != null) {
for (TraceSegmentRef ref : refs) {
String front = ref.getApplicationCode();
String behind = segment.getApplicationCode();
DAGNodeRefAnalysis.Metric nodeRef = new DAGNodeRefAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), front, behind);
getSelfContext().lookup(DAGNodeRefAnalysis.Role.INSTANCE).tell(nodeRef);
}
protected void aggregation() throws Exception {
RecordData oneRecord;
while ((oneRecord = pushOne()) != null) {
getClusterContext().lookup(NodeMinuteAgg.Role.INSTANCE).tell(oneRecord);
}
}
public static class Factory extends AbstractLocalSyncWorkerProvider<ApplicationRefMain> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeMinuteAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
......@@ -52,8 +45,13 @@ public class ApplicationRefMain extends AbstractLocalSyncWorker {
}
@Override
public ApplicationRefMain workerInstance(ClusterWorkerContext clusterContext) {
return new ApplicationRefMain(role(), clusterContext, new LocalWorkerContext());
public NodeMinuteAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new NodeMinuteAnalysis(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return 1024;
}
}
......@@ -62,7 +60,7 @@ public class ApplicationRefMain extends AbstractLocalSyncWorker {
@Override
public String roleName() {
return ApplicationRefMain.class.getSimpleName();
return NodeMinuteAnalysis.class.getSimpleName();
}
@Override
......
package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class NodeDayAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeDayAgg.class);
public NodeDayAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(NodeDaySave.Role.INSTANCE).create(this);
}
@Override
protected void onWork(Object message) throws Exception {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeDaySave.Role.INSTANCE).tell(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractClusterWorkerProvider<NodeDayAgg> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeDayAgg workerInstance(ClusterWorkerContext clusterContext) {
return new NodeDayAgg(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int workerNum() {
return WorkerConfig.Worker.DAGNodeReceiver.Num;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeDayAgg.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.NodeIndex;
/**
* @author pengys5
*/
public class NodeDaySave extends RecordPersistenceMember {
public NodeDaySave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return NodeIndex.Index;
}
@Override
public String esType() {
return NodeIndex.Type_Day;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeDaySave> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeDaySave workerInstance(ClusterWorkerContext clusterContext) {
return new NodeDaySave(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.DAGNodePersistence.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeDaySave.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class NodeHourAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeHourAgg.class);
public NodeHourAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(NodeHourSave.Role.INSTANCE).create(this);
}
@Override
protected void onWork(Object message) throws Exception {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeHourSave.Role.INSTANCE).tell(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractClusterWorkerProvider<NodeHourAgg> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeHourAgg workerInstance(ClusterWorkerContext clusterContext) {
return new NodeHourAgg(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int workerNum() {
return WorkerConfig.Worker.DAGNodeReceiver.Num;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeHourAgg.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.NodeIndex;
/**
* @author pengys5
*/
public class NodeHourSave extends RecordPersistenceMember {
public NodeHourSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return NodeIndex.Index;
}
@Override
public String esType() {
return NodeIndex.Type_Hour;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeHourSave> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeHourSave workerInstance(ClusterWorkerContext clusterContext) {
return new NodeHourSave(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.DAGNodePersistence.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeHourSave.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class NodeMinuteAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeMinuteAgg.class);
public NodeMinuteAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(NodeMinuteSave.Role.INSTANCE).create(this);
}
@Override
protected void onWork(Object message) throws Exception {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeMinuteSave.Role.INSTANCE).tell(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractClusterWorkerProvider<NodeMinuteAgg> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeMinuteAgg workerInstance(ClusterWorkerContext clusterContext) {
return new NodeMinuteAgg(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int workerNum() {
return WorkerConfig.Worker.DAGNodeReceiver.Num;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeMinuteAgg.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.NodeIndex;
/**
* @author pengys5
*/
public class NodeMinuteSave extends RecordPersistenceMember {
public NodeMinuteSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return NodeIndex.Index;
}
@Override
public String esType() {
return NodeIndex.Type_Minute;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeMinuteSave> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeMinuteSave workerInstance(ClusterWorkerContext clusterContext) {
return new NodeMinuteSave(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.DAGNodePersistence.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeMinuteSave.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.storage.index;
package com.a.eye.skywalking.collector.worker.nodeinst;
import com.a.eye.skywalking.collector.worker.storage.index.AbstractIndex;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.xcontent.XContentBuilder;
......@@ -10,17 +11,22 @@ import java.io.IOException;
/**
* @author pengys5
*/
public class NodeInstanceIndex extends AbstractIndex {
public class NodeInstIndex extends AbstractIndex {
private static Logger logger = LogManager.getFormatterLogger(NodeInstanceIndex.class);
private static Logger logger = LogManager.getFormatterLogger(NodeInstIndex.class);
public static final String Index = "node_instance_idx";
public static final String Index = "node_inst_idx";
@Override
public String index() {
return Index;
}
@Override
public boolean isRecord() {
return false;
}
@Override
public XContentBuilder createMappingBuilder() throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
......
package com.a.eye.skywalking.collector.worker.nodeinst.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class NodeInstDayAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeInstDayAgg.class);
public NodeInstDayAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(NodeInstDaySave.Role.INSTANCE).create(this);
}
@Override
protected void onWork(Object message) throws Exception {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeInstDaySave.Role.INSTANCE).tell(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractClusterWorkerProvider<NodeInstDayAgg> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeInstDayAgg workerInstance(ClusterWorkerContext clusterContext) {
return new NodeInstDayAgg(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int workerNum() {
return WorkerConfig.Worker.NodeInstanceReceiver.Num;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeInstDayAgg.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.nodeinst.persistence;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.nodeinst.NodeInstIndex;
/**
* @author pengys5
*/
public class NodeInstDaySave extends RecordPersistenceMember {
public NodeInstDaySave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return NodeInstIndex.Index;
}
@Override
public String esType() {
return NodeInstIndex.Type_Day;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeInstDaySave> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeInstDaySave workerInstance(ClusterWorkerContext clusterContext) {
return new NodeInstDaySave(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.NodeInstancePersistence.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeInstDaySave.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.nodeinst.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class NodeInstHourAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeInstHourAgg.class);
public NodeInstHourAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(NodeInstHourSave.Role.INSTANCE).create(this);
}
@Override
protected void onWork(Object message) throws Exception {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeInstHourSave.Role.INSTANCE).tell(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractClusterWorkerProvider<NodeInstHourAgg> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeInstHourAgg workerInstance(ClusterWorkerContext clusterContext) {
return new NodeInstHourAgg(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int workerNum() {
return WorkerConfig.Worker.NodeInstanceReceiver.Num;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeInstHourAgg.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.nodeinst.persistence;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.nodeinst.NodeInstIndex;
/**
* @author pengys5
*/
public class NodeInstHourSave extends RecordPersistenceMember {
public NodeInstHourSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return NodeInstIndex.Index;
}
@Override
public String esType() {
return NodeInstIndex.Type_Hour;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeInstHourSave> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeInstHourSave workerInstance(ClusterWorkerContext clusterContext) {
return new NodeInstHourSave(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.NodeInstancePersistence.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeInstHourSave.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.nodeinst.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class NodeInstMinuteAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeInstMinuteAgg.class);
public NodeInstMinuteAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(NodeInstMinuteSave.Role.INSTANCE).create(this);
}
@Override
protected void onWork(Object message) throws Exception {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeInstMinuteSave.Role.INSTANCE).tell(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractClusterWorkerProvider<NodeInstMinuteAgg> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeInstMinuteAgg workerInstance(ClusterWorkerContext clusterContext) {
return new NodeInstMinuteAgg(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int workerNum() {
return WorkerConfig.Worker.NodeInstanceReceiver.Num;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeInstMinuteAgg.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.nodeinst.persistence;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.nodeinst.NodeInstIndex;
/**
* @author pengys5
*/
public class NodeInstMinuteSave extends RecordPersistenceMember {
public NodeInstMinuteSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return NodeInstIndex.Index;
}
@Override
public String esType() {
return NodeInstIndex.Type_Minute;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeInstMinuteSave> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeInstMinuteSave workerInstance(ClusterWorkerContext clusterContext) {
return new NodeInstMinuteSave(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.NodeInstancePersistence.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeInstMinuteSave.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.storage.index;
package com.a.eye.skywalking.collector.worker.noderef;
import com.a.eye.skywalking.collector.worker.storage.index.AbstractIndex;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.xcontent.XContentBuilder;
......@@ -21,16 +22,21 @@ public class NodeRefIndex extends AbstractIndex {
return Index;
}
@Override
public boolean isRecord() {
return false;
}
@Override
public XContentBuilder createMappingBuilder() throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject("client")
.startObject("front")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject("server")
.startObject("behind")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
......
package com.a.eye.skywalking.collector.worker.noderef.analysis;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.worker.RecordAnalysisMember;
import com.a.eye.skywalking.collector.worker.storage.index.AbstractIndex;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
import com.a.eye.skywalking.trace.tag.Tags;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
/**
* @author pengys5
*/
public abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(AbstractNodeRefAnalysis.class);
public AbstractNodeRefAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
void analyseRefs(TraceSegment segment, long timeSlice) throws Exception {
List<TraceSegmentRef> segmentRefList = segment.getRefs();
if (segmentRefList != null && segmentRefList.size() > 0) {
for (TraceSegmentRef segmentRef : segmentRefList) {
String front = segmentRef.getApplicationCode();
String behind = segment.getApplicationCode();
String id = timeSlice + "-" + front + "-" + behind;
JsonObject dataJsonObj = new JsonObject();
dataJsonObj.addProperty("front", front);
dataJsonObj.addProperty("behind", behind);
dataJsonObj.addProperty(AbstractIndex.Time_Slice_Column_Name, timeSlice);
logger.debug("dag node ref: %s", dataJsonObj.toString());
setRecord(id, dataJsonObj);
}
}
}
void analyseSpans(TraceSegment segment, long timeSlice) throws Exception {
List<Span> spanList = segment.getSpans();
if (spanList != null && spanList.size() > 0) {
for (Span span : spanList) {
if (Tags.SPAN_KIND_CLIENT.equals(Tags.SPAN_KIND.get(span))) {
JsonObject dataJsonObj = new JsonObject();
String front = segment.getApplicationCode();
dataJsonObj.addProperty("front", front);
String component = Tags.COMPONENT.get(span);
String peers = Tags.PEERS.get(span);
String behind = component + "-" + peers;
dataJsonObj.addProperty("behind", behind);
dataJsonObj.addProperty(AbstractIndex.Time_Slice_Column_Name, timeSlice);
String id = timeSlice + "-" + front + "-" + behind;
logger.debug("dag node ref: %s", dataJsonObj.toString());
setRecord(id, dataJsonObj);
}
}
}
}
}
package com.a.eye.skywalking.collector.worker.applicationref.analysis;
package com.a.eye.skywalking.collector.worker.noderef.analysis;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordAnalysisMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.applicationref.receiver.DAGNodeRefReceiver;
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefDayAgg;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.DateTools;
import com.google.gson.JsonObject;
import com.a.eye.skywalking.trace.TraceSegment;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class DAGNodeRefAnalysis extends RecordAnalysisMember {
public class NodeRefDayAnalysis extends AbstractNodeRefAnalysis {
private Logger logger = LogManager.getFormatterLogger(DAGNodeRefAnalysis.class);
private Logger logger = LogManager.getFormatterLogger(NodeRefDayAnalysis.class);
public DAGNodeRefAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
public NodeRefDayAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof Metric) {
Metric metric = (Metric) message;
JsonObject propertyJsonObj = new JsonObject();
propertyJsonObj.addProperty("frontCode", metric.frontCode);
propertyJsonObj.addProperty("behindCode", metric.behindCode);
propertyJsonObj.addProperty(DateTools.Time_Slice_Column_Name, metric.getMinute());
String id = metric.getMinute() + "-" + metric.frontCode + "-" + metric.behindCode;
setRecord(id, propertyJsonObj);
logger.debug("dag node ref: %s", propertyJsonObj.toString());
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
analyseRefs(segment, segmentWithTimeSlice.getDay());
analyseSpans(segment, segmentWithTimeSlice.getDay());
}
}
......@@ -45,11 +38,11 @@ public class DAGNodeRefAnalysis extends RecordAnalysisMember {
protected void aggregation() throws Exception {
RecordData oneRecord;
while ((oneRecord = pushOne()) != null) {
getClusterContext().lookup(DAGNodeRefReceiver.Role.INSTANCE).tell(oneRecord);
getClusterContext().lookup(NodeRefDayAgg.Role.INSTANCE).tell(oneRecord);
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<DAGNodeRefAnalysis> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefDayAnalysis> {
public static Factory INSTANCE = new Factory();
......@@ -59,8 +52,8 @@ public class DAGNodeRefAnalysis extends RecordAnalysisMember {
}
@Override
public DAGNodeRefAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new DAGNodeRefAnalysis(role(), clusterContext, new LocalWorkerContext());
public NodeRefDayAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefDayAnalysis(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -74,23 +67,12 @@ public class DAGNodeRefAnalysis extends RecordAnalysisMember {
@Override
public String roleName() {
return DAGNodeRefAnalysis.class.getSimpleName();
return NodeRefDayAnalysis.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
public static class Metric extends AbstractTimeSlice {
private final String frontCode;
private final String behindCode;
public Metric(long minute, int second, String frontCode, String behindCode) {
super(minute, second);
this.frontCode = frontCode;
this.behindCode = behindCode;
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.noderef.analysis;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefHourAgg;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.trace.TraceSegment;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class NodeRefHourAnalysis extends AbstractNodeRefAnalysis {
private Logger logger = LogManager.getFormatterLogger(NodeRefHourAnalysis.class);
public NodeRefHourAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
analyseRefs(segment, segmentWithTimeSlice.getHour());
analyseSpans(segment, segmentWithTimeSlice.getHour());
}
}
@Override
protected void aggregation() throws Exception {
RecordData oneRecord;
while ((oneRecord = pushOne()) != null) {
getClusterContext().lookup(NodeRefHourAgg.Role.INSTANCE).tell(oneRecord);
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefHourAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeRefHourAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefHourAnalysis(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.DAGNodeRefAnalysis.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeRefHourAnalysis.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.noderef.analysis;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefMinuteAgg;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.trace.TraceSegment;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class NodeRefMinuteAnalysis extends AbstractNodeRefAnalysis {
private Logger logger = LogManager.getFormatterLogger(NodeRefMinuteAnalysis.class);
public NodeRefMinuteAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
analyseRefs(segment, segmentWithTimeSlice.getMinute());
analyseSpans(segment, segmentWithTimeSlice.getMinute());
}
}
@Override
protected void aggregation() throws Exception {
RecordData oneRecord;
while ((oneRecord = pushOne()) != null) {
getClusterContext().lookup(NodeRefMinuteAgg.Role.INSTANCE).tell(oneRecord);
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefMinuteAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeRefMinuteAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefMinuteAnalysis(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.DAGNodeRefAnalysis.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeRefMinuteAnalysis.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.applicationref.receiver;
package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.applicationref.persistence.DAGNodeRefPersistence;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -12,31 +11,29 @@ import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class DAGNodeRefReceiver extends AbstractClusterWorker {
public class NodeRefDayAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(DAGNodeRefReceiver.class);
private Logger logger = LogManager.getFormatterLogger(NodeRefDayAgg.class);
private DAGNodeRefPersistence persistence;
public DAGNodeRefReceiver(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
public NodeRefDayAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(DAGNodeRefPersistence.Role.INSTANCE).create(this);
getClusterContext().findProvider(NodeRefDaySave.Role.INSTANCE).create(this);
}
@Override
protected void onWork(Object message) throws Exception {
if (message instanceof RecordData) {
getSelfContext().lookup(DAGNodeRefPersistence.Role.INSTANCE).tell(message);
getSelfContext().lookup(NodeRefDaySave.Role.INSTANCE).tell(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractClusterWorkerProvider<DAGNodeRefReceiver> {
public static class Factory extends AbstractClusterWorkerProvider<NodeRefDayAgg> {
public static Factory INSTANCE = new Factory();
@Override
......@@ -45,8 +42,8 @@ public class DAGNodeRefReceiver extends AbstractClusterWorker {
}
@Override
public DAGNodeRefReceiver workerInstance(ClusterWorkerContext clusterContext) {
return new DAGNodeRefReceiver(role(), clusterContext, new LocalWorkerContext());
public NodeRefDayAgg workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefDayAgg(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -60,12 +57,12 @@ public class DAGNodeRefReceiver extends AbstractClusterWorker {
@Override
public String roleName() {
return DAGNodeRefReceiver.class.getSimpleName();
return NodeRefDayAgg.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
/**
* @author pengys5
*/
public class NodeRefDaySave extends RecordPersistenceMember {
public NodeRefDaySave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return NodeRefIndex.Index;
}
@Override
public String esType() {
return NodeRefIndex.Type_Day;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefDaySave> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeRefDaySave workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefDaySave(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.DAGNodeRefPersistence.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeRefDaySave.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class NodeRefHourAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefHourAgg.class);
public NodeRefHourAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(NodeRefHourSave.Role.INSTANCE).create(this);
}
@Override
protected void onWork(Object message) throws Exception {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeRefHourSave.Role.INSTANCE).tell(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractClusterWorkerProvider<NodeRefHourAgg> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeRefHourAgg workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefHourAgg(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int workerNum() {
return WorkerConfig.Worker.DAGNodeRefReceiver.Num;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeRefHourAgg.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
/**
* @author pengys5
*/
public class NodeRefHourSave extends RecordPersistenceMember {
public NodeRefHourSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return NodeRefIndex.Index;
}
@Override
public String esType() {
return NodeRefIndex.Type_Hour;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefHourSave> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeRefHourSave workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefHourSave(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.DAGNodeRefPersistence.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeRefHourSave.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class NodeRefMinuteAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefMinuteAgg.class);
public NodeRefMinuteAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(NodeRefMinuteSave.Role.INSTANCE).create(this);
}
@Override
protected void onWork(Object message) throws Exception {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeRefMinuteSave.Role.INSTANCE).tell(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractClusterWorkerProvider<NodeRefMinuteAgg> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeRefMinuteAgg workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefMinuteAgg(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int workerNum() {
return WorkerConfig.Worker.DAGNodeRefReceiver.Num;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeRefMinuteAgg.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.applicationref.persistence;
package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
/**
* @author pengys5
*/
public class DAGNodeRefPersistence extends RecordPersistenceMember {
public class NodeRefMinuteSave extends RecordPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(DAGNodeRefPersistence.class);
public DAGNodeRefPersistence(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
public NodeRefMinuteSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return "node_ref";
return NodeRefIndex.Index;
}
@Override
public String esType() {
return "node_ref";
return NodeRefIndex.Type_Minute;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<DAGNodeRefPersistence> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefMinuteSave> {
public static Factory INSTANCE = new Factory();
......@@ -41,8 +39,8 @@ public class DAGNodeRefPersistence extends RecordPersistenceMember {
}
@Override
public DAGNodeRefPersistence workerInstance(ClusterWorkerContext clusterContext) {
return new DAGNodeRefPersistence(role(), clusterContext, new LocalWorkerContext());
public NodeRefMinuteSave workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefMinuteSave(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -56,12 +54,12 @@ public class DAGNodeRefPersistence extends RecordPersistenceMember {
@Override
public String roleName() {
return DAGNodeRefPersistence.class.getSimpleName();
return NodeRefMinuteSave.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.receiver;
/**
* @author pengys5
*/
public class TraceSegmentKeyMapping {
public static class TraceSegment {
public static String traceSegmentId = "ts";
}
}
package com.a.eye.skywalking.collector.worker.receiver;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.worker.application.ApplicationMain;
import com.a.eye.skywalking.collector.worker.applicationref.ApplicationRefMain;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractReceiver;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractReceiverProvider;
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.tools.DateTools;
import com.a.eye.skywalking.trace.TraceSegment;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class TraceSegmentReceiver extends AbstractReceiver {
private Logger logger = LogManager.getFormatterLogger(TraceSegmentReceiver.class);
public TraceSegmentReceiver(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(ApplicationMain.Role.INSTANCE).create(this);
getClusterContext().findProvider(ApplicationRefMain.Role.INSTANCE).create(this);
}
@Override
protected void onReceive(JsonObject request) throws Exception {
logger.debug("receive message instanceof TraceSegment, traceSegmentId is %s", request.get("ts"));
// long timeSlice = DateTools.timeStampToTimeSlice(traceSegment.getStartTime());
// int second = DateTools.timeStampToSecond(traceSegment.getStartTime());
//
// TraceSegmentTimeSlice segmentTimeSlice = new TraceSegmentTimeSlice(timeSlice, second, traceSegment);
// getSelfContext().lookup(ApplicationMain.Role.INSTANCE).tell(segmentTimeSlice);
// getSelfContext().lookup(ApplicationRefMain.Role.INSTANCE).tell(segmentTimeSlice);
}
public static class Factory extends AbstractReceiverProvider<TraceSegmentReceiver> {
public static Factory INSTANCE = new Factory();
@Override
public String servletPath() {
return "/receiver/traceSegment";
}
@Override
public int queueSize() {
return 128;
}
}
public static class TraceSegmentTimeSlice extends AbstractTimeSlice {
private final TraceSegment traceSegment;
public TraceSegmentTimeSlice(long timeSliceMinute, int second, TraceSegment traceSegment) {
super(timeSliceMinute, second);
this.traceSegment = traceSegment;
}
public TraceSegment getTraceSegment() {
return traceSegment;
}
}
}
package com.a.eye.skywalking.collector.worker.segment;
import com.a.eye.skywalking.collector.worker.storage.index.AbstractIndex;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.io.IOException;
/**
* @author pengys5
*/
public class SegmentIndex extends AbstractIndex {
private Logger logger = LogManager.getFormatterLogger(SegmentIndex.class);
public static final String Index = "segment_idx";
@Override
public String index() {
return Index;
}
@Override
public boolean isRecord() {
return true;
}
@Override
public XContentBuilder createMappingBuilder() throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject("traceSegmentId")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject("startTime")
.field("type", "date")
.field("index", "not_analyzed")
.endObject()
.startObject("endTime")
.field("type", "date")
.field("index", "not_analyzed")
.endObject()
.startObject("applicationCode")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject("minute")
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject("hour")
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject("day")
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startArray("refs")
.startObject("traceSegmentId")
.field("type", "String")
.field("index", "not_analyzed")
.endObject()
.startObject("spanId")
.field("type", "integer")
.field("index", "not_analyzed")
.endObject()
.startObject("applicationCode")
.field("type", "String")
.field("index", "not_analyzed")
.endObject()
.startObject("peerHost")
.field("type", "String")
.field("index", "not_analyzed")
.endObject()
.endArray()
.startArray("refs")
.startObject("spanId")
.field("type", "integer")
.field("index", "not_analyzed")
.endObject()
.startObject("parentSpanId")
.field("type", "integer")
.field("index", "not_analyzed")
.endObject()
.startObject("startTime")
.field("type", "date")
.field("index", "not_analyzed")
.endObject()
.startObject("endTime")
.field("type", "date")
.field("index", "not_analyzed")
.endObject()
.startObject("operationName")
.field("type", "String")
.field("index", "not_analyzed")
.endObject()
.endArray()
.startArray("relatedGlobalTraces")
.startObject("id")
.field("type", "String")
.field("index", "not_analyzed")
.endObject()
.endArray()
.endObject()
.endObject();
return mappingBuilder;
}
}
package com.a.eye.skywalking.collector.worker.segment;
import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractPost;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractPostProvider;
import com.a.eye.skywalking.collector.worker.node.analysis.NodeDayAnalysis;
import com.a.eye.skywalking.collector.worker.node.analysis.NodeHourAnalysis;
import com.a.eye.skywalking.collector.worker.node.analysis.NodeMinuteAnalysis;
import com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefDayAnalysis;
import com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefHourAnalysis;
import com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefMinuteAnalysis;
import com.a.eye.skywalking.collector.worker.segment.persistence.SegmentSave;
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.tools.DateTools;
import com.a.eye.skywalking.trace.TraceSegment;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class SegmentPost extends AbstractPost {
private Logger logger = LogManager.getFormatterLogger(SegmentPost.class);
private Gson gson = new Gson();
public SegmentPost(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(SegmentSave.Role.INSTANCE).create(this);
getClusterContext().findProvider(NodeRefMinuteAnalysis.Role.INSTANCE).create(this);
getClusterContext().findProvider(NodeRefHourAnalysis.Role.INSTANCE).create(this);
getClusterContext().findProvider(NodeRefDayAnalysis.Role.INSTANCE).create(this);
getClusterContext().findProvider(NodeMinuteAnalysis.Role.INSTANCE).create(this);
getClusterContext().findProvider(NodeHourAnalysis.Role.INSTANCE).create(this);
getClusterContext().findProvider(NodeDayAnalysis.Role.INSTANCE).create(this);
}
@Override
protected void onReceive(String reqJsonStr) throws Exception {
TraceSegment newSegment = gson.fromJson(reqJsonStr, TraceSegment.class);
validateData(newSegment);
logger.debug("receive message instanceof TraceSegment, traceSegmentId is %s", newSegment.getTraceSegmentId());
long minuteSlice = DateTools.getMinuteSlice(newSegment.getStartTime());
long hourSlice = DateTools.getHourSlice(newSegment.getStartTime());
long daySlice = DateTools.getDaySlice(newSegment.getStartTime());
int second = DateTools.getSecond(newSegment.getStartTime());
SegmentWithTimeSlice segmentWithTimeSlice = new SegmentWithTimeSlice(newSegment, minuteSlice, hourSlice, daySlice, second);
tellSegmentSave(reqJsonStr, daySlice, hourSlice, minuteSlice);
tellNodeRef(segmentWithTimeSlice);
tellNode(segmentWithTimeSlice);
}
private void tellSegmentSave(String reqJsonStr, long day, long hour, long minute) throws Exception {
JsonObject newSegmentJson = gson.fromJson(reqJsonStr, JsonObject.class);
newSegmentJson.addProperty("minute", minute);
newSegmentJson.addProperty("hour", hour);
newSegmentJson.addProperty("day", day);
getSelfContext().lookup(SegmentSave.Role.INSTANCE).tell(newSegmentJson);
}
private void tellNodeRef(SegmentWithTimeSlice segmentWithTimeSlice) throws Exception {
getSelfContext().lookup(NodeRefMinuteAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
getSelfContext().lookup(NodeRefHourAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
getSelfContext().lookup(NodeRefDayAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
}
private void tellNode(SegmentWithTimeSlice segmentWithTimeSlice) throws Exception {
getSelfContext().lookup(NodeMinuteAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
getSelfContext().lookup(NodeHourAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
getSelfContext().lookup(NodeDayAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
}
private void validateData(TraceSegment newSegment) {
if (StringUtil.isEmpty(newSegment.getTraceSegmentId())) {
throw new IllegalArgumentException("traceSegmentId required");
}
if (0 == newSegment.getStartTime()) {
throw new IllegalArgumentException("startTime required");
}
}
public static class Factory extends AbstractPostProvider<SegmentPost> {
public static Factory INSTANCE = new Factory();
@Override
public String servletPath() {
return "/segment";
}
@Override
public int queueSize() {
return 128;
}
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public SegmentPost workerInstance(ClusterWorkerContext clusterContext) {
return new SegmentPost(role(), clusterContext, new LocalWorkerContext());
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return SegmentPost.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
public static class SegmentWithTimeSlice extends AbstractTimeSlice {
private final TraceSegment traceSegment;
public SegmentWithTimeSlice(TraceSegment traceSegment, long minute, long hour, long day, int second) {
super(minute, hour, day, second);
this.traceSegment = traceSegment;
}
public TraceSegment getTraceSegment() {
return traceSegment;
}
}
}
package com.a.eye.skywalking.collector.worker.segment.persistence;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.storage.index.AbstractIndex;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class SegmentSave extends RecordPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(SegmentSave.class);
@Override
public String esIndex() {
return SegmentIndex.Index;
}
@Override
public String esType() {
return AbstractIndex.Type_Record;
}
public SegmentSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof JsonObject) {
JsonObject segmentJson = (JsonObject) message;
RecordData recordData = new RecordData(segmentJson.get("ts").getAsString());
recordData.setRecord(segmentJson);
super.analyse(recordData);
} else {
logger.error("unhandled message, message instance must JsonObject, but is %s", message.getClass().toString());
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<SegmentSave> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public int queueSize() {
return WorkerConfig.Queue.TraceSegmentRecordAnalysis.Size;
}
@Override
public SegmentSave workerInstance(ClusterWorkerContext clusterContext) {
return new SegmentSave(role(), clusterContext, new LocalWorkerContext());
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return SegmentSave.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
......@@ -5,10 +5,14 @@ package com.a.eye.skywalking.collector.worker.storage;
*/
public abstract class AbstractTimeSlice {
private final long minute;
private final long hour;
private final long day;
private final int second;
public AbstractTimeSlice(long minute, int second) {
public AbstractTimeSlice(long minute, long hour, long day, int second) {
this.minute = minute;
this.hour = hour;
this.day = day;
this.second = second;
}
......@@ -16,6 +20,14 @@ public abstract class AbstractTimeSlice {
return minute;
}
public long getHour() {
return hour;
}
public long getDay() {
return day;
}
public int getSecond() {
return second;
}
......
......@@ -5,6 +5,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
......@@ -24,6 +25,10 @@ public abstract class AbstractIndex {
public static final String Type_Hour = "hour";
public static final String Type_Day = "day";
public static final String Type_Record = "record";
public static final String Time_Slice_Column_Name = "timeSlice";
final public XContentBuilder createSettingBuilder() throws IOException {
XContentBuilder settingsBuilder = XContentFactory.jsonBuilder()
.startObject()
......@@ -33,6 +38,8 @@ public abstract class AbstractIndex {
return settingsBuilder;
}
public abstract boolean isRecord();
public abstract XContentBuilder createMappingBuilder() throws IOException;
final public void createIndex() {
......@@ -53,11 +60,18 @@ public abstract class AbstractIndex {
}
Settings settings = Settings.builder().loadFromSource(settingSource).build();
IndicesAdminClient client = EsClient.getClient().admin().indices();
CreateIndexResponse response = client.prepareCreate(index()).setSettings(settings).addMapping(Type_Minute, mappingBuilder).get();
client.preparePutMapping(index()).setType(Type_Hour).setSource(mappingBuilder).get();
client.preparePutMapping(index()).setType(Type_Day).setSource(mappingBuilder).get();
logger.info("create %s index with type of %s finished, isAcknowledged: %s", index(), "aaa", response.isAcknowledged());
if (isRecord()) {
CreateIndexResponse response = client.prepareCreate(index()).setSettings(settings).addMapping(Type_Record, mappingBuilder).get();
logger.info("create %s index with type of %s finished, isAcknowledged: %s", index(), Type_Record, response.isAcknowledged());
} else {
CreateIndexResponse response = client.prepareCreate(index()).setSettings(settings).addMapping(Type_Minute, mappingBuilder).get();
logger.info("create %s index with type of %s finished, isAcknowledged: %s", index(), Type_Minute, response.isAcknowledged());
PutMappingResponse putMappingResponse = client.preparePutMapping(index()).setType(Type_Hour).setSource(mappingBuilder).get();
logger.info("create %s index with type of %s finished, isAcknowledged: %s", index(), Type_Hour, putMappingResponse.isAcknowledged());
putMappingResponse = client.preparePutMapping(index()).setType(Type_Day).setSource(mappingBuilder).get();
logger.info("create %s index with type of %s finished, isAcknowledged: %s", index(), Type_Day, putMappingResponse.isAcknowledged());
}
}
final public boolean deleteIndex() {
......
......@@ -21,6 +21,11 @@ public class ClientNodeIndex extends AbstractIndex {
return Index;
}
@Override
public boolean isRecord() {
return false;
}
@Override
public XContentBuilder createMappingBuilder() throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
......
......@@ -8,20 +8,34 @@ import java.util.Calendar;
*/
public class DateTools {
private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
private static final SimpleDateFormat dayDateFormat = new SimpleDateFormat("yyyyMMdd");
private static final SimpleDateFormat hourDateFormat = new SimpleDateFormat("yyyyMMddHH");
private static final SimpleDateFormat minuteDateFormat = new SimpleDateFormat("yyyyMMddHHmm");
public static final String Time_Slice_Column_Name = "timeSlice";
public static int timeStampToSecond(long time) {
public static int getSecond(long time) {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(time);
return calendar.get(Calendar.SECOND);
}
public static long timeStampToTimeSlice(long time) {
public static long getMinuteSlice(long time) {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(time);
String timeStr = minuteDateFormat.format(calendar.getTime());
return Long.valueOf(timeStr);
}
public static long getHourSlice(long time) {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(time);
String timeStr = hourDateFormat.format(calendar.getTime()) + "00";
return Long.valueOf(timeStr);
}
public static long getDaySlice(long time) {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(time);
String timeStr = sdf.format(calendar.getTime());
String timeStr = dayDateFormat.format(calendar.getTime()) + "0000";
return Long.valueOf(timeStr);
}
}
package com.a.eye.skywalking.collector.worker.tools;
import org.apache.http.Consts;
import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.net.URI;
import java.util.List;
/**
* @author pengys5
*/
public enum HttpClientTools {
INSTANCE;
private Logger logger = LogManager.getFormatterLogger(HttpClientTools.class);
public String get(String url, List<NameValuePair> params) throws IOException {
CloseableHttpClient httpClient = HttpClients.createDefault();
try {
HttpGet httpget = new HttpGet(url);
String paramStr = EntityUtils.toString(new UrlEncodedFormEntity(params));
httpget.setURI(new URI(httpget.getURI().toString() + "?" + paramStr));
logger.debug("executing get request %s", httpget.getURI());
try (CloseableHttpResponse response = httpClient.execute(httpget)) {
HttpEntity entity = response.getEntity();
System.out.println(response.getStatusLine());
if (entity != null) {
return EntityUtils.toString(entity);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
httpClient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return null;
}
public String post(String url, String data) throws IOException {
CloseableHttpClient httpClient = HttpClients.createDefault();
try {
HttpPost httppost = new HttpPost(url);
httppost.setEntity(new StringEntity(data, Consts.UTF_8));
logger.debug("executing post request %s", httppost.getURI());
try (CloseableHttpResponse response = httpClient.execute(httppost)) {
HttpEntity entity = response.getEntity();
if (entity != null) {
return EntityUtils.toString(entity);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
httpClient.close();
} catch (Exception e) {
e.printStackTrace();
}
}
return null;
}
}
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefDayAgg$Factory
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefHourAgg$Factory
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefMinuteAgg$Factory
com.a.eye.skywalking.collector.worker.node.persistence.NodeDayAgg$Factory
com.a.eye.skywalking.collector.worker.node.persistence.NodeHourAgg$Factory
com.a.eye.skywalking.collector.worker.node.persistence.NodeMinuteAgg$Factory
com.a.eye.skywalking.collector.worker.nodeinst.persistence.NodeInstDayAgg$Factory
com.a.eye.skywalking.collector.worker.nodeinst.persistence.NodeInstHourAgg$Factory
com.a.eye.skywalking.collector.worker.nodeinst.persistence.NodeInstMinuteAgg$Factory
\ No newline at end of file
com.a.eye.skywalking.collector.worker.dagnode.persistence.ClientNodeSearchPersistence$Factory
com.a.eye.skywalking.collector.worker.dagnode.persistence.NodeInstanceSearchPersistence$Factory
com.a.eye.skywalking.collector.worker.dagnode.persistence.NodeRefSearchPersistence$Factory
com.a.eye.skywalking.collector.worker.dagnode.persistence.ServerNodeSearchPersistence$Factory
\ No newline at end of file
com.a.eye.skywalking.collector.worker.segment.persistence.SegmentSave$Factory
com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefDayAnalysis$Factory
com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefHourAnalysis$Factory
com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefMinuteAnalysis$Factory
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefDaySave$Factory
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefHourSave$Factory
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefMinuteSave$Factory
com.a.eye.skywalking.collector.worker.node.analysis.NodeDayAnalysis$Factory
com.a.eye.skywalking.collector.worker.node.analysis.NodeHourAnalysis$Factory
com.a.eye.skywalking.collector.worker.node.analysis.NodeMinuteAnalysis$Factory
com.a.eye.skywalking.collector.worker.node.persistence.NodeDaySave$Factory
com.a.eye.skywalking.collector.worker.node.persistence.NodeHourSave$Factory
com.a.eye.skywalking.collector.worker.node.persistence.NodeMinuteSave$Factory
com.a.eye.skywalking.collector.worker.nodeinst.persistence.NodeInstDaySave$Factory
com.a.eye.skywalking.collector.worker.nodeinst.persistence.NodeInstHourSave$Factory
com.a.eye.skywalking.collector.worker.nodeinst.persistence.NodeInstMinuteSave$Factory
\ No newline at end of file
com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver$Factory
com.a.eye.skywalking.collector.worker.application.receiver.DAGNodeReceiver$Factory
com.a.eye.skywalking.collector.worker.application.receiver.NodeInstanceReceiver$Factory
com.a.eye.skywalking.collector.worker.application.receiver.ResponseCostReceiver$Factory
com.a.eye.skywalking.collector.worker.application.receiver.ResponseSummaryReceiver$Factory
com.a.eye.skywalking.collector.worker.applicationref.receiver.DAGNodeRefReceiver$Factory
\ No newline at end of file
com.a.eye.skywalking.collector.worker.dagnode.searcher.ClientNodeWithTimeSliceSearcher$Factory
com.a.eye.skywalking.collector.worker.dagnode.searcher.NodeInstanceWithTimeSliceSearcher$Factory
com.a.eye.skywalking.collector.worker.dagnode.searcher.NodeRefWithTimeSliceSearcher$Factory
com.a.eye.skywalking.collector.worker.dagnode.searcher.ServerNodeWithTimeSliceSearcher$Factory
cluster.current.hostname=127.0.0.1
cluster.current.port=1000
cluster.current.roles=[WorkersListener, TraceSegmentReceiver, NodeInstancePersistence]
cluster.seed_nodes=["akka.tcp://CollectorSystem@127.0.0.1:1000", "akka.tcp://CollectorSystem@127.0.0.1:1001", "akka.tcp://CollectorSystem@127.0.0.1:1002"]
cluster.current.roles=[WorkersListener]
#cluster.seed_nodes=["akka.tcp://CollectorSystem@127.0.0.1:1000"]
......@@ -5,13 +5,10 @@ import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import com.a.eye.skywalking.collector.cluster.ClusterConfig;
import com.a.eye.skywalking.collector.cluster.ClusterConfigInitializer;
import com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.sniffer.mock.trace.TraceSegmentBuilderFactory;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.proto.SegmentMessage;
import com.a.eye.skywalking.trace.proto.SegmentRefMessage;
import com.a.eye.skywalking.trace.tag.Tags;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
......@@ -21,7 +18,7 @@ import com.typesafe.config.ConfigFactory;
public class StartUpTestCase {
public void test() throws Exception {
System.out.println(TraceSegmentReceiver.class.getSimpleName());
System.out.println(SegmentPost.class.getSimpleName());
ClusterConfigInitializer.initialize("collector.config");
System.out.println(ClusterConfig.Cluster.Current.roles);
......@@ -36,25 +33,25 @@ public class StartUpTestCase {
TraceSegment dubboClientData = TraceSegmentBuilderFactory.INSTANCE.traceOf_Tomcat_DubboClient();
SegmentMessage.Builder clientBuilder = dubboClientData.serialize().toBuilder();
clientBuilder.setApplicationCode("Tomcat_DubboClient");
dubboClientData = new TraceSegment(clientBuilder.build());
TraceSegment dubboServerData = TraceSegmentBuilderFactory.INSTANCE.traceOf_DubboServer_MySQL();
SegmentMessage serializeServer = dubboServerData.serialize();
SegmentMessage.Builder builder = serializeServer.toBuilder();
SegmentRefMessage.Builder builderRef = builder.getRefs(0).toBuilder();
builderRef.setApplicationCode(dubboClientData.getApplicationCode());
builderRef.setPeerHost(Tags.PEER_HOST.get(dubboClientData.getSpans().get(1)));
builder.setApplicationCode("DubboServer_MySQL");
builder.addRefs(builderRef);
dubboServerData = new TraceSegment(builder.build());
// SegmentMessage.Builder clientBuilder = dubboClientData.serialize().toBuilder();
// clientBuilder.setApplicationCode("Tomcat_DubboClient");
//
// dubboClientData = new TraceSegment(clientBuilder.build());
//
// TraceSegment dubboServerData = TraceSegmentBuilderFactory.INSTANCE.traceOf_DubboServer_MySQL();
//
// SegmentMessage serializeServer = dubboServerData.serialize();
// SegmentMessage.Builder builder = serializeServer.toBuilder();
//
// SegmentRefMessage.Builder builderRef = builder.getRefs(0).toBuilder();
// builderRef.setApplicationCode(dubboClientData.getApplicationCode());
//
//
// builderRef.setPeerHost(Tags.PEER_HOST.get(dubboClientData.getSpans().get(1)));
//
// builder.setApplicationCode("DubboServer_MySQL");
// builder.addRefs(builderRef);
// dubboServerData = new TraceSegment(builder.build());
Thread.sleep(5000);
......@@ -62,7 +59,7 @@ public class StartUpTestCase {
for (int i = 0; i < 100; i++) {
selection.tell(dubboClientData, ActorRef.noSender());
selection.tell(dubboServerData, ActorRef.noSender());
// selection.tell(dubboServerData, ActorRef.noSender());
Thread.sleep(200);
}
......
......@@ -3,7 +3,7 @@ package com.a.eye.skywalking.collector.worker.dagnode.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.LocalSyncWorkerRef;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.index.NodeInstanceIndex;
import com.a.eye.skywalking.collector.worker.nodeinst.NodeInstIndex;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.elasticsearch.action.index.IndexResponse;
......@@ -23,23 +23,23 @@ public class NodeInstanceTestCase {
@Before
public void initIndex() throws UnknownHostException {
EsClient.boot();
NodeInstanceIndex index = new NodeInstanceIndex();
NodeInstIndex index = new NodeInstIndex();
index.deleteIndex();
index.createIndex();
}
@Test
public void testLoadNodeInstance() throws Exception {
loadNodeInstance(201703101201l, NodeInstanceIndex.Type_Minute);
loadNodeInstance(201703101200l, NodeInstanceIndex.Type_Hour);
loadNodeInstance(201703100000l, NodeInstanceIndex.Type_Day);
loadNodeInstance(201703101201l, NodeInstIndex.Type_Minute);
loadNodeInstance(201703101200l, NodeInstIndex.Type_Hour);
loadNodeInstance(201703100000l, NodeInstIndex.Type_Day);
}
public void loadNodeInstance(long timeSlice, String type) throws Exception {
LocalSyncWorkerRef workerRef = (LocalSyncWorkerRef) NodeInstanceSearchPersistence.Factory.INSTANCE.create(AbstractWorker.noOwner());
insertData(timeSlice, type);
EsClient.indexRefresh(NodeInstanceIndex.Index);
EsClient.indexRefresh(NodeInstIndex.Index);
NodeInstanceSearchPersistence.RequestEntity requestEntity = new NodeInstanceSearchPersistence.RequestEntity(type, timeSlice);
JsonObject resJsonObj = new JsonObject();
......@@ -58,7 +58,7 @@ public class NodeInstanceTestCase {
json.put("timeSlice", timeSlice);
String _id = timeSlice + "-WebApplication-" + "10.218.9.86:8080";
IndexResponse response = EsClient.getClient().prepareIndex(NodeInstanceIndex.Index, type, _id).setSource(json).get();
IndexResponse response = EsClient.getClient().prepareIndex(NodeInstIndex.Index, type, _id).setSource(json).get();
RestStatus status = response.status();
status.getStatus();
......@@ -68,7 +68,7 @@ public class NodeInstanceTestCase {
json.put("timeSlice", timeSlice);
_id = timeSlice + "-MotanServiceApplication-" + "10.20.3.15:3000";
response = EsClient.getClient().prepareIndex(NodeInstanceIndex.Index, type, _id).setSource(json).get();
response = EsClient.getClient().prepareIndex(NodeInstIndex.Index, type, _id).setSource(json).get();
status = response.status();
status.getStatus();
}
......
......@@ -3,7 +3,7 @@ package com.a.eye.skywalking.collector.worker.dagnode.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.LocalSyncWorkerRef;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.index.NodeRefIndex;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.elasticsearch.action.index.IndexResponse;
......
......@@ -3,7 +3,7 @@ package com.a.eye.skywalking.collector.worker.dagnode.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.LocalSyncWorkerRef;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.index.ServerNodeIndex;
import com.a.eye.skywalking.collector.worker.node.NodeIndex;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.elasticsearch.action.index.IndexResponse;
......@@ -23,23 +23,23 @@ public class ServerNodeTestCase {
@Before
public void initIndex() throws UnknownHostException {
EsClient.boot();
ServerNodeIndex index = new ServerNodeIndex();
NodeIndex index = new NodeIndex();
index.deleteIndex();
index.createIndex();
}
@Test
public void testLoadServerNode() throws Exception {
loadNode(201703101201l, ServerNodeIndex.Type_Minute);
loadNode(201703101200l, ServerNodeIndex.Type_Hour);
loadNode(201703100000l, ServerNodeIndex.Type_Day);
loadNode(201703101201l, NodeIndex.Type_Minute);
loadNode(201703101200l, NodeIndex.Type_Hour);
loadNode(201703100000l, NodeIndex.Type_Day);
}
public void loadNode(long timeSlice, String type) throws Exception {
LocalSyncWorkerRef workerRef = (LocalSyncWorkerRef) ServerNodeSearchPersistence.Factory.INSTANCE.create(AbstractWorker.noOwner());
insertData(timeSlice, type);
EsClient.indexRefresh(ServerNodeIndex.Index);
EsClient.indexRefresh(NodeIndex.Index);
ServerNodeSearchPersistence.RequestEntity requestEntity = new ServerNodeSearchPersistence.RequestEntity(type, timeSlice);
JsonObject resJsonObj = new JsonObject();
......@@ -59,7 +59,7 @@ public class ServerNodeTestCase {
json.put("layer", "http");
String _id = timeSlice + "-WebApplication";
IndexResponse response = EsClient.getClient().prepareIndex(ServerNodeIndex.Index, type, _id).setSource(json).get();
IndexResponse response = EsClient.getClient().prepareIndex(NodeIndex.Index, type, _id).setSource(json).get();
RestStatus status = response.status();
status.getStatus();
......@@ -70,7 +70,7 @@ public class ServerNodeTestCase {
json.put("layer", "rpc");
_id = timeSlice + "-MotanServiceApplication";
response = EsClient.getClient().prepareIndex(ServerNodeIndex.Index, type, _id).setSource(json).get();
response = EsClient.getClient().prepareIndex(NodeIndex.Index, type, _id).setSource(json).get();
status = response.status();
status.getStatus();
}
......
package com.a.eye.skywalking.collector.worker.segment;
import com.a.eye.skywalking.collector.worker.tools.HttpClientTools;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
import com.a.eye.skywalking.trace.tag.Tags;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.junit.Test;
import java.io.IOException;
import java.util.Date;
/**
* @author pengys5
*/
public class SegmentPostTestCase {
@Test
public void testPostSegment() throws Exception {
Gson gson = new GsonBuilder().excludeFieldsWithoutExposeAnnotation().create();
TraceSegment webSegment = new TraceSegment("WebApplication");
Span span = new Span(0, "/Web/GetUser", new Date().getTime());
Tags.SPAN_LAYER.asHttp(span);
Tags.SPAN_KIND.set(span, Tags.SPAN_KIND_SERVER);
Tags.URL.set(span, "http://10.218.9.86:8080/Web/GetUser");
Tags.STATUS_CODE.set(span, 200);
Tags.COMPONENT.set(span, "Tomcat");
Tags.PEERS.set(span, "202.135.4.12");
Thread.sleep(300);
webSegment.archive(span);
Thread.sleep(300);
Span span1 = new Span(1, span, "com.a.eye.skywalking.demo.services.GetUserService.findUser(String, String)", new Date().getTime());
Tags.SPAN_LAYER.asRPCFramework(span1);
Tags.URL.set(span1, "motan://10.20.3.15:3000/com.a.eye.skywalking.demo.services.GetUserService.findUser(String, String)");
Tags.SPAN_KIND.set(span1, Tags.SPAN_KIND_CLIENT);
Tags.COMPONENT.set(span1, "Motan");
Tags.PEERS.set(span1, "10.20.3.15:3000");
Thread.sleep(300);
webSegment.archive(span1);
Thread.sleep(300);
Thread.sleep(300);
webSegment.finish();
Thread.sleep(300);
String webJsonStr = gson.toJson(webSegment);
HttpClientTools.INSTANCE.post("http://localhost:7001/segment", webJsonStr);
TraceSegment motanSegment = new TraceSegment("MotanServiceApplication");
TraceSegmentRef segmentRef = new TraceSegmentRef();
segmentRef.setApplicationCode("WebApplication");
segmentRef.setPeerHost("10.20.3.15:3000");
segmentRef.setTraceSegmentId(webSegment.getTraceSegmentId());
motanSegment.ref(segmentRef);
Span span2 = new Span(0, "com.a.eye.skywalking.demo.services.GetUserService.findUser(String, String)", new Date().getTime());
Tags.SPAN_LAYER.asRPCFramework(span2);
Tags.SPAN_KIND.set(span2, Tags.SPAN_KIND_SERVER);
Tags.URL.set(span2, "motan://10.20.3.15:3000/com.a.eye.skywalking.demo.services.GetUserService.findUser(String, String)");
Tags.COMPONENT.set(span2, "Motan");
Tags.PEERS.set(span2, "10.218.9.86");
Thread.sleep(300);
motanSegment.archive(span2);
Thread.sleep(300);
Span span3 = new Span(1, span2, "com.a.eye.skywalking.demo.services.GetUserService.findUser(String, String)", new Date().getTime());
Tags.SPAN_LAYER.asDB(span3);
Tags.SPAN_KIND.set(span3, Tags.SPAN_KIND_CLIENT);
Tags.COMPONENT.set(span3, "Mysql");
Tags.PEERS.set(span3, "10.5.34.18");
Thread.sleep(300);
motanSegment.archive(span3);
Thread.sleep(300);
Thread.sleep(300);
motanSegment.finish();
Thread.sleep(300);
String motanJsonStr = gson.toJson(motanSegment);
HttpClientTools.INSTANCE.post("http://localhost:7001/segment", motanJsonStr);
}
}
......@@ -17,12 +17,6 @@
<artifactId>skywalking-util</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.1.0</version>
</dependency>
</dependencies>
<build>
......
package com.a.eye.skywalking.trace;
import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.trace.proto.KeyValue;
import com.a.eye.skywalking.trace.proto.LogDataMessage;
import com.google.gson.annotations.Expose;
import com.google.gson.annotations.SerializedName;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
......@@ -14,7 +11,12 @@ import java.util.Map;
* Created by wusheng on 2017/2/17.
*/
public class LogData {
@Expose
@SerializedName(value="ti")
private long time;
@Expose
@SerializedName(value="fi")
private Map<String, ?> fields;
LogData(long time, Map<String, ?> fields) {
......@@ -25,9 +27,7 @@ public class LogData {
this.fields = fields;
}
LogData(LogDataMessage message){
deserialize(message);
}
public LogData(){}
public long getTime() {
return time;
......@@ -37,35 +37,4 @@ public class LogData {
return Collections.unmodifiableMap(fields);
}
public LogDataMessage serialize() {
LogDataMessage.Builder logDataBuilder = LogDataMessage.newBuilder();
logDataBuilder.setTime(time);
if(fields != null){
for (Map.Entry<String, ?> entry : fields.entrySet()) {
KeyValue.Builder logEntryBuilder = KeyValue.newBuilder();
logEntryBuilder.setKey(entry.getKey());
String value = String.valueOf(entry.getValue());
if(!StringUtil.isEmpty(value)) {
logEntryBuilder.setValue(value);
}
logDataBuilder.addFields(logEntryBuilder);
}
}
return logDataBuilder.build();
}
public void deserialize(LogDataMessage message) {
time = message.getTime();
List<KeyValue> list = message.getFieldsList();
if(list != null){
HashMap initFields = new HashMap<String, String>();
for (KeyValue field : list) {
initFields.put(field.getKey(), field.getValue());
}
this.fields = initFields;
}
}
}
package com.a.eye.skywalking.trace;
import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.trace.proto.KeyValue;
import com.a.eye.skywalking.trace.proto.LogDataMessage;
import com.a.eye.skywalking.trace.proto.SpanMessage;
import com.google.gson.annotations.Expose;
import com.google.gson.annotations.SerializedName;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
......@@ -23,18 +20,26 @@ import java.util.Map;
* Created by wusheng on 2017/2/17.
*/
public class Span{
@Expose
@SerializedName(value="si")
private int spanId;
@Expose
@SerializedName(value="ps")
private int parentSpanId;
/**
* The start time of this Span.
*/
@Expose
@SerializedName(value="st")
private long startTime;
/**
* The end time of this Span.
*/
@Expose
@SerializedName(value="et")
private long endTime;
/**
......@@ -42,6 +47,8 @@ public class Span{
* If you want to know, how to set an operation name,
* {@see https://github.com/opentracing/specification/blob/master/specification.md#start-a-new-span}
*/
@Expose
@SerializedName(value="on")
private String operationName;
/**
......@@ -49,6 +56,8 @@ public class Span{
*
* {@see https://github.com/opentracing/specification/blob/master/specification.md#set-a-span-tag}
*/
@Expose
@SerializedName(value="ta")
private final Map<String, Object> tags;
/**
......@@ -56,6 +65,8 @@ public class Span{
*
* {@see https://github.com/opentracing/specification/blob/master/specification.md#log-structured-data}
*/
@Expose
@SerializedName(value="lo")
private final List<LogData> logs;
/**
......@@ -82,12 +93,11 @@ public class Span{
* @param startTime given start timestamp.
*/
private Span(int spanId, int parentSpanId, String operationName, long startTime) {
this();
this.spanId = spanId;
this.parentSpanId = parentSpanId;
this.startTime = startTime;
this.operationName = operationName;
this.tags = new HashMap<String, Object>();
this.logs = new ArrayList<LogData>();
}
/**
......@@ -138,15 +148,11 @@ public class Span{
}
/**
* Create a new span, by given {@link SpanMessage}, which you can get from another {@link Span} object,
* by calling {@link Span#serialize()};
*
* @param spanMessage from another {@link Span#serialize()}
* Create a new/empty span.
*/
public Span(SpanMessage spanMessage) {
public Span() {
tags = new HashMap<String, Object>();
logs = new LinkedList<LogData>();
this.deserialize(spanMessage);
}
/**
......@@ -265,51 +271,6 @@ public class Span{
return log(exceptionFields);
}
public SpanMessage serialize() {
SpanMessage.Builder builder = SpanMessage.newBuilder();
builder.setSpanId(spanId);
builder.setParentSpanId(parentSpanId);
builder.setStartTime(startTime);
builder.setEndTime(endTime);
builder.setOperationName(operationName);
for (Map.Entry<String, Object> entry : tags.entrySet()) {
KeyValue.Builder tagEntryBuilder = KeyValue.newBuilder();
tagEntryBuilder.setKey(entry.getKey());
String value = String.valueOf(entry.getValue());
if (!StringUtil.isEmpty(value)) {
tagEntryBuilder.setValue(value);
}
builder.addTags(tagEntryBuilder);
}
for (LogData log : logs) {
builder.addLogs(log.serialize());
}
return builder.build();
}
public void deserialize(SpanMessage message) {
spanId = message.getSpanId();
parentSpanId = message.getParentSpanId();
startTime = message.getStartTime();
endTime = message.getEndTime();
operationName = message.getOperationName();
List<KeyValue> tagsList = message.getTagsList();
if(tagsList != null){
for (KeyValue tag : tagsList) {
tags.put(tag.getKey(), tag.getValue());
}
}
List<LogDataMessage> logsList = message.getLogsList();
if (logsList != null) {
for (LogDataMessage logDataMessage : logsList) {
logs.add(new LogData(logDataMessage));
}
}
}
private enum ThrowableTransformer {
INSTANCE;
......
package com.a.eye.skywalking.trace.TraceId;
import com.google.gson.TypeAdapter;
import com.google.gson.annotations.JsonAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
/**
* The <code>DistributedTraceId</code> presents a distributed call chain.
*
......@@ -13,6 +19,7 @@ package com.a.eye.skywalking.trace.TraceId;
*
* @author wusheng
*/
@JsonAdapter(DistributedTraceId.Serializer.class)
public abstract class DistributedTraceId {
private String id;
......@@ -40,4 +47,22 @@ public abstract class DistributedTraceId {
public int hashCode() {
return id != null ? id.hashCode() : 0;
}
public static class Serializer extends TypeAdapter<DistributedTraceId> {
@Override
public void write(JsonWriter out, DistributedTraceId value) throws IOException {
out.beginArray();
out.value(value.get());
out.endArray();
}
@Override
public DistributedTraceId read(JsonReader in) throws IOException {
in.beginArray();
PropagatedTraceId traceId = new PropagatedTraceId(in.nextString());
in.endArray();
return traceId;
}
}
}
......@@ -2,11 +2,8 @@ package com.a.eye.skywalking.trace;
import com.a.eye.skywalking.trace.TraceId.DistributedTraceId;
import com.a.eye.skywalking.trace.TraceId.NewDistributedTraceId;
import com.a.eye.skywalking.trace.TraceId.PropagatedTraceId;
import com.a.eye.skywalking.trace.proto.SegmentMessage;
import com.a.eye.skywalking.trace.proto.SegmentRefMessage;
import com.a.eye.skywalking.trace.proto.SpanMessage;
import com.google.protobuf.ProtocolStringList;
import com.google.gson.annotations.Expose;
import com.google.gson.annotations.SerializedName;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
......@@ -27,16 +24,22 @@ public class TraceSegment {
* The id of this trace segment.
* Every segment has its unique-global-id.
*/
@Expose
@SerializedName(value="ts")
private String traceSegmentId;
/**
* The start time of this trace segment.
*/
@Expose
@SerializedName(value="st")
private long startTime;
/**
* The end time of this trace segment.
*/
@Expose
@SerializedName(value="et")
private long endTime;
/**
......@@ -45,6 +48,8 @@ public class TraceSegment {
* but if this segment is a start span of batch process, the segment faces multi parents,
* at this moment, we use this {@link #refs} to link them.
*/
@Expose
@SerializedName(value="rs")
private List<TraceSegmentRef> refs;
/**
......@@ -52,6 +57,8 @@ public class TraceSegment {
* They all have finished.
* All active spans are hold and controlled by "skywalking-api" module.
*/
@Expose
@SerializedName(value="ss")
private List<Span> spans;
/**
......@@ -60,6 +67,8 @@ public class TraceSegment {
*
* e.g. account_app, billing_app
*/
@Expose
@SerializedName(value="ac")
private String applicationCode;
/**
......@@ -75,6 +84,8 @@ public class TraceSegment {
* <code>relatedGlobalTraces</code> targets this {@link TraceSegment}'s related call chain, a call chain contains
* multi {@link TraceSegment}s, only using {@link #refs} is not enough for analysis and ui.
*/
@Expose
@SerializedName(value="gt")
private LinkedList<DistributedTraceId> relatedGlobalTraces;
/**
......@@ -82,21 +93,19 @@ public class TraceSegment {
* This segmentId is generated by TraceSegmentRef, AKA, from tracer/agent module.
*/
public TraceSegment(String applicationCode) {
this();
this.traceSegmentId = GlobalIdGenerator.generate(ID_TYPE);
this.applicationCode = applicationCode;
this.startTime = System.currentTimeMillis();
this.spans = new LinkedList<Span>();
this.relatedGlobalTraces = new LinkedList<DistributedTraceId>();
this.relatedGlobalTraces.add(new NewDistributedTraceId());
}
/**
* Create a trace segment, by given {@link SegmentMessage}
*
* @param message from another {@link TraceSegment#serialize()}
* Create a default/empty trace segment
*/
public TraceSegment(SegmentMessage message) {
deserialize(message);
public TraceSegment() {
this.spans = new LinkedList<Span>();
this.relatedGlobalTraces = new LinkedList<DistributedTraceId>();
this.relatedGlobalTraces.add(new NewDistributedTraceId());
}
/**
......@@ -189,53 +198,4 @@ public class TraceSegment {
", relatedGlobalTraces=" + relatedGlobalTraces +
'}';
}
public SegmentMessage serialize() {
SegmentMessage.Builder segmentBuilder = SegmentMessage.newBuilder();
segmentBuilder.setTraceSegmentId(traceSegmentId);
segmentBuilder.setStartTime(startTime);
segmentBuilder.setEndTime(endTime);
segmentBuilder.setApplicationCode(applicationCode);
if (refs != null && refs.size() > 0) {
for (TraceSegmentRef ref : refs) {
segmentBuilder.addRefs(ref.serialize());
}
}
for (DistributedTraceId id : relatedGlobalTraces) {
segmentBuilder.addRelatedTraceIds(id.get());
}
for (Span span : spans) {
segmentBuilder.addSpans(span.serialize());
}
return segmentBuilder.build();
}
public void deserialize(SegmentMessage message) {
traceSegmentId = message.getTraceSegmentId();
startTime = message.getStartTime();
endTime = message.getEndTime();
applicationCode = message.getApplicationCode();
List<SegmentRefMessage> refsList = message.getRefsList();
if (refsList != null && refsList.size() > 0) {
this.refs = new LinkedList<TraceSegmentRef>();
for (SegmentRefMessage refMessage : refsList) {
TraceSegmentRef ref = new TraceSegmentRef();
ref.deserialize(refMessage);
refs.add(ref);
}
}
ProtocolStringList relatedTraceIdsList = message.getRelatedTraceIdsList();
this.relatedGlobalTraces = new LinkedList<DistributedTraceId>();
for (String id : relatedTraceIdsList) {
relatedGlobalTraces.add(new PropagatedTraceId(id));
}
List<SpanMessage> spansList = message.getSpansList();
if (spansList != null) {
this.spans = new LinkedList<Span>();
for (SpanMessage spanMessage : spansList) {
spans.add(new Span(spanMessage));
}
}
}
}
package com.a.eye.skywalking.trace;
import com.a.eye.skywalking.trace.proto.SegmentRefMessage;
import com.a.eye.skywalking.trace.tag.Tags;
import com.google.gson.annotations.Expose;
import com.google.gson.annotations.SerializedName;
/**
* {@link TraceSegmentRef} is like a pointer, which ref to another {@link TraceSegment},
......@@ -13,21 +14,29 @@ public class TraceSegmentRef{
/**
* {@link TraceSegment#traceSegmentId}
*/
@Expose
@SerializedName(value="ts")
private String traceSegmentId;
/**
* {@link Span#spanId}
*/
@Expose
@SerializedName(value="si")
private int spanId = -1;
/**
* {@link TraceSegment#applicationCode}
*/
@Expose
@SerializedName(value="ac")
private String applicationCode;
/**
* {@link Tags#PEER_HOST}
*/
@Expose
@SerializedName(value="ph")
private String peerHost;
/**
......@@ -77,23 +86,6 @@ public class TraceSegmentRef{
'}';
}
public SegmentRefMessage serialize() {
SegmentRefMessage.Builder builder = SegmentRefMessage.newBuilder();
builder.setTraceSegmentId(traceSegmentId);
builder.setSpanId(spanId);
builder.setApplicationCode(applicationCode);
if(peerHost != null) {
builder.setPeerHost(peerHost);
}
return builder.build();
}
public void deserialize(SegmentRefMessage message) {
traceSegmentId = message.getTraceSegmentId();
spanId = message.getSpanId();
applicationCode = message.getApplicationCode();
peerHost = message.getPeerHost();
}
@Override
public boolean equals(Object o) {
......
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.a.eye.skywalking.trace.proto";
message SegmentMessage {
string traceSegmentId = 1;
int64 startTime = 2;
int64 endTime = 3;
string applicationCode = 4;
repeated SegmentRefMessage refs = 5;
repeated string relatedTraceIds = 6;
repeated SpanMessage spans = 7;
}
message SegmentRefMessage {
string traceSegmentId = 1;
int32 spanId = 2;
string applicationCode = 3;
string peerHost = 4;
}
message SpanMessage {
int32 spanId = 1;
int32 parentSpanId = 2;
int64 startTime = 3;
int64 endTime = 4;
string operationName = 5;
repeated KeyValue tags = 6;
repeated LogDataMessage logs = 7;
}
message LogDataMessage {
int64 time = 1;
repeated KeyValue fields = 5;
}
message KeyValue {
string key = 1;
string value = 2;
}
package com.a.eye.skywalking.trace;
import com.a.eye.skywalking.trace.tag.Tags;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.junit.Assert;
import org.junit.Test;
......@@ -100,7 +102,15 @@ public class TraceSegmentTestCase {
span2.log(new NullPointerException());
segment.archive(span2);
TraceSegment newSegment = new TraceSegment(segment.serialize());
Gson gson = new GsonBuilder()
.excludeFieldsWithoutExposeAnnotation()
.create();
String json = gson.toJson(segment);
System.out.println(json);
TraceSegment newSegment = gson.fromJson(json, TraceSegment.class);
Assert.assertEquals(segment.getSpans().size(), newSegment.getSpans().size());
Assert.assertEquals(segment.getRefs().get(0).getTraceSegmentId(), newSegment.getRefs().get(0).getTraceSegmentId());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册