提交 0f1c27e6 编写于 作者: A ascrutae

1. 修复二次MapReduce部分bug

2. 添加测试类
上级 2e842511
......@@ -12,7 +12,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
public class Chain2SummaryMapper extends TableMapper<Text, Object> {
public class Chain2SummaryMapper extends TableMapper<Text, ChainSpecificTimeSummary> {
private Logger logger = LoggerFactory
.getLogger(Chain2SummaryMapper.class.getName());
......@@ -34,6 +34,7 @@ public class Chain2SummaryMapper extends TableMapper<Text, Object> {
}
context.write(new Text(summary.buildMapperKey()), summary);
} catch (Exception e) {
e.printStackTrace();
logger.error("Failed to mapper call chain[" + key.toString() + "]",
e);
}
......
......@@ -5,11 +5,15 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
public class Chain2SummaryReducer extends Reducer<Text, ChainSpecificTimeSummary, Text, IntWritable> {
private Logger logger = LoggerFactory
.getLogger(Chain2SummaryReducer.class.getName());
@Override
protected void setup(Context context) throws IOException, InterruptedException {
......@@ -18,14 +22,27 @@ public class Chain2SummaryReducer extends Reducer<Text, ChainSpecificTimeSummary
@Override
protected void reduce(Text key, Iterable<ChainSpecificTimeSummary> values, Context context) throws IOException, InterruptedException {
ChainRelationship4Search chainRelationship = ChainRelationship4Search.load(Bytes.toString(key.getBytes()));
Iterator<ChainSpecificTimeSummary> summaryIterator = values.iterator();
Summary summary = new Summary();
while (summaryIterator.hasNext()) {
ChainSpecificTimeSummary timeSummary = summaryIterator.next();
summary.summary(timeSummary, chainRelationship);
}
doReduceAction(Bytes.toString(key.getBytes()), values.iterator());
}
summary.saveToHBase();
public void doReduceAction(String key, Iterator<ChainSpecificTimeSummary> summaryIterator) {
try {
ChainRelationship4Search chainRelationship = ChainRelationship4Search.load(Bytes.toString(key.getBytes()));
Summary summary = new Summary();
while (summaryIterator.hasNext()) {
try {
ChainSpecificTimeSummary timeSummary = summaryIterator.next();
summary.summary(timeSummary, chainRelationship);
} catch (Exception e) {
e.printStackTrace();
logger.error("Failed to reduce", e);
}
}
summary.saveToHBase();
} catch (Exception e) {
e.printStackTrace();
logger.error("Failed to reduce", e);
}
}
}
......@@ -25,6 +25,9 @@ public class ChainSpecificTimeSummary implements Writable {
private Map<String, ChainNodeSpecificTimeWindowSummary> summaryMap;
private long summaryTimestamp;
public ChainSpecificTimeSummary() {
}
public ChainSpecificTimeSummary(String rowKey) throws ParseException {
String[] splitValue = rowKey.split("-");
this.cId = splitValue[0];
......@@ -43,6 +46,9 @@ public class ChainSpecificTimeSummary implements Writable {
JsonObject jsonObject = (JsonObject) new JsonParser().parse(in.readLine());
cId = jsonObject.get("cId").getAsString();
userId = jsonObject.get("userId").getAsString();
if (jsonObject.get("entranceNodeToken") == null) {
throw new IOException("No entryNode Token CId[" + cId + "]");
}
entranceNodeToken = jsonObject.get("entranceNodeToken").getAsString();
summaryMap = new Gson().fromJson(jsonObject.get("summaryMap").toString(),
new TypeToken<Map<String, ChainNodeSpecificTimeWindowSummary>>() {
......
......@@ -4,7 +4,8 @@ package com.ai.cloud.skywalking.analysis.mapper;
import com.ai.cloud.skywalking.analysis.categorize2chain.Categorize2ChainMapper;
import com.ai.cloud.skywalking.analysis.categorize2chain.Categorize2ChainReducer;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.ai.cloud.skywalking.analysis.chain2summary.Chain2SummaryReducer;
import com.ai.cloud.skywalking.analysis.chain2summary.ChainSpecificTimeSummary;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;
import com.ai.cloud.skywalking.analysis.config.HBaseTableMetaData;
import com.ai.cloud.skywalking.protocol.Span;
......@@ -19,8 +20,11 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Created by astraea on 2016/1/15.
......@@ -31,6 +35,7 @@ public class CallChainMapperTest {
private static String ZK_CLIENT_PORT = "29181";
// private static String chain_Id = "1.0a2.1453430186581.3efa259.4296.56.1";
private static String chain_Id = "1.0a2.1453429608422.2701d43.6468.56.1";
private static String[] summaryRowKeys = {"CID_FF44EB45B69FCC1BE1C09F25DB1DFCEF-5-2016/01/26 20:00:00","CID_EE53EE49E36184A9901A7E5C4993C4F2-5-2016/01/27 14:00:00","CID_53AAB989D315C98CBA7352474EEFA57F-5-2016/01/30 10:00:00"};
private static Configuration configuration = null;
private static Connection connection;
......@@ -69,4 +74,41 @@ public class CallChainMapperTest {
connection = ConnectionFactory.createConnection(configuration);
}
}
@Test
public void testSummary() throws IOException, ParseException {
ConfigInitializer.initialize();
Map<String,List<ChainSpecificTimeSummary>> chainSpecificTimeSummaries = new HashMap();
for (String rowkey : summaryRowKeys) {
ChainSpecificTimeSummary chainSpecificDaySummary = selectSummary(rowkey);
List<ChainSpecificTimeSummary> chainSpecificTimeSummaries1 = chainSpecificTimeSummaries.get(chainSpecificDaySummary.buildMapperKey());
if (chainSpecificTimeSummaries1 == null){
chainSpecificTimeSummaries1 = new ArrayList<>();
}
chainSpecificTimeSummaries1.add(chainSpecificDaySummary);
chainSpecificTimeSummaries.put(chainSpecificDaySummary.buildMapperKey(), chainSpecificTimeSummaries1);
}
Chain2SummaryReducer reducer = new Chain2SummaryReducer();
for (Map.Entry<String, List<ChainSpecificTimeSummary>> entry : chainSpecificTimeSummaries.entrySet()) {
reducer.doReduceAction(entry.getKey(), entry.getValue().iterator());
}
System.out.println("1");
}
private ChainSpecificTimeSummary selectSummary(String key) throws IOException, ParseException {
ChainSpecificTimeSummary summary = new ChainSpecificTimeSummary(Bytes.toString(key.getBytes()));
Table table = connection.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_CHAIN_ONE_MINUTE_SUMMARY_EXCLUDE_RELATIONSHIP.TABLE_NAME));
Get g = new Get(Bytes.toBytes(key));
Result value = table.get(g);
for (Cell cell : value.rawCells()) {
summary.addChainNodeSummaryResult(Bytes.toString(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength()));
}
return summary;
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册