diff --git a/skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/categorize2chain/Categorize2ChainMapper.java b/skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/categorize2chain/Categorize2ChainMapper.java index 346509816bb1c29912dbbeb0407150510be42f73..9f1432ecc48850bfc41b5721c053620dd7c2c32b 100644 --- a/skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/categorize2chain/Categorize2ChainMapper.java +++ b/skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/categorize2chain/Categorize2ChainMapper.java @@ -1,12 +1,12 @@ package com.ai.cloud.skywalking.analysis.categorize2chain; -import com.ai.cloud.skywalking.analysis.categorize2chain.filter.SpanNodeProcessChain; -import com.ai.cloud.skywalking.analysis.categorize2chain.filter.SpanNodeProcessFilter; -import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainInfo; -import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainNode; -import com.ai.cloud.skywalking.analysis.categorize2chain.util.HBaseUtil; -import com.ai.cloud.skywalking.analysis.config.ConfigInitializer; -import com.ai.cloud.skywalking.protocol.Span; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Result; @@ -17,68 +17,90 @@ import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.*; +import com.ai.cloud.skywalking.analysis.categorize2chain.filter.SpanNodeProcessChain; +import com.ai.cloud.skywalking.analysis.categorize2chain.filter.SpanNodeProcessFilter; +import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainInfo; +import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainNode; +import com.ai.cloud.skywalking.analysis.categorize2chain.util.HBaseUtil; +import com.ai.cloud.skywalking.analysis.config.ConfigInitializer; +import com.ai.cloud.skywalking.protocol.Span; public class Categorize2ChainMapper extends TableMapper { - private Logger logger = LoggerFactory.getLogger(Categorize2ChainMapper.class.getName()); + private Logger logger = LoggerFactory + .getLogger(Categorize2ChainMapper.class.getName()); + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + ConfigInitializer.initialize(); + } - @Override - protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, - InterruptedException { - ConfigInitializer.initialize(); - List spanList = new ArrayList(); - ChainInfo chainInfo = null; - try { - for (Cell cell : value.rawCells()) { - Span span = new Span(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); - spanList.add(span); - } + @Override + protected void map(ImmutableBytesWritable key, Result value, Context context) + throws IOException, InterruptedException { + List spanList = new ArrayList(); + ChainInfo chainInfo = null; + try { + for (Cell cell : value.rawCells()) { + Span span = new Span(Bytes.toString(cell.getValueArray(), + cell.getValueOffset(), cell.getValueLength())); + spanList.add(span); + } - chainInfo = spanToChainInfo(Bytes.toString(key.get()), spanList); - logger.info("Success convert span to chain info...." + chainInfo.getCID()); - context.write(new Text(chainInfo.getUserId() + ":" + chainInfo.getEntranceNodeToken()), chainInfo); - } catch (Exception e) { - logger.error("Failed to mapper call chain[" + key.toString() + "]", e); - } - } + chainInfo = spanToChainInfo(Bytes.toString(key.get()), spanList); + logger.info("Success convert span to chain info...." + + chainInfo.getCID()); + context.write( + new Text(chainInfo.getUserId() + ":" + + chainInfo.getEntranceNodeToken()), chainInfo); + } catch (Exception e) { + logger.error("Failed to mapper call chain[" + key.toString() + "]", + e); + } + } - public static ChainInfo spanToChainInfo(String key, List spanList) { - SubLevelSpanCostCounter costMap = new SubLevelSpanCostCounter(); - ChainInfo chainInfo = new ChainInfo(); - Collections.sort(spanList, new Comparator() { - @Override - public int compare(Span span1, Span span2) { - String span1TraceLevel = span1.getParentLevel() + "." + span1.getLevelId(); - String span2TraceLevel = span2.getParentLevel() + "." + span2.getLevelId(); - return span1TraceLevel.compareTo(span2TraceLevel); - } - }); + public static ChainInfo spanToChainInfo(String key, List spanList) { + SubLevelSpanCostCounter costMap = new SubLevelSpanCostCounter(); + ChainInfo chainInfo = new ChainInfo(); + Collections.sort(spanList, new Comparator() { + @Override + public int compare(Span span1, Span span2) { + String span1TraceLevel = span1.getParentLevel() + "." + + span1.getLevelId(); + String span2TraceLevel = span2.getParentLevel() + "." + + span2.getLevelId(); + return span1TraceLevel.compareTo(span2TraceLevel); + } + }); - Map spanEntryMap = mergeSpanDataSet(spanList); - for (Map.Entry entry : spanEntryMap.entrySet()) { - ChainNode chainNode = new ChainNode(); - SpanNodeProcessFilter filter = SpanNodeProcessChain.getProcessChainByCallType(entry.getValue().getSpanType()); - filter.doFilter(entry.getValue(), chainNode, costMap); - chainInfo.addNodes(chainNode); - } + Map spanEntryMap = mergeSpanDataSet(spanList); + for (Map.Entry entry : spanEntryMap.entrySet()) { + ChainNode chainNode = new ChainNode(); + SpanNodeProcessFilter filter = SpanNodeProcessChain + .getProcessChainByCallType(entry.getValue().getSpanType()); + filter.doFilter(entry.getValue(), chainNode, costMap); + chainInfo.addNodes(chainNode); + } - chainInfo.generateChainToken(); - HBaseUtil.saveCidTidMapping(key, chainInfo); - return chainInfo; - } + chainInfo.generateChainToken(); + HBaseUtil.saveCidTidMapping(key, chainInfo); + return chainInfo; + } - private static Map mergeSpanDataSet(List spanList) { - Map spanEntryMap = new LinkedHashMap(); - for (int i = spanList.size() - 1; i >= 0; i--) { - Span span = spanList.get(i); - SpanEntry spanEntry = spanEntryMap.get(span.getParentLevel() + "." + span.getLevelId()); - if (spanEntry == null) { - spanEntry = new SpanEntry(); - spanEntryMap.put(span.getParentLevel() + "." + span.getLevelId(), spanEntry); - } - spanEntry.setSpan(span); - } - return spanEntryMap; - } + private static Map mergeSpanDataSet(List spanList) { + Map spanEntryMap = new LinkedHashMap(); + for (int i = spanList.size() - 1; i >= 0; i--) { + Span span = spanList.get(i); + SpanEntry spanEntry = spanEntryMap.get(span.getParentLevel() + "." + + span.getLevelId()); + if (spanEntry == null) { + spanEntry = new SpanEntry(); + spanEntryMap.put( + span.getParentLevel() + "." + span.getLevelId(), + spanEntry); + } + spanEntry.setSpan(span); + } + return spanEntryMap; + } } diff --git a/skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/categorize2chain/Categorize2ChainReducer.java b/skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/categorize2chain/Categorize2ChainReducer.java index cd9f89eaa2e1c5973a9d6fc86f6672beb2c12c9a..378648cde2c30ab98e841313283e64a87cba9058 100644 --- a/skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/categorize2chain/Categorize2ChainReducer.java +++ b/skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/categorize2chain/Categorize2ChainReducer.java @@ -3,8 +3,6 @@ package com.ai.cloud.skywalking.analysis.categorize2chain; import java.io.IOException; import java.util.Iterator; -import com.ai.cloud.skywalking.analysis.config.ConfigInitializer; - import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; @@ -13,13 +11,19 @@ import org.slf4j.LoggerFactory; import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainInfo; import com.ai.cloud.skywalking.analysis.categorize2chain.util.HBaseUtil; +import com.ai.cloud.skywalking.analysis.config.ConfigInitializer; public class Categorize2ChainReducer extends Reducer { private static Logger logger = LoggerFactory.getLogger(Categorize2ChainReducer.class.getName()); + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + ConfigInitializer.initialize(); + } + @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { - ConfigInitializer.initialize(); int totalCount = reduceAction(key.toString(), values.iterator()); context.write(new Text(key.toString()), new IntWritable(totalCount)); }