提交 6f08371c 编写于 作者: A ascrutae

1. 添加TraceSpanTree序列化方法

2. 将Mapper的Keyout对象修改成TraceSpanTree
上级 06445fc5
package com.ai.cloud.skywalking.analysis.chainbuild;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import com.ai.cloud.skywalking.analysis.chainbuild.entity.TraceSpanTree;
import com.ai.cloud.skywalking.analysis.chainbuild.util.VersionIdentifier;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;
import com.ai.cloud.skywalking.protocol.Span;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
......@@ -13,12 +13,11 @@ import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.ai.cloud.skywalking.analysis.chainbuild.entity.TraceSpanTree;
import com.ai.cloud.skywalking.analysis.chainbuild.util.VersionIdentifier;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;
import com.ai.cloud.skywalking.protocol.Span;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class ChainBuildMapper extends TableMapper<Text, Text> {
public class ChainBuildMapper extends TableMapper<Text, TraceSpanTree> {
private Logger logger = LoggerFactory
.getLogger(ChainBuildMapper.class);
......@@ -31,7 +30,7 @@ public class ChainBuildMapper extends TableMapper<Text, Text> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
if(!VersionIdentifier.enableAnaylsis(Bytes.toString(key.get()))){
if (!VersionIdentifier.enableAnaylsis(Bytes.toString(key.get()))) {
return;
}
......@@ -46,6 +45,7 @@ public class ChainBuildMapper extends TableMapper<Text, Text> {
TraceSpanTree tree = new TraceSpanTree();
tree.build(spanList);
context.write(new Text(tree.getTreeRoot().getNodeRefToken()), tree);
} catch (Throwable e) {
logger.error("Failed to mapper call chain[" + key.toString() + "]",
e);
......
package com.ai.cloud.skywalking.analysis.chainbuild.entity;
import java.util.List;
import com.ai.cloud.skywalking.analysis.chainbuild.exception.TraceSpanTreeSerializeException;
import com.ai.cloud.skywalking.analysis.chainbuild.util.StringUtil;
import com.ai.cloud.skywalking.analysis.chainbuild.util.TokenGenerator;
import com.ai.cloud.skywalking.protocol.CallType;
import com.ai.cloud.skywalking.protocol.Span;
import java.util.List;
public class TraceSpanNode {
protected TraceSpanNode prev = null;
......@@ -79,9 +80,9 @@ public class TraceSpanNode {
this.cost = span.getCost();
this.callTimes = 1;
this.statusCode = span.getStatusCode();
if(span.isReceiver()){
if (span.isReceiver()) {
this.exceptionStack = "server stack:";
}else{
} else {
this.exceptionStack = "client stack:";
}
this.exceptionStack += span.getExceptionStack();
......@@ -89,61 +90,63 @@ public class TraceSpanNode {
this.businessKey = span.getBusinessKey();
this.applicationId = span.getApplicationId();
//TODO: to set nodeToken
//nodeToken : MD5(parentLevelId + levelId + viewpoint)
nodeRefToken = TokenGenerator.generateNodeToken(parentLevel + "-" + levelId + "-" + viewPointId);
}
protected TraceSpanNode(TraceSpanNode parent, TraceSpanNode sub, TraceSpanNode prev, TraceSpanNode next, List<TraceSpanNode> spanContainer){
protected TraceSpanNode(TraceSpanNode parent, TraceSpanNode sub, TraceSpanNode prev, TraceSpanNode next, List<TraceSpanNode> spanContainer) {
this.visualNode = true;
this.setParent(parent);
if(parent != null){
if (parent != null) {
parent.setSub(this);
}
this.setSub(sub);
if(sub != null){
if (sub != null) {
sub.setParent(this);
}
this.setPrev(prev);
if(prev != null){
if (prev != null) {
prev.setNext(this);
}
this.setNext(next);
if(next != null){
if (next != null) {
next.setPrev(this);
}
spanContainer.add(this);
}
protected TraceSpanNode(TraceSpanNode parent, TraceSpanNode sub, TraceSpanNode prev, TraceSpanNode next, String parentLevelId, int levelId, List<TraceSpanNode> spanContainer){
protected TraceSpanNode(TraceSpanNode parent, TraceSpanNode sub, TraceSpanNode prev, TraceSpanNode next, String parentLevelId, int levelId, List<TraceSpanNode> spanContainer) {
this(parent, sub, prev, next, spanContainer);
this.parentLevel = parentLevelId;
this.levelId = levelId;
this.callTimes = 0;
}
boolean hasNext(){
if(this.next != null){
boolean hasNext() {
if (this.next != null) {
return true;
}else{
} else {
return false;
}
}
boolean hasSub(){
if(this.sub != null){
boolean hasSub() {
if (this.sub != null) {
return true;
}else{
} else {
return false;
}
}
void mergeSpan(Span span){
if(CallType.convert(span.getCallType()) == CallType.ASYNC){
void mergeSpan(Span span) {
if (CallType.convert(span.getCallType()) == CallType.ASYNC) {
this.cost += span.getCost();
}
if(span.getStatusCode() != 0 && !StringUtil.isBlank(span.getExceptionStack())){
if(span.isReceiver()){
if (span.getStatusCode() != 0 && !StringUtil.isBlank(span.getExceptionStack())) {
if (span.isReceiver()) {
this.exceptionStack += "server stack:";
}else{
} else {
this.exceptionStack += "client stack:";
}
this.exceptionStack += span.getExceptionStack();
......@@ -223,23 +226,23 @@ public class TraceSpanNode {
}
public String getNodeRefToken() throws TraceSpanTreeSerializeException {
if(StringUtil.isBlank(nodeRefToken)){
if (StringUtil.isBlank(nodeRefToken)) {
throw new TraceSpanTreeSerializeException("parentLevel=" + parentLevel + ", levelId=" + levelId + ", viewPointId=" + viewPointId + ", node ref token is null.");
}
return nodeRefToken;
}
void serializeRef() throws TraceSpanTreeSerializeException{
if(prev != null){
void serializeRef() throws TraceSpanTreeSerializeException {
if (prev != null) {
prevNodeRefToken = prev.getNodeRefToken();
}
if(parent != null){
if (parent != null) {
parentNodeRefToken = parent.getNodeRefToken();
}
if(next != null){
if (next != null) {
nextNodeRefToken = next.getNodeRefToken();
}
if(sub != null){
if (sub != null) {
subNodeRefToken = sub.getNodeRefToken();
}
}
......
package com.ai.cloud.skywalking.analysis.chainbuild.entity;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.ai.cloud.skywalking.analysis.chainbuild.exception.BuildTraceSpanTreeException;
import com.ai.cloud.skywalking.analysis.chainbuild.exception.TraceSpanTreeSerializeException;
import com.ai.cloud.skywalking.analysis.chainbuild.util.StringUtil;
import com.ai.cloud.skywalking.analysis.chainbuild.util.TokenGenerator;
import com.ai.cloud.skywalking.protocol.Span;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
public class TraceSpanTree {
public class TraceSpanTree implements Writable {
private Logger logger = LoggerFactory.getLogger(TraceSpanTree.class);
private String userId = null;
......@@ -136,10 +143,43 @@ public class TraceSpanTree {
}
private void beforeSerialize() throws TraceSpanTreeSerializeException{
for(TraceSpanNode treeNode : spanContainer){
private void beforeSerialize() throws TraceSpanTreeSerializeException {
for (TraceSpanNode treeNode : spanContainer) {
treeNode.serializeRef();
}
}
public String serialize() throws TraceSpanTreeSerializeException {
beforeSerialize();
return new Gson().toJson(this);
}
@Override
public void write(DataOutput out) throws IOException {
try {
out.write(serialize().getBytes());
} catch (TraceSpanTreeSerializeException e) {
logger.error("Failed to serialize Chain Id[" + cid + "]", e);
}
}
@Override
public void readFields(DataInput in) throws IOException {
String value = in.readLine();
try {
JsonObject jsonObject = (JsonObject) new JsonParser().parse(value);
userId = jsonObject.get("userId").getAsString();
cid = jsonObject.get("cid").getAsString();
treeRoot = new Gson().fromJson(jsonObject.get("treeRoot"), TraceSpanNode.class);
spanContainer = new Gson().fromJson(jsonObject.get("spanContainer"),
new TypeToken<List<TraceSpanNode>>() {
}.getType());
} catch (Exception e) {
logger.error("Failed to parse the value[" + value + "] to TraceSpanTree Object", e);
}
}
public TraceSpanNode getTreeRoot() {
return treeRoot;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册