提交 2ba67bdb 编写于 作者: A ascrutae

1.修复MapReduce的出现的NullPointException

2. 增加CallTreeId的生成
3. 修改Filter的包路径
4. 修复测试类编译问题
上级 94e2819d
...@@ -27,8 +27,12 @@ public class ChainBuildReducer extends Reducer<Text, ChainInfo, Text, IntWritabl ...@@ -27,8 +27,12 @@ public class ChainBuildReducer extends Reducer<Text, ChainInfo, Text, IntWritabl
@Override @Override
protected void reduce(Text key, Iterable<ChainInfo> values, Context context) throws IOException, protected void reduce(Text key, Iterable<ChainInfo> values, Context context) throws IOException,
InterruptedException { InterruptedException {
CallChainTree chainTree = CallChainTree.load(Bytes.toString(key.getBytes())); doReduceAction(Bytes.toString(key.getBytes()), values.iterator());
Iterator<ChainInfo> chainInfoIterator = values.iterator(); }
public static void doReduceAction(String key, Iterator<ChainInfo> chainInfoIterator) throws IOException, InterruptedException {
CallChainTree chainTree = CallChainTree.load(key);
while (chainInfoIterator.hasNext()) { while (chainInfoIterator.hasNext()) {
ChainInfo chainInfo = chainInfoIterator.next(); ChainInfo chainInfo = chainInfoIterator.next();
if (chainInfo.getChainStatus() == ChainInfo.ChainStatus.NORMAL) { if (chainInfo.getChainStatus() == ChainInfo.ChainStatus.NORMAL) {
......
...@@ -93,7 +93,7 @@ public class ChainInfo implements Writable { ...@@ -93,7 +93,7 @@ public class ChainInfo implements Writable {
&& chainNode.getLevelId() == 0) { && chainNode.getLevelId() == 0) {
firstChainNode = chainNode; firstChainNode = chainNode;
startDate = chainNode.getStartDate(); startDate = chainNode.getStartDate();
cid = firstChainNode.getViewPoint(); callEntrance = firstChainNode.getViewPoint();
} }
} }
...@@ -142,4 +142,8 @@ public class ChainInfo implements Writable { ...@@ -142,4 +142,8 @@ public class ChainInfo implements Writable {
public long getStartDate() { public long getStartDate() {
return startDate; return startDate;
} }
public String getCallEntrance() {
return callEntrance;
}
} }
...@@ -34,6 +34,9 @@ public class HBaseUtil { ...@@ -34,6 +34,9 @@ public class HBaseUtil {
createTableIfNeed(HBaseTableMetaData.TABLE_CALL_CHAIN_TREE_ID_AND_CID_MAPPING.TABLE_NAME, createTableIfNeed(HBaseTableMetaData.TABLE_CALL_CHAIN_TREE_ID_AND_CID_MAPPING.TABLE_NAME,
HBaseTableMetaData.TABLE_CALL_CHAIN_TREE_ID_AND_CID_MAPPING.COLUMN_FAMILY_NAME); HBaseTableMetaData.TABLE_CALL_CHAIN_TREE_ID_AND_CID_MAPPING.COLUMN_FAMILY_NAME);
createTableIfNeed(HBaseTableMetaData.TABLE_CHAIN_ONE_MINUTE_SUMMARY.TABLE_NAME,
HBaseTableMetaData.TABLE_CHAIN_ONE_MINUTE_SUMMARY.COLUMN_FAMILY_NAME);
} catch (IOException e) { } catch (IOException e) {
logger.error("Create tables failed", e); logger.error("Create tables failed", e);
} }
...@@ -69,13 +72,13 @@ public class HBaseUtil { ...@@ -69,13 +72,13 @@ public class HBaseUtil {
Result r = table.get(g); Result r = table.get(g);
if (r.rawCells().length == 0) { if (r.rawCells().length == 0) {
return null; return new ChainNodeSpecificMinSummary();
} }
Cell cell = r.getColumnLatestCell(HBaseTableMetaData.TABLE_CHAIN_ONE_MINUTE_SUMMARY.COLUMN_FAMILY_NAME.getBytes(), Cell cell = r.getColumnLatestCell(HBaseTableMetaData.TABLE_CHAIN_ONE_MINUTE_SUMMARY.COLUMN_FAMILY_NAME.getBytes(),
qualifier.getBytes()); qualifier.getBytes());
if (cell.getValueArray().length > 0) { if (cell != null && cell.getValueArray().length > 0) {
result = new ChainNodeSpecificMinSummary(Bytes.toString(cell.getValueArray(), result = new ChainNodeSpecificMinSummary(Bytes.toString(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength())); cell.getValueOffset(), cell.getValueLength()));
} else { } else {
...@@ -91,7 +94,7 @@ public class HBaseUtil { ...@@ -91,7 +94,7 @@ public class HBaseUtil {
Get g = new Get(Bytes.toBytes(callEntrance)); Get g = new Get(Bytes.toBytes(callEntrance));
Result r = table.get(g); Result r = table.get(g);
if (r.rawCells().length == 0) { if (r.rawCells().length == 0) {
return null; return new CallChainTree(callEntrance);
} }
result = new CallChainTree(callEntrance); result = new CallChainTree(callEntrance);
for (Cell cell : r.rawCells()) { for (Cell cell : r.rawCells()) {
...@@ -125,11 +128,12 @@ public class HBaseUtil { ...@@ -125,11 +128,12 @@ public class HBaseUtil {
Get g = new Get(Bytes.toBytes(treeId)); Get g = new Get(Bytes.toBytes(treeId));
Result r = table.get(g); Result r = table.get(g);
if (r.rawCells().length == 0) { if (r.rawCells().length == 0) {
return null; return new ArrayList<>();
} }
for (Cell cell : r.rawCells()) { for (Cell cell : r.rawCells()) {
if (cell.getValueArray().length > 0) { if (cell.getValueArray().length > 0) {
List<String> hasBeenMergedCIds = new Gson().fromJson("", List<String> hasBeenMergedCIds = new Gson().fromJson(Bytes.toString(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength()),
new TypeToken<List<String>>() { new TypeToken<List<String>>() {
}.getType()); }.getType());
result.addAll(hasBeenMergedCIds); result.addAll(hasBeenMergedCIds);
......
...@@ -17,10 +17,14 @@ public class TokenGenerator { ...@@ -17,10 +17,14 @@ public class TokenGenerator {
return "CID_" + generate(originData); return "CID_" + generate(originData);
} }
public static String generateNodeToken(String originData){ public static String generateNodeToken(String originData) {
return "C_NID_" + generate(originData); return "C_NID_" + generate(originData);
} }
public static String generateTreeToken(String originData) {
return "TREE_ID_" + generate(originData);
}
private static String generate(String originData) { private static String generate(String originData) {
StringBuilder result = new StringBuilder(); StringBuilder result = new StringBuilder();
if (originData != null) { if (originData != null) {
......
...@@ -21,16 +21,6 @@ public class HBaseTableMetaData { ...@@ -21,16 +21,6 @@ public class HBaseTableMetaData {
public static final String COLUMN_FAMILY_NAME = "chain_detail"; public static final String COLUMN_FAMILY_NAME = "chain_detail";
} }
/**
* 用于存放每个CID在一分钟内的汇总,汇总结果不包含关系汇总
*
* @author wusheng
*/
public final static class TABLE_CHAIN_ONE_MINUTE_SUMMARY_EXCLUDE_RELATIONSHIP {
public static final String TABLE_NAME = "sw-chain-1min-summary-ex-rela";
public static final String COLUMN_FAMILY_NAME = "chain_summary";
}
/** /**
* 用于存放每个CID在一分钟内的汇总,汇总结果不包含关系汇总 * 用于存放每个CID在一分钟内的汇总,汇总结果不包含关系汇总
...@@ -38,7 +28,7 @@ public class HBaseTableMetaData { ...@@ -38,7 +28,7 @@ public class HBaseTableMetaData {
* @author wusheng * @author wusheng
*/ */
public final static class TABLE_CHAIN_ONE_MINUTE_SUMMARY { public final static class TABLE_CHAIN_ONE_MINUTE_SUMMARY {
public static final String TABLE_NAME = "sw-chain-1min-summary-ic-rela"; public static final String TABLE_NAME = "sw-chain-1min-summary";
public static final String COLUMN_FAMILY_NAME = "chain_summary"; public static final String COLUMN_FAMILY_NAME = "chain_summary";
} }
...@@ -49,11 +39,16 @@ public class HBaseTableMetaData { ...@@ -49,11 +39,16 @@ public class HBaseTableMetaData {
* @author zhangxin * @author zhangxin
*/ */
public final static class TABLE_CALL_CHAIN_TREE_ID_AND_CID_MAPPING { public final static class TABLE_CALL_CHAIN_TREE_ID_AND_CID_MAPPING {
public static final String TABLE_NAME = "sw-topologyId-cid-mapping"; public static final String TABLE_NAME = "sw-treeId-cid-mapping";
public static final String COLUMN_FAMILY_NAME = "sw-topologyId-cid-mapping"; public static final String COLUMN_FAMILY_NAME = "sw-treeId-cid-mapping";
} }
/**
* 用于存放调用树
*
* @author zhangxin
*/
public final static class TABLE_CALL_CHAIN_TREE_DETAIL { public final static class TABLE_CALL_CHAIN_TREE_DETAIL {
public static final String TABLE_NAME = "sw-call-chain-tree-detail"; public static final String TABLE_NAME = "sw-call-chain-tree-detail";
......
...@@ -11,7 +11,7 @@ mysql.password=devrdbusr21 ...@@ -11,7 +11,7 @@ mysql.password=devrdbusr21
mysql.driver_class=com.mysql.jdbc.Driver mysql.driver_class=com.mysql.jdbc.Driver
filter.filter_package_name=com.ai.cloud.skywalking.analysis.categorize2chain.filter.impl filter.filter_package_name=com.ai.cloud.skywalking.analysis.chainbuild.filter.impl
chainnodesummary.interval=1 chainnodesummary.interval=1
......
package com.ai.cloud.skywalking.analysis.mapper; package com.ai.cloud.skywalking.analysis.mapper;
import com.ai.cloud.skywalking.analysis.categorize2chain.Categorize2ChainMapper; import com.ai.cloud.skywalking.analysis.chainbuild.ChainBuildMapper;
import com.ai.cloud.skywalking.analysis.categorize2chain.Categorize2ChainReducer; import com.ai.cloud.skywalking.analysis.chainbuild.ChainBuildReducer;
import com.ai.cloud.skywalking.analysis.categorize2chain.po.ChainInfo; import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainInfo;
import com.ai.cloud.skywalking.analysis.chain2summary.Chain2SummaryReducer;
import com.ai.cloud.skywalking.analysis.chain2summary.po.ChainSpecificTimeSummary;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer; import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;
import com.ai.cloud.skywalking.analysis.config.HBaseTableMetaData; import com.ai.cloud.skywalking.analysis.config.HBaseTableMetaData;
import com.ai.cloud.skywalking.protocol.Span; import com.ai.cloud.skywalking.protocol.Span;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
...@@ -20,11 +17,8 @@ import org.junit.Before; ...@@ -20,11 +17,8 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* Created by astraea on 2016/1/15. * Created by astraea on 2016/1/15.
...@@ -33,9 +27,8 @@ public class CallChainMapperTest { ...@@ -33,9 +27,8 @@ public class CallChainMapperTest {
private static String ZK_QUORUM = "10.1.235.197,10.1.235.198,10.1.235.199"; private static String ZK_QUORUM = "10.1.235.197,10.1.235.198,10.1.235.199";
private static String ZK_CLIENT_PORT = "29181"; private static String ZK_CLIENT_PORT = "29181";
// private static String chain_Id = "1.0a2.1453430186581.3efa259.4296.56.1"; private static String chain_Id = "1.0a2.1455875996366.eb2055d.30051.88.29";
private static String chain_Id = "1.0a2.1453429608422.2701d43.6468.56.1"; // private static String chain_Id = "1.0a2.1453429608422.2701d43.6468.56.1";
private static String[] summaryRowKeys = {"CID_FF44EB45B69FCC1BE1C09F25DB1DFCEF-5-2016/01/26 20:00:00","CID_EE53EE49E36184A9901A7E5C4993C4F2-5-2016/01/27 14:00:00","CID_53AAB989D315C98CBA7352474EEFA57F-5-2016/01/30 10:00:00"};
private static Configuration configuration = null; private static Configuration configuration = null;
private static Connection connection; private static Connection connection;
...@@ -44,12 +37,12 @@ public class CallChainMapperTest { ...@@ -44,12 +37,12 @@ public class CallChainMapperTest {
public void testMap() throws Exception { public void testMap() throws Exception {
ConfigInitializer.initialize(); ConfigInitializer.initialize();
List<Span> spanList = selectByTraceId(chain_Id); List<Span> spanList = selectByTraceId(chain_Id);
ChainInfo chainInfo = Categorize2ChainMapper.spanToChainInfo(chain_Id, spanList); ChainInfo chainInfo = ChainBuildMapper.spanToChainInfo(chain_Id, spanList);
List<ChainInfo> chainInfos = new ArrayList<ChainInfo>(); List<ChainInfo> chainInfos = new ArrayList<ChainInfo>();
chainInfos.add(chainInfo); chainInfos.add(chainInfo);
Categorize2ChainReducer.reduceAction(chainInfo.getUserId() + ":" + chainInfo.getEntranceNodeToken(), chainInfos.iterator()); ChainBuildReducer.doReduceAction(chainInfo.getCallEntrance(), chainInfos.iterator());
} }
public static List<Span> selectByTraceId(String traceId) throws IOException { public static List<Span> selectByTraceId(String traceId) throws IOException {
...@@ -74,41 +67,4 @@ public class CallChainMapperTest { ...@@ -74,41 +67,4 @@ public class CallChainMapperTest {
connection = ConnectionFactory.createConnection(configuration); connection = ConnectionFactory.createConnection(configuration);
} }
} }
@Test
public void testSummary() throws IOException, ParseException {
ConfigInitializer.initialize();
Map<String,List<ChainSpecificTimeSummary>> chainSpecificTimeSummaries = new HashMap();
for (String rowkey : summaryRowKeys) {
ChainSpecificTimeSummary chainSpecificDaySummary = selectSummary(rowkey);
List<ChainSpecificTimeSummary> chainSpecificTimeSummaries1 = chainSpecificTimeSummaries.get(chainSpecificDaySummary.buildMapperKey());
if (chainSpecificTimeSummaries1 == null){
chainSpecificTimeSummaries1 = new ArrayList<>();
}
chainSpecificTimeSummaries1.add(chainSpecificDaySummary);
chainSpecificTimeSummaries.put(chainSpecificDaySummary.buildMapperKey(), chainSpecificTimeSummaries1);
}
Chain2SummaryReducer reducer = new Chain2SummaryReducer();
for (Map.Entry<String, List<ChainSpecificTimeSummary>> entry : chainSpecificTimeSummaries.entrySet()) {
reducer.doReduceAction(entry.getKey(), entry.getValue().iterator());
}
System.out.println("1");
}
private ChainSpecificTimeSummary selectSummary(String key) throws IOException, ParseException {
ChainSpecificTimeSummary summary = new ChainSpecificTimeSummary(Bytes.toString(key.getBytes()));
Table table = connection.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_CHAIN_ONE_MINUTE_SUMMARY_EXCLUDE_RELATIONSHIP.TABLE_NAME));
Get g = new Get(Bytes.toBytes(key));
Result value = table.get(g);
for (Cell cell : value.rawCells()) {
summary.addChainNodeSummaryResult(Bytes.toString(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength()));
}
return summary;
}
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册