提交 d37dbc8d 编写于 作者: wu-sheng's avatar wu-sheng

FInish codes about TraceSegment to UpstreamSegment

上级 4310daf6
......@@ -5,6 +5,8 @@ import java.util.List;
import org.skywalking.apm.agent.core.context.util.KeyValuePair;
import org.skywalking.apm.agent.core.context.util.ThrowableTransformer;
import org.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.SpanType;
import org.skywalking.apm.network.trace.component.Component;
/**
......@@ -152,4 +154,38 @@ public abstract class AbstractTracingSpan implements AbstractSpan {
this.componentName = componentName;
return this;
}
public SpanObject.Builder transform(){
SpanObject.Builder spanBuilder = SpanObject.newBuilder();
spanBuilder.setSpanId(this.spanId);
spanBuilder.setParentSpanId(parentSpanId);
spanBuilder.setStartTime(startTime);
spanBuilder.setEndTime(endTime);
if (operationId == DictionaryUtil.nullValue()) {
spanBuilder.setOperationNameId(operationId);
} else {
spanBuilder.setOperationName(operationName);
}
spanBuilder.setSpanType(SpanType.Entry);
spanBuilder.setSpanLayerValue(this.layer.getCode());
if (componentId == DictionaryUtil.nullValue()) {
spanBuilder.setComponentId(componentId);
} else {
spanBuilder.setComponent(componentName);
}
spanBuilder.setIsError(errorOccurred);
if (this.tags != null) {
for (KeyValuePair tag : this.tags) {
spanBuilder.addTags(tag.transform());
}
}
if (this.logs != null) {
for (LogDataEntity log : this.logs) {
spanBuilder.addLogs(log.transform());
}
}
return spanBuilder;
}
}
package org.skywalking.apm.agent.core.context.trace;
import org.skywalking.apm.agent.core.context.util.KeyValuePair;
import org.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.SpanType;
import org.skywalking.apm.network.trace.component.Component;
/**
......
package org.skywalking.apm.agent.core.context.trace;
import org.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.trace.component.Component;
/**
......@@ -102,6 +103,16 @@ public class ExitSpan extends AbstractTracingSpan {
return this;
}
@Override public SpanObject.Builder transform() {
SpanObject.Builder spanBuilder = super.transform();
if (peerId == DictionaryUtil.nullValue()) {
spanBuilder.setPeerId(peerId);
} else {
spanBuilder.setPeer(peer);
}
return spanBuilder;
}
public int getPeerId() {
return peerId;
}
......
package org.skywalking.apm.agent.core.context.trace;
import org.skywalking.apm.network.proto.SpanObject;
/**
* The <code>LocalSpan</code> represents a normal tracing point, such as a local method.
*
......@@ -27,6 +29,10 @@ public class LocalSpan extends AbstractTracingSpan {
return this;
}
@Override public SpanObject transform() {
return null;
}
@Override public boolean isEntry() {
return false;
}
......
......@@ -3,6 +3,8 @@ package org.skywalking.apm.agent.core.context.trace;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.agent.core.context.util.KeyValuePair;
import org.skywalking.apm.network.proto.KeyWithStringValue;
import org.skywalking.apm.network.proto.LogMessage;
/**
* The <code>LogDataEntity</code> represents a collection of {@link KeyValuePair},
......@@ -39,4 +41,12 @@ public class LogDataEntity {
return new LogDataEntity(logs);
}
}
public LogMessage transform() {
LogMessage.Builder logMessageBuilder = LogMessage.newBuilder();
for (KeyValuePair log : logs) {
logMessageBuilder.addData(log.transform());
}
return logMessageBuilder.build();
}
}
......@@ -4,10 +4,20 @@ package org.skywalking.apm.agent.core.context.trace;
* @author wusheng
*/
public enum SpanLayer {
DB,
RPC_FRAMEWORK,
HTTP,
MQ;
DB(0),
RPC_FRAMEWORK(1),
HTTP(2),
MQ(3);
private int code;
SpanLayer(int code) {
this.code = code;
}
public int getCode() {
return code;
}
public static void asDB(AbstractSpan span) {
span.setLayer(SpanLayer.DB);
......
......@@ -3,6 +3,7 @@ package org.skywalking.apm.agent.core.context.trace;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.agent.core.conf.Config;
import org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.skywalking.apm.agent.core.context.ids.DistributedTraceId;
import org.skywalking.apm.agent.core.context.ids.DistributedTraceIds;
import org.skywalking.apm.agent.core.context.ids.GlobalIdGenerator;
......@@ -11,6 +12,9 @@ import org.skywalking.apm.agent.core.dictionary.DictionaryManager;
import org.skywalking.apm.agent.core.dictionary.PossibleFound;
import org.skywalking.apm.logging.ILog;
import org.skywalking.apm.logging.LogManager;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.skywalking.apm.network.proto.UpstreamSegment;
/**
* {@link TraceSegment} is a segment or fragment of the distributed trace.
......@@ -33,16 +37,6 @@ public class TraceSegment {
*/
private String traceSegmentId;
/**
* The start time of this trace segment.
*/
private long startTime;
/**
* The end time of this trace segment.
*/
private long endTime;
/**
* The refs of parent trace segments, except the primary one.
* For most RPC call, {@link #refs} contains only one element,
......@@ -103,7 +97,6 @@ public class TraceSegment {
}
}
);
this.startTime = System.currentTimeMillis();
this.traceSegmentId = GlobalIdGenerator.generate(ID_TYPE);
this.spans = new LinkedList<AbstractTracingSpan>();
this.relatedGlobalTraces = new DistributedTraceIds();
......@@ -154,7 +147,6 @@ public class TraceSegment {
* return this, for chaining
*/
public TraceSegment finish() {
this.endTime = System.currentTimeMillis();
return this;
}
......@@ -162,14 +154,6 @@ public class TraceSegment {
return traceSegmentId;
}
public long getStartTime() {
return startTime;
}
public long getEndTime() {
return endTime;
}
public int getApplicationId() {
return applicationId;
}
......@@ -198,12 +182,46 @@ public class TraceSegment {
this.ignore = ignore;
}
/**
* This is a high CPU cost method, only called when sending to collector or test cases.
*
* @return the segment as GRPC service parameter
*/
public UpstreamSegment transform() {
UpstreamSegment.Builder upstreamBuilder = UpstreamSegment.newBuilder();
for (DistributedTraceId distributedTraceId : getRelatedGlobalTraces()) {
upstreamBuilder = upstreamBuilder.addGlobalTraceIds(distributedTraceId.get());
}
TraceSegmentObject.Builder traceSegmentBuilder = TraceSegmentObject.newBuilder();
/**
* Trace Segment
*/
traceSegmentBuilder.setTraceSegmentId(this.traceSegmentId);
// TraceSegmentReference
if (this.refs != null) {
for (TraceSegmentRef ref : this.refs) {
traceSegmentBuilder.addRefs(ref.transform());
}
}
// globalTraceIds
for (DistributedTraceId distributedTraceId : getRelatedGlobalTraces()) {
traceSegmentBuilder.addGlobalTraceIds(distributedTraceId.get());
}
// SpanObject
for (AbstractTracingSpan span : this.spans) {
traceSegmentBuilder.addSpans(span.transform());
}
traceSegmentBuilder.setApplicationId(this.applicationId);
traceSegmentBuilder.setApplicationInstanceId(RemoteDownstreamConfig.Agent.APPLICATION_ID);
upstreamBuilder.setSegment(traceSegmentBuilder.build().toByteString());
return upstreamBuilder.build();
}
@Override
public String toString() {
return "TraceSegment{" +
"traceSegmentId='" + traceSegmentId + '\'' +
", startTime=" + startTime +
", endTime=" + endTime +
", refs=" + refs +
", spans=" + spans +
", applicationId='" + applicationId + '\'' +
......
......@@ -4,6 +4,8 @@ import java.util.List;
import org.skywalking.apm.agent.core.context.ContextCarrier;
import org.skywalking.apm.agent.core.context.ids.DistributedTraceId;
import org.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.skywalking.apm.network.proto.UpstreamSegment;
/**
* {@link TraceSegmentRef} is like a pointer, which ref to another {@link TraceSegment},
......@@ -26,11 +28,6 @@ public class TraceSegmentRef {
private int operationId = DictionaryUtil.nullValue();
/**
* {@link DistributedTraceId}
*/
private List<DistributedTraceId> distributedTraceIds;
/**
* Transform a {@link ContextCarrier} to the <code>TraceSegmentRef</code>
*
......@@ -52,8 +49,6 @@ public class TraceSegmentRef {
} else {
this.operationId = Integer.parseInt(entryOperationName);
}
this.distributedTraceIds = carrier.getDistributedTraceIds();
}
public String getOperationName() {
......@@ -64,6 +59,24 @@ public class TraceSegmentRef {
return operationId;
}
public TraceSegmentReference transform() {
TraceSegmentReference.Builder refBuilder = TraceSegmentReference.newBuilder();
refBuilder.setParentTraceSegmentId(traceSegmentId);
refBuilder.setParentSpanId(spanId);
refBuilder.setParentApplicationId(applicationId);
if (peerId == DictionaryUtil.nullValue()) {
refBuilder.setNetworkAddress(peerHost);
} else {
refBuilder.setNetworkAddressId(peerId);
}
if (operationId == DictionaryUtil.nullValue()) {
refBuilder.setEntryServiceName(operationName);
} else {
refBuilder.setEntryServiceId(operationId);
}
return refBuilder.build();
}
@Override
public boolean equals(Object o) {
if (this == o)
......
package org.skywalking.apm.agent.core.context.util;
import org.skywalking.apm.network.proto.KeyWithStringValue;
/**
* The <code>KeyValuePair</code> represents a object which contains a string key and a string value.
*
......@@ -21,4 +23,11 @@ public class KeyValuePair {
public String getValue() {
return value;
}
public KeyWithStringValue transform() {
KeyWithStringValue.Builder keyValueBuilder = KeyWithStringValue.newBuilder();
keyValueBuilder.setKey(key);
keyValueBuilder.setValue(value);
return keyValueBuilder.build();
}
}
......@@ -13,9 +13,9 @@ import org.skywalking.apm.agent.core.datacarrier.buffer.BufferStrategy;
import org.skywalking.apm.agent.core.datacarrier.consumer.IConsumer;
import org.skywalking.apm.logging.ILog;
import org.skywalking.apm.logging.LogManager;
import org.skywalking.apm.network.collecor.proto.Downstream;
import org.skywalking.apm.network.trace.proto.TraceSegmentServiceGrpc;
import org.skywalking.apm.network.trace.proto.UpstreamSegment;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.TraceSegmentServiceGrpc;
import org.skywalking.apm.network.proto.UpstreamSegment;
import static org.skywalking.apm.agent.core.conf.Config.Buffer.BUFFER_SIZE;
import static org.skywalking.apm.agent.core.conf.Config.Buffer.CHANNEL_SIZE;
......@@ -78,14 +78,13 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
}
});
try {
for (TraceSegment segment : data) {
//TODO
// segment to PROTOBUFF object
upstreamSegmentStreamObserver.onNext(null);
for (TraceSegment segment : data) {
try {
UpstreamSegment upstreamSegment = segment.transform();
upstreamSegmentStreamObserver.onNext(upstreamSegment);
} catch (Throwable t) {
logger.error(t, "Transform and send UpstreamSegment to collector fail.");
}
} catch (Throwable t) {
logger.error(t, "Send UpstreamSegment to collector fail.");
}
upstreamSegmentStreamObserver.onCompleted();
......@@ -113,6 +112,9 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
@Override
public void afterFinished(TraceSegment traceSegment) {
if (traceSegment.isIgnore()) {
return;
}
if (!carrier.produce(traceSegment)) {
if (logger.isDebugEnable()) {
logger.debug("One trace segment has been abandoned, cause by buffer is full.");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册