提交 6284fcbf 编写于 作者: P pengys5

temporary storage

上级 7885b4b9
......@@ -33,11 +33,12 @@ public class GlobalTraceAnalysis extends JoinAndSplitAnalysisMember {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
Segment segment = segmentWithTimeSlice.getSegment();
String subSegmentId = segment.getTraceSegmentId();
List<GlobalTraceId> globalTraceIdList = segment.getRelatedGlobalTraces();
List<GlobalTraceId> globalTraceIdList = null;
// List<GlobalTraceId> globalTraceIdList = segment.getRelatedGlobalTraces();
if (CollectionTools.isNotEmpty(globalTraceIdList)) {
for (GlobalTraceId disTraceId : globalTraceIdList) {
String traceId = disTraceId.get();
set(traceId, GlobalTraceIndex.SUB_SEG_IDS, subSegmentId);
// String traceId = disTraceId.get();
// set(traceId, GlobalTraceIndex.SUB_SEG_IDS, subSegmentId);
}
}
} else {
......
package org.skywalking.apm.collector.worker.httpserver;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.stream.JsonReader;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import java.io.BufferedReader;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.BufferedReader;
import java.io.IOException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalAsyncWorkerRef;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.entity.SegmentAndJson;
/**
* @author pengys5
......@@ -23,8 +27,7 @@ public abstract class AbstractPost extends AbstractLocalAsyncWorker {
super(role, clusterContext, selfContext);
}
@Override
final public void onWork(Object message) throws Exception {
@Override final public void onWork(Object message) throws Exception {
onReceive(message);
}
......@@ -34,15 +37,16 @@ public abstract class AbstractPost extends AbstractLocalAsyncWorker {
private Logger logger = LogManager.getFormatterLogger(PostWithHttpServlet.class);
private final Gson gson = new Gson();
private final LocalAsyncWorkerRef ownerWorkerRef;
PostWithHttpServlet(LocalAsyncWorkerRef ownerWorkerRef) {
this.ownerWorkerRef = ownerWorkerRef;
}
@Override
final protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
@Override final protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
JsonObject resJson = new JsonObject();
try {
BufferedReader bufferedReader = request.getReader();
......@@ -56,19 +60,34 @@ public abstract class AbstractPost extends AbstractLocalAsyncWorker {
}
private void streamReader(BufferedReader bufferedReader) throws Exception {
try (JsonReader reader = new JsonReader(bufferedReader)) {
readSegmentArray(reader);
}
}
Segment segment;
do {
int character;
StringBuilder builder = new StringBuilder();
while ((character = bufferedReader.read()) != ' ') {
if (character == -1) {
return;
}
builder.append((char)character);
}
int length = Integer.valueOf(builder.toString());
builder = new StringBuilder();
char[] buffer = new char[length];
int readLength = bufferedReader.read(buffer, 0, length);
if (readLength != length) {
logger.error("The actual data length was different from the length in data head! ");
return;
}
builder.append(buffer);
String segmentJsonStr = builder.toString();
segment = gson.fromJson(segmentJsonStr, Segment.class);
private void readSegmentArray(JsonReader reader) throws Exception {
reader.beginArray();
while (reader.hasNext()) {
Segment segment = new Segment();
segment.deserialize(reader);
ownerWorkerRef.tell(segment);
ownerWorkerRef.tell(new SegmentAndJson(segment, segmentJsonStr));
}
reader.endArray();
while (segment != null);
}
}
}
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.worker.segment;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.worker.segment.entity.SegmentAndJson;
import org.skywalking.apm.util.StringUtil;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
......@@ -58,8 +59,9 @@ public class SegmentPost extends AbstractPost {
@Override
protected void onReceive(Object message) throws Exception {
if (message instanceof Segment) {
Segment segment = (Segment) message;
if (message instanceof SegmentAndJson) {
SegmentAndJson segmentAndJson = (SegmentAndJson) message;
Segment segment = segmentAndJson.getSegment();
try {
validateData(segment);
} catch (IllegalArgumentException e) {
......@@ -75,7 +77,7 @@ public class SegmentPost extends AbstractPost {
logger.debug("minuteSlice: %s, hourSlice: %s, daySlice: %s, second:%s", minuteSlice, hourSlice, daySlice, second);
SegmentWithTimeSlice segmentWithTimeSlice = new SegmentWithTimeSlice(segment, minuteSlice, hourSlice, daySlice, second);
getSelfContext().lookup(SegmentAnalysis.Role.INSTANCE).tell(segment);
getSelfContext().lookup(SegmentAnalysis.Role.INSTANCE).tell(segmentAndJson);
getSelfContext().lookup(SegmentCostAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
getSelfContext().lookup(GlobalTraceAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
......
......@@ -8,6 +8,7 @@ import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.RecordAnalysisMember;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.entity.SegmentAndJson;
import org.skywalking.apm.collector.worker.segment.persistence.SegmentSave;
/**
......@@ -29,8 +30,8 @@ public class SegmentAnalysis extends RecordAnalysisMember {
@Override
public void analyse(Object message) throws Exception {
if (message instanceof Segment) {
Segment segment = (Segment) message;
getSelfContext().lookup(SegmentSave.Role.INSTANCE).tell(segment);
SegmentAndJson segmentAndJson = (SegmentAndJson) message;
getSelfContext().lookup(SegmentSave.Role.INSTANCE).tell(segmentAndJson);
} else {
logger.error("unhandled message, message instance must Segment, but is %s", message.getClass().toString());
}
......
package org.skywalking.apm.collector.worker.segment.entity;
import com.google.gson.TypeAdapter;
import com.google.gson.annotations.JsonAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
/**
* @author pengys5
*/
public class GlobalTraceId extends DeserializeObject {
private String globalTraceId;
@JsonAdapter(GlobalTraceId.Serializer.class)
public class GlobalTraceId {
public GlobalTraceId() {
globalTraceIds = new LinkedList<>();
}
public String get() {
return globalTraceId;
private LinkedList<String> globalTraceIds;
public LinkedList<String> get() {
return globalTraceIds;
}
public GlobalTraceId deserialize(JsonReader reader) throws IOException {
this.globalTraceId = reader.nextString();
this.setJsonStr("\"" + globalTraceId + "\"");
return this;
public static class Serializer extends TypeAdapter<GlobalTraceId> {
@Override public void write(JsonWriter out, GlobalTraceId value) throws IOException {
List<String> globalTraceIds = value.globalTraceIds;
if (globalTraceIds.size() > 0) {
out.beginArray();
for (String globalTraceId : globalTraceIds) {
out.value(globalTraceId);
}
out.endArray();
}
}
@Override public GlobalTraceId read(JsonReader in) throws IOException {
GlobalTraceId globalTraceId = new GlobalTraceId();
in.beginArray();
try {
while (in.hasNext()) {
globalTraceId.get().add(in.nextString());
}
} finally {
in.endArray();
}
return globalTraceId;
}
}
}
package org.skywalking.apm.collector.worker.segment.entity;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import java.util.HashMap;
import com.google.gson.annotations.SerializedName;
import java.util.Map;
/**
* @author pengys5
*/
public class LogData extends DeserializeObject {
public class LogData {
@SerializedName("tm")
private long time;
@SerializedName("fi")
private Map<String, String> fields;
public long getTime() {
......@@ -20,40 +21,4 @@ public class LogData extends DeserializeObject {
public Map<String, String> getFields() {
return fields;
}
public LogData deserialize(JsonReader reader) throws IOException {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("{");
boolean first = true;
reader.beginObject();
while (reader.hasNext()) {
switch (reader.nextName()) {
case "tm":
Long tm = reader.nextLong();
this.time = tm;
JsonBuilder.INSTANCE.append(stringBuilder, "tm", tm, first);
break;
case "fi":
fields = new HashMap<>();
reader.beginObject();
while (reader.hasNext()) {
String key = reader.nextName();
String value = reader.nextString();
fields.put(key, value);
}
reader.endObject();
JsonBuilder.INSTANCE.append(stringBuilder, "fi", fields, first);
break;
default:
reader.skipValue();
}
first = false;
}
reader.endObject();
stringBuilder.append("}");
this.setJsonStr(stringBuilder.toString());
return this;
}
}
package org.skywalking.apm.collector.worker.segment.entity;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import java.util.ArrayList;
import com.google.gson.annotations.SerializedName;
import java.util.List;
/**
* @author pengys5
*/
public class Segment extends DeserializeObject {
public class Segment {
@SerializedName("ts")
private String traceSegmentId;
@SerializedName("st")
private long startTime;
@SerializedName("et")
private long endTime;
@SerializedName("rs")
private List<TraceSegmentRef> refs;
@SerializedName("ss")
private List<Span> spans;
@SerializedName("ac")
private String applicationCode;
private List<GlobalTraceId> relatedGlobalTraces;
@SerializedName("gt")
private GlobalTraceId relatedGlobalTraces;
public String getTraceSegmentId() {
return traceSegmentId;
......@@ -42,86 +53,7 @@ public class Segment extends DeserializeObject {
return spans;
}
public List<GlobalTraceId> getRelatedGlobalTraces() {
public GlobalTraceId getRelatedGlobalTraces() {
return relatedGlobalTraces;
}
public Segment deserialize(JsonReader reader) throws IOException {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("{");
boolean first = true;
reader.beginObject();
while (reader.hasNext()) {
switch (reader.nextName()) {
case "ts":
String ts = reader.nextString();
this.traceSegmentId = ts;
JsonBuilder.INSTANCE.append(stringBuilder, "ts", ts, first);
break;
case "ac":
String ac = reader.nextString();
this.applicationCode = ac;
JsonBuilder.INSTANCE.append(stringBuilder, "ac", ac, first);
break;
case "st":
long st = reader.nextLong();
this.startTime = st;
JsonBuilder.INSTANCE.append(stringBuilder, "st", st, first);
break;
case "et":
long et = reader.nextLong();
this.endTime = et;
JsonBuilder.INSTANCE.append(stringBuilder, "et", et, first);
break;
case "rs":
refs = new ArrayList<>();
reader.beginArray();
while (reader.hasNext()) {
TraceSegmentRef ref = new TraceSegmentRef();
ref.deserialize(reader);
refs.add(ref);
}
reader.endArray();
JsonBuilder.INSTANCE.append(stringBuilder, "rs", refs, first);
break;
case "ss":
spans = new ArrayList<>();
reader.beginArray();
while (reader.hasNext()) {
Span span = new Span();
span.deserialize(reader);
spans.add(span);
}
reader.endArray();
JsonBuilder.INSTANCE.append(stringBuilder, "ss", spans, first);
break;
case "gt":
relatedGlobalTraces = new ArrayList<>();
reader.beginArray();
while (reader.hasNext()) {
GlobalTraceId globalTraceId = new GlobalTraceId();
globalTraceId.deserialize(reader);
relatedGlobalTraces.add(globalTraceId);
}
JsonBuilder.INSTANCE.append(stringBuilder, "gt", relatedGlobalTraces, first);
reader.endArray();
break;
default:
reader.skipValue();
}
first = false;
}
reader.endObject();
stringBuilder.append("}");
this.setJsonStr(stringBuilder.toString());
return this;
}
}
......@@ -3,14 +3,21 @@ package org.skywalking.apm.collector.worker.segment.entity;
/**
* @author pengys5
*/
public abstract class DeserializeObject {
private String jsonStr;
public class SegmentAndJson {
public String getJsonStr() {
return jsonStr;
}
private final Segment segment;
private final String jsonStr;
public void setJsonStr(String jsonStr) {
public SegmentAndJson(Segment segment, String jsonStr) {
this.segment = segment;
this.jsonStr = jsonStr;
}
public Segment getSegment() {
return segment;
}
public String getJsonStr() {
return jsonStr;
}
}
package org.skywalking.apm.collector.worker.segment.entity;
import com.google.gson.Gson;
import com.google.gson.stream.JsonReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
......@@ -14,10 +13,10 @@ import java.util.List;
public enum SegmentDeserialize {
INSTANCE;
private final Gson gson = new Gson();
public Segment deserializeSingle(String singleSegmentJsonStr) throws IOException {
JsonReader reader = new JsonReader(new StringReader(singleSegmentJsonStr));
Segment segment = new Segment();
segment.deserialize(reader);
Segment segment = gson.fromJson(singleSegmentJsonStr, Segment.class);
return segment;
}
......@@ -37,7 +36,7 @@ public enum SegmentDeserialize {
reader.beginArray();
while (reader.hasNext()) {
Segment segment = new Segment();
segment.deserialize(reader);
// segment.deserialize(reader);
segmentList.add(segment);
}
reader.endArray();
......
package org.skywalking.apm.collector.worker.segment.entity;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import com.google.gson.annotations.SerializedName;
import java.util.List;
import java.util.Map;
/**
* @author pengys5
*/
public class Span extends DeserializeObject {
public class Span {
@SerializedName("si")
private int spanId;
@SerializedName("ps")
private int parentSpanId;
@SerializedName("st")
private long startTime;
@SerializedName("et")
private long endTime;
@SerializedName("on")
private String operationName;
@SerializedName("ts")
private Map<String, String> tagsWithStr;
@SerializedName("tb")
private Map<String, Boolean> tagsWithBool;
@SerializedName("ti")
private Map<String, Integer> tagsWithInt;
@SerializedName("lo")
private List<LogData> logs;
public int getSpanId() {
......@@ -57,97 +71,4 @@ public class Span extends DeserializeObject {
public List<LogData> getLogs() {
return logs;
}
public Span deserialize(JsonReader reader) throws IOException {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("{");
boolean first = true;
reader.beginObject();
while (reader.hasNext()) {
switch (reader.nextName()) {
case "si":
Integer si = reader.nextInt();
this.spanId = si;
JsonBuilder.INSTANCE.append(stringBuilder, "si", si, first);
break;
case "ps":
Integer ps = reader.nextInt();
this.parentSpanId = ps;
JsonBuilder.INSTANCE.append(stringBuilder, "ps", ps, first);
break;
case "st":
Long st = reader.nextLong();
this.startTime = st;
JsonBuilder.INSTANCE.append(stringBuilder, "st", st, first);
break;
case "et":
Long et = reader.nextLong();
this.endTime = et;
JsonBuilder.INSTANCE.append(stringBuilder, "et", et, first);
break;
case "on":
String on = reader.nextString();
this.operationName = on;
JsonBuilder.INSTANCE.append(stringBuilder, "on", on, first);
break;
case "ts":
tagsWithStr = new HashMap<>();
reader.beginObject();
while (reader.hasNext()) {
String key = reader.nextName();
String value = reader.nextString();
tagsWithStr.put(key, value);
}
reader.endObject();
JsonBuilder.INSTANCE.append(stringBuilder, "ts", tagsWithStr, first);
break;
case "tb":
tagsWithBool = new HashMap<>();
reader.beginObject();
while (reader.hasNext()) {
String key = reader.nextName();
boolean value = reader.nextBoolean();
tagsWithBool.put(key, value);
}
reader.endObject();
JsonBuilder.INSTANCE.append(stringBuilder, "tb", tagsWithBool, first);
break;
case "ti":
tagsWithInt = new HashMap<>();
reader.beginObject();
while (reader.hasNext()) {
String key = reader.nextName();
Integer value = reader.nextInt();
tagsWithInt.put(key, value);
}
reader.endObject();
JsonBuilder.INSTANCE.append(stringBuilder, "ti", tagsWithInt, first);
break;
case "lo":
logs = new ArrayList<>();
reader.beginArray();
while (reader.hasNext()) {
LogData logData = new LogData();
logData.deserialize(reader);
logs.add(logData);
}
reader.endArray();
JsonBuilder.INSTANCE.append(stringBuilder, "lo", logs, first);
break;
default:
reader.skipValue();
}
first = false;
}
reader.endObject();
stringBuilder.append("}");
this.setJsonStr(stringBuilder.toString());
return this;
}
}
package org.skywalking.apm.collector.worker.segment.entity;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import com.google.gson.annotations.SerializedName;
/**
* @author pengys5
*/
public class TraceSegmentRef extends DeserializeObject {
public class TraceSegmentRef {
@SerializedName("ts")
private String traceSegmentId;
@SerializedName("si")
private int spanId = -1;
@SerializedName("ac")
private String applicationCode;
@SerializedName("ph")
private String peerHost;
public String getTraceSegmentId() {
......@@ -32,44 +34,4 @@ public class TraceSegmentRef extends DeserializeObject {
public String getPeerHost() {
return peerHost;
}
public TraceSegmentRef deserialize(JsonReader reader) throws IOException {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("{");
boolean first = true;
reader.beginObject();
while (reader.hasNext()) {
switch (reader.nextName()) {
case "ts":
String ts = reader.nextString();
this.traceSegmentId = ts;
JsonBuilder.INSTANCE.append(stringBuilder, "ts", ts, first);
break;
case "si":
Integer si = reader.nextInt();
this.spanId = si;
JsonBuilder.INSTANCE.append(stringBuilder, "si", si, first);
break;
case "ac":
String ac = reader.nextString();
this.applicationCode = ac;
JsonBuilder.INSTANCE.append(stringBuilder, "ac", ac, first);
break;
case "ph":
String ph = reader.nextString();
this.peerHost = ph;
JsonBuilder.INSTANCE.append(stringBuilder, "ph", ph, first);
break;
default:
reader.skipValue();
}
first = false;
}
reader.endObject();
stringBuilder.append("}");
this.setJsonStr(stringBuilder.toString());
return this;
}
}
package org.skywalking.apm.collector.worker.segment.persistence;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorkerProvider;
......@@ -10,12 +13,12 @@ import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.PersistenceMember;
import org.skywalking.apm.collector.worker.config.CacheSizeConfig;
import org.skywalking.apm.collector.worker.segment.SegmentIndex;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.storage.*;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.worker.segment.entity.SegmentAndJson;
import org.skywalking.apm.collector.worker.storage.AbstractIndex;
import org.skywalking.apm.collector.worker.storage.EsClient;
import org.skywalking.apm.collector.worker.storage.PersistenceWorkerListener;
import org.skywalking.apm.collector.worker.storage.SegmentData;
import org.skywalking.apm.collector.worker.storage.SegmentPersistenceData;
/**
* @author pengys5
......@@ -33,7 +36,7 @@ public class SegmentSave extends PersistenceMember<SegmentPersistenceData, Segme
}
public SegmentSave(org.skywalking.apm.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -42,13 +45,12 @@ public class SegmentSave extends PersistenceMember<SegmentPersistenceData, Segme
return new SegmentPersistenceData();
}
@Override
final public void analyse(Object message) throws Exception {
if (message instanceof Segment) {
Segment segment = (Segment) message;
@Override final public void analyse(Object message) throws Exception {
if (message instanceof SegmentAndJson) {
SegmentAndJson segmentAndJson = (SegmentAndJson)message;
SegmentPersistenceData data = getPersistenceData();
data.hold();
data.getOrCreate(segment.getTraceSegmentId()).setSegmentStr(segment.getJsonStr());
data.getOrCreate(segmentAndJson.getSegment().getTraceSegmentId()).setSegmentStr(segmentAndJson.getJsonStr());
if (data.size() >= CacheSizeConfig.Cache.Persistence.SIZE) {
persistence(data.asMap());
}
......@@ -69,8 +71,7 @@ public class SegmentSave extends PersistenceMember<SegmentPersistenceData, Segme
dataMap.clear();
}
@Override
final protected void prepareIndex(List<IndexRequestBuilder> builderList) {
@Override final protected void prepareIndex(List<IndexRequestBuilder> builderList) {
Map<String, SegmentData> lastData = getPersistenceData().getLast().asMap();
Client client = EsClient.INSTANCE.getClient();
......
......@@ -82,12 +82,13 @@ public class SegmentTopSearchWithGlobalTraceId extends AbstractLocalSyncWorker {
String segmentSource = client.prepareGet(SegmentIndex.INDEX, SegmentIndex.TYPE_RECORD, segId).get().getSourceAsString();
Segment segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentSource);
List<GlobalTraceId> distributedTraceIdList = segment.getRelatedGlobalTraces();
List<GlobalTraceId> distributedTraceIdList = null;
// List<GlobalTraceId> distributedTraceIdList = segment.getRelatedGlobalTraces();
JsonArray distributedTraceIdArray = new JsonArray();
if (CollectionTools.isNotEmpty(distributedTraceIdList)) {
for (GlobalTraceId distributedTraceId : distributedTraceIdList) {
distributedTraceIdArray.add(distributedTraceId.get());
// distributedTraceIdArray.add(distributedTraceId.get());
}
}
topSegmentJson.add("traceIds", distributedTraceIdArray);
......
......@@ -90,12 +90,13 @@ public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
String segmentSource = EsClient.INSTANCE.getClient().prepareGet(SegmentIndex.INDEX, SegmentIndex.TYPE_RECORD, segId).get().getSourceAsString();
logger().debug("segmentSource:" + segmentSource);
Segment segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentSource);
List<GlobalTraceId> distributedTraceIdList = segment.getRelatedGlobalTraces();
// List<GlobalTraceId> distributedTraceIdList = segment.getRelatedGlobalTraces();
List<GlobalTraceId> distributedTraceIdList = null;
JsonArray distributedTraceIdArray = new JsonArray();
if (CollectionTools.isNotEmpty(distributedTraceIdList)) {
for (GlobalTraceId distributedTraceId : distributedTraceIdList) {
distributedTraceIdArray.add(distributedTraceId.get());
// distributedTraceIdArray.add(distributedTraceId.get());
}
}
topSegmentJson.add("traceIds", distributedTraceIdArray);
......
......@@ -39,7 +39,6 @@ public class SpanSearchWithId extends AbstractLocalSyncWorker {
for (Span span : spanList) {
if (String.valueOf(span.getSpanId()).equals(search.spanId)) {
span.setJsonStr("");
String spanJsonStr = gson.toJson(span);
dataJson = gson.fromJson(spanJsonStr, JsonObject.class);
}
......
package org.skywalking.apm.collector.worker.tools;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import java.io.FileNotFoundException;
import java.io.FileReader;
......@@ -15,6 +15,15 @@ public enum JsonFileReader {
public String read(String path) throws FileNotFoundException {
JsonParser jsonParser = new JsonParser();
JsonElement jsonElement = jsonParser.parse(new FileReader(path));
return jsonElement.toString();
StringBuilder segmentBuilder = new StringBuilder();
JsonArray segments = jsonElement.getAsJsonArray();
for (int i = 0; i < segments.size(); i++) {
JsonElement segment = segments.get(i);
String segmentStr = segment.toString();
segmentBuilder.append(segmentStr.length()).append(" ").append(segmentStr);
}
return segmentBuilder.toString();
}
}
package org.skywalking.apm.collector.worker.httpserver;
import com.google.gson.JsonObject;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.PrintWriter;
import java.io.StringReader;
import java.io.Writer;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -10,16 +17,21 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.worker.segment.mock.SegmentMock;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest( {TestAbstractPost.class})
@PowerMockIgnore( {"javax.management.*"})
@PrepareForTest({TestAbstractPost.class})
@PowerMockIgnore({"javax.management.*"})
public class AbstractPostTestCase {
private TestAbstractPost post;
......@@ -43,4 +55,23 @@ public class AbstractPostTestCase {
post.onWork(new JsonObject());
PowerMockito.verifyPrivate(post).invoke("saveException", any(IllegalArgumentException.class));
}
@Test
public void testPostWithHttpServlet() throws Exception {
SegmentMock segmentMock = new SegmentMock();
// BufferedReader reader = new BufferedReader(new StringReader(segmentMock.mockCacheServiceExceptionSegmentAsString()));
BufferedReader reader = new BufferedReader(new StringReader(segmentMock.mockCacheServiceSegmentAsString()));
HttpServletRequest request = mock(HttpServletRequest.class);
when(request.getReader()).thenReturn(reader);
Writer writer = mock(Writer.class);
PrintWriter printWriter = new PrintWriter(writer);
HttpServletResponse response = mock(HttpServletResponse.class);
when(response.getWriter()).thenReturn(printWriter);
AbstractPost.PostWithHttpServlet servlet = new AbstractPost.PostWithHttpServlet(null);
servlet.doPost(request, response);
}
}
package org.skywalking.apm.collector.worker.segment.entity;
import com.google.gson.stream.JsonReader;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.io.StringReader;
import java.util.Map;
/**
* @author pengys5
*/
public class LogDataTestCase {
@Test
public void deserialize() throws IOException {
LogData logData = new LogData();
JsonReader reader = new JsonReader(new StringReader("{\"tm\":1, \"fi\": {\"test1\":\"test1\",\"test2\":\"test2\"}, \"skip\":\"skip\"}"));
logData.deserialize(reader);
Assert.assertEquals(1L, logData.getTime());
Map<String, String> fields = logData.getFields();
Assert.assertEquals("test1", fields.get("test1"));
Assert.assertEquals("test2", fields.get("test2"));
Assert.assertEquals(false, fields.containsKey("skip"));
}
}
package org.skywalking.apm.collector.worker.segment.entity;
import com.google.gson.stream.JsonReader;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.io.StringReader;
/**
* @author pengys5
*/
public class TraceSegmentRefTestCase {
@Test
public void deserialize() throws IOException {
TraceSegmentRef traceSegmentRef = new TraceSegmentRef();
JsonReader reader = new JsonReader(new StringReader("{\"ts\" :\"ts\",\"si\":0,\"ac\":\"ac\",\"ph\":\"ph\", \"skip\":\"skip\"}"));
traceSegmentRef.deserialize(reader);
Assert.assertEquals("ts", traceSegmentRef.getTraceSegmentId());
Assert.assertEquals("ac", traceSegmentRef.getApplicationCode());
Assert.assertEquals("ph", traceSegmentRef.getPeerHost());
Assert.assertEquals(0, traceSegmentRef.getSpanId());
}
}
......@@ -35,6 +35,10 @@ public class SegmentMock {
return JsonFileReader.INSTANCE.read(CacheServiceJsonFile);
}
public String mockCacheServiceExceptionSegmentAsString() throws FileNotFoundException {
return JsonFileReader.INSTANCE.read(CacheServiceExceptionJsonFile);
}
public String mockPersistenceServiceSegmentAsString() throws FileNotFoundException {
return JsonFileReader.INSTANCE.read(PersistenceServiceJsonFile);
}
......
......@@ -366,7 +366,7 @@
],
"ac": "cache-service",
"gt": [
"Trace.1490922929254.1797892356.6003.69.2"
"Trace.1490922929254.1797892356.6003.69.2,Trace.1490922929254.1797892356.6003.69.3"
]
}
]
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册