提交 8d404e71 编写于 作者: Z zhangxin10

上传Reduce部分代码

上级 ab4fada8
......@@ -47,6 +47,11 @@
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.2.2</version>
</dependency>
</dependencies>
<build>
......
......@@ -15,6 +15,8 @@ public class Config {
public static String TABLE_CALL_CHAIN_RELATIONSHIP = "sw_chain_relationship";
public static String TABLE_CHAIN_INFO = "sw_chain_info";
}
public static class TraceInfo {
......
......@@ -16,6 +16,7 @@ public class CopyAttrFilter extends SpanNodeProcessFilter {
node.setViewPoint(spanEntry.getViewPoint());
node.setUserId(spanEntry.getUserId());
node.setBusinessKey(spanEntry.getBusinessKey());
node.setStartDate(spanEntry.getStartDate());
this.doNext(spanEntry, node, costMap);
}
......
package com.ai.cloud.skywalking.analysis.model;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
public class CategorizedChainInfo {
private String chainToken;
private String chainStr;
private List<String> children_Token;
public CategorizedChainInfo(ChainInfo chainInfo) {
chainToken = chainInfo.getChainToken();
StringBuilder stringBuilder = new StringBuilder();
boolean flag = false;
for (ChainNode chainNode : chainInfo.getNodes()) {
if (flag) {
stringBuilder.append(";");
}
stringBuilder.append(chainNode.getTraceLevelId() + "-" + chainNode.getViewPoint());
flag = true;
}
chainStr = stringBuilder.toString();
children_Token = new ArrayList<String>();
}
public String getChainStr() {
return chainStr;
}
public boolean isContained(UncategorizeChainInfo uncategorizeChainInfo) {
Pattern pattern = Pattern.compile(uncategorizeChainInfo.getNodeRegEx());
return pattern.matcher(getChainStr()).find();
}
public void add(UncategorizeChainInfo uncategorizeChainInfo) {
children_Token.add(uncategorizeChainInfo.getChainToken());
}
}
......@@ -15,6 +15,7 @@ public class ChainInfo implements Writable {
private List<ChainNode> nodes;
private String userId = null;
private ChainNode firstChainNode;
private long startDate;
public ChainInfo() {
this.nodes = new ArrayList<ChainNode>();
......@@ -55,7 +56,7 @@ public class ChainInfo implements Writable {
chainNode.setParentLevelId(in.readLine());
chainNode.setLevelId(in.readInt());
chainNode.setBusinessKey(in.readLine());
nodes.add(chainNode);
}
}
......@@ -99,6 +100,7 @@ public class ChainInfo implements Writable {
if ((chainNode.getParentLevelId() == null || chainNode.getParentLevelId().length() == 0)
&& chainNode.getLevelId() == 0) {
firstChainNode = chainNode;
startDate = chainNode.getStartDate();
}
}
......@@ -137,6 +139,14 @@ public class ChainInfo implements Writable {
}
}
}
public void setChainToken(String chainToken) {
this.chainToken = chainToken;
}
public long getStartDate() {
return startDate;
}
}
......@@ -11,6 +11,7 @@ public class ChainNode {
private String parentLevelId;
private int levelId;
private String callType;
private long startDate;
// 不参与序列化
private String userId;
......@@ -79,14 +80,26 @@ public class ChainNode {
this.businessKey = businessKey;
}
public String getCallType() {
return callType;
}
public String getBusinessKey() {
return businessKey;
}
public String getTraceLevelId() {
StringBuilder stringBuilder = new StringBuilder();
if (getParentLevelId() != null && getParentLevelId().length() > 0) {
stringBuilder.append(getParentLevelId() + ".");
}
return stringBuilder.append(getLevelId()).toString();
}
public void setStartDate(long startDate) {
this.startDate = startDate;
}
public long getStartDate() {
return startDate;
}
public enum NodeStatus {
NORMAL('N'), ABNORMAL('A');
private char value;
......
package com.ai.cloud.skywalking.analysis.model;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
import java.util.Map;
public class ChainNodeSummaryResult {
private String traceLevelId;
private Map<String, SummaryResult> summerResultMap;
public ChainNodeSummaryResult(String value) {
JsonObject jsonObject = new Gson().fromJson(value, JsonObject.class);
traceLevelId = jsonObject.get("traceLevelId").getAsString();
summerResultMap = new Gson().fromJson(jsonObject.get("summerResultMap").getAsString(),
new TypeToken<Map<String, SummaryResult>>() {
}.getType());
}
public String getTraceLevelId() {
return traceLevelId;
}
public void setTraceLevelId(String traceLevelId) {
this.traceLevelId = traceLevelId;
}
public Map<String, SummaryResult> getSummerResultMap() {
return summerResultMap;
}
public void setSummerResultMap(Map<String, SummaryResult> summerResultMap) {
this.summerResultMap = summerResultMap;
}
public void summary(ChainNode node) {
SummaryResult summaryResult = summerResultMap.get(generateKey(node.getStartDate()));
if (summaryResult == null) {
summaryResult = new SummaryResult();
summerResultMap.put(node.getTraceLevelId(), summaryResult);
}
summaryResult.summary(node);
}
private String generateKey(long startTime) {
return null;
}
}
package com.ai.cloud.skywalking.analysis.model;
import java.util.*;
public class ChainRelate {
private Map<String, CategorizedChainInfo> categorizedChainInfoList = new HashMap<String, CategorizedChainInfo>();
private List<UncategorizeChainInfo> uncategorizeChainInfoList = new ArrayList<UncategorizeChainInfo>();
private void categoryUncategorizedChainInfo(CategorizedChainInfo parentChains) {
if (uncategorizeChainInfoList != null && uncategorizeChainInfoList.size() > 0) {
Iterator<UncategorizeChainInfo> uncategorizeChainInfoIterator = uncategorizeChainInfoList.iterator();
while (uncategorizeChainInfoIterator.hasNext()) {
UncategorizeChainInfo uncategorizeChainInfo = uncategorizeChainInfoIterator.next();
if (parentChains.isContained(uncategorizeChainInfo)) {
parentChains.add(uncategorizeChainInfo);
uncategorizeChainInfoIterator.remove();
}
}
}
}
private void classifiedChains(UncategorizeChainInfo child) {
boolean isContained = false;
for (Map.Entry<String, CategorizedChainInfo> entry : categorizedChainInfoList.entrySet()) {
if (entry.getValue().isContained(child)) {
entry.getValue().add(child);
isContained = true;
}
}
if (!isContained) {
uncategorizeChainInfoList.add(child);
}
}
private CategorizedChainInfo addCategorizedChain(ChainInfo chainInfo) {
if (!categorizedChainInfoList.containsKey(chainInfo.getChainToken())) {
categorizedChainInfoList.put(chainInfo.getChainToken(),
new CategorizedChainInfo(chainInfo));
}
return categorizedChainInfoList.get(chainInfo.getChainToken());
}
public void addRelate(ChainInfo chainInfo) {
if (chainInfo.getChainStatus() == ChainInfo.ChainStatus.NORMAL) {
CategorizedChainInfo categorizedChainInfo = addCategorizedChain(chainInfo);
categoryUncategorizedChainInfo(categorizedChainInfo);
} else {
UncategorizeChainInfo uncategorizeChainInfo = new UncategorizeChainInfo(chainInfo);
classifiedChains(uncategorizeChainInfo);
}
}
public void save() {
}
}
package com.ai.cloud.skywalking.analysis.model;
import com.ai.cloud.skywalking.analysis.util.HBaseUtil;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class ChainSummaryResult {
private Map<String, ChainNodeSummaryResult> chainNodeSummaryResultMap;
public ChainSummaryResult() {
chainNodeSummaryResultMap = new HashMap<String, ChainNodeSummaryResult>();
}
public static ChainSummaryResult load(String id) {
ChainSummaryResult result = null;
try {
result = HBaseUtil.selectChainSummaryResult(id);
} catch (IOException e) {
e.printStackTrace();
}
if (result == null) {
result = new ChainSummaryResult();
}
return result;
}
public void summaryResult(ChainInfo chainInfo) {
for (ChainNode node : chainInfo.getNodes()) {
ChainNodeSummaryResult chainNodeSummaryResult = chainNodeSummaryResultMap.get(node.getTraceLevelId());
chainNodeSummaryResult.summary(node);
}
}
public void addNodeSummaryResult(ChainNodeSummaryResult chainNodeSummaryResult) {
chainNodeSummaryResultMap.put(chainNodeSummaryResult.getTraceLevelId(), chainNodeSummaryResult);
}
}
......@@ -7,6 +7,7 @@ public class SpanEntry {
private Span clientSpan;
private Span serverSpan;
private String startTime;
public SpanEntry() {
......@@ -132,13 +133,13 @@ public class SpanEntry {
public Span getServerSpan() {
return serverSpan;
}
public void setSpan(Span span){
if (span.isReceiver()) {
this.serverSpan = span;
}else{
this.clientSpan = span;
}
public void setSpan(Span span) {
if (span.isReceiver()) {
this.serverSpan = span;
} else {
this.clientSpan = span;
}
}
public String getSpanType() {
......@@ -154,4 +155,11 @@ public class SpanEntry {
}
return serverSpan.getUserId();
}
public long getStartDate() {
if (clientSpan != null) {
return clientSpan.getStartDate();
}
return serverSpan.getStartDate();
}
}
package com.ai.cloud.skywalking.analysis.model;
import java.util.Map;
public class Summary {
private Map<String, ChainSummaryResult> summaryResultMap;
public void summary(ChainInfo chainInfo) {
String key = generateKey(chainInfo);
ChainSummaryResult chainSummaryResult = summaryResultMap.get(key);
if (chainSummaryResult == null) {
chainSummaryResult = ChainSummaryResult.load(key);
summaryResultMap.put(key, chainSummaryResult);
}
chainSummaryResult.summaryResult(chainInfo);
}
private String generateKey(ChainInfo chainInfo) {
return chainInfo.getChainToken() + ":" + (chainInfo.getStartDate() / (1000 * 60 * 60));
}
public void save() {
}
}
package com.ai.cloud.skywalking.analysis.model;
public class SummaryResult {
private long totalCall;
private long totalCostTime;
private long correctNumber;
public SummaryResult() {
totalCall = 0;
totalCostTime = 0;
correctNumber = 0;
}
public long getTotalCall() {
return totalCall;
}
public void setTotalCall(long totalCall) {
this.totalCall = totalCall;
}
public long getTotalCostTime() {
return totalCostTime;
}
public void setTotalCostTime(long totalCostTime) {
this.totalCostTime = totalCostTime;
}
public long getCorrectNumber() {
return correctNumber;
}
public void setCorrectNumber(long correctNumber) {
this.correctNumber = correctNumber;
}
public void summary(ChainNode node) {
totalCall++;
if (node.getStatus() == ChainNode.NodeStatus.NORMAL) {
correctNumber++;
}
totalCostTime += node.getCost();
}
}
package com.ai.cloud.skywalking.analysis.model;
public class UncategorizeChainInfo {
private String chainToken;
private String nodeRegEx;
public UncategorizeChainInfo(ChainInfo chainInfo) {
this.chainToken = chainInfo.getChainToken();
StringBuilder stringBuilder = new StringBuilder();
boolean flag = false;
for (ChainNode node : chainInfo.getNodes()) {
if (flag) {
stringBuilder.append("*");
}
stringBuilder.append(node.getTraceLevelId() + "-" + node.getViewPoint());
flag = true;
}
nodeRegEx = stringBuilder.toString();
}
public String getChainToken() {
return chainToken;
}
public void setChainToken(String chainToken) {
this.chainToken = chainToken;
}
public String getNodeRegEx() {
return nodeRegEx;
}
public void setNodeRegEx(String nodeRegEx) {
this.nodeRegEx = nodeRegEx;
}
}
package com.ai.cloud.skywalking.analysis.reduce;
import com.ai.cloud.skywalking.analysis.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.model.ChainRelate;
import com.ai.cloud.skywalking.analysis.model.Summary;
import com.ai.cloud.skywalking.analysis.util.HBaseUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.util.Iterator;
public class ChainInfoReduce extends TableReducer<Text, ChainInfo, Put> {
@Override
protected void reduce(Text key, Iterable<ChainInfo> values, Context context) throws IOException, InterruptedException {
String[] keyArray = key.toString().split(":");
String userId = keyArray[0];
String firstNode = keyArray[1];
//
ChainRelate chainRelate = HBaseUtil.selectCallChainRelationship(key.toString());
Summary summary = new Summary();
Iterator<ChainInfo> chainInfoIterator = values.iterator();
while (chainInfoIterator.hasNext()) {
ChainInfo chainInfo = chainInfoIterator.next();
chainRelate.addRelate(chainInfo);
summary.summary(chainInfo);
}
chainRelate.save();
// 入HBase库(关系表,Info表,汇总表)
summary.save();
// 入Mysql表
}
}
package com.ai.cloud.skywalking.analysis.util;
import com.ai.cloud.skywalking.analysis.model.ChainInfo;
public class ChainInfoUtil {
private ChainInfoUtil() {
// Non
}
public static ChainInfo generateUncategorizedChainInfo() {
ChainInfo chainInfo = new ChainInfo();
chainInfo.setChainToken("UNCATEGORIZED");
chainInfo.setUserId("-1");
return chainInfo;
}
}
......@@ -2,6 +2,9 @@ package com.ai.cloud.skywalking.analysis.util;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.ai.cloud.skywalking.analysis.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.model.ChainNodeSummaryResult;
import com.ai.cloud.skywalking.analysis.model.ChainRelate;
import com.ai.cloud.skywalking.analysis.model.ChainSummaryResult;
import com.ai.cloud.skywalking.protocol.Span;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
......@@ -76,7 +79,7 @@ public class HBaseUtil {
}
public static void selectById(String id) throws IOException {
public static ChainRelate selectCallChainRelationship(String id) throws IOException {
List<Span> entries = new ArrayList<Span>();
Table table = connection.getTable(TableName.valueOf(Config.HBase.TABLE_CALL_CHAIN_RELATIONSHIP));
Get g = new Get(Bytes.toBytes(id));
......@@ -85,6 +88,23 @@ public class HBaseUtil {
if (cell.getValueArray().length > 0)
entries.add(new Span(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())));
}
return null;
}
public static ChainSummaryResult selectChainSummaryResult(String key) throws IOException {
ChainSummaryResult result = null;
//TODO 初始化表
Table table = connection.getTable(TableName.valueOf(Config.HBase.TABLE_CHAIN_INFO));
Get g = new Get(Bytes.toBytes(key));
Result r = table.get(g);
for (Cell cell : r.rawCells()) {
result = new ChainSummaryResult();
if (cell.getValueArray().length > 0)
result.addNodeSummaryResult(new ChainNodeSummaryResult(Bytes.toString(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength())));
}
return result;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册