提交 4dab082c 编写于 作者: wu-sheng's avatar wu-sheng

Use gson replace protobuf in serialization. Build as small json as possible.(not finished yet.)

上级 a4619318
......@@ -56,6 +56,12 @@
</properties>
<dependencies>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
......
......@@ -23,10 +23,5 @@
<artifactId>skywalking-trace</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
</project>
package com.a.eye.skywalking.collector.commons.serializer;
import akka.serialization.JSerializer;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.proto.SegmentMessage;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* @author pengys5
*/
public class TraceSegmentSerializer extends JSerializer {
private static ILog logger = LogManager.getLogger(TraceSegmentSerializer.class);
@Override
public boolean includeManifest() {
return false;
}
@Override
public int identifier() {
return 30;
}
@Override
public byte[] toBinary(Object o) {
TraceSegment traceSegment = (TraceSegment) o;
return traceSegment.serialize().toByteArray();
}
@Override
public Object fromBinaryJava(byte[] bytes, Class<?> manifest) {
TraceSegment traceSegment = null;
try {
traceSegment = new TraceSegment(SegmentMessage.parseFrom(bytes));
} catch (InvalidProtocolBufferException e) {
logger.warn("Can't covert message from byte[] to SegmentMessage");
}
return traceSegment;
}
}
......@@ -9,9 +9,6 @@ import com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.sniffer.mock.trace.TraceSegmentBuilderFactory;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.proto.SegmentMessage;
import com.a.eye.skywalking.trace.proto.SegmentRefMessage;
import com.a.eye.skywalking.trace.tag.Tags;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
......@@ -36,25 +33,25 @@ public class StartUpTestCase {
TraceSegment dubboClientData = TraceSegmentBuilderFactory.INSTANCE.traceOf_Tomcat_DubboClient();
SegmentMessage.Builder clientBuilder = dubboClientData.serialize().toBuilder();
clientBuilder.setApplicationCode("Tomcat_DubboClient");
dubboClientData = new TraceSegment(clientBuilder.build());
TraceSegment dubboServerData = TraceSegmentBuilderFactory.INSTANCE.traceOf_DubboServer_MySQL();
SegmentMessage serializeServer = dubboServerData.serialize();
SegmentMessage.Builder builder = serializeServer.toBuilder();
SegmentRefMessage.Builder builderRef = builder.getRefs(0).toBuilder();
builderRef.setApplicationCode(dubboClientData.getApplicationCode());
builderRef.setPeerHost(Tags.PEER_HOST.get(dubboClientData.getSpans().get(1)));
builder.setApplicationCode("DubboServer_MySQL");
builder.addRefs(builderRef);
dubboServerData = new TraceSegment(builder.build());
// SegmentMessage.Builder clientBuilder = dubboClientData.serialize().toBuilder();
// clientBuilder.setApplicationCode("Tomcat_DubboClient");
//
// dubboClientData = new TraceSegment(clientBuilder.build());
//
// TraceSegment dubboServerData = TraceSegmentBuilderFactory.INSTANCE.traceOf_DubboServer_MySQL();
//
// SegmentMessage serializeServer = dubboServerData.serialize();
// SegmentMessage.Builder builder = serializeServer.toBuilder();
//
// SegmentRefMessage.Builder builderRef = builder.getRefs(0).toBuilder();
// builderRef.setApplicationCode(dubboClientData.getApplicationCode());
//
//
// builderRef.setPeerHost(Tags.PEER_HOST.get(dubboClientData.getSpans().get(1)));
//
// builder.setApplicationCode("DubboServer_MySQL");
// builder.addRefs(builderRef);
// dubboServerData = new TraceSegment(builder.build());
Thread.sleep(5000);
......@@ -62,7 +59,7 @@ public class StartUpTestCase {
for (int i = 0; i < 100; i++) {
selection.tell(dubboClientData, ActorRef.noSender());
selection.tell(dubboServerData, ActorRef.noSender());
// selection.tell(dubboServerData, ActorRef.noSender());
Thread.sleep(200);
}
......
......@@ -17,12 +17,6 @@
<artifactId>skywalking-util</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.1.0</version>
</dependency>
</dependencies>
<build>
......
package com.a.eye.skywalking.trace;
import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.trace.proto.KeyValue;
import com.a.eye.skywalking.trace.proto.LogDataMessage;
import com.google.gson.annotations.Expose;
import com.google.gson.annotations.SerializedName;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
......@@ -14,7 +11,12 @@ import java.util.Map;
* Created by wusheng on 2017/2/17.
*/
public class LogData {
@Expose
@SerializedName(value="ti")
private long time;
@Expose
@SerializedName(value="fi")
private Map<String, ?> fields;
LogData(long time, Map<String, ?> fields) {
......@@ -25,9 +27,7 @@ public class LogData {
this.fields = fields;
}
LogData(LogDataMessage message){
deserialize(message);
}
public LogData(){}
public long getTime() {
return time;
......@@ -37,35 +37,4 @@ public class LogData {
return Collections.unmodifiableMap(fields);
}
public LogDataMessage serialize() {
LogDataMessage.Builder logDataBuilder = LogDataMessage.newBuilder();
logDataBuilder.setTime(time);
if(fields != null){
for (Map.Entry<String, ?> entry : fields.entrySet()) {
KeyValue.Builder logEntryBuilder = KeyValue.newBuilder();
logEntryBuilder.setKey(entry.getKey());
String value = String.valueOf(entry.getValue());
if(!StringUtil.isEmpty(value)) {
logEntryBuilder.setValue(value);
}
logDataBuilder.addFields(logEntryBuilder);
}
}
return logDataBuilder.build();
}
public void deserialize(LogDataMessage message) {
time = message.getTime();
List<KeyValue> list = message.getFieldsList();
if(list != null){
HashMap initFields = new HashMap<String, String>();
for (KeyValue field : list) {
initFields.put(field.getKey(), field.getValue());
}
this.fields = initFields;
}
}
}
package com.a.eye.skywalking.trace;
import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.trace.proto.KeyValue;
import com.a.eye.skywalking.trace.proto.LogDataMessage;
import com.a.eye.skywalking.trace.proto.SpanMessage;
import com.google.gson.annotations.Expose;
import com.google.gson.annotations.SerializedName;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
......@@ -23,18 +20,26 @@ import java.util.Map;
* Created by wusheng on 2017/2/17.
*/
public class Span{
@Expose
@SerializedName(value="si")
private int spanId;
@Expose
@SerializedName(value="ps")
private int parentSpanId;
/**
* The start time of this Span.
*/
@Expose
@SerializedName(value="st")
private long startTime;
/**
* The end time of this Span.
*/
@Expose
@SerializedName(value="et")
private long endTime;
/**
......@@ -42,6 +47,8 @@ public class Span{
* If you want to know, how to set an operation name,
* {@see https://github.com/opentracing/specification/blob/master/specification.md#start-a-new-span}
*/
@Expose
@SerializedName(value="on")
private String operationName;
/**
......@@ -49,6 +56,8 @@ public class Span{
*
* {@see https://github.com/opentracing/specification/blob/master/specification.md#set-a-span-tag}
*/
@Expose
@SerializedName(value="ta")
private final Map<String, Object> tags;
/**
......@@ -56,6 +65,8 @@ public class Span{
*
* {@see https://github.com/opentracing/specification/blob/master/specification.md#log-structured-data}
*/
@Expose
@SerializedName(value="ls")
private final List<LogData> logs;
/**
......@@ -82,12 +93,11 @@ public class Span{
* @param startTime given start timestamp.
*/
private Span(int spanId, int parentSpanId, String operationName, long startTime) {
this();
this.spanId = spanId;
this.parentSpanId = parentSpanId;
this.startTime = startTime;
this.operationName = operationName;
this.tags = new HashMap<String, Object>();
this.logs = new ArrayList<LogData>();
}
/**
......@@ -138,15 +148,11 @@ public class Span{
}
/**
* Create a new span, by given {@link SpanMessage}, which you can get from another {@link Span} object,
* by calling {@link Span#serialize()};
*
* @param spanMessage from another {@link Span#serialize()}
* Create a new/empty span.
*/
public Span(SpanMessage spanMessage) {
public Span() {
tags = new HashMap<String, Object>();
logs = new LinkedList<LogData>();
this.deserialize(spanMessage);
}
/**
......@@ -265,51 +271,6 @@ public class Span{
return log(exceptionFields);
}
public SpanMessage serialize() {
SpanMessage.Builder builder = SpanMessage.newBuilder();
builder.setSpanId(spanId);
builder.setParentSpanId(parentSpanId);
builder.setStartTime(startTime);
builder.setEndTime(endTime);
builder.setOperationName(operationName);
for (Map.Entry<String, Object> entry : tags.entrySet()) {
KeyValue.Builder tagEntryBuilder = KeyValue.newBuilder();
tagEntryBuilder.setKey(entry.getKey());
String value = String.valueOf(entry.getValue());
if (!StringUtil.isEmpty(value)) {
tagEntryBuilder.setValue(value);
}
builder.addTags(tagEntryBuilder);
}
for (LogData log : logs) {
builder.addLogs(log.serialize());
}
return builder.build();
}
public void deserialize(SpanMessage message) {
spanId = message.getSpanId();
parentSpanId = message.getParentSpanId();
startTime = message.getStartTime();
endTime = message.getEndTime();
operationName = message.getOperationName();
List<KeyValue> tagsList = message.getTagsList();
if(tagsList != null){
for (KeyValue tag : tagsList) {
tags.put(tag.getKey(), tag.getValue());
}
}
List<LogDataMessage> logsList = message.getLogsList();
if (logsList != null) {
for (LogDataMessage logDataMessage : logsList) {
logs.add(new LogData(logDataMessage));
}
}
}
private enum ThrowableTransformer {
INSTANCE;
......
package com.a.eye.skywalking.trace.TraceId;
import com.google.gson.TypeAdapter;
import com.google.gson.annotations.JsonAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
/**
* The <code>DistributedTraceId</code> presents a distributed call chain.
*
......@@ -13,6 +19,7 @@ package com.a.eye.skywalking.trace.TraceId;
*
* @author wusheng
*/
@JsonAdapter(DistributedTraceId.Serializer.class)
public abstract class DistributedTraceId {
private String id;
......@@ -40,4 +47,22 @@ public abstract class DistributedTraceId {
public int hashCode() {
return id != null ? id.hashCode() : 0;
}
public static class Serializer extends TypeAdapter<DistributedTraceId> {
@Override
public void write(JsonWriter out, DistributedTraceId value) throws IOException {
out.beginArray();
out.value(value.get());
out.endArray();
}
@Override
public DistributedTraceId read(JsonReader in) throws IOException {
in.beginArray();
PropagatedTraceId traceId = new PropagatedTraceId(in.nextString());
in.endArray();
return traceId;
}
}
}
......@@ -2,11 +2,8 @@ package com.a.eye.skywalking.trace;
import com.a.eye.skywalking.trace.TraceId.DistributedTraceId;
import com.a.eye.skywalking.trace.TraceId.NewDistributedTraceId;
import com.a.eye.skywalking.trace.TraceId.PropagatedTraceId;
import com.a.eye.skywalking.trace.proto.SegmentMessage;
import com.a.eye.skywalking.trace.proto.SegmentRefMessage;
import com.a.eye.skywalking.trace.proto.SpanMessage;
import com.google.protobuf.ProtocolStringList;
import com.google.gson.annotations.Expose;
import com.google.gson.annotations.SerializedName;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
......@@ -27,16 +24,22 @@ public class TraceSegment {
* The id of this trace segment.
* Every segment has its unique-global-id.
*/
@Expose
@SerializedName(value="ts")
private String traceSegmentId;
/**
* The start time of this trace segment.
*/
@Expose
@SerializedName(value="st")
private long startTime;
/**
* The end time of this trace segment.
*/
@Expose
@SerializedName(value="et")
private long endTime;
/**
......@@ -45,6 +48,8 @@ public class TraceSegment {
* but if this segment is a start span of batch process, the segment faces multi parents,
* at this moment, we use this {@link #refs} to link them.
*/
@Expose
@SerializedName(value="rs")
private List<TraceSegmentRef> refs;
/**
......@@ -52,6 +57,8 @@ public class TraceSegment {
* They all have finished.
* All active spans are hold and controlled by "skywalking-api" module.
*/
@Expose
@SerializedName(value="ss")
private List<Span> spans;
/**
......@@ -60,6 +67,8 @@ public class TraceSegment {
*
* e.g. account_app, billing_app
*/
@Expose
@SerializedName(value="ac")
private String applicationCode;
/**
......@@ -75,6 +84,8 @@ public class TraceSegment {
* <code>relatedGlobalTraces</code> targets this {@link TraceSegment}'s related call chain, a call chain contains
* multi {@link TraceSegment}s, only using {@link #refs} is not enough for analysis and ui.
*/
@Expose
@SerializedName(value="gt")
private LinkedList<DistributedTraceId> relatedGlobalTraces;
/**
......@@ -82,21 +93,19 @@ public class TraceSegment {
* This segmentId is generated by TraceSegmentRef, AKA, from tracer/agent module.
*/
public TraceSegment(String applicationCode) {
this();
this.traceSegmentId = GlobalIdGenerator.generate(ID_TYPE);
this.applicationCode = applicationCode;
this.startTime = System.currentTimeMillis();
this.spans = new LinkedList<Span>();
this.relatedGlobalTraces = new LinkedList<DistributedTraceId>();
this.relatedGlobalTraces.add(new NewDistributedTraceId());
}
/**
* Create a trace segment, by given {@link SegmentMessage}
*
* @param message from another {@link TraceSegment#serialize()}
* Create a default/empty trace segment
*/
public TraceSegment(SegmentMessage message) {
deserialize(message);
public TraceSegment() {
this.spans = new LinkedList<Span>();
this.relatedGlobalTraces = new LinkedList<DistributedTraceId>();
this.relatedGlobalTraces.add(new NewDistributedTraceId());
}
/**
......@@ -189,53 +198,4 @@ public class TraceSegment {
", relatedGlobalTraces=" + relatedGlobalTraces +
'}';
}
public SegmentMessage serialize() {
SegmentMessage.Builder segmentBuilder = SegmentMessage.newBuilder();
segmentBuilder.setTraceSegmentId(traceSegmentId);
segmentBuilder.setStartTime(startTime);
segmentBuilder.setEndTime(endTime);
segmentBuilder.setApplicationCode(applicationCode);
if (refs != null && refs.size() > 0) {
for (TraceSegmentRef ref : refs) {
segmentBuilder.addRefs(ref.serialize());
}
}
for (DistributedTraceId id : relatedGlobalTraces) {
segmentBuilder.addRelatedTraceIds(id.get());
}
for (Span span : spans) {
segmentBuilder.addSpans(span.serialize());
}
return segmentBuilder.build();
}
public void deserialize(SegmentMessage message) {
traceSegmentId = message.getTraceSegmentId();
startTime = message.getStartTime();
endTime = message.getEndTime();
applicationCode = message.getApplicationCode();
List<SegmentRefMessage> refsList = message.getRefsList();
if (refsList != null && refsList.size() > 0) {
this.refs = new LinkedList<TraceSegmentRef>();
for (SegmentRefMessage refMessage : refsList) {
TraceSegmentRef ref = new TraceSegmentRef();
ref.deserialize(refMessage);
refs.add(ref);
}
}
ProtocolStringList relatedTraceIdsList = message.getRelatedTraceIdsList();
this.relatedGlobalTraces = new LinkedList<DistributedTraceId>();
for (String id : relatedTraceIdsList) {
relatedGlobalTraces.add(new PropagatedTraceId(id));
}
List<SpanMessage> spansList = message.getSpansList();
if (spansList != null) {
this.spans = new LinkedList<Span>();
for (SpanMessage spanMessage : spansList) {
spans.add(new Span(spanMessage));
}
}
}
}
package com.a.eye.skywalking.trace;
import com.a.eye.skywalking.trace.proto.SegmentRefMessage;
import com.a.eye.skywalking.trace.tag.Tags;
import com.google.gson.annotations.Expose;
import com.google.gson.annotations.SerializedName;
/**
* {@link TraceSegmentRef} is like a pointer, which ref to another {@link TraceSegment},
......@@ -13,21 +14,29 @@ public class TraceSegmentRef{
/**
* {@link TraceSegment#traceSegmentId}
*/
@Expose
@SerializedName(value="ts")
private String traceSegmentId;
/**
* {@link Span#spanId}
*/
@Expose
@SerializedName(value="si")
private int spanId = -1;
/**
* {@link TraceSegment#applicationCode}
*/
@Expose
@SerializedName(value="ac")
private String applicationCode;
/**
* {@link Tags#PEER_HOST}
*/
@Expose
@SerializedName(value="ph")
private String peerHost;
/**
......@@ -77,23 +86,6 @@ public class TraceSegmentRef{
'}';
}
public SegmentRefMessage serialize() {
SegmentRefMessage.Builder builder = SegmentRefMessage.newBuilder();
builder.setTraceSegmentId(traceSegmentId);
builder.setSpanId(spanId);
builder.setApplicationCode(applicationCode);
if(peerHost != null) {
builder.setPeerHost(peerHost);
}
return builder.build();
}
public void deserialize(SegmentRefMessage message) {
traceSegmentId = message.getTraceSegmentId();
spanId = message.getSpanId();
applicationCode = message.getApplicationCode();
peerHost = message.getPeerHost();
}
@Override
public boolean equals(Object o) {
......
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.a.eye.skywalking.trace.proto";
message SegmentMessage {
string traceSegmentId = 1;
int64 startTime = 2;
int64 endTime = 3;
string applicationCode = 4;
repeated SegmentRefMessage refs = 5;
repeated string relatedTraceIds = 6;
repeated SpanMessage spans = 7;
}
message SegmentRefMessage {
string traceSegmentId = 1;
int32 spanId = 2;
string applicationCode = 3;
string peerHost = 4;
}
message SpanMessage {
int32 spanId = 1;
int32 parentSpanId = 2;
int64 startTime = 3;
int64 endTime = 4;
string operationName = 5;
repeated KeyValue tags = 6;
repeated LogDataMessage logs = 7;
}
message LogDataMessage {
int64 time = 1;
repeated KeyValue fields = 5;
}
message KeyValue {
string key = 1;
string value = 2;
}
package com.a.eye.skywalking.trace;
import com.a.eye.skywalking.trace.tag.Tags;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.junit.Assert;
import org.junit.Test;
......@@ -100,7 +102,15 @@ public class TraceSegmentTestCase {
span2.log(new NullPointerException());
segment.archive(span2);
TraceSegment newSegment = new TraceSegment(segment.serialize());
Gson gson = new GsonBuilder()
.excludeFieldsWithoutExposeAnnotation()
.create();
String json = gson.toJson(segment);
System.out.println(json);
TraceSegment newSegment = gson.fromJson(json, TraceSegment.class);
Assert.assertEquals(segment.getSpans().size(), newSegment.getSpans().size());
Assert.assertEquals(segment.getRefs().get(0).getTraceSegmentId(), newSegment.getRefs().get(0).getTraceSegmentId());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册