提交 3ba2f221 编写于 作者: Z zhangxin10

1.完成Map部分

上级 b68c0411
package com.ai.cloud.skywalking.analysis.mapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.ai.cloud.skywalking.analysis.filter.SpanNodeProcessChain;
import com.ai.cloud.skywalking.analysis.filter.SpanNodeProcessFilter;
import com.ai.cloud.skywalking.analysis.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.model.ChainNode;
import com.ai.cloud.skywalking.analysis.model.CostMap;
import com.ai.cloud.skywalking.analysis.model.SpanEntry;
import com.ai.cloud.skywalking.analysis.util.HBaseUtil;
import com.ai.cloud.skywalking.protocol.Span;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
......@@ -15,14 +17,8 @@ import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.ai.cloud.skywalking.analysis.filter.SpanNodeProcessChain;
import com.ai.cloud.skywalking.analysis.filter.SpanNodeProcessFilter;
import com.ai.cloud.skywalking.analysis.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.model.ChainNode;
import com.ai.cloud.skywalking.analysis.model.CostMap;
import com.ai.cloud.skywalking.analysis.model.SpanEntry;
import com.ai.cloud.skywalking.analysis.util.HBaseUtil;
import com.ai.cloud.skywalking.protocol.Span;
import java.io.IOException;
import java.util.*;
public class CallChainMapper extends TableMapper<Text, ChainInfo> {
private Logger logger = LoggerFactory.getLogger(CallChainMapper.class.getName());
......@@ -30,35 +26,44 @@ public class CallChainMapper extends TableMapper<Text, ChainInfo> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException,
InterruptedException {
ChainInfo chainInfo = new ChainInfo();
List<Span> spanList = new ArrayList<Span>();
CostMap costMap = new CostMap();
for (Cell cell : value.rawCells()) {
Span span = new Span(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
spanList.add(span);
}
ChainInfo chainInfo = spanToChainInfo(key.toString(), spanList);
context.write(new Text(chainInfo.getUserId() + ":" + chainInfo.getEntranceNodeToken()), chainInfo);
}
//TODO: sort spanList
Map<String, SpanEntry> spanEntryMap = mergeSpanDataset(spanList);
public static ChainInfo spanToChainInfo(String key, List<Span> spanList) {
CostMap costMap = new CostMap();
ChainInfo chainInfo = new ChainInfo();
Collections.sort(spanList, new Comparator<Span>() {
@Override
public int compare(Span span1, Span span2) {
String span1TraceLevel = span1.getParentLevel() + "." + span1.getLevelId();
String span2TraceLevel = span2.getParentLevel() + "." + span2.getLevelId();
return span1TraceLevel.compareTo(span2TraceLevel);
}
});
Map<String, SpanEntry> spanEntryMap = mergeSpanDataSet(spanList);
for (Map.Entry<String, SpanEntry> entry : spanEntryMap.entrySet()) {
ChainNode chainNode = new ChainNode();
SpanNodeProcessFilter filter = SpanNodeProcessChain.getProcessChainByCallType(entry.getValue().getSpanType());
filter.doFilter(entry.getValue(), chainNode, costMap);
chainInfo.addNodes(chainNode);
}
chainInfo.generateChainToken();
HBaseUtil.saveData(key.toString(), chainInfo);
context.write(new Text(chainInfo.getUserId() + ":" + chainInfo.getEntranceNodeToken()), chainInfo);
chainInfo.generateChainToken();
HBaseUtil.saveData(key, chainInfo);
return chainInfo;
}
private Map<String, SpanEntry> mergeSpanDataset(List<Span> spanList) {
Map<String, SpanEntry> spanEntryMap = new HashMap<String, SpanEntry>();
for (Span span : spanList) {
private static Map<String, SpanEntry> mergeSpanDataSet(List<Span> spanList) {
Map<String, SpanEntry> spanEntryMap = new LinkedHashMap<String, SpanEntry>();
for (int i = spanList.size() - 1; i >= 0; i--) {
Span span = spanList.get(i);
SpanEntry spanEntry = spanEntryMap.get(span.getParentLevel() + "." + span.getLevelId());
if (spanEntry == null) {
spanEntry = new SpanEntry();
......
......@@ -67,10 +67,6 @@ public class ChainNode {
this.callType = callType;
}
public String getCallType() {
return callType;
}
public void setUserId(String userId) {
this.userId = userId;
}
......@@ -79,10 +75,6 @@ public class ChainNode {
return userId;
}
public String getBusinessKey() {
return businessKey;
}
public void setBusinessKey(String businessKey) {
this.businessKey = businessKey;
}
......@@ -110,5 +102,4 @@ public class ChainNode {
}
}
}
}
......@@ -2,7 +2,6 @@ package com.ai.cloud.skywalking.analysis.util;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.ai.cloud.skywalking.analysis.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.model.ChainNode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
......@@ -32,41 +31,9 @@ public class HBaseUtil {
Put put = new Put(Bytes.toBytes(traceId));
for (ChainNode chainNode : chainInfo.getNodes()) {
put.addColumn(Bytes.toBytes(Config.HBase.TRACE_INFO_COLUMN_FAMILY),
Bytes.toBytes(Config.TraceInfo.USER_ID),
Bytes.toBytes(chainNode.getUserId()));
put.addColumn(Bytes.toBytes(Config.HBase.TRACE_INFO_COLUMN_FAMILY),
Bytes.toBytes(Config.TraceInfo.STATUS),
Bytes.toBytes(chainNode.getStatus().getValue()));
put.addColumn(Bytes.toBytes(Config.HBase.TRACE_INFO_COLUMN_FAMILY),
Bytes.toBytes(Config.TraceInfo.TRACE_INFO_COLUMN_CID),
Bytes.toBytes(chainInfo.getChainToken()));
put.addColumn(Bytes.toBytes(Config.HBase.TRACE_INFO_COLUMN_FAMILY),
Bytes.toBytes(Config.TraceInfo.TRACE_INFO_COLUMN_CID),
Bytes.toBytes(chainInfo.getChainToken()));
put.addColumn(Bytes.toBytes(Config.HBase.TRACE_INFO_COLUMN_FAMILY),
Bytes.toBytes(Config.TraceInfo.COST),
Bytes.toBytes(chainNode.getCost()));
put.addColumn(Bytes.toBytes(Config.HBase.TRACE_INFO_COLUMN_FAMILY),
Bytes.toBytes(Config.TraceInfo.PARENT_LEVEL_ID),
Bytes.toBytes(chainNode.getParentLevelId()));
put.addColumn(Bytes.toBytes(Config.HBase.TRACE_INFO_COLUMN_FAMILY),
Bytes.toBytes(Config.TraceInfo.LEVEL_ID),
Bytes.toBytes(chainNode.getLevelId()));
put.addColumn(Bytes.toBytes(Config.HBase.TRACE_INFO_COLUMN_FAMILY),
Bytes.toBytes(Config.TraceInfo.BUSINESS_KEY),
Bytes.toBytes(chainNode.getBusinessKey()));
}
put.addColumn(Bytes.toBytes(Config.HBase.TRACE_INFO_COLUMN_FAMILY),
Bytes.toBytes(Config.TraceInfo.TRACE_INFO_COLUMN_CID),
Bytes.toBytes(chainInfo.getChainToken()));
try {
table.put(put);
if (logger.isDebugEnabled()) {
......
......@@ -3,14 +3,7 @@ package com.ai.cloud.skywalking.analysis.mapper;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;
import com.ai.cloud.skywalking.analysis.filter.SpanNodeProcessChain;
import com.ai.cloud.skywalking.analysis.filter.SpanNodeProcessFilter;
import com.ai.cloud.skywalking.analysis.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.model.ChainNode;
import com.ai.cloud.skywalking.analysis.model.CostMap;
import com.ai.cloud.skywalking.analysis.model.SpanEntry;
import com.ai.cloud.skywalking.analysis.util.HBaseUtil;
import com.ai.cloud.skywalking.analysis.util.TokenGenerator;
import com.ai.cloud.skywalking.protocol.Span;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
......@@ -23,9 +16,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Created by astraea on 2016/1/15.
......@@ -43,78 +34,7 @@ public class CallChainMapperTest {
public void testMap() throws Exception {
ConfigInitializer.initialize();
List<Span> spanList = selectByTraceId(chain_Id);
ChainInfo chainInfo = new ChainInfo();
CostMap costMap = new CostMap();
Map<String, SpanEntry> spanEntryMap = mergeSpanDataset(spanList);
for (Map.Entry<String, SpanEntry> entry : spanEntryMap.entrySet()) {
ChainNode chainNode = new ChainNode();
SpanNodeProcessFilter filter = SpanNodeProcessChain.getProcessChainByCallType(entry.getValue().getSpanType());
filter.doFilter(entry.getValue(), chainNode, costMap);
chainInfo.getNodes().add(chainNode);
}
//
String firstNodeToken = null;
boolean status = true;
StringBuilder stringBuilder = new StringBuilder();
for (ChainNode node : chainInfo.getNodes()) {
if ((node.getParentLevelId() == null || node.getParentLevelId().length() == 0)
&& node.getLevelId() == 0) {
firstNodeToken = node.getNodeToken();
chainInfo.setUserId(node.getUserId());
}
// 状态轮询
if (node.getStatus() == ChainNode.NodeStatus.ABNORMAL) {
status = false;
}
// 设置时间
computeChainNodeCost(costMap, node);
stringBuilder.append(node.getParentLevelId() + "." + node.getLevelId() + "-" + node.getNodeToken() + ";");
}
// 设置状态
if (status) {
chainInfo.setChainStatus(ChainInfo.ChainStatus.NORMAL);
} else {
chainInfo.setChainStatus(ChainInfo.ChainStatus.ABNORMAL);
}
// 设置Token
chainInfo.setChainToken(TokenGenerator.generate(stringBuilder.toString()));
//SaveToHbase
HBaseUtil.saveData(chain_Id, chainInfo);
System.out.println(chainInfo);
}
private void computeChainNodeCost(CostMap costMap, ChainNode node) {
String levelId = node.getParentLevelId();
if (levelId != null && levelId.length() > 0) {
levelId += ".";
}
levelId += node.getLevelId() + "";
if (costMap.get(levelId) != null) {
node.setCost(node.getCost() - costMap.get(levelId));
}
}
private Map<String, SpanEntry> mergeSpanDataset(List<Span> spanList) {
Map<String, SpanEntry> spanEntryMap = new HashMap<String, SpanEntry>();
for (Span span : spanList) {
SpanEntry spanEntry = spanEntryMap.get(span.getParentLevel() + "." + span.getLevelId());
if (spanEntry == null) {
spanEntry = new SpanEntry();
spanEntryMap.put(span.getParentLevel() + "." + span.getLevelId(), spanEntry);
}
spanEntry.setSpan(span);
}
return spanEntryMap;
ChainInfo chainInfo = CallChainMapper.spanToChainInfo(chain_Id, spanList);
}
public static List<Span> selectByTraceId(String traceId) throws IOException {
......
......@@ -48,7 +48,7 @@ public class Span extends SpanData {
processNo = fieldValues[12].trim();
applicationId = fieldValues[13].trim();
userId = fieldValues[14].trim();
if (fieldValues.length > 14) {
if (fieldValues.length > 15) {
callType = fieldValues[15].trim();
}
this.originData = originData;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册