提交 1dd34903 编写于 作者: wu-sheng's avatar wu-sheng

1.提供大量修改方法,以提高mr任务的效率,降低处理复杂度和数据量

2.需要大规模降低对象符合关系,降低GC压力和对象创建压力
3.通过更细粒度的任务,提高mr进度的精度,并大幅度减少内存需求
上级 c7caebe2
......@@ -68,6 +68,22 @@ public class ChainBuildMapper extends TableMapper<Text, Text> {
if (chainInfo.getCallEntrance() != null && chainInfo.getCallEntrance().length() > 0) {
for (ChainNode chainNode : chainInfo.getNodes()) {
/**
* TODO: 进一步提高运行速度所需的性能提升
* 此处修改原因,
* 1.更细粒度划分reduce任务,提高性能。
* 2.减少数据传输量,以及处理复杂度。
* 3.请避免使用gson序列化,提高程序处理性能
*
* hour/day/month/year,
* key 修改为:类型+时间字符+callEntrance+levelId+viewpoint,
* value 为ChainNodeSpecificTimeWindowSummaryValue中所需的明确的值组成的简单串
* value包含:
* 1.是否正确调用,由NodeStatus获取,值为N/A/I
* 2.调用所需时间,由cost获取
*
*/
context.write(new Text(SummaryType.HOUR.getValue() + "-" + hourSimpleDateFormat.format(
new Date(chainNode.getStartDate())
) + ":" + chainInfo.getCallEntrance()), new Text(new Gson().toJson(chainNode)));
......@@ -84,6 +100,17 @@ public class ChainBuildMapper extends TableMapper<Text, Text> {
new Date(chainNode.getStartDate())
) + ":" + chainInfo.getCallEntrance()), new Text(new Gson().toJson(chainNode)));
}
/**
* TODO:通过对本地的调用链进行缓存,每个map任务中的调用链,在一个JVM内只会被传递一次,大幅度降低reduce任务的数据量。
*
* 1.使用静态变量,缓存MAP中的调用链。每个典型调用链ID只传递一次
* 2.注意缓存需要限制容量,初期规划,缓存1W个典型调用链KEY(可通过配置扩展)。仅缓存典型调用链ID,非链路明细。
* 3.对于节点数量大于2K条的调用量,暂不进行关系处理
*
* 注意:CallChainRelationshipAction暂时不做修改,此处的修改会大规模降低reduce的处理数据量,提高总体运行速度
*
*/
// Reduce key : R-CallEntrance
context.write(new Text(SummaryType.RELATIONSHIP + "-" + TokenGenerator.generateTreeToken(chainInfo.getCallEntrance())
+ ":" + chainInfo.getCallEntrance()),
......
package com.ai.cloud.skywalking.analysis.chainbuild.action.impl;
import java.io.IOException;
import java.sql.SQLException;
import com.ai.cloud.skywalking.analysis.chainbuild.action.IStatisticsAction;
import com.ai.cloud.skywalking.analysis.chainbuild.entity.CallChainTree;
import com.ai.cloud.skywalking.analysis.chainbuild.entity.CallChainTreeNode;
import com.ai.cloud.skywalking.analysis.chainbuild.entity.ChainNodeSpecificTimeWindowSummaryValue;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainNode;
import com.ai.cloud.skywalking.analysis.chainbuild.po.SummaryType;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.google.gson.Gson;
import java.io.IOException;
import java.sql.SQLException;
public class NumberOfCalledStatisticsAction implements IStatisticsAction {
private CallChainTree callChainTree;
private String summaryDate;
private SummaryType summaryType;
/**
* 统计任务的主要存储值
*
* TODO:配置本次修改
*/
private ChainNodeSpecificTimeWindowSummaryValue summaryValue = new ChainNodeSpecificTimeWindowSummaryValue();
public NumberOfCalledStatisticsAction(String entryKey, String summaryDate) throws IOException {
callChainTree = CallChainTree.create(entryKey);
......@@ -22,6 +31,21 @@ public class NumberOfCalledStatisticsAction implements IStatisticsAction {
@Override
public void doAction(String summaryData) throws IOException {
/**
* TODO:根据MAP的优化,大幅度减少对象逻辑复杂度
*
* 每次传递来的数据,根据NodeStatus的值在内存中快速累加到ChainNodeSpecificTimeWindowSummaryValue中
*
* ChainNodeSpecificTimeWindowSummaryValue执行以下逻辑:
* 1.totalCall默认+1
* 2.totalCostTime根据cost累加
* 3.correctNumber和humanInterruptionNumber根据状态累加
*
* 注意:
* 1.减少对象创建和嵌套
* 2.此过程中不要操作任何hbase数据
*/
ChainNode chainNode = new Gson().fromJson(summaryData, ChainNode.class);
CallChainTreeNode newCallChainTreeNode = new CallChainTreeNode(chainNode);
CallChainTreeNode callChainTreeNode = callChainTree.getNodes().get(newCallChainTreeNode.getTreeNodeId());
......@@ -34,6 +58,16 @@ public class NumberOfCalledStatisticsAction implements IStatisticsAction {
@Override
public void doSave() throws InterruptedException, SQLException, IOException {
/**
* TODO:新的写入逻辑,快速完成
*
* 根据Config.AnalysisServer.IS_ACCUMULATE_MODE决定,在需要时,精确读取指定rowkey的指定列值。
* 和ChainNodeSpecificTimeWindowSummaryValue值进行简单相加,然后汇总写入。
*
* 注意:
* 1.减少对象创建和嵌套
* 2。确保任务的告诉高效完成
*/
callChainTree.saveToHBase(summaryType);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册