提交 fb309caa 编写于 作者: Z zhangxin10

提交保存逻辑

上级 dbdb9ee4
......@@ -3,6 +3,8 @@ package com.ai.cloud.skywalking.analysis.config;
public class Config {
public static class HBase {
public static String CHAIN_SUMMARY_COLUMN_FAMILY = "chain_summary";
public static String TRACE_INFO_COLUMN_FAMILY = "trace_info";
public static String CALL_CHAIN_TABLE_NAME;
......@@ -11,12 +13,16 @@ public class Config {
public static String ZK_CLIENT_PORT;
public static String TRACE_INFO_TABLE_NAME;
public static String TRACE_INFO_TABLE_NAME = "trace_info";
public static String TABLE_CALL_CHAIN_RELATIONSHIP = "sw_chain_relationship";
public static String CHAIN_RELATIONSHIP_COLUMN_FAMILY = "chain_relationship";
public static String TABLE_CHAIN_INFO = "sw_chain_info";
public static String TABLE_CHAIN_SUMMARY = "sw_chain_summary";
}
public static class TraceInfo {
......@@ -37,4 +43,8 @@ public class Config {
public static class Filter {
public static String FILTER_PACKAGE_NAME;
}
public class ChainNodeSummary {
public static final long INTERVAL = 5L;
}
}
package com.ai.cloud.skywalking.analysis.config;
public class Constants {
public static String UNCATEGORIZED_QUALIFIER_NAME = "UNCATEGORIZED_CALL_CHAIN";
}
package com.ai.cloud.skywalking.analysis.model;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
......@@ -27,11 +31,25 @@ public class CategorizedChainInfo {
children_Token = new ArrayList<String>();
}
public CategorizedChainInfo(String value) {
JsonObject jsonObject = new Gson().fromJson(value, JsonObject.class);
chainToken = jsonObject.get("chainToken").getAsString();
chainStr = jsonObject.get("chainStr").getAsString();
children_Token = new Gson().fromJson(jsonObject.get("children_Token"),
new TypeToken<List<String>>() {
}.getType());
}
public String getChainStr() {
return chainStr;
}
public boolean isContained(UncategorizeChainInfo uncategorizeChainInfo) {
if (children_Token.contains(uncategorizeChainInfo.getChainToken())) {
return false;
}
Pattern pattern = Pattern.compile(uncategorizeChainInfo.getNodeRegEx());
return pattern.matcher(getChainStr()).find();
}
......@@ -39,4 +57,9 @@ public class CategorizedChainInfo {
public void add(UncategorizeChainInfo uncategorizeChainInfo) {
children_Token.add(uncategorizeChainInfo.getChainToken());
}
@Override
public String toString() {
return new Gson().toJson(this);
}
}
package com.ai.cloud.skywalking.analysis.model;
/**
* Created by astraea on 2016/1/21.
*/
public class ChainDetail {
public ChainDetail(ChainInfo chainInfo) {
}
public ChainDetail(UncategorizeChainInfo child) {
}
}
package com.ai.cloud.skywalking.analysis.model;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
......@@ -13,13 +14,13 @@ public class ChainNodeSpecificTimeWindowSummary {
private Map<String, SummaryResult> summerResultMap;
public static ChainNodeSpecificTimeWindowSummary newInstance(String traceLevelId){
public static ChainNodeSpecificTimeWindowSummary newInstance(String traceLevelId) {
ChainNodeSpecificTimeWindowSummary cns = new ChainNodeSpecificTimeWindowSummary();
cns.traceLevelId = traceLevelId;
return cns;
}
private ChainNodeSpecificTimeWindowSummary(){
private ChainNodeSpecificTimeWindowSummary() {
summerResultMap = new HashMap<String, SummaryResult>();
}
......@@ -57,7 +58,12 @@ public class ChainNodeSpecificTimeWindowSummary {
}
private String generateKey(long startTime) {
long minutes = (startTime % (1000 * 60 * 60)) / (1000 * 60);
return String.valueOf(minutes / Config.ChainNodeSummary.INTERVAL);
}
return null;
@Override
public String toString() {
return new Gson().toJson(this);
}
}
package com.ai.cloud.skywalking.analysis.model;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.ai.cloud.skywalking.analysis.config.Constants;
import com.ai.cloud.skywalking.analysis.util.HBaseUtil;
import com.google.gson.Gson;
import org.apache.hadoop.hbase.client.Put;
import java.io.IOException;
import java.util.*;
public class ChainRelate {
private Map<String, CategorizedChainInfo> categorizedChainInfoList = new HashMap<String, CategorizedChainInfo>();
private String key;
private Map<String, CategorizedChainInfo> categorizedChainInfoMap = new HashMap<String, CategorizedChainInfo>();
private List<UncategorizeChainInfo> uncategorizeChainInfoList = new ArrayList<UncategorizeChainInfo>();
private Map<String, ChainDetail> chainDetailMap = new HashMap<String, ChainDetail>();
public ChainRelate(String key) {
this.key = key;
}
private void categoryUncategorizedChainInfo(CategorizedChainInfo parentChains) {
if (uncategorizeChainInfoList != null && uncategorizeChainInfoList.size() > 0) {
......@@ -21,24 +34,34 @@ public class ChainRelate {
private void classifiedChains(UncategorizeChainInfo child) {
boolean isContained = false;
for (Map.Entry<String, CategorizedChainInfo> entry : categorizedChainInfoList.entrySet()) {
for (Map.Entry<String, CategorizedChainInfo> entry : categorizedChainInfoMap.entrySet()) {
if (entry.getValue().isContained(child)) {
entry.getValue().add(child);
isContained = true;
}
}
if (!isContained) {
uncategorizeChainInfoList.add(child);
if (!uncategorizeChainInfoList.contains(child)){
chainDetailMap.put(child.getChainToken(), new ChainDetail(child));
}
}
}
private CategorizedChainInfo addCategorizedChain(ChainInfo chainInfo) {
if (!categorizedChainInfoList.containsKey(chainInfo.getChainToken())) {
categorizedChainInfoList.put(chainInfo.getChainToken(),
if (!categorizedChainInfoMap.containsKey(chainInfo.getChainToken())) {
categorizedChainInfoMap.put(chainInfo.getChainToken(),
new CategorizedChainInfo(chainInfo));
chainDetailMap.put(chainInfo.getChainToken(), new ChainDetail(chainInfo));
}
return categorizedChainInfoList.get(chainInfo.getChainToken());
return categorizedChainInfoMap.get(chainInfo.getChainToken());
}
public void addRelate(ChainInfo chainInfo) {
......@@ -52,6 +75,45 @@ public class ChainRelate {
}
public void save() {
Put put = new Put(getKey().getBytes());
put.addColumn(Config.HBase.CHAIN_RELATIONSHIP_COLUMN_FAMILY.getBytes(), Constants.UNCATEGORIZED_QUALIFIER_NAME.getBytes()
, new Gson().toJson(getUncategorizeChainInfoList()).getBytes());
for (Map.Entry<String, CategorizedChainInfo> entry : getCategorizedChainInfoMap().entrySet()) {
put.addColumn(Config.HBase.CHAIN_RELATIONSHIP_COLUMN_FAMILY.getBytes(), entry.getKey().getBytes()
, entry.getValue().toString().getBytes());
}
try {
HBaseUtil.saveChainRelate(put);
} catch (IOException e) {
//TODO
e.printStackTrace();
}
}
public void addUncategorizeChain(UncategorizeChainInfo uncategorizeChainInfo) {
uncategorizeChainInfoList.add(uncategorizeChainInfo);
}
public void addCategorizeChain(String qualifierName, CategorizedChainInfo categorizedChainInfo) {
categorizedChainInfoMap.put(qualifierName, categorizedChainInfo);
}
public String getKey() {
return key;
}
public Map<String, CategorizedChainInfo> getCategorizedChainInfoMap() {
return categorizedChainInfoMap;
}
public List<UncategorizeChainInfo> getUncategorizeChainInfoList() {
return uncategorizeChainInfoList;
}
public void addUncategorizeChain(List<UncategorizeChainInfo> uncategorizeChainInfos) {
uncategorizeChainInfoList.addAll(uncategorizeChainInfos);
}
}
package com.ai.cloud.skywalking.analysis.model;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.ai.cloud.skywalking.analysis.util.HBaseUtil;
import org.apache.hadoop.hbase.client.Put;
import java.io.IOException;
import java.util.HashMap;
......@@ -28,18 +30,23 @@ public class ChainSpecificTimeWindowSummary {
return result;
}
public void summaryResult(ChainInfo chainInfo) {
for (ChainNode node : chainInfo.getNodes()) {
public void addNodeSummaryResult(ChainNodeSpecificTimeWindowSummary chainNodeSummaryResult) {
chainNodeSummaryResultMap.put(chainNodeSummaryResult.getTraceLevelId(), chainNodeSummaryResult);
}
public void summaryResult(ChainNode node) {
String tlid = node.getTraceLevelId();
ChainNodeSpecificTimeWindowSummary chainNodeSummaryResult = chainNodeSummaryResultMap.get(tlid);
if(chainNodeSummaryResult == null){
if (chainNodeSummaryResult == null) {
chainNodeSummaryResult = ChainNodeSpecificTimeWindowSummary.newInstance(tlid);
chainNodeSummaryResultMap.put(tlid, chainNodeSummaryResult);
}
chainNodeSummaryResult.summary(node);
}
}
public void addNodeSummaryResult(ChainNodeSpecificTimeWindowSummary chainNodeSummaryResult) {
chainNodeSummaryResultMap.put(chainNodeSummaryResult.getTraceLevelId(), chainNodeSummaryResult);
public void save(Put put) {
for (Map.Entry<String, ChainNodeSpecificTimeWindowSummary> entry : chainNodeSummaryResultMap.entrySet()) {
put.addColumn(Config.HBase.CHAIN_SUMMARY_COLUMN_FAMILY.getBytes(), entry.getKey().getBytes(), entry.getValue().toString().getBytes());
}
}
}
package com.ai.cloud.skywalking.analysis.model;
import com.ai.cloud.skywalking.analysis.util.HBaseUtil;
import org.apache.hadoop.hbase.client.Put;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class Summary {
private Map<String, ChainSpecificTimeWindowSummary> summaryResultMap;
public Summary() {
summaryResultMap = new HashMap<String, ChainSpecificTimeWindowSummary>();
}
public void summary(ChainInfo chainInfo) {
String csk = generateChainSummaryKey(chainInfo);
for (ChainNode node : chainInfo.getNodes()) {
String csk = generateChainSummaryKey(chainInfo.getChainToken(), node.getStartDate());
ChainSpecificTimeWindowSummary chainSummaryResult = summaryResultMap.get(csk);
if (chainSummaryResult == null) {
chainSummaryResult = ChainSpecificTimeWindowSummary.load(csk);
summaryResultMap.put(csk, chainSummaryResult);
}
chainSummaryResult.summaryResult(chainInfo);
chainSummaryResult.summaryResult(node);
}
}
private String generateChainSummaryKey(ChainInfo chainInfo) {
return chainInfo.getChainToken() + ":" + (chainInfo.getStartDate() / (1000 * 60 * 60));
private String generateChainSummaryKey(String chainToken, long startDate) {
return chainToken + "-" + (startDate / (1000 * 60 * 60));
}
public void save() {
public void save() throws IOException, InterruptedException {
List<Put> puts = new ArrayList<Put>();
for (Map.Entry<String, ChainSpecificTimeWindowSummary> entry : summaryResultMap.entrySet()) {
Put put = new Put(entry.getKey().getBytes());
entry.getValue().save(put);
puts.add(put);
}
HBaseUtil.batchSaveChainSpecificTimeWindowSummary(puts);
}
}
......@@ -36,12 +36,10 @@ public class SummaryResult {
}
public void summary(ChainNode node) {
totalCall++;
if (node.getStatus() == ChainNode.NodeStatus.NORMAL) {
correctNumber++;
}
totalCostTime += node.getCost();
}
}
package com.ai.cloud.skywalking.analysis.model;
import com.google.gson.Gson;
public class UncategorizeChainInfo {
private String chainToken;
private String nodeRegEx;
public UncategorizeChainInfo() {
}
public UncategorizeChainInfo(ChainInfo chainInfo) {
this.chainToken = chainInfo.getChainToken();
StringBuilder stringBuilder = new StringBuilder();
......@@ -34,4 +39,9 @@ public class UncategorizeChainInfo {
public void setNodeRegEx(String nodeRegEx) {
this.nodeRegEx = nodeRegEx;
}
@Override
public String toString() {
return new Gson().toJson(this);
}
}
......@@ -14,7 +14,6 @@ import java.util.Iterator;
public class ChainInfoReduce extends TableReducer<Text, ChainInfo, Put> {
@Override
protected void reduce(Text key, Iterable<ChainInfo> values, Context context) throws IOException, InterruptedException {
ChainRelate chainRelate = HBaseUtil.selectCallChainRelationship(key.toString());
Summary summary = new Summary();
Iterator<ChainInfo> chainInfoIterator = values.iterator();
......@@ -29,5 +28,6 @@ public class ChainInfoReduce extends TableReducer<Text, ChainInfo, Put> {
summary.save();
// 入Mysql表
}
}
package com.ai.cloud.skywalking.analysis.util;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.ai.cloud.skywalking.analysis.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.model.ChainNodeSpecificTimeWindowSummary;
import com.ai.cloud.skywalking.analysis.model.ChainRelate;
import com.ai.cloud.skywalking.analysis.model.ChainSpecificTimeWindowSummary;
import com.ai.cloud.skywalking.protocol.Span;
import com.ai.cloud.skywalking.analysis.config.Constants;
import com.ai.cloud.skywalking.analysis.model.*;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
......@@ -14,7 +13,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class HBaseUtil {
......@@ -53,18 +51,27 @@ public class HBaseUtil {
static {
try {
initHBaseClient();
Admin admin = connection.getAdmin();
if (!admin.isTableAvailable(TableName.valueOf(Config.HBase.TRACE_INFO_TABLE_NAME))) {
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(Config.HBase.TRACE_INFO_TABLE_NAME));
tableDesc.addFamily(new HColumnDescriptor(Config.HBase.TRACE_INFO_COLUMN_FAMILY));
admin.createTable(tableDesc);
logger.info("Create table [{}] ok!", Config.HBase.TRACE_INFO_TABLE_NAME);
}
//
createTableIfNeed(Config.HBase.TRACE_INFO_TABLE_NAME, Config.HBase.TRACE_INFO_COLUMN_FAMILY);
//
createTableIfNeed(Config.HBase.TABLE_CALL_CHAIN_RELATIONSHIP, Config.HBase.CHAIN_RELATIONSHIP_COLUMN_FAMILY);
createTableIfNeed(Config.HBase.TABLE_CHAIN_SUMMARY, Config.HBase.CHAIN_SUMMARY_COLUMN_FAMILY);
} catch (IOException e) {
logger.error("Create table[{}] failed", Config.HBase.TRACE_INFO_TABLE_NAME, e);
}
}
private static void createTableIfNeed(String tableName, String familyName) throws IOException {
Admin admin = connection.getAdmin();
if (!admin.isTableAvailable(TableName.valueOf(tableName))) {
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
tableDesc.addFamily(new HColumnDescriptor(familyName.getBytes()));
admin.createTable(tableDesc);
logger.info("Create table [{}] ok!", tableName);
}
}
private static void initHBaseClient() throws IOException {
if (configuration == null) {
configuration = HBaseConfiguration.create();
......@@ -79,21 +86,34 @@ public class HBaseUtil {
}
public static ChainRelate selectCallChainRelationship(String id) throws IOException {
List<Span> entries = new ArrayList<Span>();
public static ChainRelate selectCallChainRelationship(String key) throws IOException {
ChainRelate chainRelate = new ChainRelate(key);
Table table = connection.getTable(TableName.valueOf(Config.HBase.TABLE_CALL_CHAIN_RELATIONSHIP));
Get g = new Get(Bytes.toBytes(id));
Get g = new Get(Bytes.toBytes(key));
Result r = table.get(g);
for (Cell cell : r.rawCells()) {
if (cell.getValueArray().length > 0)
entries.add(new Span(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())));
if (cell.getValueArray().length > 0) {
String qualifierName = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength());
if (Constants.UNCATEGORIZED_QUALIFIER_NAME.equals(qualifierName)) {
List<UncategorizeChainInfo> uncategorizeChainInfoList = new Gson().fromJson(Bytes.toString(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength()),
new TypeToken<List<UncategorizeChainInfo>>() {
}.getType());
chainRelate.addUncategorizeChain(uncategorizeChainInfoList);
} else {
chainRelate.addCategorizeChain(qualifierName, new CategorizedChainInfo(
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())
));
}
}
}
return null;
return chainRelate;
}
public static ChainSpecificTimeWindowSummary selectChainSummaryResult(String key) throws IOException {
ChainSpecificTimeWindowSummary result = null;
//TODO 初始化表
Table table = connection.getTable(TableName.valueOf(Config.HBase.TABLE_CHAIN_INFO));
Get g = new Get(Bytes.toBytes(key));
Result r = table.get(g);
......@@ -107,4 +127,25 @@ public class HBaseUtil {
return result;
}
public static void saveChainRelate(Put put) throws IOException {
Table table = connection.getTable(TableName.valueOf(Config.HBase.TABLE_CALL_CHAIN_RELATIONSHIP));
table.put(put);
if (logger.isDebugEnabled()) {
logger.debug("Insert data[RowKey:{}] success.", put.getId());
}
}
public static void batchSaveChainSpecificTimeWindowSummary(List<Put> puts) throws IOException, InterruptedException {
Table table = connection.getTable(TableName.valueOf(Config.HBase.TABLE_CHAIN_SUMMARY));
Object[] resultArrays = new Object[puts.size()];
table.batch(puts, resultArrays);
for (Object result : resultArrays) {
if (result == null) {
//TODO
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册