提交 c7caebe2 编写于 作者: A ascrutae

1.修复无法出统计结果的bug

2. 新增测试类
上级 d2dd8004
package com.ai.cloud.skywalking.analysis.chainbuild;
import java.io.IOException;
import java.util.Iterator;
import com.ai.cloud.skywalking.analysis.chainbuild.action.IStatisticsAction;
import com.ai.cloud.skywalking.analysis.chainbuild.po.SummaryType;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
......@@ -10,65 +11,64 @@ import org.apache.hadoop.mapreduce.Reducer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.ai.cloud.skywalking.analysis.chainbuild.action.IStatisticsAction;
import com.ai.cloud.skywalking.analysis.chainbuild.po.SummaryType;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;
import java.io.IOException;
import java.util.Iterator;
public class ChainBuildReducer extends Reducer<Text, Text, Text, IntWritable> {
private Logger logger = LogManager.getLogger(ChainBuildReducer.class);
private Logger logger = LogManager.getLogger(ChainBuildReducer.class);
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
ConfigInitializer.initialize();
Config.AnalysisServer.IS_ACCUMULATE_MODE = Boolean.parseBoolean(context
.getConfiguration().get("skywalking.analysis.mode", "false"));
logger.info("Skywalking analysis mode :[{}]",
Config.AnalysisServer.IS_ACCUMULATE_MODE ? "ACCUMULATE"
: "REWRITE");
}
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String reduceKey = key.toString();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
ConfigInitializer.initialize();
Config.AnalysisServer.IS_ACCUMULATE_MODE = Boolean.parseBoolean(context
.getConfiguration().get("skywalking.analysis.mode", "false"));
logger.info("Skywalking analysis mode :[{}]",
Config.AnalysisServer.IS_ACCUMULATE_MODE ? "ACCUMULATE"
: "REWRITE");
}
int index = reduceKey.indexOf(":");
if (index == -1) {
return;
}
String summaryTypeAndDateStr = reduceKey.substring(0, index);
String entryKey = reduceKey.substring(index + 1);
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String reduceKey = Bytes.toString(key.getBytes());
int index = reduceKey.indexOf(":");
if (index == -1) {
return;
}
String summaryTypeAndDateStr = reduceKey.substring(0, index);
String entryKey = reduceKey.substring(index + 1);
logger.debug("begin to reduce for key: {}", reduceKey);
IStatisticsAction summaryAction = SummaryType.chooseSummaryAction(
summaryTypeAndDateStr, entryKey);
doReduceAction(reduceKey, summaryAction, values.iterator());
}
IStatisticsAction summaryAction = SummaryType.chooseSummaryAction(
summaryTypeAndDateStr, entryKey);
doReduceAction(entryKey, summaryAction, values.iterator());
}
public void doReduceAction(String reduceKey, IStatisticsAction summaryAction,
Iterator<Text> iterator) {
long dataCounter = 0;
while (iterator.hasNext()) {
String summaryData = iterator.next().toString();
try {
summaryAction.doAction(summaryData);
} catch (Exception e) {
logger.error(
"Failed to summary call chain, maybe illegal data:"
+ summaryData, e);
}finally{
dataCounter++;
}
if(dataCounter % 1000 == 0){
logger.debug("reduce for key: {}, count: {}", reduceKey, dataCounter);
}
}
public void doReduceAction(String reduceKey, IStatisticsAction summaryAction,
Iterator<Text> iterator) {
long dataCounter = 0;
while (iterator.hasNext()) {
String summaryData = iterator.next().toString();
try {
summaryAction.doAction(summaryData);
} catch (Exception e) {
logger.error(
"Failed to summary call chain, maybe illegal data:"
+ summaryData, e);
} finally {
dataCounter++;
}
if (dataCounter % 1000 == 0) {
logger.debug("reduce for key: {}, count: {}", reduceKey, dataCounter);
}
}
try {
summaryAction.doSave();
} catch (Exception e) {
logger.error("Failed to save summaryresult/chainTree.", e);
}
}
try {
summaryAction.doSave();
} catch (Exception e) {
e.printStackTrace();
logger.error("Failed to save summaryresult/chainTree.", e);
}
}
}
......@@ -26,24 +26,23 @@ public class TokenGenerator {
}
private static String generate(String originData) {
// StringBuilder result = new StringBuilder();
// if (originData != null) {
// try {
// MessageDigest md = MessageDigest.getInstance("MD5");
// byte bytes[] = md.digest(originData.getBytes());
// for (int i = 0; i < bytes.length; i++) {
// String str = Integer.toHexString(bytes[i] & 0xFF);
// if (str.length() == 1) {
// str += "F";
// }
// result.append(str);
// }
// } catch (NoSuchAlgorithmException e) {
// logger.error("Cannot found algorithm.", e);
// System.exit(-1);
// }
// }
// return result.toString().toUpperCase();
return originData;
StringBuilder result = new StringBuilder();
if (originData != null) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
byte bytes[] = md.digest(originData.getBytes());
for (int i = 0; i < bytes.length; i++) {
String str = Integer.toHexString(bytes[i] & 0xFF);
if (str.length() == 1) {
str += "F";
}
result.append(str);
}
} catch (NoSuchAlgorithmException e) {
logger.error("Cannot found algorithm.", e);
System.exit(-1);
}
}
return result.toString().toUpperCase();
}
}
\ No newline at end of file
......@@ -8,6 +8,8 @@ public class HBaseTableMetaData {
*/
public final static class TABLE_CALL_CHAIN {
public static final String TABLE_NAME = "sw-call-chain";
public static final String FAMILY_NAME = "call-chain";
}
......
......@@ -10,4 +10,26 @@ scan 'sw-chain-info', { FILTER
=>SingleColumnValueFilter.new(Bytes.toBytes('trace_info'),Bytes.toBytes('cid'),
CompareFilter::CompareOp.valueOf('EQUAL'),
BinaryComparator.new(Bytes.toBytes('CID_B4CAEE3953B3C0DBD7A43656B2EEE345')))}
```
* 限制查询某张表的条数
**以下只返回10条记录**
```java
scan "sw-treeId-cid-mapping",{LIMIT =>10}
```
* 根据TreeId查询某个月份合并过的CID
**以下是查询2016年4月份TREE_ID_BF32F7D57F6E69912C8AF2487F1FFCE1合并过的CID**
```java
get "sw-treeId-cid-mapping","TREE_ID_354AF3F3F58CE4AECFCC7782A188396B@2016-3"
```
* 根据CID查询所有归属TraceId
```java
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter. Filter;
scan 'sw-traceId-cid-mapping', { FILTER =>SingleColumnValueFilter.new(Bytes.toBytes('cid'),Bytes.toBytes('cid'), CompareFilter::CompareOp.valueOf('EQUAL'),BinaryComparator.new(Bytes.toBytes('CID_A4AD2F204E3DA016BC455AB95FAEFFF9')))}
```
\ No newline at end of file
......@@ -2,23 +2,15 @@ package com.ai.cloud.skywalking.analysis.mapper;
import com.ai.cloud.skywalking.analysis.chainbuild.ChainBuildMapper;
import com.ai.cloud.skywalking.analysis.chainbuild.ChainBuildReducer;
import com.ai.cloud.skywalking.analysis.chainbuild.action.IStatisticsAction;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainInfo;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainNode;
import com.ai.cloud.skywalking.analysis.chainbuild.po.SummaryType;
import com.ai.cloud.skywalking.analysis.chainbuild.util.TokenGenerator;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;
import com.ai.cloud.skywalking.analysis.config.HBaseTableMetaData;
import com.ai.cloud.skywalking.analysis.mapper.util.HBaseUtils;
import com.ai.cloud.skywalking.protocol.Span;
import com.google.gson.Gson;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.junit.Test;
import java.io.IOException;
......@@ -26,30 +18,12 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* Created by astraea on 2016/1/15.
*/
public class CallChainMapperTest {
private static String ZK_QUORUM = "10.1.235.197,10.1.235.198,10.1.235.199";
// private static String ZK_QUORUM = "10.1.241.18,10.1.241.19,10.1.241.20";
private static String ZK_CLIENT_PORT = "29181";
private static String chain_Id = "1.0b.1461569643178.5b468e7.23292.118.68";
//private static String chain_Id = "1.0a2.1453429608422.2701d43.6468.56.1";
private static Configuration configuration = null;
private static Connection connection;
private static SimpleDateFormat hourSimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH");
private static SimpleDateFormat daySimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
private static SimpleDateFormat monthSimpleDateFormat = new SimpleDateFormat("yyyy-MM");
private static SimpleDateFormat yearSimpleDateFormat = new SimpleDateFormat("yyyy");
private static Connection connection = HBaseUtils.getConnection();
public static void main(String[] args) throws Exception {
ConfigInitializer.initialize();
initHBaseClient();
// 2016-04-22/23:57:03 to 2016-05-02/23:47:03
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd/HH:mm:ss");
Date startDate = simpleDateFormat.parse("2016-04-22/23:57:03");
Date endDate = simpleDateFormat.parse("2016-05-02/23:47:03");
......@@ -58,113 +32,9 @@ public class CallChainMapperTest {
Table table = connection.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_CALL_CHAIN.TABLE_NAME));
ResultScanner result = table.getScanner(scan);
int count = 0;
for (Result result1 : result){
for (Result result1 : result) {
count++;
}
System.out.println(count);
// List<Span> spanList = selectByTraceId(chain_Id);
// ChainInfo chainInfo = ChainBuildMapper.spanToChainInfo(chain_Id, spanList);
// List<Text> chainNodeInfo = new ArrayList<>();
// for (ChainNode chainNode : chainInfo.getNodes()) {
// List<Text> value1 = new ArrayList<Text>();
// Text key =new Text(SummaryType.YEAR.getValue() + "-" + yearSimpleDateFormat.format(
// new Date(chainNode.getStartDate())
// ) + ":" + chainInfo.getCallEntrance());
// value1.add(new Text(new Gson().toJson(chainNode)));
//
// String reduceKey = Bytes.toString(key.getBytes());
// int index = reduceKey.indexOf(":");
// if (index == -1) {
// return;
// }
//
// String summaryTypeAndDateStr = reduceKey.substring(0, index);
// String entryKey = reduceKey.substring(index + 1);
// IStatisticsAction summaryAction = SummaryType.chooseSummaryAction(summaryTypeAndDateStr, entryKey);
//
// new ChainBuildReducer().doReduceAction(reduceKey, summaryAction, value1.iterator());
// }
}
public static List<Span> selectByTraceId(String traceId) throws IOException {
List<Span> entries = new ArrayList<Span>();
Table table = connection.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_CALL_CHAIN.TABLE_NAME));
Get g = new Get(Bytes.toBytes(traceId));
Result r = table.get(g);
for (Cell cell : r.rawCells()) {
if (cell.getValueArray().length > 0)
entries.add(new Span(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())));
}
return entries;
}
@Test
public void validateSummaryResult() throws IOException, ParseException {
List<ChainInfo> chainInfoList = selectSpecificCallEntranceSpansOnSpecificCallTime("com.ai.aisse.core.service.impl.SynchAisseWorkDataServiceImpl.SynchAisseDataDel()", "2016-01-26/10:00:00", "2016-02-08/10:00:00");
System.out.println("size :" + chainInfoList.size());
Map<String, Integer> summaryResult = new HashMap<String, Integer>();
for (ChainInfo chainInfo : chainInfoList) {
String key = generateKey(chainInfo.getStartDate());
Integer total = summaryResult.get(key);
if (total == null) {
total = 0;
}
summaryResult.put(key, ++total);
}
for (Map.Entry<String, Integer> entry : summaryResult.entrySet()) {
System.out.println(entry.getKey() + " " + entry.getValue());
}
}
private String generateKey(long startDate) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date(startDate));
return calendar.get(Calendar.YEAR) + "-" + (calendar.get(Calendar.MONTH) + 1) + "-"
+ calendar.get(Calendar.DAY_OF_MONTH);
}
public static List<ChainInfo> selectSpecificCallEntranceSpansOnSpecificCallTime(String callEntrance, String startDate, String endDate) throws IOException, ParseException {
List<ChainInfo> chainInfos = new ArrayList<ChainInfo>();
Table table = connection.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_CALL_CHAIN.TABLE_NAME));
Scan scan = new Scan();
scan.setTimeRange(new SimpleDateFormat("yyyy-MM-dd/HH:mm:ss").parse(startDate).getTime(), new SimpleDateFormat("yyyy-MM-dd/HH:mm:ss").parse(endDate).getTime());
ResultScanner resultScanner = table.getScanner(scan);
Iterator<Result> resultIterator = resultScanner.iterator();
while (resultIterator.hasNext()) {
Result result = resultIterator.next();
List<Span> entries = new ArrayList<Span>();
for (Cell cell : result.rawCells()) {
if (cell.getValueArray().length > 0)
entries.add(new Span(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())));
}
ChainInfo chainInfo = null;
try {
chainInfo = ChainBuildMapper.spanToChainInfo(Bytes.toString(result.getRow()), entries);
} catch (Exception e) {
continue;
}
// System.out.println(chainInfo.getCallEntrance());
if (callEntrance.equalsIgnoreCase(chainInfo.getCallEntrance())) {
chainInfos.add(chainInfo);
}
}
return chainInfos;
}
public static void initHBaseClient() throws IOException {
if (configuration == null) {
configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", ZK_QUORUM);
configuration.set("hbase.zookeeper.property.clientPort", ZK_CLIENT_PORT);
configuration.set("hbase.rpc.timeout", "600000");
connection = ConnectionFactory.createConnection(configuration);
}
ConfigInitializer.initialize();
}
}
\ No newline at end of file
package com.ai.cloud.skywalking.analysis.mapper;
import com.ai.cloud.skywalking.analysis.chainbuild.ChainBuildMapper;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainInfo;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainNode;
import com.ai.cloud.skywalking.analysis.config.HBaseTableMetaData;
import com.ai.cloud.skywalking.analysis.mapper.util.HBaseUtils;
import com.ai.cloud.skywalking.protocol.Span;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ValidateMonthSummaryResult {
public static void main(String[] args) throws IOException {
Connection connection = HBaseUtils.getConnection();
Table table = connection.getTable(TableName.valueOf
(HBaseTableMetaData.TABLE_CALL_CHAIN.TABLE_NAME));
Scan scan = new Scan();
Filter filter = new SingleColumnValueFilter(HBaseTableMetaData.TABLE_CALL_CHAIN.FAMILY_NAME.getBytes(),
"0-S".getBytes(), CompareFilter.CompareOp.EQUAL,
"http://hire.asiainfo.com/Aisse-Mobile-Web/aisseWorkUser/queryProsonLoding".getBytes());
scan.setFilter(filter);
ResultScanner resultScanner = table.getScanner(scan);
List<ChainInfo> chainInfos = new ArrayList<ChainInfo>();
for (Result result : resultScanner) {
List<Span> spanList = new ArrayList<Span>();
for (Cell cell : result.rawCells()) {
spanList.add(new Span(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())));
}
chainInfos.add(ChainBuildMapper.spanToChainInfo(Bytes.toString(result.getRow()), spanList));
}
Map<String, Integer> totalCallSize = new HashMap<String, Integer>();
Map<String, Float> totalCallTimes = new HashMap<>();
for (ChainInfo chainInfo : chainInfos) {
for (ChainNode chainNode : chainInfo.getNodes()) {
Integer totalCall = totalCallSize.get(chainNode.getViewPoint());
if (totalCallSize == null) {
totalCall = 0;
}
totalCallSize.put(chainNode.getTraceLevelId(), ++totalCall);
Float totalCallTime = totalCallTimes.get(chainNode.getViewPoint());
if (totalCallTime == null) {
totalCallTime = 0F;
}
totalCallTimes.put(chainNode.getTraceLevelId(), (totalCallTime + chainNode.getCost()));
}
}
}
}
package com.ai.cloud.skywalking.analysis.mapper.util;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;
import com.ai.cloud.skywalking.analysis.config.HBaseTableMetaData;
import com.ai.cloud.skywalking.protocol.Span;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Created by xin on 16-5-4.
*/
public class HBaseUtils {
private static String ZK_QUORUM = "host-10-1-241-18,host-10-1-241-19,host-10-1-241-20";
private static String ZK_CLIENT_PORT = "29181";
private static Configuration configuration = null;
private static Connection connection;
static {
if (configuration == null) {
configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", ZK_QUORUM);
configuration.set("hbase.zookeeper.property.clientPort", ZK_CLIENT_PORT);
configuration.set("hbase.rpc.timeout", "600000");
try {
connection = ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
e.printStackTrace();
System.exit(-1);
}
}
ConfigInitializer.initialize();
}
public static Connection getConnection() {
return connection;
}
public static List<Span> selectByTraceId(String traceId) throws IOException {
List<Span> entries = new ArrayList<Span>();
Table table = connection.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_CALL_CHAIN.TABLE_NAME));
Get g = new Get(Bytes.toBytes(traceId));
Result r = table.get(g);
for (Cell cell : r.rawCells()) {
if (cell.getValueArray().length > 0)
entries.add(new Span(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())));
}
return entries;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册