提交 fe8911d4 编写于 作者: wu-sheng's avatar wu-sheng

1.修改大量类名和方法名

2.增加reduce的入口日志,以及大数据量批量运行计数器
3.修复部分编译告警
上级 fe2ef6a7
package com.ai.cloud.skywalking.analysis.chainbuild;
import com.ai.cloud.skywalking.analysis.chainbuild.action.ISummaryAction;
import com.ai.cloud.skywalking.analysis.chainbuild.po.SummaryType;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
......@@ -11,81 +10,65 @@ import org.apache.hadoop.mapreduce.Reducer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.Iterator;
import com.ai.cloud.skywalking.analysis.chainbuild.action.IStatisticsAction;
import com.ai.cloud.skywalking.analysis.chainbuild.po.SummaryType;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;
public class ChainBuildReducer extends Reducer<Text, Text, Text, IntWritable> {
private Logger logger = LogManager.getLogger(ChainBuildReducer.class);
private Logger logger = LogManager.getLogger(ChainBuildReducer.class);
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
ConfigInitializer.initialize();
Config.AnalysisServer.IS_ACCUMULATE_MODE = Boolean.parseBoolean(context.getConfiguration()
.get("skywalking.analysis.mode", "false"));
logger.info("Skywalking analysis mode :[{}]",
Config.AnalysisServer.IS_ACCUMULATE_MODE ? "ACCUMULATE" : "REWRITE");
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
ConfigInitializer.initialize();
Config.AnalysisServer.IS_ACCUMULATE_MODE = Boolean.parseBoolean(context
.getConfiguration().get("skywalking.analysis.mode", "false"));
logger.info("Skywalking analysis mode :[{}]",
Config.AnalysisServer.IS_ACCUMULATE_MODE ? "ACCUMULATE"
: "REWRITE");
}
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String reduceKey = Bytes.toString(key.getBytes());
int index = reduceKey.indexOf(":");
if (index == -1) {
return;
}
String summaryTypeAndDateStr = reduceKey.substring(0, index - 1);
String entryKey = reduceKey.substring(index + 1);
ISummaryAction summaryAction = SummaryType.chooseSummaryAction(summaryTypeAndDateStr, entryKey);
doReduceAction(summaryAction, values.iterator());
}
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String reduceKey = Bytes.toString(key.getBytes());
int index = reduceKey.indexOf(":");
if (index == -1) {
return;
}
String summaryTypeAndDateStr = reduceKey.substring(0, index - 1);
String entryKey = reduceKey.substring(index + 1);
logger.debug("begin to reduce for key: {}", reduceKey);
IStatisticsAction summaryAction = SummaryType.chooseSummaryAction(
summaryTypeAndDateStr, entryKey);
doReduceAction(reduceKey, summaryAction, values.iterator());
}
public void doReduceAction(ISummaryAction summaryAction, Iterator<Text> iterator) {
while (iterator.hasNext()) {
String summaryData = iterator.next().toString();
try {
summaryAction.doAction(summaryData);
} catch (Exception e) {
logger.error(
"Failed to summary call chain, maybe illegal data:"
+ summaryData, e);
}
}
public void doReduceAction(String reduceKey, IStatisticsAction summaryAction,
Iterator<Text> iterator) {
long dataCounter = 0;
while (iterator.hasNext()) {
String summaryData = iterator.next().toString();
try {
summaryAction.doAction(summaryData);
} catch (Exception e) {
logger.error(
"Failed to summary call chain, maybe illegal data:"
+ summaryData, e);
}finally{
dataCounter++;
}
if(dataCounter % 1000 == 0){
logger.debug("reduce for key: {}, count: {}", reduceKey, dataCounter);
}
}
try {
summaryAction.doSave();
} catch (Exception e) {
logger.error("Failed to save summaryresult/chainTree.", e);
}
}
//
// public void doReduceAction(String key, SummaryType summaryType, String summaryDateStr, Iterator<Text> chainInfoIterator)
// throws IOException, InterruptedException {
// CallChainTree chainTree = CallChainTree.load(key);
// SpecificTimeCallTreeMergedChainIdContainer container
// = new SpecificTimeCallTreeMergedChainIdContainer(chainTree.getTreeToken());
// while (chainInfoIterator.hasNext()) {
// String chainNodeData = chainInfoIterator.next().toString();
//// ChainInfo chainInfo = null;
// ChainNode chainNode = null;
// try {
//// chainInfo = new Gson().fromJson(callChainData, ChainInfo.class);
// chainNode = new Gson().fromJson(chainNodeData, ChainNode.class);
// // container.addMergedChainNodeIdIfNotContain(chainNode);
// //container.addMergedChainIfNotContain(chainInfo);
// //chainTree.summary(chainInfo, summaryType);
// } catch (Exception e) {
// logger.error(
// "Failed to summary call chain, maybe illegal data:"
// + chainNodeData, e);
// }
// }
// try {
// container.saveToHBase(summaryType);
// chainTree.saveToHBase();
// } catch (Exception e) {
// logger.error("Failed to save summaryresult/chainTree.", e);
// }
// }
try {
summaryAction.doSave();
} catch (Exception e) {
logger.error("Failed to save summaryresult/chainTree.", e);
}
}
}
......@@ -4,7 +4,7 @@ package com.ai.cloud.skywalking.analysis.chainbuild.action;
import java.io.IOException;
import java.sql.SQLException;
public interface ISummaryAction {
public interface IStatisticsAction {
void doAction(String summaryData) throws IOException;
void doSave() throws InterruptedException, SQLException, IOException;
......
package com.ai.cloud.skywalking.analysis.chainbuild.action.impl;
import com.ai.cloud.skywalking.analysis.chainbuild.action.ISummaryAction;
import java.io.IOException;
import java.sql.SQLException;
import com.ai.cloud.skywalking.analysis.chainbuild.action.IStatisticsAction;
import com.ai.cloud.skywalking.analysis.chainbuild.entity.CallChainTree;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainInfo;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainNode;
import com.ai.cloud.skywalking.analysis.chainbuild.po.SpecificTimeCallTreeMergedChainIdContainer;
import com.ai.cloud.skywalking.analysis.chainbuild.po.SpecificTimeCallChainTreeContainer;
import com.google.gson.Gson;
import java.io.IOException;
import java.sql.SQLException;
public class RelationShipAction implements ISummaryAction {
public class CallChainRelationshipAction implements IStatisticsAction {
private CallChainTree chainTree;
private SpecificTimeCallTreeMergedChainIdContainer container;
public RelationShipAction(String entryKey) throws IOException {
chainTree = CallChainTree.load(entryKey);
container = new SpecificTimeCallTreeMergedChainIdContainer(chainTree.getTreeToken());
private SpecificTimeCallChainTreeContainer container;
public CallChainRelationshipAction(String entryKey) throws IOException {
chainTree = CallChainTree.create(entryKey);
container = new SpecificTimeCallChainTreeContainer(chainTree.getTreeToken());
}
@Override
public void doAction(String summaryData) throws IOException {
ChainInfo chainInfo = new Gson().fromJson(summaryData, ChainInfo.class);
container.addMergedChainIfNotContain(chainInfo);
container.addChainIfNew(chainInfo);
}
@Override
......
package com.ai.cloud.skywalking.analysis.chainbuild.action.impl;
import com.ai.cloud.skywalking.analysis.chainbuild.action.ISummaryAction;
import com.ai.cloud.skywalking.analysis.chainbuild.action.IStatisticsAction;
import com.ai.cloud.skywalking.analysis.chainbuild.entity.CallChainTree;
import com.ai.cloud.skywalking.analysis.chainbuild.entity.CallChainTreeNode;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainNode;
......@@ -10,13 +10,13 @@ import com.google.gson.Gson;
import java.io.IOException;
import java.sql.SQLException;
public class DateDetailSummaryAction implements ISummaryAction {
public class NumberOfCalledStatisticsAction implements IStatisticsAction {
private CallChainTree callChainTree;
private String summaryDate;
private SummaryType summaryType;
public DateDetailSummaryAction(String entryKey, String summaryDate) throws IOException {
callChainTree = CallChainTree.load(entryKey);
public NumberOfCalledStatisticsAction(String entryKey, String summaryDate) throws IOException {
callChainTree = CallChainTree.create(entryKey);
this.summaryDate = summaryDate;
}
......
package com.ai.cloud.skywalking.analysis.chainbuild.entity;
import com.ai.cloud.skywalking.analysis.chainbuild.DBCallChainInfoDao;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainInfo;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainNode;
import com.ai.cloud.skywalking.analysis.config.HBaseTableMetaData;
import com.google.gson.Gson;
import org.apache.hadoop.hbase.client.Put;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import com.ai.cloud.skywalking.analysis.chainbuild.DBCallChainInfoDao;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainInfo;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainNode;
import com.google.gson.Gson;
public class CallChainDetailForMysql {
private String chainToken;
private String treeToken;
......
......@@ -4,13 +4,12 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainInfo;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainNode;
import com.ai.cloud.skywalking.analysis.chainbuild.po.SummaryType;
import com.ai.cloud.skywalking.analysis.chainbuild.util.TokenGenerator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.ai.cloud.skywalking.analysis.chainbuild.po.SummaryType;
import com.ai.cloud.skywalking.analysis.chainbuild.util.TokenGenerator;
public class CallChainTree {
private Logger logger = LogManager.getLogger(CallChainTree.class);
......@@ -31,7 +30,7 @@ public class CallChainTree {
logger.info("CallEntrance:[{}] == TreeToken[{}]",callEntrance, treeToken);
}
public static CallChainTree load(String callEntrance) throws IOException {
public static CallChainTree create(String callEntrance) throws IOException {
CallChainTree chain = new CallChainTree(callEntrance);
return chain;
}
......
......@@ -21,220 +21,269 @@ import java.util.*;
* @author wusheng
*/
public class CallChainTreeNode {
private Logger logger = LogManager.getLogger(CallChainTreeNode.class);
@Expose
private String traceLevelId;
@Expose
private String viewPointId;
/**
* key: treeId + 小时
* value: 当前树的当前小时范围内的,所有分钟和节点的统计数据
*/
private Map<String, ChainNodeSpecificMinSummary> chainNodeSpecificMinSummaryContainer;
/**
* key: treeId + 天
* value: 当前树的当前天范围内的,所有小时和节点的统计数据
*/
private Map<String, ChainNodeSpecificHourSummary> chainNodeSpecificHourSummaryContainer;
/**
* key: treeId + 月
* value: 当前树的当前月范围内的,所有天和节点的统计数据
*/
private Map<String, ChainNodeSpecificDaySummary> chainNodeSpecificDaySummaryContainer;
/**
* key: treeId + 年
* value: 当前树的当前年范围内的,所有月份和节点的统计数据
*/
private Map<String, ChainNodeSpecificMonthSummary> chainNodeSpecificMonthSummaryContainer;
public CallChainTreeNode(ChainNode node) {
this.traceLevelId = node.getTraceLevelId();
chainNodeSpecificMinSummaryContainer = new HashMap<String, ChainNodeSpecificMinSummary>();
chainNodeSpecificHourSummaryContainer = new HashMap<String, ChainNodeSpecificHourSummary>();
chainNodeSpecificDaySummaryContainer = new HashMap<String, ChainNodeSpecificDaySummary>();
chainNodeSpecificMonthSummaryContainer = new HashMap<String, ChainNodeSpecificMonthSummary>();
this.viewPointId = node.getViewPoint();
}
public void summary(String treeId, ChainNode node, SummaryType summaryType, String summaryDate) throws IOException {
Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date(node.getStartDate()));
switch (summaryType) {
case HOUR: {
summaryMinResult(treeId, node, calendar, summaryDate);
break;
}
case DAY: {
summaryHourResult(treeId, node, calendar, summaryDate);
break;
}
case MONTH: {
summaryDayResult(treeId, node, calendar, summaryDate);
break;
}
case YEAR: {
summaryMonthResult(treeId, node, calendar, summaryDate);
break;
}
}
}
private void summaryMonthResult(String treeId, ChainNode node, Calendar calendar, String summaryDate) throws IOException {
String keyOfMonthSummaryTable = generateRowKey(treeId, summaryDate);
ChainNodeSpecificMonthSummary monthSummary = chainNodeSpecificMonthSummaryContainer.get(keyOfMonthSummaryTable);
if (monthSummary == null) {
if (Config.AnalysisServer.IS_ACCUMULATE_MODE) {
monthSummary = HBaseUtil.loadSpecificMonthSummary(keyOfMonthSummaryTable, getTreeNodeId());
} else {
monthSummary = new ChainNodeSpecificMonthSummary();
}
chainNodeSpecificMonthSummaryContainer.put(keyOfMonthSummaryTable, monthSummary);
}
monthSummary.summary(String.valueOf(calendar.get(Calendar.MONTH) + 1), node);
}
private void summaryDayResult(String treeId, ChainNode node, Calendar calendar, String summaryDate) throws IOException {
String keyOfDaySummaryTable = generateRowKey(treeId, summaryDate);
ChainNodeSpecificDaySummary daySummary = chainNodeSpecificDaySummaryContainer.get(keyOfDaySummaryTable);
if (daySummary == null) {
if (Config.AnalysisServer.IS_ACCUMULATE_MODE) {
daySummary = HBaseUtil.loadSpecificDaySummary(keyOfDaySummaryTable, getTreeNodeId());
} else {
daySummary = new ChainNodeSpecificDaySummary();
}
chainNodeSpecificDaySummaryContainer.put(keyOfDaySummaryTable, daySummary);
}
daySummary.summary(String.valueOf(calendar.get(Calendar.DAY_OF_MONTH)), node);
}
private void summaryHourResult(String treeId, ChainNode node, Calendar calendar, String summaryDate) throws IOException {
String keyOfHourSummaryTable = generateRowKey(treeId, summaryDate);
ChainNodeSpecificHourSummary hourSummary = chainNodeSpecificHourSummaryContainer.get(keyOfHourSummaryTable);
if (hourSummary == null) {
if (Config.AnalysisServer.IS_ACCUMULATE_MODE) {
hourSummary = HBaseUtil.loadSpecificHourSummary(keyOfHourSummaryTable, getTreeNodeId());
} else {
hourSummary = new ChainNodeSpecificHourSummary();
}
chainNodeSpecificHourSummaryContainer.put(keyOfHourSummaryTable, hourSummary);
}
hourSummary.summary(String.valueOf(calendar.get(Calendar.HOUR)), node);
}
/**
* 按分钟维度进行汇总<br/>
* chainNodeContainer以treeId和时间(精确到分钟)为key,value为当前时间范围内的所有分钟的汇总数据
*/
private void summaryMinResult(String treeId, ChainNode node, Calendar calendar, String summaryDate) throws IOException {
String keyOfMinSummaryTable = generateRowKey(treeId, summaryDate);
ChainNodeSpecificMinSummary minSummary = chainNodeSpecificMinSummaryContainer.get(keyOfMinSummaryTable);
if (minSummary == null) {
if (Config.AnalysisServer.IS_ACCUMULATE_MODE) {
minSummary = HBaseUtil.loadSpecificMinSummary(keyOfMinSummaryTable, getTreeNodeId());
} else {
minSummary = new ChainNodeSpecificMinSummary();
}
chainNodeSpecificMinSummaryContainer.put(keyOfMinSummaryTable, minSummary);
}
minSummary.summary(String.valueOf(calendar.get(Calendar.MINUTE)), node);
}
private String generateRowKey(String treeId, String dateKey) {
return treeId + "/" + dateKey;
}
@Override
public String toString() {
return new GsonBuilder().excludeFieldsWithoutExposeAnnotation().create().toJson(this);
}
/**
* 存储入库时 <br/>
* hbase的key 为 treeId + 小时 <br/>
* 列族中,列为节点id,规则为:traceLevelId + "@" + viewPointId <br/>
* 列的值,为当前节点按小时内各分钟的汇总 <br/>
*
* @throws IOException
* @throws InterruptedException
*/
public void saveSummaryResultToHBase(SummaryType summaryType) throws IOException, InterruptedException {
switch (summaryType) {
case HOUR: {
batchSaveMinSummaryResult();
break;
}
case DAY: {
batchSaveHourSummaryResult();
break;
}
case MONTH: {
batchSaveDaySummaryResult();
break;
}
case YEAR: {
batchSaveMonthSummaryResult();
break;
}
}
}
private void batchSaveMonthSummaryResult() throws IOException, InterruptedException {
List<Put> puts = new ArrayList<Put>();
for (Map.Entry<String, ChainNodeSpecificMonthSummary> entry : chainNodeSpecificMonthSummaryContainer.entrySet()) {
Put put = new Put(entry.getKey().getBytes());
put.addColumn(HBaseTableMetaData.TABLE_CHAIN_ONE_MONTH_SUMMARY.COLUMN_FAMILY_NAME.getBytes()
, getTreeNodeId().getBytes(), entry.getValue().toString().getBytes());
puts.add(put);
}
HBaseUtil.batchSaveMonthSummaryResult(puts);
}
private void batchSaveDaySummaryResult() throws IOException, InterruptedException {
List<Put> puts = new ArrayList<Put>();
for (Map.Entry<String, ChainNodeSpecificDaySummary> entry : chainNodeSpecificDaySummaryContainer.entrySet()) {
Put put = new Put(entry.getKey().getBytes());
put.addColumn(HBaseTableMetaData.TABLE_CHAIN_ONE_DAY_SUMMARY.COLUMN_FAMILY_NAME.getBytes()
, getTreeNodeId().getBytes(), entry.getValue().toString().getBytes());
puts.add(put);
}
HBaseUtil.batchSaveDaySummaryResult(puts);
}
private void batchSaveHourSummaryResult() throws IOException, InterruptedException {
List<Put> puts = new ArrayList<Put>();
for (Map.Entry<String, ChainNodeSpecificHourSummary> entry : chainNodeSpecificHourSummaryContainer.entrySet()) {
Put put = new Put(entry.getKey().getBytes());
put.addColumn(HBaseTableMetaData.TABLE_CHAIN_ONE_HOUR_SUMMARY.COLUMN_FAMILY_NAME.getBytes()
, getTreeNodeId().getBytes(), entry.getValue().toString().getBytes());
puts.add(put);
}
HBaseUtil.batchSaveHourSummaryResult(puts);
}
private void batchSaveMinSummaryResult() throws IOException, InterruptedException {
List<Put> puts = new ArrayList<Put>();
for (Map.Entry<String, ChainNodeSpecificMinSummary> entry : chainNodeSpecificMinSummaryContainer.entrySet()) {
Put put = new Put(entry.getKey().getBytes());
put.addColumn(HBaseTableMetaData.TABLE_CHAIN_ONE_MINUTE_SUMMARY.COLUMN_FAMILY_NAME.getBytes()
, getTreeNodeId().getBytes(), entry.getValue().toString().getBytes());
puts.add(put);
}
HBaseUtil.batchSaveMinSummaryResult(puts);
}
public String getTreeNodeId() {
return traceLevelId + "@" + viewPointId;
}
private Logger logger = LogManager.getLogger(CallChainTreeNode.class);
@Expose
private String traceLevelId;
@Expose
private String viewPointId;
/**
* key: treeId + 小时 value: 当前树的当前小时范围内的,所有分钟和节点的统计数据
*/
private Map<String, ChainNodeSpecificMinSummary> chainNodeSpecificMinSummaryContainer;
/**
* key: treeId + 天 value: 当前树的当前天范围内的,所有小时和节点的统计数据
*/
private Map<String, ChainNodeSpecificHourSummary> chainNodeSpecificHourSummaryContainer;
/**
* key: treeId + 月 value: 当前树的当前月范围内的,所有天和节点的统计数据
*/
private Map<String, ChainNodeSpecificDaySummary> chainNodeSpecificDaySummaryContainer;
/**
* key: treeId + 年 value: 当前树的当前年范围内的,所有月份和节点的统计数据
*/
private Map<String, ChainNodeSpecificMonthSummary> chainNodeSpecificMonthSummaryContainer;
public CallChainTreeNode(ChainNode node) {
this.traceLevelId = node.getTraceLevelId();
chainNodeSpecificMinSummaryContainer = new HashMap<String, ChainNodeSpecificMinSummary>();
chainNodeSpecificHourSummaryContainer = new HashMap<String, ChainNodeSpecificHourSummary>();
chainNodeSpecificDaySummaryContainer = new HashMap<String, ChainNodeSpecificDaySummary>();
chainNodeSpecificMonthSummaryContainer = new HashMap<String, ChainNodeSpecificMonthSummary>();
this.viewPointId = node.getViewPoint();
}
/**
* 针对节点、汇总类型,进行数据汇总。<br/>
* 已汇总数据加载使用lazy load模式,只有此次汇总过程中触发的数据节点统计数量,才会被读取<br/>
*
* @param treeId
* @param node
* @param summaryType
* @param summaryDate
* @throws IOException
*/
public void summary(String treeId, ChainNode node, SummaryType summaryType,
String summaryDate) throws IOException {
Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date(node.getStartDate()));
switch (summaryType) {
case HOUR: {
summaryMinResult(treeId, node, calendar, summaryDate);
break;
}
case DAY: {
summaryHourResult(treeId, node, calendar, summaryDate);
break;
}
case MONTH: {
summaryDayResult(treeId, node, calendar, summaryDate);
break;
}
case YEAR: {
summaryMonthResult(treeId, node, calendar, summaryDate);
break;
}
default: {
logger.error("unknown summary type :{}", summaryType);
break;
}
}
}
private void summaryMonthResult(String treeId, ChainNode node,
Calendar calendar, String summaryDate) throws IOException {
String keyOfMonthSummaryTable = generateRowKey(treeId, summaryDate);
ChainNodeSpecificMonthSummary monthSummary = chainNodeSpecificMonthSummaryContainer
.get(keyOfMonthSummaryTable);
if (monthSummary == null) {
if (Config.AnalysisServer.IS_ACCUMULATE_MODE) {
monthSummary = HBaseUtil.loadSpecificMonthSummary(
keyOfMonthSummaryTable, getTreeNodeId());
} else {
monthSummary = new ChainNodeSpecificMonthSummary();
}
chainNodeSpecificMonthSummaryContainer.put(keyOfMonthSummaryTable,
monthSummary);
}
monthSummary.summary(String.valueOf(calendar.get(Calendar.MONTH) + 1),
node);
}
private void summaryDayResult(String treeId, ChainNode node,
Calendar calendar, String summaryDate) throws IOException {
String keyOfDaySummaryTable = generateRowKey(treeId, summaryDate);
ChainNodeSpecificDaySummary daySummary = chainNodeSpecificDaySummaryContainer
.get(keyOfDaySummaryTable);
if (daySummary == null) {
if (Config.AnalysisServer.IS_ACCUMULATE_MODE) {
daySummary = HBaseUtil.loadSpecificDaySummary(
keyOfDaySummaryTable, getTreeNodeId());
} else {
daySummary = new ChainNodeSpecificDaySummary();
}
chainNodeSpecificDaySummaryContainer.put(keyOfDaySummaryTable,
daySummary);
}
daySummary.summary(String.valueOf(calendar.get(Calendar.DAY_OF_MONTH)),
node);
}
private void summaryHourResult(String treeId, ChainNode node,
Calendar calendar, String summaryDate) throws IOException {
String keyOfHourSummaryTable = generateRowKey(treeId, summaryDate);
ChainNodeSpecificHourSummary hourSummary = chainNodeSpecificHourSummaryContainer
.get(keyOfHourSummaryTable);
if (hourSummary == null) {
if (Config.AnalysisServer.IS_ACCUMULATE_MODE) {
hourSummary = HBaseUtil.loadSpecificHourSummary(
keyOfHourSummaryTable, getTreeNodeId());
} else {
hourSummary = new ChainNodeSpecificHourSummary();
}
chainNodeSpecificHourSummaryContainer.put(keyOfHourSummaryTable,
hourSummary);
}
hourSummary.summary(String.valueOf(calendar.get(Calendar.HOUR)), node);
}
/**
* 按分钟维度进行汇总<br/>
* chainNodeContainer以treeId和时间(精确到分钟)为key,value为当前时间范围内的所有分钟的汇总数据
*/
private void summaryMinResult(String treeId, ChainNode node,
Calendar calendar, String summaryDate) throws IOException {
String keyOfMinSummaryTable = generateRowKey(treeId, summaryDate);
ChainNodeSpecificMinSummary minSummary = chainNodeSpecificMinSummaryContainer
.get(keyOfMinSummaryTable);
if (minSummary == null) {
if (Config.AnalysisServer.IS_ACCUMULATE_MODE) {
minSummary = HBaseUtil.loadSpecificMinSummary(
keyOfMinSummaryTable, getTreeNodeId());
} else {
minSummary = new ChainNodeSpecificMinSummary();
}
chainNodeSpecificMinSummaryContainer.put(keyOfMinSummaryTable,
minSummary);
}
minSummary.summary(String.valueOf(calendar.get(Calendar.MINUTE)), node);
}
private String generateRowKey(String treeId, String dateKey) {
return treeId + "/" + dateKey;
}
@Override
public String toString() {
return new GsonBuilder().excludeFieldsWithoutExposeAnnotation()
.create().toJson(this);
}
/**
* 存储入库时 <br/>
* hbase的key 为 treeId + 小时 <br/>
* 列族中,列为节点id,规则为:traceLevelId + "@" + viewPointId <br/>
* 列的值,为当前节点按小时内各分钟的汇总 <br/>
*
* @throws IOException
* @throws InterruptedException
*/
public void saveSummaryResultToHBase(SummaryType summaryType)
throws IOException, InterruptedException {
switch (summaryType) {
case HOUR: {
batchSaveMinSummaryResult();
break;
}
case DAY: {
batchSaveHourSummaryResult();
break;
}
case MONTH: {
batchSaveDaySummaryResult();
break;
}
case YEAR: {
batchSaveMonthSummaryResult();
break;
}
default: {
logger.error("unknown summary type :{}", summaryType);
break;
}
}
}
private void batchSaveMonthSummaryResult() throws IOException,
InterruptedException {
List<Put> puts = new ArrayList<Put>();
for (Map.Entry<String, ChainNodeSpecificMonthSummary> entry : chainNodeSpecificMonthSummaryContainer
.entrySet()) {
Put put = new Put(entry.getKey().getBytes());
put.addColumn(
HBaseTableMetaData.TABLE_CHAIN_ONE_MONTH_SUMMARY.COLUMN_FAMILY_NAME
.getBytes(), getTreeNodeId().getBytes(), entry
.getValue().toString().getBytes());
puts.add(put);
}
HBaseUtil.batchSaveMonthSummaryResult(puts);
}
private void batchSaveDaySummaryResult() throws IOException,
InterruptedException {
List<Put> puts = new ArrayList<Put>();
for (Map.Entry<String, ChainNodeSpecificDaySummary> entry : chainNodeSpecificDaySummaryContainer
.entrySet()) {
Put put = new Put(entry.getKey().getBytes());
put.addColumn(
HBaseTableMetaData.TABLE_CHAIN_ONE_DAY_SUMMARY.COLUMN_FAMILY_NAME
.getBytes(), getTreeNodeId().getBytes(), entry
.getValue().toString().getBytes());
puts.add(put);
}
HBaseUtil.batchSaveDaySummaryResult(puts);
}
private void batchSaveHourSummaryResult() throws IOException,
InterruptedException {
List<Put> puts = new ArrayList<Put>();
for (Map.Entry<String, ChainNodeSpecificHourSummary> entry : chainNodeSpecificHourSummaryContainer
.entrySet()) {
Put put = new Put(entry.getKey().getBytes());
put.addColumn(
HBaseTableMetaData.TABLE_CHAIN_ONE_HOUR_SUMMARY.COLUMN_FAMILY_NAME
.getBytes(), getTreeNodeId().getBytes(), entry
.getValue().toString().getBytes());
puts.add(put);
}
HBaseUtil.batchSaveHourSummaryResult(puts);
}
private void batchSaveMinSummaryResult() throws IOException,
InterruptedException {
List<Put> puts = new ArrayList<Put>();
for (Map.Entry<String, ChainNodeSpecificMinSummary> entry : chainNodeSpecificMinSummaryContainer
.entrySet()) {
Put put = new Put(entry.getKey().getBytes());
put.addColumn(
HBaseTableMetaData.TABLE_CHAIN_ONE_MINUTE_SUMMARY.COLUMN_FAMILY_NAME
.getBytes(), getTreeNodeId().getBytes(), entry
.getValue().toString().getBytes());
puts.add(put);
}
HBaseUtil.batchSaveMinSummaryResult(puts);
}
public String getTreeNodeId() {
return traceLevelId + "@" + viewPointId;
}
}
......@@ -10,25 +10,25 @@ import java.io.IOException;
import java.sql.SQLException;
import java.util.*;
public class SpecificTimeCallTreeMergedChainIdContainer {
public class SpecificTimeCallChainTreeContainer {
private String treeToken;
private Map<String, List<String>> hasBeenMergedChainIds;
private Map<String, CallChainDetailForMysql> callChainDetailMap;
private Map<String, CallChainDetailForMysql> newChain4DB;
// 本次Reduce合并过的调用链
private Map<String, ChainInfo> combineChains;
private Map<String, ChainInfo> newChains;
public SpecificTimeCallTreeMergedChainIdContainer(String treeToken) {
public SpecificTimeCallChainTreeContainer(String treeToken) {
this.treeToken = treeToken;
hasBeenMergedChainIds = new HashMap<String, List<String>>();
combineChains = new HashMap<String, ChainInfo>();
callChainDetailMap = new HashMap<String, CallChainDetailForMysql>();
newChains = new HashMap<String, ChainInfo>();
newChain4DB = new HashMap<String, CallChainDetailForMysql>();
}
public void addMergedChainIfNotContain(ChainInfo chainInfo) throws IOException {
public void addChainIfNew(ChainInfo chainInfo) throws IOException {
String key = generateKey(chainInfo.getStartDate());
List<String> cIds = hasBeenMergedChainIds.get(key);
if (cIds == null) {
......@@ -38,11 +38,10 @@ public class SpecificTimeCallTreeMergedChainIdContainer {
if (!cIds.contains(chainInfo.getCID())) {
cIds.add(chainInfo.getCID());
combineChains.put(chainInfo.getCID(), chainInfo);
newChains.put(chainInfo.getCID(), chainInfo);
//
if (chainInfo.getChainStatus() == ChainInfo.ChainStatus.NORMAL) {
callChainDetailMap.put(chainInfo.getCID(), new CallChainDetailForMysql(chainInfo, treeToken));
newChain4DB.put(chainInfo.getCID(), new CallChainDetailForMysql(chainInfo, treeToken));
}
}
}
......@@ -55,13 +54,13 @@ public class SpecificTimeCallTreeMergedChainIdContainer {
}
public void saveToHBase() throws IOException, InterruptedException, SQLException {
batchSaveCurrentHasBeenMergedChainInfo();
batchSaveNewChainsInfo();
batchSaveMergedChainId();
batchSaveToMysql();
}
private void batchSaveToMysql() throws SQLException {
for (Map.Entry<String, CallChainDetailForMysql> entry : callChainDetailMap.entrySet()) {
for (Map.Entry<String, CallChainDetailForMysql> entry : newChain4DB.entrySet()) {
entry.getValue().saveToMysql();
}
}
......@@ -90,9 +89,9 @@ public class SpecificTimeCallTreeMergedChainIdContainer {
* @throws IOException
* @throws InterruptedException
*/
private void batchSaveCurrentHasBeenMergedChainInfo() throws IOException, InterruptedException {
private void batchSaveNewChainsInfo() throws IOException, InterruptedException {
List<Put> chainInfoPuts = new ArrayList<Put>();
for (Map.Entry<String, ChainInfo> entry : combineChains.entrySet()) {
for (Map.Entry<String, ChainInfo> entry : newChains.entrySet()) {
Put put = new Put(entry.getKey().getBytes());
entry.getValue().saveToHBase(put);
chainInfoPuts.add(put);
......
package com.ai.cloud.skywalking.analysis.chainbuild.po;
import com.ai.cloud.skywalking.analysis.chainbuild.action.ISummaryAction;
import com.ai.cloud.skywalking.analysis.chainbuild.action.impl.DateDetailSummaryAction;
import com.ai.cloud.skywalking.analysis.chainbuild.action.impl.RelationShipAction;
import com.ai.cloud.skywalking.analysis.chainbuild.action.IStatisticsAction;
import com.ai.cloud.skywalking.analysis.chainbuild.action.impl.NumberOfCalledStatisticsAction;
import com.ai.cloud.skywalking.analysis.chainbuild.action.impl.CallChainRelationshipAction;
import java.io.IOException;
......@@ -19,7 +19,7 @@ public enum SummaryType {
return value;
}
public static ISummaryAction chooseSummaryAction(String summaryTypeAndDateStr, String entryKey) throws IOException {
public static IStatisticsAction chooseSummaryAction(String summaryTypeAndDateStr, String entryKey) throws IOException {
char valueChar = summaryTypeAndDateStr.charAt(0);
// HOUR : 2016-05-02/12
// DAY : 2016-05-02
......@@ -42,11 +42,11 @@ public enum SummaryType {
type = YEAR;
break;
case 'R':
return new RelationShipAction(entryKey);
return new CallChainRelationshipAction(entryKey);
default:
throw new RuntimeException("Can not find the summary type[" + valueChar + "]");
}
DateDetailSummaryAction summaryAction = new DateDetailSummaryAction(entryKey, summaryDateStr);
NumberOfCalledStatisticsAction summaryAction = new NumberOfCalledStatisticsAction(entryKey, summaryDateStr);
summaryAction.setSummaryType(type);
return summaryAction;
}
......
......@@ -3,7 +3,7 @@ package com.ai.cloud.skywalking.analysis.mapper;
import com.ai.cloud.skywalking.analysis.chainbuild.ChainBuildMapper;
import com.ai.cloud.skywalking.analysis.chainbuild.ChainBuildReducer;
import com.ai.cloud.skywalking.analysis.chainbuild.action.ISummaryAction;
import com.ai.cloud.skywalking.analysis.chainbuild.action.IStatisticsAction;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainInfo;
import com.ai.cloud.skywalking.analysis.chainbuild.po.SummaryType;
import com.ai.cloud.skywalking.analysis.chainbuild.util.TokenGenerator;
......@@ -62,9 +62,9 @@ public class CallChainMapperTest {
String summaryTypeAndDateStr = reduceKey.substring(0, index - 1);
String entryKey = reduceKey.substring(index + 1);
ISummaryAction summaryAction = SummaryType.chooseSummaryAction(summaryTypeAndDateStr, entryKey);
IStatisticsAction summaryAction = SummaryType.chooseSummaryAction(summaryTypeAndDateStr, entryKey);
new ChainBuildReducer().doReduceAction(summaryAction, chainNodeInfo.iterator());
new ChainBuildReducer().doReduceAction(reduceKey, summaryAction, chainNodeInfo.iterator());
}
public static List<Span> selectByTraceId(String traceId) throws IOException {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册