提交 2e6d2589 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge pull request #157 from wu-sheng/feature/141

fix issues #141, use gson stream to deserialize http posed segments json array
......@@ -10,10 +10,10 @@ import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
import com.a.eye.skywalking.collector.worker.globaltrace.persistence.GlobalTraceAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.segment.entity.GlobalTraceId;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.tools.CollectionTools;
import com.a.eye.skywalking.trace.TraceId.DistributedTraceId;
import com.a.eye.skywalking.trace.TraceSegment;
import java.util.List;
......@@ -29,12 +29,12 @@ public class GlobalTraceAnalysis extends MergeAnalysisMember {
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
Segment segment = segmentWithTimeSlice.getSegment();
String subSegmentId = segment.getTraceSegmentId();
List<DistributedTraceId> globalTraceIdList = segment.getRelatedGlobalTraces();
List<GlobalTraceId> globalTraceIdList = segment.getRelatedGlobalTraces();
if (CollectionTools.isNotEmpty(globalTraceIdList)) {
for (DistributedTraceId disTraceId : globalTraceIdList) {
for (GlobalTraceId disTraceId : globalTraceIdList) {
String traceId = disTraceId.get();
setMergeData(traceId, GlobalTraceIndex.SUB_SEG_IDS, subSegmentId);
}
......
......@@ -6,14 +6,10 @@ import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
import com.a.eye.skywalking.collector.worker.segment.logic.Segment;
import com.a.eye.skywalking.collector.worker.segment.logic.SegmentDeserialize;
import com.a.eye.skywalking.collector.worker.segment.logic.SpanView;
import com.a.eye.skywalking.collector.worker.segment.entity.*;
import com.a.eye.skywalking.collector.worker.storage.GetResponseFromEs;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.tools.CollectionTools;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegmentRef;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
......@@ -52,7 +48,7 @@ public class GlobalTraceSearchWithGlobalId extends AbstractLocalSyncWorker {
logger.debug("subSegId: %s", subSegId);
String segmentSource = GetResponseFromEs.INSTANCE.get(SegmentIndex.INDEX, SegmentIndex.TYPE_RECORD, subSegId).getSourceAsString();
logger.debug("segmentSource: %s", segmentSource);
Segment segment = SegmentDeserialize.INSTANCE.deserializeFromES(segmentSource);
Segment segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentSource);
String segmentId = segment.getTraceSegmentId();
List<TraceSegmentRef> refsList = segment.getRefs();
......
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.google.gson.stream.JsonReader;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
......@@ -17,22 +17,16 @@ import java.io.IOException;
public abstract class AbstractPost extends AbstractLocalAsyncWorker {
private Logger logger = LogManager.getFormatterLogger(AbstractPost.class);
public AbstractPost(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override final public void onWork(Object request) throws Exception {
if (request instanceof String) {
onReceive((String)request);
} else {
logger.error("unhandled request, request instance must String, but is %s", request.getClass().toString());
saveException(new IllegalArgumentException("request instance must String"));
}
@Override
final public void onWork(Object message) throws Exception {
onReceive(message);
}
protected abstract void onReceive(String reqJsonStr) throws Exception;
protected abstract void onReceive(Object message) throws Exception;
static class PostWithHttpServlet extends AbstractHttpServlet {
......@@ -42,22 +36,34 @@ public abstract class AbstractPost extends AbstractLocalAsyncWorker {
this.ownerWorkerRef = ownerWorkerRef;
}
@Override final protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
@Override
final protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
JsonObject resJson = new JsonObject();
try {
BufferedReader bufferedReader = request.getReader();
StringBuilder dataStr = new StringBuilder();
String tmpStr;
while ((tmpStr = bufferedReader.readLine()) != null) {
dataStr.append(tmpStr);
}
ownerWorkerRef.tell(dataStr.toString());
streamReader(bufferedReader);
reply(response, resJson, HttpServletResponse.SC_OK);
} catch (Exception e) {
e.printStackTrace();
resJson.addProperty("error", e.getMessage());
reply(response, resJson, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
}
}
private void streamReader(BufferedReader bufferedReader) throws Exception {
try (JsonReader reader = new JsonReader(bufferedReader)) {
readSegmentArray(reader);
}
}
private void readSegmentArray(JsonReader reader) throws Exception {
reader.beginArray();
while (reader.hasNext()) {
Segment segment = new Segment();
segment.deserialize(reader);
ownerWorkerRef.tell(segment);
}
reader.endArray();
}
}
}
......@@ -4,12 +4,12 @@ import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.worker.RecordAnalysisMember;
import com.a.eye.skywalking.collector.worker.node.NodeCompIndex;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.a.eye.skywalking.collector.worker.segment.entity.Span;
import com.a.eye.skywalking.collector.worker.segment.entity.tag.Tags;
import com.a.eye.skywalking.collector.worker.tools.ClientSpanIsLeafTools;
import com.a.eye.skywalking.collector.worker.tools.CollectionTools;
import com.a.eye.skywalking.collector.worker.tools.SpanPeersTools;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.tag.Tags;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -28,7 +28,7 @@ abstract class AbstractNodeCompAnalysis extends RecordAnalysisMember {
super(role, clusterContext, selfContext);
}
void analyseSpans(TraceSegment segment) throws Exception {
void analyseSpans(Segment segment) throws Exception {
List<Span> spanList = segment.getSpans();
logger.debug("node analysis span isNotEmpty %s", CollectionTools.isNotEmpty(spanList));
......
......@@ -5,9 +5,9 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.worker.Const;
import com.a.eye.skywalking.collector.worker.RecordAnalysisMember;
import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.a.eye.skywalking.collector.worker.segment.entity.TraceSegmentRef;
import com.a.eye.skywalking.collector.worker.tools.CollectionTools;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -26,7 +26,7 @@ abstract class AbstractNodeMappingAnalysis extends RecordAnalysisMember {
super(role, clusterContext, selfContext);
}
void analyseRefs(TraceSegment segment, long timeSlice) throws Exception {
void analyseRefs(Segment segment, long timeSlice) throws Exception {
List<TraceSegmentRef> segmentRefList = segment.getRefs();
logger.debug("node mapping analysis refs isNotEmpty %s", CollectionTools.isNotEmpty(segmentRefList));
......
......@@ -8,8 +8,8 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.persistence.NodeCompAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.trace.TraceSegment;
/**
* @author pengys5
......@@ -17,15 +17,15 @@ import com.a.eye.skywalking.trace.TraceSegment;
public class NodeCompAnalysis extends AbstractNodeCompAnalysis {
NodeCompAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
Segment segment = segmentWithTimeSlice.getSegment();
analyseSpans(segment);
}
}
......
......@@ -8,8 +8,8 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.persistence.NodeMappingDayAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.trace.TraceSegment;
/**
* @author pengys5
......@@ -17,15 +17,15 @@ import com.a.eye.skywalking.trace.TraceSegment;
public class NodeMappingDayAnalysis extends AbstractNodeMappingAnalysis {
public NodeMappingDayAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
Segment segment = segmentWithTimeSlice.getSegment();
analyseRefs(segment, segmentWithTimeSlice.getDay());
}
}
......
......@@ -8,8 +8,8 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.persistence.NodeMappingHourAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.trace.TraceSegment;
/**
* @author pengys5
......@@ -17,15 +17,15 @@ import com.a.eye.skywalking.trace.TraceSegment;
public class NodeMappingHourAnalysis extends AbstractNodeMappingAnalysis {
NodeMappingHourAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
Segment segment = segmentWithTimeSlice.getSegment();
analyseRefs(segment, segmentWithTimeSlice.getHour());
}
}
......
......@@ -8,8 +8,8 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.persistence.NodeMappingMinuteAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.trace.TraceSegment;
/**
* @author pengys5
......@@ -17,15 +17,15 @@ import com.a.eye.skywalking.trace.TraceSegment;
public class NodeMappingMinuteAnalysis extends AbstractNodeMappingAnalysis {
NodeMappingMinuteAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
Segment segment = segmentWithTimeSlice.getSegment();
analyseRefs(segment, segmentWithTimeSlice.getMinute());
}
}
......
......@@ -5,12 +5,12 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.worker.Const;
import com.a.eye.skywalking.collector.worker.RecordAnalysisMember;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.a.eye.skywalking.collector.worker.segment.entity.Span;
import com.a.eye.skywalking.collector.worker.segment.entity.tag.Tags;
import com.a.eye.skywalking.collector.worker.tools.ClientSpanIsLeafTools;
import com.a.eye.skywalking.collector.worker.tools.CollectionTools;
import com.a.eye.skywalking.collector.worker.tools.SpanPeersTools;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.tag.Tags;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -25,12 +25,12 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(AbstractNodeRefAnalysis.class);
AbstractNodeRefAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
final void analyseNodeRef(TraceSegment segment, long timeSlice, long minute, long hour, long day,
int second) throws Exception {
final void analyseNodeRef(Segment segment, long timeSlice, long minute, long hour, long day,
int second) throws Exception {
List<Span> spanList = segment.getSpans();
if (CollectionTools.isNotEmpty(spanList)) {
for (Span span : spanList) {
......@@ -69,7 +69,7 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
}
private void buildNodeRefResRecordData(String nodeRefId, Span span, long minute, long hour, long day,
int second) throws Exception {
int second) throws Exception {
AbstractNodeRefResSumAnalysis.NodeRefResRecord refResRecord = new AbstractNodeRefResSumAnalysis.NodeRefResRecord(minute, hour, day, second);
refResRecord.setStartTime(span.getStartTime());
refResRecord.setEndTime(span.getEndTime());
......@@ -79,5 +79,5 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
}
protected abstract void sendToResSumAnalysis(
AbstractNodeRefResSumAnalysis.NodeRefResRecord refResRecord) throws Exception;
AbstractNodeRefResSumAnalysis.NodeRefResRecord refResRecord) throws Exception;
}
......@@ -9,8 +9,8 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefDayAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.trace.TraceSegment;
/**
* @author pengys5
......@@ -18,7 +18,7 @@ import com.a.eye.skywalking.trace.TraceSegment;
public class NodeRefDayAnalysis extends AbstractNodeRefAnalysis {
protected NodeRefDayAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -31,8 +31,8 @@ public class NodeRefDayAnalysis extends AbstractNodeRefAnalysis {
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
Segment segment = segmentWithTimeSlice.getSegment();
long minute = segmentWithTimeSlice.getMinute();
long hour = segmentWithTimeSlice.getHour();
......
......@@ -9,8 +9,8 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefHourAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.trace.TraceSegment;
/**
* @author pengys5
......@@ -18,7 +18,7 @@ import com.a.eye.skywalking.trace.TraceSegment;
public class NodeRefHourAnalysis extends AbstractNodeRefAnalysis {
protected NodeRefHourAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -31,8 +31,8 @@ public class NodeRefHourAnalysis extends AbstractNodeRefAnalysis {
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
Segment segment = segmentWithTimeSlice.getSegment();
long minute = segmentWithTimeSlice.getMinute();
long hour = segmentWithTimeSlice.getHour();
......
......@@ -9,8 +9,8 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefMinuteAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.trace.TraceSegment;
/**
* @author pengys5
......@@ -18,7 +18,7 @@ import com.a.eye.skywalking.trace.TraceSegment;
public class NodeRefMinuteAnalysis extends AbstractNodeRefAnalysis {
protected NodeRefMinuteAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -31,8 +31,8 @@ public class NodeRefMinuteAnalysis extends AbstractNodeRefAnalysis {
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
Segment segment = segmentWithTimeSlice.getSegment();
long minute = segmentWithTimeSlice.getMinute();
long hour = segmentWithTimeSlice.getHour();
long day = segmentWithTimeSlice.getDay();
......
......@@ -11,35 +11,30 @@ import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.globaltrace.analysis.GlobalTraceAnalysis;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractPost;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractPostProvider;
import com.a.eye.skywalking.collector.worker.node.analysis.*;
import com.a.eye.skywalking.collector.worker.node.analysis.NodeCompAnalysis;
import com.a.eye.skywalking.collector.worker.node.analysis.NodeMappingDayAnalysis;
import com.a.eye.skywalking.collector.worker.node.analysis.NodeMappingHourAnalysis;
import com.a.eye.skywalking.collector.worker.node.analysis.NodeMappingMinuteAnalysis;
import com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefDayAnalysis;
import com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefHourAnalysis;
import com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefMinuteAnalysis;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.a.eye.skywalking.collector.worker.segment.persistence.SegmentCostSave;
import com.a.eye.skywalking.collector.worker.segment.persistence.SegmentExceptionSave;
import com.a.eye.skywalking.collector.worker.segment.persistence.SegmentSave;
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.tools.DateTools;
import com.a.eye.skywalking.trace.SegmentsMessage;
import com.a.eye.skywalking.trace.TraceSegment;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
/**
* @author pengys5
*/
public class SegmentPost extends AbstractPost {
private static final Logger logger = LogManager.getFormatterLogger(SegmentPost.class);
private Gson gson;
public SegmentPost(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
gson = new Gson();
}
@Override
......@@ -62,27 +57,25 @@ public class SegmentPost extends AbstractPost {
}
@Override
protected void onReceive(String reqJsonStr) throws Exception {
SegmentsMessage segmentsMessage = gson.fromJson(reqJsonStr, SegmentsMessage.class);
List<TraceSegment> segmentList = segmentsMessage.getSegments();
for (TraceSegment newSegment : segmentList) {
protected void onReceive(Object message) throws Exception {
if (message instanceof Segment) {
Segment segment = (Segment) message;
try {
validateData(newSegment);
validateData(segment);
} catch (IllegalArgumentException e) {
continue;
return;
}
logger.debug("receive message instanceof TraceSegment, traceSegmentId is %s", newSegment.getTraceSegmentId());
logger.debug("receive message instanceof TraceSegment, traceSegmentId is %s", segment.getTraceSegmentId());
long minuteSlice = DateTools.getMinuteSlice(newSegment.getStartTime());
long hourSlice = DateTools.getHourSlice(newSegment.getStartTime());
long daySlice = DateTools.getDaySlice(newSegment.getStartTime());
int second = DateTools.getSecond(newSegment.getStartTime());
long minuteSlice = DateTools.getMinuteSlice(segment.getStartTime());
long hourSlice = DateTools.getHourSlice(segment.getStartTime());
long daySlice = DateTools.getDaySlice(segment.getStartTime());
int second = DateTools.getSecond(segment.getStartTime());
logger.debug("minuteSlice: %s, hourSlice: %s, daySlice: %s, second:%s", minuteSlice, hourSlice, daySlice, second);
SegmentWithTimeSlice segmentWithTimeSlice = new SegmentWithTimeSlice(newSegment, minuteSlice, hourSlice, daySlice, second);
String newSegmentJsonStr = gson.toJson(newSegment);
tellSegmentSave(newSegmentJsonStr, daySlice, hourSlice, minuteSlice);
SegmentWithTimeSlice segmentWithTimeSlice = new SegmentWithTimeSlice(segment, minuteSlice, hourSlice, daySlice, second);
getSelfContext().lookup(SegmentSave.Role.INSTANCE).tell(segment);
getSelfContext().lookup(SegmentCostSave.Role.INSTANCE).tell(segmentWithTimeSlice);
getSelfContext().lookup(GlobalTraceAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
......@@ -95,14 +88,6 @@ public class SegmentPost extends AbstractPost {
}
}
private void tellSegmentSave(String newSegmentJsonStr, long day, long hour, long minute) throws Exception {
JsonObject newSegmentJson = gson.fromJson(newSegmentJsonStr, JsonObject.class);
newSegmentJson.addProperty("minute", minute);
newSegmentJson.addProperty("hour", hour);
newSegmentJson.addProperty("day", day);
getSelfContext().lookup(SegmentSave.Role.INSTANCE).tell(newSegmentJson);
}
private void tellNodeRef(SegmentWithTimeSlice segmentWithTimeSlice) throws Exception {
getSelfContext().lookup(NodeRefMinuteAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
getSelfContext().lookup(NodeRefHourAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
......@@ -115,11 +100,11 @@ public class SegmentPost extends AbstractPost {
getSelfContext().lookup(NodeMappingDayAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
}
private void validateData(TraceSegment newSegment) {
if (StringUtil.isEmpty(newSegment.getTraceSegmentId())) {
private void validateData(Segment segment) {
if (StringUtil.isEmpty(segment.getTraceSegmentId())) {
throw new IllegalArgumentException("traceSegmentId required");
}
if (0 == newSegment.getStartTime()) {
if (0 == segment.getStartTime()) {
throw new IllegalArgumentException("startTime required");
}
}
......@@ -163,15 +148,15 @@ public class SegmentPost extends AbstractPost {
}
public static class SegmentWithTimeSlice extends AbstractTimeSlice {
private final TraceSegment traceSegment;
private final Segment segment;
public SegmentWithTimeSlice(TraceSegment traceSegment, long minute, long hour, long day, int second) {
public SegmentWithTimeSlice(Segment segment, long minute, long hour, long day, int second) {
super(minute, hour, day, second);
this.traceSegment = traceSegment;
this.segment = segment;
}
public TraceSegment getTraceSegment() {
return traceSegment;
public Segment getSegment() {
return segment;
}
}
}
package com.a.eye.skywalking.collector.worker.segment.entity;
/**
* @author pengys5
*/
public abstract class DeserializeObject {
private String jsonStr;
public String getJsonStr() {
return jsonStr;
}
public void setJsonStr(String jsonStr) {
this.jsonStr = jsonStr;
}
}
package com.a.eye.skywalking.collector.worker.segment.entity;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
/**
* @author pengys5
*/
public class GlobalTraceId extends DeserializeObject {
private String globalTraceId;
public String get() {
return globalTraceId;
}
public GlobalTraceId deserialize(JsonReader reader) throws IOException {
this.globalTraceId = reader.nextString();
this.setJsonStr("\"" + globalTraceId + "\"");
return this;
}
}
package com.a.eye.skywalking.collector.worker.segment.entity;
import java.util.List;
import java.util.Map;
/**
* @author pengys5
*/
public enum JsonBuilder {
INSTANCE;
public void append(StringBuilder builder, String name, String value, boolean first) {
if (!first) {
builder.append(",");
}
builder.append("\"").append(name).append("\":\"").append(value).append("\"");
}
public void append(StringBuilder builder, String name, Number value, boolean first) {
if (!first) {
builder.append(",");
}
builder.append("\"").append(name).append("\":").append(value);
}
public void append(StringBuilder builder, String name, List<?> value, boolean first) {
if (!first) {
builder.append(",");
}
builder.append("\"").append(name).append("\":");
builder.append("[");
boolean isFirst = true;
for (int i = 0; i < value.size(); i++) {
DeserializeObject deserializeObject = (DeserializeObject) value.get(i);
if (!isFirst) {
builder.append(",");
}
builder.append(deserializeObject.getJsonStr());
isFirst = false;
}
builder.append("]");
}
public void append(StringBuilder builder, String name, Map<String, ?> tagsWithStr, boolean first) {
if (!first) {
builder.append(",");
}
builder.append("\"").append(name).append("\":");
builder.append("{");
boolean isFirst = true;
for (Map.Entry<String, ?> entry : tagsWithStr.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (!isFirst) {
builder.append(",");
}
if (value instanceof String) {
builder.append("\"").append(key).append("\":\"").append(value).append("\"");
} else {
builder.append("\"").append(key).append("\":").append(value);
}
isFirst = false;
}
builder.append("}");
}
}
package com.a.eye.skywalking.collector.worker.segment.entity;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class LogData extends DeserializeObject {
private long time;
private Map<String, String> fields;
public long getTime() {
return time;
}
public Map<String, String> getFields() {
return fields;
}
public LogData deserialize(JsonReader reader) throws IOException {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("{");
boolean first = true;
reader.beginObject();
while (reader.hasNext()) {
String name = reader.nextName();
if (name.equals("tm")) {
Long tm = reader.nextLong();
this.time = tm;
JsonBuilder.INSTANCE.append(stringBuilder, "tm", tm, first);
} else if (name.equals("fi")) {
fields = new HashMap<>();
reader.beginObject();
while (reader.hasNext()) {
String key = reader.nextName();
String value = reader.nextString();
fields.put(key, value);
}
reader.endObject();
JsonBuilder.INSTANCE.append(stringBuilder, "fi", fields, first);
} else {
reader.skipValue();
}
first = false;
}
reader.endObject();
stringBuilder.append("}");
this.setJsonStr(stringBuilder.toString());
return this;
}
}
package com.a.eye.skywalking.collector.worker.segment.entity;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* @author pengys5
*/
public class Segment extends DeserializeObject {
private String traceSegmentId;
private long startTime;
private long endTime;
private List<TraceSegmentRef> refs;
private List<Span> spans;
private String applicationCode;
private List<GlobalTraceId> relatedGlobalTraces;
public String getTraceSegmentId() {
return traceSegmentId;
}
public long getStartTime() {
return startTime;
}
public long getEndTime() {
return endTime;
}
public String getApplicationCode() {
return applicationCode;
}
public List<TraceSegmentRef> getRefs() {
return refs;
}
public List<Span> getSpans() {
return spans;
}
public List<GlobalTraceId> getRelatedGlobalTraces() {
return relatedGlobalTraces;
}
public Segment deserialize(JsonReader reader) throws IOException {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("{");
boolean first = true;
reader.beginObject();
while (reader.hasNext()) {
String name = reader.nextName();
if (name.equals("ts")) {
String ts = reader.nextString();
this.traceSegmentId = ts;
JsonBuilder.INSTANCE.append(stringBuilder, "ts", ts, first);
} else if (name.equals("ac")) {
String ac = reader.nextString();
this.applicationCode = ac;
JsonBuilder.INSTANCE.append(stringBuilder, "ac", ac, first);
} else if (name.equals("st")) {
long st = reader.nextLong();
this.startTime = st;
JsonBuilder.INSTANCE.append(stringBuilder, "st", st, first);
} else if (name.equals("et")) {
long et = reader.nextLong();
this.endTime = et;
JsonBuilder.INSTANCE.append(stringBuilder, "et", et, first);
} else if (name.equals("rs")) {
refs = new ArrayList<>();
reader.beginArray();
while (reader.hasNext()) {
TraceSegmentRef ref = new TraceSegmentRef();
ref.deserialize(reader);
refs.add(ref);
}
reader.endArray();
JsonBuilder.INSTANCE.append(stringBuilder, "rs", refs, first);
} else if (name.equals("ss")) {
spans = new ArrayList<>();
reader.beginArray();
while (reader.hasNext()) {
Span span = new Span();
span.deserialize(reader);
spans.add(span);
}
reader.endArray();
JsonBuilder.INSTANCE.append(stringBuilder, "ss", spans, first);
} else if (name.equals("gt")) {
relatedGlobalTraces = new ArrayList<>();
reader.beginArray();
while (reader.hasNext()) {
GlobalTraceId globalTraceId = new GlobalTraceId();
globalTraceId.deserialize(reader);
relatedGlobalTraces.add(globalTraceId);
}
JsonBuilder.INSTANCE.append(stringBuilder, "gt", relatedGlobalTraces, first);
reader.endArray();
} else {
reader.skipValue();
}
first = false;
}
reader.endObject();
stringBuilder.append("}");
this.setJsonStr(stringBuilder.toString());
return this;
}
}
package com.a.eye.skywalking.collector.worker.segment.entity;
import com.google.gson.stream.JsonReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
/**
* @author pengys5
*/
public enum SegmentDeserialize {
INSTANCE;
public Segment deserializeSingle(String singleSegmentJsonStr) throws IOException {
JsonReader reader = new JsonReader(new StringReader(singleSegmentJsonStr));
Segment segment = new Segment();
segment.deserialize(reader);
return segment;
}
public List<Segment> deserializeMultiple(String segmentJsonFile) throws Exception {
List<Segment> segmentList = new ArrayList<>();
streamReader(segmentList, new FileReader(segmentJsonFile));
return segmentList;
}
private void streamReader(List<Segment> segmentList, FileReader fileReader) throws Exception {
try (JsonReader reader = new JsonReader(fileReader)) {
readSegmentArray(segmentList, reader);
}
}
private void readSegmentArray(List<Segment> segmentList, JsonReader reader) throws Exception {
reader.beginArray();
while (reader.hasNext()) {
Segment segment = new Segment();
segment.deserialize(reader);
segmentList.add(segment);
}
reader.endArray();
}
}
package com.a.eye.skywalking.collector.worker.segment.entity;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author pengys5
*/
public class Span extends DeserializeObject {
private int spanId;
private int parentSpanId;
private long startTime;
private long endTime;
private String operationName;
private Map<String, String> tagsWithStr;
private Map<String, Boolean> tagsWithBool;
private Map<String, Integer> tagsWithInt;
private List<LogData> logs;
public int getSpanId() {
return spanId;
}
public int getParentSpanId() {
return parentSpanId;
}
public long getStartTime() {
return startTime;
}
public long getEndTime() {
return endTime;
}
public String getOperationName() {
return operationName;
}
public String getStrTag(String key) {
return tagsWithStr.get(key);
}
public Boolean getBoolTag(String key) {
return tagsWithBool.get(key);
}
public Integer getIntTag(String key) {
return tagsWithInt.get(key);
}
public List<LogData> getLogs() {
return logs;
}
public Span deserialize(JsonReader reader) throws IOException {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("{");
boolean first = true;
reader.beginObject();
while (reader.hasNext()) {
String name = reader.nextName();
if (name.equals("si")) {
Integer si = reader.nextInt();
this.spanId = si;
JsonBuilder.INSTANCE.append(stringBuilder, "si", si, first);
} else if (name.equals("ps")) {
Integer ps = reader.nextInt();
this.parentSpanId = ps;
JsonBuilder.INSTANCE.append(stringBuilder, "ps", ps, first);
} else if (name.equals("st")) {
Long st = reader.nextLong();
this.startTime = st;
JsonBuilder.INSTANCE.append(stringBuilder, "st", st, first);
} else if (name.equals("et")) {
Long et = reader.nextLong();
this.endTime = et;
JsonBuilder.INSTANCE.append(stringBuilder, "et", et, first);
} else if (name.equals("on")) {
String on = reader.nextString();
this.operationName = on;
JsonBuilder.INSTANCE.append(stringBuilder, "on", on, first);
} else if (name.equals("ts")) {
tagsWithStr = new HashMap<>();
reader.beginObject();
while (reader.hasNext()) {
String key = reader.nextName();
String value = reader.nextString();
tagsWithStr.put(key, value);
}
reader.endObject();
JsonBuilder.INSTANCE.append(stringBuilder, "ts", tagsWithStr, first);
} else if (name.equals("tb")) {
tagsWithBool = new HashMap<>();
reader.beginObject();
while (reader.hasNext()) {
String key = reader.nextName();
boolean value = reader.nextBoolean();
tagsWithBool.put(key, value);
}
reader.endObject();
JsonBuilder.INSTANCE.append(stringBuilder, "tb", tagsWithBool, first);
} else if (name.equals("ti")) {
tagsWithInt = new HashMap<>();
reader.beginObject();
while (reader.hasNext()) {
String key = reader.nextName();
Integer value = reader.nextInt();
tagsWithInt.put(key, value);
}
reader.endObject();
JsonBuilder.INSTANCE.append(stringBuilder, "ti", tagsWithInt, first);
} else if (name.equals("lo")) {
logs = new ArrayList<>();
reader.beginArray();
while (reader.hasNext()) {
LogData logData = new LogData();
logData.deserialize(reader);
logs.add(logData);
}
reader.endArray();
JsonBuilder.INSTANCE.append(stringBuilder, "lo", logs, first);
} else {
reader.skipValue();
}
first = false;
}
reader.endObject();
stringBuilder.append("}");
this.setJsonStr(stringBuilder.toString());
return this;
}
}
package com.a.eye.skywalking.collector.worker.segment.entity;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
/**
* @author pengys5
*/
public class TraceSegmentRef extends DeserializeObject {
private String traceSegmentId;
private int spanId = -1;
private String applicationCode;
private String peerHost;
public String getTraceSegmentId() {
return traceSegmentId;
}
public int getSpanId() {
return spanId;
}
public String getApplicationCode() {
return applicationCode;
}
public String getPeerHost() {
return peerHost;
}
public TraceSegmentRef deserialize(JsonReader reader) throws IOException {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("{");
boolean first = true;
reader.beginObject();
while (reader.hasNext()) {
String name = reader.nextName();
if (name.equals("ts")) {
String ts = reader.nextString();
this.traceSegmentId = ts;
JsonBuilder.INSTANCE.append(stringBuilder, "ts", ts, first);
} else if (name.equals("si")) {
Integer si = reader.nextInt();
this.spanId = si;
JsonBuilder.INSTANCE.append(stringBuilder, "si", si, first);
} else if (name.equals("ac")) {
String ac = reader.nextString();
this.applicationCode = ac;
JsonBuilder.INSTANCE.append(stringBuilder, "ac", ac, first);
} else if (name.equals("ph")) {
String ph = reader.nextString();
this.peerHost = ph;
JsonBuilder.INSTANCE.append(stringBuilder, "ph", ph, first);
} else {
reader.skipValue();
}
first = false;
}
reader.endObject();
stringBuilder.append("}");
this.setJsonStr(stringBuilder.toString());
return this;
}
}
package com.a.eye.skywalking.collector.worker.segment.entity.tag;
import com.a.eye.skywalking.collector.worker.segment.entity.Span;
public abstract class AbstractTag<T> {
/**
* The key of this Tag.
*/
protected final String key;
public AbstractTag(String tagKey) {
this.key = tagKey;
}
public abstract T get(Span span);
}
package com.a.eye.skywalking.collector.worker.segment.entity.tag;
import com.a.eye.skywalking.collector.worker.segment.entity.Span;
/**
* Do the same thing as {@link StringTag}, just with a {@link Boolean} value.
* <p>
* Created by wusheng on 2017/2/17.
*/
public class BooleanTag extends AbstractTag<Boolean> {
private boolean defaultValue;
public BooleanTag(String key, boolean defaultValue) {
super(key);
this.defaultValue = defaultValue;
}
/**
* Get a tag value, type of {@link Boolean}. After akka-message/serialize, all tags values are type of {@link
* String}, convert to {@link Boolean}, if necessary.
*
* @param span
* @return tag value
*/
@Override
public Boolean get(Span span) {
Boolean tagValue = span.getBoolTag(super.key);
if (tagValue == null) {
return defaultValue;
} else {
return tagValue;
}
}
}
package com.a.eye.skywalking.collector.worker.segment.entity.tag;
import com.a.eye.skywalking.collector.worker.segment.entity.Span;
/**
* Do the same thing as {@link StringTag}, just with a {@link Integer} value.
*
* Created by wusheng on 2017/2/18.
*/
public class IntTag extends AbstractTag<Integer> {
public IntTag(String key) {
super(key);
}
/**
* Get a tag value, type of {@link Integer}.
* After akka-message/serialize, all tags values are type of {@link String}, convert to {@link Integer}, if necessary.
*
* @param span
* @return tag value
*/
@Override
public Integer get(Span span) {
Integer tagValue = span.getIntTag(super.key);
if (tagValue == null) {
return null;
} else {
return tagValue;
}
}
}
package com.a.eye.skywalking.collector.worker.segment.entity.tag;
import com.a.eye.skywalking.collector.worker.segment.entity.Span;
/**
* Do the same thing as {@link StringTag}, just with a {@link Short} value.
*
* Created by wusheng on 2017/2/17.
*/
public class ShortTag extends AbstractTag<Short> {
public ShortTag(String key) {
super(key);
}
/**
* Get a tag value, type of {@link Short}.
* After akka-message/serialize, all tags values are type of {@link String}, convert to {@link Short}, if necessary.
*
* @param span
* @return tag value
*/
@Override public Short get(Span span) {
Integer tagValue = span.getIntTag(super.key);
if (tagValue == null) {
return null;
} else {
return Short.valueOf(tagValue.toString());
}
}
}
package com.a.eye.skywalking.collector.worker.segment.entity.tag;
import com.a.eye.skywalking.collector.worker.segment.entity.Span;
/**
* A subclass of {@link AbstractTag},
* represent a tag with a {@link String} value.
*
* Created by wusheng on 2017/2/17.
*/
public class StringTag extends AbstractTag<String> {
public StringTag(String tagKey) {
super(tagKey);
}
@Override public String get(Span span) {
return span.getStrTag(super.key);
}
}
package com.a.eye.skywalking.collector.worker.segment.entity.tag;
import com.a.eye.skywalking.collector.worker.segment.entity.Span;
/**
* The span tags are supported by sky-walking engine.
* As default, all tags will be stored, but these ones have particular meanings.
* <p>
* Created by wusheng on 2017/2/17.
*/
public final class Tags {
private Tags() {
}
/**
* URL records the url of the incoming request.
*/
public static final StringTag URL = new StringTag("url");
/**
* STATUS_CODE records the http status code of the response.
*/
public static final IntTag STATUS_CODE = new IntTag("status_code");
/**
* SPAN_KIND hints at the relationship between spans, e.g. client/server.
*/
public static final StringTag SPAN_KIND = new StringTag("span.kind");
/**
* A constant for setting the span kind to indicate that it represents a server span.
*/
public static final String SPAN_KIND_SERVER = "server";
/**
* A constant for setting the span kind to indicate that it represents a client span.
*/
public static final String SPAN_KIND_CLIENT = "client";
/**
* SPAN_LAYER represents the kind of span.
* <p>
* e.g.
* db=database;
* rpc=Remote Procedure Call Framework, like motan, thift;
* nosql=something like redis/memcache
*/
public static final class SPAN_LAYER {
private static StringTag SPAN_LAYER_TAG = new StringTag("span.layer");
public static String get(Span span) {
return SPAN_LAYER_TAG.get(span);
}
}
/**
* COMPONENT is a low-cardinality identifier of the module, library, or package that is instrumented.
* Like dubbo/dubbox/motan
*/
public static final StringTag COMPONENT = new StringTag("component");
/**
* ERROR indicates whether a Span ended in an error state.
*/
public static final BooleanTag ERROR = new BooleanTag("error", false);
/**
* PEER_HOST records host address (ip:port, or ip1:port1,ip2:port2) of the peer, maybe IPV4, IPV6 or hostname.
*/
public static final StringTag PEER_HOST = new StringTag("peer.host");
/**
* PEER_PORT records remote port of the peer
*/
public static final IntTag PEER_PORT = new IntTag("peer.port");
/**
* PEERS records multiple host address and port of remote
*/
public static final StringTag PEERS = new StringTag("peers");
/**
* DB_TYPE records database type, such as sql, redis, cassandra and so on.
*/
public static final StringTag DB_TYPE = new StringTag("db.type");
/**
* DB_INSTANCE records database instance name.
*/
public static final StringTag DB_INSTANCE = new StringTag("db.instance");
/**
* DB_STATEMENT records the sql statement of the database access.
*/
public static final StringTag DB_STATEMENT = new StringTag("db.statement");
}
package com.a.eye.skywalking.collector.worker.segment.logic;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceId.DistributedTraceId;
import com.a.eye.skywalking.trace.TraceId.DistributedTraceIds;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
import com.google.gson.annotations.Expose;
import com.google.gson.annotations.SerializedName;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
/**
* @author pengys5
*/
public class Segment {
/**
* The id of this trace segment.
* Every segment has its unique-global-id.
*/
@Expose
@SerializedName(value = "ts")
private String traceSegmentId;
/**
* The start time of this trace segment.
*/
@Expose
@SerializedName(value = "st")
private long startTime;
/**
* The end time of this trace segment.
*/
@Expose
@SerializedName(value = "et")
private long endTime;
/**
* The refs of parent trace segments, except the primary one.
* For most RPC call, {@link #refs} contains only one element,
* but if this segment is a start span of batch process, the segment faces multi parents,
* at this moment, we use this {@link #refs} to link them.
*/
@Expose
@SerializedName(value = "rs")
private List<TraceSegmentRef> refs;
/**
* The spans belong to this trace segment.
* They all have finished.
* All active spans are hold and controlled by "skywalking-api" module.
*/
@Expose
@SerializedName(value = "ss")
private List<Span> spans;
/**
* The <code>applicationCode</code> represents a NAME of current application/JVM and indicates which is business
* role in the cluster.
* <p>
* e.g. account_app, billing_app
*/
@Expose
@SerializedName(value = "ac")
private String applicationCode;
/**
* The <code>relatedGlobalTraces</code> represent a set of all related trace. Most time it contains only one
* element, because only one parent {@link TraceSegment} exists, but, in batch scenario, the num becomes greater
* than 1, also meaning multi-parents {@link TraceSegment}.
* <p>
* The difference between <code>relatedGlobalTraces</code> and {@link #refs} is:
* {@link #refs} targets this {@link TraceSegment}'s direct parent,
* <p>
* and
* <p>
* <code>relatedGlobalTraces</code> targets this {@link TraceSegment}'s related call chain, a call chain contains
* multi {@link TraceSegment}s, only using {@link #refs} is not enough for analysis and ui.
*/
@Expose
@SerializedName(value = "gt")
private DistributedTraceIds relatedGlobalTraces;
/**
* Establish the link between this segment and its parents.
*
* @param refSegment {@link TraceSegmentRef}
*/
public void ref(TraceSegmentRef refSegment) {
if (refs == null) {
refs = new LinkedList<TraceSegmentRef>();
}
if (!refs.contains(refSegment)) {
refs.add(refSegment);
}
}
public void relatedGlobalTraces(List<DistributedTraceId> distributedTraceIds) {
if (distributedTraceIds == null || distributedTraceIds.size() == 0) {
return;
}
for (DistributedTraceId distributedTraceId : distributedTraceIds) {
relatedGlobalTraces.append(distributedTraceId);
}
}
public String getTraceSegmentId() {
return traceSegmentId;
}
public long getStartTime() {
return startTime;
}
public long getEndTime() {
return endTime;
}
public List<TraceSegmentRef> getRefs() {
if (refs == null) {
return null;
}
return Collections.unmodifiableList(refs);
}
public List<DistributedTraceId> getRelatedGlobalTraces() {
return relatedGlobalTraces.getRelatedGlobalTraces();
}
public List<Span> getSpans() {
return Collections.unmodifiableList(spans);
}
public String getApplicationCode() {
return applicationCode;
}
}
package com.a.eye.skywalking.collector.worker.segment.logic;
import com.google.gson.Gson;
/**
* @author pengys5
*/
public enum SegmentDeserialize {
INSTANCE;
private Gson gson = new Gson();
public Segment deserializeFromES(String segmentSource) {
Segment segment = gson.fromJson(segmentSource, Segment.class);
return segment;
}
}
......@@ -9,10 +9,10 @@ import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.segment.SegmentCostIndex;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.a.eye.skywalking.collector.worker.segment.entity.Span;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.CollectionTools;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -43,7 +43,7 @@ public class SegmentCostSave extends RecordPersistenceMember {
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
Segment segment = segmentWithTimeSlice.getSegment();
if (CollectionTools.isNotEmpty(segment.getSpans())) {
for (Span span : segment.getSpans()) {
......
......@@ -9,13 +9,13 @@ import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.segment.SegmentExceptionIndex;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.segment.entity.LogData;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.a.eye.skywalking.collector.worker.segment.entity.Span;
import com.a.eye.skywalking.collector.worker.segment.entity.tag.Tags;
import com.a.eye.skywalking.collector.worker.storage.AbstractIndex;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.CollectionTools;
import com.a.eye.skywalking.trace.LogData;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.tag.Tags;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
......@@ -49,7 +49,7 @@ public class SegmentExceptionSave extends RecordPersistenceMember {
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
Segment segment = segmentWithTimeSlice.getSegment();
if (CollectionTools.isNotEmpty(segment.getSpans())) {
for (Span span : segment.getSpans()) {
......
......@@ -6,13 +6,20 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.config.CacheSizeConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.a.eye.skywalking.collector.worker.storage.AbstractIndex;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.google.gson.JsonObject;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* @author pengys5
......@@ -21,6 +28,8 @@ public class SegmentSave extends RecordPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(SegmentSave.class);
private Map<String, String> persistenceData = new LinkedHashMap<>();
@Override
public String esIndex() {
return SegmentIndex.INDEX;
......@@ -32,22 +41,45 @@ public class SegmentSave extends RecordPersistenceMember {
}
public SegmentSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof JsonObject) {
JsonObject segmentJson = (JsonObject)message;
RecordData recordData = new RecordData(segmentJson.get("ts").getAsString());
recordData.setRecord(segmentJson);
super.analyse(recordData);
if (message instanceof Segment) {
Segment segment = (Segment) message;
persistenceData.put(segment.getTraceSegmentId(), segment.getJsonStr());
if (persistenceData.size() >= CacheSizeConfig.Cache.Persistence.SIZE) {
persistence();
}
} else {
logger.error("unhandled message, message instance must JsonObject, but is %s", message.getClass().toString());
}
}
@Override
protected void persistence() {
boolean success = saveToEs();
if (success) {
persistenceData.clear();
}
}
private boolean saveToEs() {
Client client = EsClient.INSTANCE.getClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
logger.debug("persistenceData SIZE: %s", persistenceData.size());
persistenceData.forEach((key, value) -> bulkRequest.add(client.prepareIndex(esIndex(), esType(), key).setSource(value)));
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
logger.error(bulkResponse.buildFailureMessage());
}
return !bulkResponse.hasFailures();
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<SegmentSave> {
public static Factory INSTANCE = new Factory();
......
......@@ -7,12 +7,12 @@ import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
import com.a.eye.skywalking.collector.worker.segment.SegmentCostIndex;
import com.a.eye.skywalking.collector.worker.segment.SegmentExceptionIndex;
import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
import com.a.eye.skywalking.collector.worker.segment.logic.Segment;
import com.a.eye.skywalking.collector.worker.segment.logic.SegmentDeserialize;
import com.a.eye.skywalking.collector.worker.segment.entity.GlobalTraceId;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.a.eye.skywalking.collector.worker.segment.entity.SegmentDeserialize;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.tools.CollectionTools;
import com.a.eye.skywalking.trace.TraceId.DistributedTraceId;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
......@@ -81,12 +81,12 @@ public class SegmentTopSearchWithGlobalTraceId extends AbstractLocalSyncWorker {
topSegmentJson.addProperty(SegmentCostIndex.COST, (Number)getResponse.getSource().get(SegmentCostIndex.COST));
String segmentSource = client.prepareGet(SegmentIndex.INDEX, SegmentIndex.TYPE_RECORD, segId).get().getSourceAsString();
Segment segment = SegmentDeserialize.INSTANCE.deserializeFromES(segmentSource);
List<DistributedTraceId> distributedTraceIdList = segment.getRelatedGlobalTraces();
Segment segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentSource);
List<GlobalTraceId> distributedTraceIdList = segment.getRelatedGlobalTraces();
JsonArray distributedTraceIdArray = new JsonArray();
if (CollectionTools.isNotEmpty(distributedTraceIdList)) {
for (DistributedTraceId distributedTraceId : distributedTraceIdList) {
for (GlobalTraceId distributedTraceId : distributedTraceIdList) {
distributedTraceIdArray.add(distributedTraceId.get());
}
}
......
......@@ -6,11 +6,11 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.segment.SegmentCostIndex;
import com.a.eye.skywalking.collector.worker.segment.SegmentExceptionIndex;
import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
import com.a.eye.skywalking.collector.worker.segment.logic.Segment;
import com.a.eye.skywalking.collector.worker.segment.logic.SegmentDeserialize;
import com.a.eye.skywalking.collector.worker.segment.entity.GlobalTraceId;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.a.eye.skywalking.collector.worker.segment.entity.SegmentDeserialize;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.tools.CollectionTools;
import com.a.eye.skywalking.trace.TraceId.DistributedTraceId;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.elasticsearch.action.search.SearchRequestBuilder;
......@@ -30,7 +30,7 @@ import java.util.List;
public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
private SegmentTopSearchWithTimeSlice(Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -42,7 +42,7 @@ public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
@Override
protected void onWork(Object request, Object response) throws Exception {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity)request;
RequestEntity search = (RequestEntity) request;
SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(SegmentCostIndex.INDEX);
searchRequestBuilder.setTypes(SegmentCostIndex.TYPE_RECORD);
......@@ -77,23 +77,23 @@ public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
for (SearchHit searchHit : searchResponse.getHits().getHits()) {
JsonObject topSegmentJson = new JsonObject();
topSegmentJson.addProperty("num", num);
String segId = (String)searchHit.getSource().get(SegmentCostIndex.SEG_ID);
String segId = (String) searchHit.getSource().get(SegmentCostIndex.SEG_ID);
topSegmentJson.addProperty(SegmentCostIndex.SEG_ID, segId);
topSegmentJson.addProperty(SegmentCostIndex.START_TIME, (Number)searchHit.getSource().get(SegmentCostIndex.START_TIME));
topSegmentJson.addProperty(SegmentCostIndex.START_TIME, (Number) searchHit.getSource().get(SegmentCostIndex.START_TIME));
if (searchHit.getSource().containsKey(SegmentCostIndex.END_TIME)) {
topSegmentJson.addProperty(SegmentCostIndex.END_TIME, (Number)searchHit.getSource().get(SegmentCostIndex.END_TIME));
topSegmentJson.addProperty(SegmentCostIndex.END_TIME, (Number) searchHit.getSource().get(SegmentCostIndex.END_TIME));
}
topSegmentJson.addProperty(SegmentCostIndex.OPERATION_NAME, (String)searchHit.getSource().get(SegmentCostIndex.OPERATION_NAME));
topSegmentJson.addProperty(SegmentCostIndex.COST, (Number)searchHit.getSource().get(SegmentCostIndex.COST));
topSegmentJson.addProperty(SegmentCostIndex.OPERATION_NAME, (String) searchHit.getSource().get(SegmentCostIndex.OPERATION_NAME));
topSegmentJson.addProperty(SegmentCostIndex.COST, (Number) searchHit.getSource().get(SegmentCostIndex.COST));
String segmentSource = EsClient.INSTANCE.getClient().prepareGet(SegmentIndex.INDEX, SegmentIndex.TYPE_RECORD, segId).get().getSourceAsString();
Segment segment = SegmentDeserialize.INSTANCE.deserializeFromES(segmentSource);
List<DistributedTraceId> distributedTraceIdList = segment.getRelatedGlobalTraces();
Segment segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentSource);
List<GlobalTraceId> distributedTraceIdList = segment.getRelatedGlobalTraces();
JsonArray distributedTraceIdArray = new JsonArray();
if (CollectionTools.isNotEmpty(distributedTraceIdList)) {
for (DistributedTraceId distributedTraceId : distributedTraceIdList) {
for (GlobalTraceId distributedTraceId : distributedTraceIdList) {
distributedTraceIdArray.add(distributedTraceId.get());
}
}
......@@ -114,7 +114,7 @@ public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
topSegArray.add(topSegmentJson);
}
JsonObject resJsonObj = (JsonObject)response;
JsonObject resJsonObj = (JsonObject) response;
resJsonObj.add("result", topSegPaging);
}
}
......
......@@ -5,10 +5,10 @@ import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.Const;
import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
import com.a.eye.skywalking.collector.worker.segment.logic.Segment;
import com.a.eye.skywalking.collector.worker.segment.logic.SegmentDeserialize;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.a.eye.skywalking.collector.worker.segment.entity.SegmentDeserialize;
import com.a.eye.skywalking.collector.worker.segment.entity.Span;
import com.a.eye.skywalking.collector.worker.storage.GetResponseFromEs;
import com.a.eye.skywalking.trace.Span;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.elasticsearch.action.get.GetResponse;
......@@ -31,7 +31,7 @@ public class SpanSearchWithId extends AbstractLocalSyncWorker {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity)request;
GetResponse getResponse = GetResponseFromEs.INSTANCE.get(SegmentIndex.INDEX, SegmentIndex.TYPE_RECORD, search.segId);
Segment segment = SegmentDeserialize.INSTANCE.deserializeFromES(getResponse.getSourceAsString());
Segment segment = SegmentDeserialize.INSTANCE.deserializeSingle(getResponse.getSourceAsString());
List<Span> spanList = segment.getSpans();
getResponse.getSource();
......
package com.a.eye.skywalking.collector.worker.tools;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.tag.Tags;
import com.a.eye.skywalking.collector.worker.segment.entity.Span;
import com.a.eye.skywalking.collector.worker.segment.entity.tag.Tags;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......
......@@ -2,8 +2,8 @@ package com.a.eye.skywalking.collector.worker.tools;
import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.collector.worker.Const;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.tag.Tags;
import com.a.eye.skywalking.collector.worker.segment.entity.Span;
import com.a.eye.skywalking.collector.worker.segment.entity.tag.Tags;
/**
* @author pengys5
......
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.LocalAsyncWorkerRef;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
......@@ -11,6 +12,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.BufferedReader;
import java.io.PrintWriter;
import java.io.StringReader;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.*;
......@@ -54,15 +56,14 @@ public class PostWithHttpServletTestCase {
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
String reqStr = (String)invocation.getArguments()[0];
System.out.println(reqStr);
Assert.assertEquals("TestTest2", reqStr);
Segment segment = (Segment)invocation.getArguments()[0];
System.out.println(segment.getTraceSegmentId());
Assert.assertEquals("TestTest2", segment.getTraceSegmentId());
return null;
}
}).when(workerRef).tell(anyString());
}).when(workerRef).tell(any(Segment.class));
BufferedReader bufferedReader = mock(BufferedReader.class);
when(bufferedReader.readLine()).thenReturn("Test").thenReturn("Test2").thenReturn(null);
BufferedReader bufferedReader = new BufferedReader(new StringReader("[{\"ts\":\"TestTest2\"}]"));
when(request.getReader()).thenReturn(bufferedReader);
......
......@@ -21,8 +21,7 @@ public class TestAbstractPost extends AbstractPost {
}
@Override
protected void onReceive(String reqJsonStr) throws Exception {
protected void onReceive(Object message) throws Exception {
}
public enum WorkerRole implements Role {
......
......@@ -260,7 +260,7 @@ public class SegmentPostTestCase {
doAnswer(nodeMappingDayAnalysisAnswer).when(nodeMappingDayAnalysis).tell(Mockito.argThat(new IsSegmentWithTimeSlice()));
}
@Test
// @Test
public void testOnReceive() throws Exception {
String cacheServiceSegmentAsString = segmentMock.mockCacheServiceSegmentAsString();
......
......@@ -16,13 +16,13 @@ public class SegmentRealPost {
// HttpClientTools.INSTANCE.post("http://localhost:7001/segments", portalServiceExceptionSegmentAsString);
String cacheServiceSegmentAsString = mock.mockCacheServiceSegmentAsString();
HttpClientTools.INSTANCE.post("http://localhost:7001/segments", cacheServiceSegmentAsString);
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", cacheServiceSegmentAsString);
String persistenceServiceSegmentAsString = mock.mockPersistenceServiceSegmentAsString();
HttpClientTools.INSTANCE.post("http://localhost:7001/segments", persistenceServiceSegmentAsString);
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", persistenceServiceSegmentAsString);
String portalServiceSegmentAsString = mock.mockPortalServiceSegmentAsString();
HttpClientTools.INSTANCE.post("http://localhost:7001/segments", portalServiceSegmentAsString);
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", portalServiceSegmentAsString);
// String specialSegmentAsString = mock.mockSpecialSegmentAsString();
// HttpClientTools.INSTANCE.post("http://localhost:7001/segments", specialSegmentAsString);
......
......@@ -3,13 +3,10 @@ package com.a.eye.skywalking.collector.worker.segment.mock;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
import com.a.eye.skywalking.collector.worker.AnalysisMember;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.a.eye.skywalking.collector.worker.segment.entity.SegmentDeserialize;
import com.a.eye.skywalking.collector.worker.tools.DateTools;
import com.a.eye.skywalking.collector.worker.tools.JsonFileReader;
import com.a.eye.skywalking.trace.SegmentsMessage;
import com.a.eye.skywalking.trace.TraceSegment;
import com.google.gson.Gson;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.FileNotFoundException;
import java.util.ArrayList;
......@@ -19,10 +16,6 @@ import java.util.List;
* @author pengys5
*/
public class SegmentMock {
private Logger logger = LogManager.getFormatterLogger(SegmentMock.class);
private Gson gson = new Gson();
private String path = this.getClass().getResource("/").getPath();
private final String CacheServiceJsonFile = path + "/json/segment/post/normal/cache-service.json";
......@@ -38,10 +31,6 @@ public class SegmentMock {
return JsonFileReader.INSTANCE.read(path + fileName);
}
public String mockSpecialSegmentAsString() throws FileNotFoundException {
return JsonFileReader.INSTANCE.read(SpecialJsonFile);
}
public String mockCacheServiceSegmentAsString() throws FileNotFoundException {
return JsonFileReader.INSTANCE.read(CacheServiceJsonFile);
}
......@@ -54,69 +43,44 @@ public class SegmentMock {
return JsonFileReader.INSTANCE.read(PortalServiceJsonFile);
}
public String mockCacheServiceExceptionSegmentAsString() throws FileNotFoundException {
return JsonFileReader.INSTANCE.read(CacheServiceExceptionJsonFile);
}
public String mockPortalServiceExceptionSegmentAsString() throws FileNotFoundException {
return JsonFileReader.INSTANCE.read(PortalServiceExceptionJsonFile);
}
public List<SegmentPost.SegmentWithTimeSlice> mockSpecialSegmentTimeSlice() throws FileNotFoundException {
String specialSegmentAsString = mockSpecialSegmentAsString();
logger.debug(specialSegmentAsString);
return createSegmentWithTimeSliceList(specialSegmentAsString);
}
public List<SegmentPost.SegmentWithTimeSlice> mockCacheServiceExceptionSegmentTimeSlice() throws FileNotFoundException {
String cacheServiceExceptionSegmentAsString = mockCacheServiceExceptionSegmentAsString();
logger.debug(cacheServiceExceptionSegmentAsString);
return createSegmentWithTimeSliceList(cacheServiceExceptionSegmentAsString);
public List<SegmentPost.SegmentWithTimeSlice> mockCacheServiceExceptionSegmentTimeSlice() throws Exception {
return createSegmentWithTimeSliceList(CacheServiceExceptionJsonFile);
}
public List<SegmentPost.SegmentWithTimeSlice> mockPortalServiceExceptionSegmentTimeSlice() throws FileNotFoundException {
String portalServiceExceptionSegmentAsString = mockPortalServiceExceptionSegmentAsString();
logger.debug(portalServiceExceptionSegmentAsString);
return createSegmentWithTimeSliceList(portalServiceExceptionSegmentAsString);
public List<SegmentPost.SegmentWithTimeSlice> mockPortalServiceExceptionSegmentTimeSlice() throws Exception {
return createSegmentWithTimeSliceList(PortalServiceExceptionJsonFile);
}
public List<SegmentPost.SegmentWithTimeSlice> mockCacheServiceSegmentSegmentTimeSlice() throws FileNotFoundException {
String cacheServiceSegmentAsString = mockCacheServiceSegmentAsString();
logger.debug(cacheServiceSegmentAsString);
return createSegmentWithTimeSliceList(cacheServiceSegmentAsString);
public List<SegmentPost.SegmentWithTimeSlice> mockCacheServiceSegmentSegmentTimeSlice() throws Exception {
return createSegmentWithTimeSliceList(CacheServiceJsonFile);
}
public List<SegmentPost.SegmentWithTimeSlice> mockPersistenceServiceSegmentTimeSlice() throws FileNotFoundException {
String persistenceServiceSegmentAsString = mockPersistenceServiceSegmentAsString();
logger.debug(persistenceServiceSegmentAsString);
return createSegmentWithTimeSliceList(persistenceServiceSegmentAsString);
public List<SegmentPost.SegmentWithTimeSlice> mockPersistenceServiceSegmentTimeSlice() throws Exception {
return createSegmentWithTimeSliceList(PersistenceServiceJsonFile);
}
public List<SegmentPost.SegmentWithTimeSlice> mockPortalServiceSegmentSegmentTimeSlice() throws FileNotFoundException {
String portalServiceSegmentAsString = mockPortalServiceSegmentAsString();
logger.debug(portalServiceSegmentAsString);
return createSegmentWithTimeSliceList(portalServiceSegmentAsString);
public List<SegmentPost.SegmentWithTimeSlice> mockPortalServiceSegmentSegmentTimeSlice() throws Exception {
return createSegmentWithTimeSliceList(PortalServiceJsonFile);
}
private List<SegmentPost.SegmentWithTimeSlice> createSegmentWithTimeSliceList(String segmentJsonStr) {
SegmentsMessage segmentsMessage = gson.fromJson(segmentJsonStr, SegmentsMessage.class);
List<TraceSegment> segmentList = segmentsMessage.getSegments();
private List<SegmentPost.SegmentWithTimeSlice> createSegmentWithTimeSliceList(String jsonFilePath) throws Exception {
List<Segment> segmentList = SegmentDeserialize.INSTANCE.deserializeMultiple(jsonFilePath);
List<SegmentPost.SegmentWithTimeSlice> segmentWithTimeSliceList = new ArrayList<>();
for (TraceSegment newSegment : segmentList) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = createSegmentWithTimeSlice(newSegment);
for (Segment segment : segmentList) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = createSegmentWithTimeSlice(segment);
segmentWithTimeSliceList.add(segmentWithTimeSlice);
}
return segmentWithTimeSliceList;
}
private SegmentPost.SegmentWithTimeSlice createSegmentWithTimeSlice(TraceSegment newSegment) {
long minuteSlice = DateTools.getMinuteSlice(newSegment.getStartTime());
long hourSlice = DateTools.getHourSlice(newSegment.getStartTime());
long daySlice = DateTools.getDaySlice(newSegment.getStartTime());
int second = DateTools.getSecond(newSegment.getStartTime());
private SegmentPost.SegmentWithTimeSlice createSegmentWithTimeSlice(Segment segment) {
long minuteSlice = DateTools.getMinuteSlice(segment.getStartTime());
long hourSlice = DateTools.getHourSlice(segment.getStartTime());
long daySlice = DateTools.getDaySlice(segment.getStartTime());
int second = DateTools.getSecond(segment.getStartTime());
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = new SegmentPost.SegmentWithTimeSlice(newSegment, minuteSlice, hourSlice, daySlice, second);
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = new SegmentPost.SegmentWithTimeSlice(segment, minuteSlice, hourSlice, daySlice, second);
return segmentWithTimeSlice;
}
......
......@@ -7,6 +7,7 @@ import com.a.eye.skywalking.collector.worker.config.CacheSizeConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.mock.MockEsBulkClient;
import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
......@@ -20,6 +21,7 @@ import org.mockito.stubbing.Answer;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import java.util.TimeZone;
......@@ -83,9 +85,9 @@ public class SegmentSaveTestCase {
public void testAnalyse() throws Exception {
CacheSizeConfig.Cache.Persistence.SIZE = 1;
JsonObject segment_1 = new JsonObject();
segment_1.addProperty("ts", "segment_1");
segmentSave.analyse(segment_1);
Segment segment = new Segment();
segment.setJsonStr("{\"ts\":\"segment_1\"}");
segmentSave.analyse(segment);
Assert.assertEquals("segment_1", saveToEsSource.ts);
}
......@@ -97,7 +99,7 @@ public class SegmentSaveTestCase {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Gson gson = new Gson();
String source = (String)invocation.getArguments()[0];
String source = (String) invocation.getArguments()[0];
JsonObject sourceJsonObj = gson.fromJson(source, JsonObject.class);
ts = sourceJsonObj.get("ts").getAsString();
return null;
......
......@@ -5,10 +5,10 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.worker.Const;
import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
import com.a.eye.skywalking.collector.worker.segment.mock.SegmentMock;
import com.a.eye.skywalking.collector.worker.storage.GetResponseFromEs;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.elasticsearch.action.get.GetResponse;
import org.junit.Assert;
......@@ -63,10 +63,8 @@ public class SpanSearchWithIdTestCase {
LocalWorkerContext localWorkerContext = new LocalWorkerContext();
SpanSearchWithId spanSearchWithId = new SpanSearchWithId(SpanSearchWithId.WorkerRole.INSTANCE, clusterWorkerContext, localWorkerContext);
TraceSegment segment = create();
Gson gson = new Gson();
String sourceString = gson.toJson(segment);
SegmentMock mock = new SegmentMock();
String sourceString = mock.loadJsonFile("/json/span/persistence/segment.json");
GetResponse getResponse = mock(GetResponse.class);
when(getResponseFromEs.get(SegmentIndex.INDEX, SegmentIndex.TYPE_RECORD, "1")).thenReturn(getResponse);
when(getResponse.getSourceAsString()).thenReturn(sourceString);
......@@ -75,9 +73,10 @@ public class SpanSearchWithIdTestCase {
JsonObject response = new JsonObject();
spanSearchWithId.onWork(request, response);
JsonObject segJsonObj = response.get(Const.RESULT).getAsJsonObject();
String value = segJsonObj.get("ts").getAsJsonObject().get("Tag").getAsString();
Assert.assertEquals("VALUE", value);
JsonObject spanJsonObj = response.get(Const.RESULT).getAsJsonObject();
System.out.println(spanJsonObj.toString());
String value = spanJsonObj.get("operationName").getAsString();
Assert.assertEquals("/portal/", value);
}
private TraceSegment create() {
......
package com.a.eye.skywalking.collector.worker.tools;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.collector.worker.segment.entity.Span;
import org.junit.Assert;
import org.junit.Test;
......
{
"ts": "Segment.1490922929254.1797892356.6003.69.1",
"st": 1490922929254,
"et": 1490922929306,
"ss": [
{
"si": 2,
"ps": 1,
"st": 1490922929257,
"et": 1490922929262,
"on": "com.a.eye.skywalking.test.cache.CacheService.findCache(java.lang.String)",
"ts": {
"span.layer": "rpc",
"peer.host": "127.0.0.1",
"component": "Motan",
"span.kind": "client",
"url": "motan://127.0.0.1:8002/default_rpc/com.a.eye.skywalking.test.cache.CacheService/1.0/referer"
},
"tb": {},
"ti": {
"peer.port": 8002
},
"lo": []
},
{
"si": 1,
"ps": 0,
"st": 0,
"et": 1490922929262,
"on": "Motan_default_rpc_com.a.eye.skywalking.test.cache.CacheService.findCache(java.lang.String)",
"ts": {
"requestId": "1563346001467539461"
},
"tb": {},
"ti": {},
"lo": []
},
{
"si": 4,
"ps": 3,
"st": 1490922929262,
"et": 1490922929293,
"on": "/persistence/query",
"ts": {
"span.layer": "http",
"peer.host": "10.128.35.80",
"component": "HttpClient",
"span.kind": "client",
"url": "http://10.128.35.80:20880/persistence/query"
},
"tb": {},
"ti": {
"peer.port": 20880,
"status_code": 200
},
"lo": []
},
{
"si": 3,
"ps": 0,
"st": 1490922929262,
"et": 1490922929297,
"on": "com.a.eye.skywalking.test.persistence.PersistenceService.query(String)",
"ts": {
"span.layer": "rpc",
"component": "Dubbo",
"peer.host": "10.128.35.80",
"span.kind": "client",
"url": "rest://10.128.35.80:20880/com.a.eye.skywalking.test.persistence.PersistenceService.query(String)"
},
"tb": {},
"ti": {
"peer.port": 20880
},
"lo": []
},
{
"si": 6,
"ps": 5,
"st": 1490922929297,
"et": 1490922929303,
"on": "com.a.eye.skywalking.test.cache.CacheService.updateCache(java.lang.String,java.lang.String)",
"ts": {
"span.layer": "rpc",
"peer.host": "127.0.0.1",
"component": "Motan",
"span.kind": "client",
"url": "motan://127.0.0.1:8002/default_rpc/com.a.eye.skywalking.test.cache.CacheService/1.0/referer"
},
"tb": {},
"ti": {
"peer.port": 8002
},
"lo": []
},
{
"si": 5,
"ps": 0,
"st": 0,
"et": 1490922929303,
"on": "Motan_default_rpc_com.a.eye.skywalking.test.cache.CacheService.updateCache(java.lang.String,java.lang.String)",
"ts": {
"requestId": "1563346001510531078"
},
"tb": {},
"ti": {},
"lo": []
},
{
"si": 0,
"ps": -1,
"st": 1490922929254,
"et": 1490922929306,
"on": "/portal/",
"ts": {
"span.layer": "http",
"component": "Tomcat",
"peer.host": "0:0:0:0:0:0:0:1",
"span.kind": "server",
"url": "http://localhost:38080/portal/"
},
"tb": {},
"ti": {
"peer.port": 57837,
"status_code": 200
},
"lo": []
}
],
"ac": "portal-service",
"gt": [
"Trace.1490922929254.1797892356.6003.69.2"
]
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册