提交 889fcb90 编写于 作者: wu-sheng's avatar wu-sheng

1.修改部分分析代码。

上级 c90b484f
package com.ai.cloud.skywalking.analysis.chainbuild;
import com.ai.cloud.skywalking.analysis.chainbuild.filter.SpanNodeProcessChain;
import com.ai.cloud.skywalking.analysis.chainbuild.filter.SpanNodeProcessFilter;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainInfo;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainNode;
import com.ai.cloud.skywalking.analysis.chainbuild.util.SubLevelSpanCostCounter;
import com.ai.cloud.skywalking.analysis.chainbuild.util.VersionIdentifier;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;
import com.ai.cloud.skywalking.protocol.Span;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
......@@ -17,90 +17,99 @@ import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
public class ChainBuildMapper extends TableMapper<Text, ChainInfo> {
import com.ai.cloud.skywalking.analysis.chainbuild.exception.Tid2CidECovertException;
import com.ai.cloud.skywalking.analysis.chainbuild.filter.SpanNodeProcessChain;
import com.ai.cloud.skywalking.analysis.chainbuild.filter.SpanNodeProcessFilter;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainInfo;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainNode;
import com.ai.cloud.skywalking.analysis.chainbuild.util.SubLevelSpanCostCounter;
import com.ai.cloud.skywalking.analysis.chainbuild.util.VersionIdentifier;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;
import com.ai.cloud.skywalking.protocol.Span;
import com.google.gson.Gson;
private Logger logger = LoggerFactory
.getLogger(ChainBuildMapper.class.getName());
public class ChainBuildMapper extends TableMapper<Text, Text> {
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
ConfigInitializer.initialize();
}
private Logger logger = LoggerFactory.getLogger(ChainBuildMapper.class);
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
ConfigInitializer.initialize();
}
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
if (!VersionIdentifier.enableAnaylsis(Bytes.toString(key.get()))) {
return;
}
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
if (!VersionIdentifier.enableAnaylsis(Bytes.toString(key.get()))) {
return;
}
List<Span> spanList = new ArrayList<Span>();
ChainInfo chainInfo = null;
try {
for (Cell cell : value.rawCells()) {
Span span = new Span(Bytes.toString(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength()));
spanList.add(span);
}
List<Span> spanList = new ArrayList<Span>();
ChainInfo chainInfo = null;
try {
for (Cell cell : value.rawCells()) {
Span span = new Span(Bytes.toString(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength()));
spanList.add(span);
}
if (spanList.size() == 0) {
throw new Tid2CidECovertException("tid["
+ Bytes.toString(key.get()) + "] has no span data.");
}
chainInfo = spanToChainInfo(Bytes.toString(key.get()), spanList);
logger.info("Success convert span to chain info...."
+ chainInfo.getCID() + " TraceId : " + Bytes.toString(key.get()));
context.write(
new Text(chainInfo.getUserId() + ":"
+ chainInfo.getEntranceNodeToken()), chainInfo);
} catch (Exception e) {
logger.error("Failed to mapper call chain[" + key.toString() + "]",
e);
}
}
chainInfo = spanToChainInfo(Bytes.toString(key.get()), spanList);
logger.debug("convert tid[" + Bytes.toString(key.get())
+ "] to chain with cid[" + chainInfo.getCID() + "].");
context.write(
new Text(chainInfo.getEntranceNodeToken()), new Text(new Gson().toJson(chainInfo)));
} catch (Exception e) {
logger.error("Failed to mapper call chain[" + key.toString() + "]",
e);
}
}
public static ChainInfo spanToChainInfo(String key, List<Span> spanList) {
SubLevelSpanCostCounter costMap = new SubLevelSpanCostCounter();
ChainInfo chainInfo = new ChainInfo();
Collections.sort(spanList, new Comparator<Span>() {
@Override
public int compare(Span span1, Span span2) {
String span1TraceLevel = span1.getParentLevel() + "."
+ span1.getLevelId();
String span2TraceLevel = span2.getParentLevel() + "."
+ span2.getLevelId();
return span1TraceLevel.compareTo(span2TraceLevel);
}
});
public static ChainInfo spanToChainInfo(String tid, List<Span> spanList) {
SubLevelSpanCostCounter costMap = new SubLevelSpanCostCounter();
ChainInfo chainInfo = new ChainInfo(tid);
Collections.sort(spanList, new Comparator<Span>() {
@Override
public int compare(Span span1, Span span2) {
String span1TraceLevel = span1.getParentLevel() + "."
+ span1.getLevelId();
String span2TraceLevel = span2.getParentLevel() + "."
+ span2.getLevelId();
return span1TraceLevel.compareTo(span2TraceLevel);
}
});
Map<String, SpanEntry> spanEntryMap = mergeSpanDataSet(spanList);
for (Map.Entry<String, SpanEntry> entry : spanEntryMap.entrySet()) {
ChainNode chainNode = new ChainNode();
SpanNodeProcessFilter filter = SpanNodeProcessChain
.getProcessChainByCallType(entry.getValue().getSpanType());
filter.doFilter(entry.getValue(), chainNode, costMap);
chainInfo.addNodes(chainNode);
}
chainInfo.generateChainToken();
//HBaseUtil.saveCidTidMapping(key, chainInfo);
return chainInfo;
}
Map<String, SpanEntry> spanEntryMap = mergeSpanDataSet(spanList);
for (Map.Entry<String, SpanEntry> entry : spanEntryMap.entrySet()) {
ChainNode chainNode = new ChainNode();
SpanNodeProcessFilter filter = SpanNodeProcessChain
.getProcessChainByCallType(entry.getValue().getSpanType());
filter.doFilter(entry.getValue(), chainNode, costMap);
chainInfo.addNodes(chainNode);
}
chainInfo.generateChainToken();
// HBaseUtil.saveCidTidMapping(key, chainInfo);
return chainInfo;
}
private static Map<String, SpanEntry> mergeSpanDataSet(List<Span> spanList) {
Map<String, SpanEntry> spanEntryMap = new LinkedHashMap<String, SpanEntry>();
for (int i = spanList.size() - 1; i >= 0; i--) {
Span span = spanList.get(i);
SpanEntry spanEntry = spanEntryMap.get(span.getParentLevel() + "."
+ span.getLevelId());
if (spanEntry == null) {
spanEntry = new SpanEntry();
spanEntryMap.put(
span.getParentLevel() + "." + span.getLevelId(),
spanEntry);
}
spanEntry.setSpan(span);
}
return spanEntryMap;
}
private static Map<String, SpanEntry> mergeSpanDataSet(List<Span> spanList) {
Map<String, SpanEntry> spanEntryMap = new LinkedHashMap<String, SpanEntry>();
for (int i = spanList.size() - 1; i >= 0; i--) {
Span span = spanList.get(i);
SpanEntry spanEntry = spanEntryMap.get(span.getParentLevel() + "."
+ span.getLevelId());
if (spanEntry == null) {
spanEntry = new SpanEntry();
spanEntryMap.put(
span.getParentLevel() + "." + span.getLevelId(),
spanEntry);
}
spanEntry.setSpan(span);
}
return spanEntryMap;
}
}
......@@ -3,6 +3,9 @@ package com.ai.cloud.skywalking.analysis.chainbuild;
import com.ai.cloud.skywalking.analysis.chainbuild.entity.CallChainTree;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainInfo;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;
import com.google.gson.Gson;
import com.google.gson.JsonParser;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
......@@ -13,35 +16,42 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
public class ChainBuildReducer extends Reducer<Text, ChainInfo, Text, IntWritable> {
private Logger logger = LoggerFactory
.getLogger(ChainBuildReducer.class.getName());
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
ConfigInitializer.initialize();
}
@Override
protected void reduce(Text key, Iterable<ChainInfo> values, Context context) throws IOException,
InterruptedException {
doReduceAction(Bytes.toString(key.getBytes()), values.iterator());
}
public static void doReduceAction(String key, Iterator<ChainInfo> chainInfoIterator) throws IOException, InterruptedException {
CallChainTree chainTree = CallChainTree.load(key);
while (chainInfoIterator.hasNext()) {
ChainInfo chainInfo = chainInfoIterator.next();
if (chainInfo.getChainStatus() == ChainInfo.ChainStatus.NORMAL) {
chainTree.processMerge(chainInfo);
}
//合并数据
chainTree.summary(chainInfo);
}
chainTree.saveToHbase();
}
public class ChainBuildReducer extends Reducer<Text, Text, Text, IntWritable> {
private Logger logger = LoggerFactory.getLogger(ChainBuildReducer.class);
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
ConfigInitializer.initialize();
}
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
doReduceAction(Bytes.toString(key.getBytes()), values.iterator());
}
public void doReduceAction(String key, Iterator<Text> chainInfoIterator)
throws IOException, InterruptedException {
CallChainTree chainTree = CallChainTree.load(key);
while (chainInfoIterator.hasNext()) {
String callChainData = chainInfoIterator.next().toString();
ChainInfo chainInfo = null;
try {
chainInfo = new Gson().fromJson(callChainData, ChainInfo.class);
if (chainInfo.getChainStatus() == ChainInfo.ChainStatus.NORMAL) {
chainTree.processMerge(chainInfo);
}
// 合并数据
chainTree.summary(chainInfo);
} catch (Exception e) {
logger.error(
"Failed to summary call chain, maybe illegal data:"
+ callChainData, e);
}
}
chainTree.saveToHbase();
}
}
package com.ai.cloud.skywalking.analysis.chainbuild.exception;
public class Tid2CidECovertException extends Exception{
private static final long serialVersionUID = -4679233837335940374L;
public Tid2CidECovertException(String msg){
super(msg);
}
public Tid2CidECovertException(String msg, Exception cause){
super(msg, cause);
}
}
package com.ai.cloud.skywalking.analysis.chainbuild.po;
import com.ai.cloud.skywalking.analysis.chainbuild.util.TokenGenerator;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class ChainInfo implements Writable {
private String callEntrance;
import org.apache.hadoop.hbase.client.Put;
import com.ai.cloud.skywalking.analysis.chainbuild.exception.Tid2CidECovertException;
import com.ai.cloud.skywalking.analysis.chainbuild.util.TokenGenerator;
public class ChainInfo implements Serializable {
private static final long serialVersionUID = -7194044877533469817L;
/**
* 0节点的viewpoint,用于明文标识入口
*/
private String callEntrance;
private String cid;
private ChainStatus chainStatus = ChainStatus.NORMAL;
private List<ChainNode> nodes;
private List<ChainNode> nodes = new ArrayList<ChainNode>();
private String userId = null;
private ChainNode firstChainNode;
private long startDate;
private String tid;
public ChainInfo(String userId) {
public ChainInfo(String tid) {
super();
this.userId = userId;
}
public ChainInfo() {
this.nodes = new ArrayList<ChainNode>();
}
@Override
public void write(DataOutput out) throws IOException {
out.write(new Gson().toJson(this).getBytes());
}
@Override
public void readFields(DataInput in) throws IOException {
JsonObject jsonObject = (JsonObject) new JsonParser().parse(in
.readLine());
cid = jsonObject.get("cid").getAsString();
chainStatus = ChainStatus.convert(jsonObject.get("chainStatus")
.getAsCharacter());
nodes = new Gson().fromJson(jsonObject.get("nodes"),
new TypeToken<List<ChainNode>>() {
}.getType());
userId = jsonObject.get("userId").getAsString();
this.tid = tid;
}
public String getCID() {
return cid;
}
public String getEntranceNodeToken() {
public String getEntranceNodeToken() throws Tid2CidECovertException {
if (firstChainNode == null) {
return "";
throw new Tid2CidECovertException("tid[" + tid + "] can't find span node with level=0.");
} else {
return firstChainNode.getNodeToken();
return this.getUserId() + ":"
+ firstChainNode.getNodeToken();
}
}
......@@ -106,7 +86,7 @@ public class ChainInfo implements Writable {
}
public void saveToHBase(Put put) {
//TODO: @zhangxin,未完成的入库代码
}
public enum ChainStatus {
......
package com.ai.cloud.skywalking.analysis.mapper;
import com.ai.cloud.skywalking.analysis.chainbuild.ChainBuildMapper;
import com.ai.cloud.skywalking.analysis.chainbuild.ChainBuildReducer;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainInfo;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;
import com.ai.cloud.skywalking.analysis.config.HBaseTableMetaData;
import com.ai.cloud.skywalking.protocol.Span;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import com.ai.cloud.skywalking.analysis.chainbuild.ChainBuildMapper;
import com.ai.cloud.skywalking.analysis.chainbuild.ChainBuildReducer;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainInfo;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;
import com.ai.cloud.skywalking.analysis.config.HBaseTableMetaData;
import com.ai.cloud.skywalking.protocol.Span;
import com.google.gson.Gson;
/**
* Created by astraea on 2016/1/15.
......@@ -39,10 +46,10 @@ public class CallChainMapperTest {
List<Span> spanList = selectByTraceId(chain_Id);
ChainInfo chainInfo = ChainBuildMapper.spanToChainInfo(chain_Id, spanList);
List<ChainInfo> chainInfos = new ArrayList<ChainInfo>();
chainInfos.add(chainInfo);
List<Text> chainInfos = new ArrayList<Text>();
chainInfos.add(new Text(new Gson().toJson(chainInfo)));
ChainBuildReducer.doReduceAction(chainInfo.getCallEntrance(), chainInfos.iterator());
new ChainBuildReducer().doReduceAction(chainInfo.getCallEntrance(), chainInfos.iterator());
}
public static List<Span> selectByTraceId(String traceId) throws IOException {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册