提交 97adb363 编写于 作者: Z zhang.xin

1.完成CallChain的数据存入Mysql表中

上级 83659644
...@@ -53,7 +53,11 @@ public class ChainBuildReducer extends Reducer<Text, Text, Text, IntWritable> { ...@@ -53,7 +53,11 @@ public class ChainBuildReducer extends Reducer<Text, Text, Text, IntWritable> {
+ callChainData, e); + callChainData, e);
} }
} }
container.saveToHBase(); try {
chainTree.saveToHbase(); container.saveToHBase();
chainTree.saveToHbase();
} catch (Exception e) {
logger.error("Failed to save summaryresult/chainTree.", e);
}
} }
} }
package com.ai.cloud.skywalking.analysis.chainbuild; package com.ai.cloud.skywalking.analysis.chainbuild;
import com.ai.cloud.skywalking.analysis.chainbuild.entity.CallChainDetail; import com.ai.cloud.skywalking.analysis.chainbuild.entity.CallChainDetailForMysql;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainNode; import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainNode;
import com.ai.cloud.skywalking.analysis.config.Config; import com.ai.cloud.skywalking.analysis.config.Config;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -30,16 +30,16 @@ public class DBCallChainInfoDao { ...@@ -30,16 +30,16 @@ public class DBCallChainInfoDao {
} }
} }
public synchronized static void saveChainDetail(CallChainDetail callChainDetail) public synchronized static void saveChainDetail(CallChainDetailForMysql callChainDetailForMysql)
throws SQLException { throws SQLException {
PreparedStatement preparedStatement = null; PreparedStatement preparedStatement = null;
try { try {
preparedStatement = connection preparedStatement = connection
.prepareStatement("INSERT INTO sw_chain_detail(cid,uid,traceLevelId,viewpoint,create_time)" .prepareStatement("INSERT INTO sw_chain_detail(cid,uid,traceLevelId,viewpoint,create_time)"
+ " VALUES(?,?,?,?,?)"); + " VALUES(?,?,?,?,?)");
for (ChainNode chainNode : callChainDetail.getChainNodes()) { for (ChainNode chainNode : callChainDetailForMysql.getChainNodes()) {
preparedStatement.setString(1, callChainDetail.getChainToken()); preparedStatement.setString(1, callChainDetailForMysql.getChainToken());
preparedStatement.setString(2, callChainDetail.getUserId()); preparedStatement.setString(2, callChainDetailForMysql.getUserId());
preparedStatement.setString(3, chainNode.getTraceLevelId()); preparedStatement.setString(3, chainNode.getTraceLevelId());
preparedStatement.setString(4, chainNode.getViewPoint() + ":" preparedStatement.setString(4, chainNode.getViewPoint() + ":"
+ chainNode.getBusinessKey()); + chainNode.getBusinessKey());
...@@ -51,32 +51,7 @@ public class DBCallChainInfoDao { ...@@ -51,32 +51,7 @@ public class DBCallChainInfoDao {
for (int i : result) { for (int i : result) {
if (i != 1) { if (i != 1) {
logger.error("Failed to save chain detail [" logger.error("Failed to save chain detail ["
+ callChainDetail.getChainToken() + "]"); + callChainDetailForMysql.getChainToken() + "]");
}
}
} finally {
if (preparedStatement != null)
preparedStatement.close();
}
connection.commit();
}
public synchronized static void updateChainLastActiveTime(Map<String, Timestamp> updateChainInfo)
throws SQLException {
PreparedStatement preparedStatement = null;
try {
preparedStatement = connection
.prepareStatement("UPDATE sw_chain_detail SET update_time = ? WHERE cid = ?");
for (Map.Entry<String, Timestamp> entry : updateChainInfo
.entrySet()) {
preparedStatement.setTimestamp(1, entry.getValue());
preparedStatement.setString(2, entry.getKey());
preparedStatement.addBatch();
}
int[] result = preparedStatement.executeBatch();
for (int i : result) {
if (i != 1) {
logger.error("Failed to update chain detail");
} }
} }
} finally { } finally {
......
...@@ -12,20 +12,17 @@ import java.util.Collection; ...@@ -12,20 +12,17 @@ import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
public class CallChainDetail { public class CallChainDetailForMysql {
private boolean isNormal = true;
private String chainToken; private String chainToken;
private Map<String, ChainNode> chainNodeMap = new HashMap<String, ChainNode>(); private Map<String, ChainNode> chainNodeMap = new HashMap<String, ChainNode>();
private String userId; private String userId;
public CallChainDetail(ChainInfo chainInfo, boolean isNormal) { public CallChainDetailForMysql(ChainInfo chainInfo) {
chainToken = chainInfo.getCID(); chainToken = chainInfo.getCID();
for (ChainNode chainNode : chainInfo.getNodes()) { for (ChainNode chainNode : chainInfo.getNodes()) {
chainNodeMap.put(chainNode.getTraceLevelId(), chainNode); chainNodeMap.put(chainNode.getTraceLevelId(), chainNode);
} }
userId = chainInfo.getUserId(); userId = chainInfo.getUserId();
this.isNormal = isNormal;
} }
@Override @Override
...@@ -33,14 +30,8 @@ public class CallChainDetail { ...@@ -33,14 +30,8 @@ public class CallChainDetail {
return new Gson().toJson(this); return new Gson().toJson(this);
} }
public void save(Put put) throws SQLException { public void saveToMysql() throws SQLException {
for (Map.Entry<String, ChainNode> entry : chainNodeMap.entrySet()){ DBCallChainInfoDao.saveChainDetail(this);
put.addColumn(HBaseTableMetaData.TABLE_CHAIN_DETAIL.COLUMN_FAMILY_NAME.getBytes(),entry.getKey().getBytes(),
entry.getValue().toString().getBytes());
}
if (isNormal) {
DBCallChainInfoDao.saveChainDetail(this);
}
} }
public Collection<ChainNode> getChainNodes() { public Collection<ChainNode> getChainNodes() {
......
package com.ai.cloud.skywalking.analysis.chainbuild.po; package com.ai.cloud.skywalking.analysis.chainbuild.po;
import com.ai.cloud.skywalking.analysis.chainbuild.entity.CallChainDetailForMysql;
import com.ai.cloud.skywalking.analysis.chainbuild.util.HBaseUtil; import com.ai.cloud.skywalking.analysis.chainbuild.util.HBaseUtil;
import com.ai.cloud.skywalking.analysis.config.HBaseTableMetaData; import com.ai.cloud.skywalking.analysis.config.HBaseTableMetaData;
import com.google.gson.Gson; import com.google.gson.Gson;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import java.io.IOException; import java.io.IOException;
import java.sql.SQLException;
import java.util.*; import java.util.*;
public class SpecificTimeCallTreeMergedChainIdContainer { public class SpecificTimeCallTreeMergedChainIdContainer {
...@@ -14,6 +16,8 @@ public class SpecificTimeCallTreeMergedChainIdContainer { ...@@ -14,6 +16,8 @@ public class SpecificTimeCallTreeMergedChainIdContainer {
private Map<String, List<String>> hasBeenMergedChainIds; private Map<String, List<String>> hasBeenMergedChainIds;
private Map<String, CallChainDetailForMysql> callChainDetailMap;
// 本次Reduce合并过的调用链 // 本次Reduce合并过的调用链
private Map<String, ChainInfo> combineChains; private Map<String, ChainInfo> combineChains;
...@@ -21,6 +25,7 @@ public class SpecificTimeCallTreeMergedChainIdContainer { ...@@ -21,6 +25,7 @@ public class SpecificTimeCallTreeMergedChainIdContainer {
this.treeToken = treeToken; this.treeToken = treeToken;
hasBeenMergedChainIds = new HashMap<String, List<String>>(); hasBeenMergedChainIds = new HashMap<String, List<String>>();
combineChains = new HashMap<String, ChainInfo>(); combineChains = new HashMap<String, ChainInfo>();
callChainDetailMap = new HashMap<String, CallChainDetailForMysql>();
} }
public void addMergedChainIfNotContain(ChainInfo chainInfo) throws IOException { public void addMergedChainIfNotContain(ChainInfo chainInfo) throws IOException {
...@@ -34,6 +39,13 @@ public class SpecificTimeCallTreeMergedChainIdContainer { ...@@ -34,6 +39,13 @@ public class SpecificTimeCallTreeMergedChainIdContainer {
if (!cIds.contains(chainInfo.getCID())) { if (!cIds.contains(chainInfo.getCID())) {
cIds.add(chainInfo.getCID()); cIds.add(chainInfo.getCID());
combineChains.put(chainInfo.getCID(), chainInfo); combineChains.put(chainInfo.getCID(), chainInfo);
//
if (chainInfo.getChainStatus() == ChainInfo.ChainStatus.NORMAL) {
callChainDetailMap.put(chainInfo.getCID(), new CallChainDetailForMysql(chainInfo));
}
}else{
} }
} }
...@@ -44,14 +56,21 @@ public class SpecificTimeCallTreeMergedChainIdContainer { ...@@ -44,14 +56,21 @@ public class SpecificTimeCallTreeMergedChainIdContainer {
return treeToken + "@" + calendar.get(Calendar.YEAR) + "-" + calendar.get(Calendar.MONTH); return treeToken + "@" + calendar.get(Calendar.YEAR) + "-" + calendar.get(Calendar.MONTH);
} }
public void saveToHBase() throws IOException, InterruptedException { public void saveToHBase() throws IOException, InterruptedException, SQLException {
batchSaveCurrentHasBeenMergedChainInfo(); batchSaveCurrentHasBeenMergedChainInfo();
batchSaveMergedChainId(); batchSaveMergedChainId();
batchSaveToMysql();
}
private void batchSaveToMysql() throws SQLException {
for (Map.Entry<String, CallChainDetailForMysql> entry : callChainDetailMap.entrySet()){
entry.getValue().saveToMysql();
}
} }
/** /**
* 保存被合并的cid信息列表 * 保存被合并的cid信息列表
* *
* @throws IOException * @throws IOException
* @throws InterruptedException * @throws InterruptedException
*/ */
...@@ -69,7 +88,7 @@ public class SpecificTimeCallTreeMergedChainIdContainer { ...@@ -69,7 +88,7 @@ public class SpecificTimeCallTreeMergedChainIdContainer {
/** /**
* 保存已经合并的调用链信息,包含调用链明细 * 保存已经合并的调用链信息,包含调用链明细
* *
* @throws IOException * @throws IOException
* @throws InterruptedException * @throws InterruptedException
*/ */
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册