提交 3af21631 编写于 作者: wu-sheng's avatar wu-sheng

1.修改大量类名、方法类、包名。为后续二次分析任务准备。

上级 2ef6ef46
package com.ai.cloud.skywalking.analysis;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.ai.cloud.skywalking.analysis.mapper.CallChainMapper;
import com.ai.cloud.skywalking.analysis.model.ChainInfo;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Scan;
......@@ -14,10 +16,10 @@ import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import com.ai.cloud.skywalking.analysis.categorize2chain.Categorize2ChainMapper;
import com.ai.cloud.skywalking.analysis.categorize2chain.Categorize2ChainReduce;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.config.Config;
public class AnalysisServerDriver extends Configured implements Tool {
......@@ -41,10 +43,10 @@ public class AnalysisServerDriver extends Configured implements Tool {
job.setJarByClass(AnalysisServerDriver.class);
Scan scan = buildHBaseScan(args);
TableMapReduceUtil.initTableMapperJob(Config.HBase.CALL_CHAIN_TABLE_NAME, scan, CallChainMapper.class,
TableMapReduceUtil.initTableMapperJob(Config.HBase.CALL_CHAIN_TABLE_NAME, scan, Categorize2ChainMapper.class,
String.class, ChainInfo.class, job);
//TableMapReduceUtil.initTableReducerJob("sw-call-chain-model", CallChainReducer.class, job);
TableMapReduceUtil.initTableReducerJob(Config.HBase.CALL_CHAIN_TABLE_NAME, Categorize2ChainReduce.class, job);
return job.waitForCompletion(true) ? 0 : 1;
}
......
package com.ai.cloud.skywalking.analysis.mapper;
package com.ai.cloud.skywalking.analysis.categorize2chain;
import com.ai.cloud.skywalking.analysis.filter.SpanNodeProcessChain;
import com.ai.cloud.skywalking.analysis.filter.SpanNodeProcessFilter;
import com.ai.cloud.skywalking.analysis.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.model.ChainNode;
import com.ai.cloud.skywalking.analysis.categorize2chain.filter.SpanNodeProcessChain;
import com.ai.cloud.skywalking.analysis.categorize2chain.filter.SpanNodeProcessFilter;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainNode;
import com.ai.cloud.skywalking.analysis.util.HBaseUtil;
import com.ai.cloud.skywalking.protocol.Span;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
......@@ -18,8 +19,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
public class CallChainMapper extends TableMapper<Text, ChainInfo> {
private Logger logger = LoggerFactory.getLogger(CallChainMapper.class.getName());
public class Categorize2ChainMapper extends TableMapper<Text, ChainInfo> {
private Logger logger = LoggerFactory.getLogger(Categorize2ChainMapper.class.getName());
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException,
......@@ -33,12 +34,11 @@ public class CallChainMapper extends TableMapper<Text, ChainInfo> {
}
chainInfo = spanToChainInfo(key.toString(), spanList);
context.write(new Text(chainInfo.getUserId() + ":" + chainInfo.getEntranceNodeToken()), chainInfo);
} catch (Exception e) {
logger.error("Failed to mapper call chain[" + key.toString() + "]", e);
chainInfo = new ChainInfo("-1");
}
context.write(new Text(chainInfo.getUserId() + ":" + chainInfo.getEntranceNodeToken()), chainInfo);
}
public static ChainInfo spanToChainInfo(String key, List<Span> spanList) {
......
package com.ai.cloud.skywalking.analysis.reduce;
package com.ai.cloud.skywalking.analysis.categorize2chain;
import java.io.IOException;
import java.util.Iterator;
import com.ai.cloud.skywalking.analysis.config.Constants;
import com.ai.cloud.skywalking.analysis.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.util.HBaseUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.util.HBaseUtil;
public class ChainInfoReduce extends TableReducer<Text, ChainInfo, Put> {
private static Logger logger = LoggerFactory.getLogger(ChainInfoReduce.class.getName());
public class Categorize2ChainReduce extends TableReducer<Text, ChainInfo, Put> {
private static Logger logger = LoggerFactory.getLogger(Categorize2ChainReduce.class.getName());
@Override
protected void reduce(Text key, Iterable<ChainInfo> values, Context context) throws IOException, InterruptedException {
......@@ -21,11 +21,6 @@ public class ChainInfoReduce extends TableReducer<Text, ChainInfo, Put> {
}
public static void reduceAction(String key, Iterator<ChainInfo> chainInfoIterator) throws IOException, InterruptedException {
if (Constants.EXCEPTIONMAPPER.equals(key)) {
logger.info("Skip Exception Mapper.....");
return;
}
try {
ChainRelate chainRelate = HBaseUtil.selectCallChainRelationship(key.toString());
Summary summary = new Summary();
......
package com.ai.cloud.skywalking.analysis.reduce;
package com.ai.cloud.skywalking.analysis.categorize2chain;
import com.ai.cloud.skywalking.analysis.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.model.ChainNode;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainNode;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
......
package com.ai.cloud.skywalking.analysis.reduce;
package com.ai.cloud.skywalking.analysis.categorize2chain;
import com.ai.cloud.skywalking.analysis.categorize2chain.dao.CallChainInfoDao;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainNode;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.ai.cloud.skywalking.analysis.dao.CallChainInfoDao;
import com.ai.cloud.skywalking.analysis.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.model.ChainNode;
import com.google.gson.Gson;
import org.apache.hadoop.hbase.client.Put;
import java.sql.SQLException;
......
package com.ai.cloud.skywalking.analysis.reduce;
package com.ai.cloud.skywalking.analysis.categorize2chain;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainNode;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.ai.cloud.skywalking.analysis.model.ChainNode;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
......@@ -14,7 +14,7 @@ public class ChainNodeSpecificTimeWindowSummary {
private String traceLevelId;
private Map<String, SummaryResult> summerResultMap;
private Map<String, ChainNodeSpecificTimeWindowSummaryValue> summerResultMap;
public static ChainNodeSpecificTimeWindowSummary newInstance(String traceLevelId) {
ChainNodeSpecificTimeWindowSummary cns = new ChainNodeSpecificTimeWindowSummary();
......@@ -23,14 +23,14 @@ public class ChainNodeSpecificTimeWindowSummary {
}
private ChainNodeSpecificTimeWindowSummary() {
summerResultMap = new HashMap<String, SummaryResult>();
summerResultMap = new HashMap<String, ChainNodeSpecificTimeWindowSummaryValue>();
}
public ChainNodeSpecificTimeWindowSummary(String value) {
JsonObject jsonObject = (JsonObject) new JsonParser().parse(value);
traceLevelId = jsonObject.get("traceLevelId").getAsString();
summerResultMap = new Gson().fromJson(jsonObject.get("summerResultMap").toString(),
new TypeToken<Map<String, SummaryResult>>() {
new TypeToken<Map<String, ChainNodeSpecificTimeWindowSummaryValue>>() {
}.getType());
}
......@@ -40,9 +40,9 @@ public class ChainNodeSpecificTimeWindowSummary {
public void summary(ChainNode node) {
String key = generateKey(node.getStartDate());
SummaryResult summaryResult = summerResultMap.get(key);
ChainNodeSpecificTimeWindowSummaryValue summaryResult = summerResultMap.get(key);
if (summaryResult == null) {
summaryResult = new SummaryResult();
summaryResult = new ChainNodeSpecificTimeWindowSummaryValue();
summerResultMap.put(key, summaryResult);
}
summaryResult.summary(node);
......
package com.ai.cloud.skywalking.analysis.reduce;
package com.ai.cloud.skywalking.analysis.categorize2chain;
import com.ai.cloud.skywalking.analysis.model.ChainNode;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainNode;
public class SummaryResult {
public class ChainNodeSpecificTimeWindowSummaryValue {
private long totalCall;
private long totalCostTime;
private long correctNumber;
public SummaryResult() {
public ChainNodeSpecificTimeWindowSummaryValue() {
totalCall = 0;
totalCostTime = 0;
correctNumber = 0;
......
package com.ai.cloud.skywalking.analysis.reduce;
package com.ai.cloud.skywalking.analysis.categorize2chain;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.ai.cloud.skywalking.analysis.config.Constants;
import com.ai.cloud.skywalking.analysis.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.util.HBaseUtil;
import com.google.gson.Gson;
import org.apache.hadoop.hbase.client.Put;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
package com.ai.cloud.skywalking.analysis.reduce;
package com.ai.cloud.skywalking.analysis.categorize2chain;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainNode;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.ai.cloud.skywalking.analysis.model.ChainNode;
import com.ai.cloud.skywalking.analysis.util.HBaseUtil;
import org.apache.hadoop.hbase.client.Put;
import java.io.IOException;
......@@ -11,6 +12,9 @@ import java.util.Map;
public class ChainSpecificTimeWindowSummary {
/**
* key : cid + 时间窗口
*/
private Map<String, ChainNodeSpecificTimeWindowSummary> chainNodeSummaryResultMap;
public ChainSpecificTimeWindowSummary() {
......
package com.ai.cloud.skywalking.analysis.mapper;
package com.ai.cloud.skywalking.analysis.categorize2chain;
import java.util.HashMap;
import java.util.Map;
......
package com.ai.cloud.skywalking.analysis.mapper;
package com.ai.cloud.skywalking.analysis.categorize2chain;
import com.ai.cloud.skywalking.analysis.model.ChainNode;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainNode;
import com.ai.cloud.skywalking.protocol.CallType;
import com.ai.cloud.skywalking.protocol.Span;
......
package com.ai.cloud.skywalking.analysis.reduce;
package com.ai.cloud.skywalking.analysis.categorize2chain;
import com.ai.cloud.skywalking.analysis.dao.CallChainInfoDao;
import com.ai.cloud.skywalking.analysis.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.model.ChainNode;
import com.ai.cloud.skywalking.analysis.categorize2chain.dao.CallChainInfoDao;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainNode;
import com.ai.cloud.skywalking.analysis.util.HBaseUtil;
import org.apache.hadoop.hbase.client.Put;
import java.io.IOException;
......
package com.ai.cloud.skywalking.analysis.reduce;
package com.ai.cloud.skywalking.analysis.categorize2chain;
import com.ai.cloud.skywalking.analysis.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.model.ChainNode;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainNode;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.Expose;
......
package com.ai.cloud.skywalking.analysis.dao;
package com.ai.cloud.skywalking.analysis.categorize2chain.dao;
import com.ai.cloud.skywalking.analysis.categorize2chain.ChainDetail;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainNode;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.ai.cloud.skywalking.analysis.model.ChainNode;
import com.ai.cloud.skywalking.analysis.reduce.ChainDetail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
package com.ai.cloud.skywalking.analysis.filter;
package com.ai.cloud.skywalking.analysis.categorize2chain.filter;
import java.io.IOException;
import java.util.HashMap;
......
package com.ai.cloud.skywalking.analysis.filter;
package com.ai.cloud.skywalking.analysis.categorize2chain.filter;
import com.ai.cloud.skywalking.analysis.model.ChainNode;
import com.ai.cloud.skywalking.analysis.mapper.CostMap;
import com.ai.cloud.skywalking.analysis.mapper.SpanEntry;
import com.ai.cloud.skywalking.analysis.categorize2chain.CostMap;
import com.ai.cloud.skywalking.analysis.categorize2chain.SpanEntry;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainNode;
public abstract class SpanNodeProcessFilter {
......
package com.ai.cloud.skywalking.analysis.filter.impl;
package com.ai.cloud.skywalking.analysis.categorize2chain.filter.impl;
import com.ai.cloud.skywalking.analysis.filter.SpanNodeProcessFilter;
import com.ai.cloud.skywalking.analysis.model.ChainNode;
import com.ai.cloud.skywalking.analysis.mapper.CostMap;
import com.ai.cloud.skywalking.analysis.mapper.SpanEntry;
import com.ai.cloud.skywalking.analysis.categorize2chain.CostMap;
import com.ai.cloud.skywalking.analysis.categorize2chain.SpanEntry;
import com.ai.cloud.skywalking.analysis.categorize2chain.filter.SpanNodeProcessFilter;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainNode;
public class AppendBusinessKeyFilter extends SpanNodeProcessFilter {
......
package com.ai.cloud.skywalking.analysis.filter.impl;
package com.ai.cloud.skywalking.analysis.categorize2chain.filter.impl;
import com.ai.cloud.skywalking.analysis.filter.SpanNodeProcessFilter;
import com.ai.cloud.skywalking.analysis.model.ChainNode;
import com.ai.cloud.skywalking.analysis.mapper.CostMap;
import com.ai.cloud.skywalking.analysis.mapper.SpanEntry;
import com.ai.cloud.skywalking.analysis.categorize2chain.CostMap;
import com.ai.cloud.skywalking.analysis.categorize2chain.SpanEntry;
import com.ai.cloud.skywalking.analysis.categorize2chain.filter.SpanNodeProcessFilter;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainNode;
public class CopyAttrFilter extends SpanNodeProcessFilter {
......
package com.ai.cloud.skywalking.analysis.filter.impl;
package com.ai.cloud.skywalking.analysis.categorize2chain.filter.impl;
import com.ai.cloud.skywalking.analysis.filter.SpanNodeProcessFilter;
import com.ai.cloud.skywalking.analysis.model.ChainNode;
import com.ai.cloud.skywalking.analysis.mapper.CostMap;
import com.ai.cloud.skywalking.analysis.mapper.SpanEntry;
import com.ai.cloud.skywalking.analysis.categorize2chain.CostMap;
import com.ai.cloud.skywalking.analysis.categorize2chain.SpanEntry;
import com.ai.cloud.skywalking.analysis.categorize2chain.filter.SpanNodeProcessFilter;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainNode;
public class ProcessCostTimeFilter extends SpanNodeProcessFilter {
@Override
......
package com.ai.cloud.skywalking.analysis.filter.impl;
package com.ai.cloud.skywalking.analysis.categorize2chain.filter.impl;
import com.ai.cloud.skywalking.analysis.filter.SpanNodeProcessFilter;
import com.ai.cloud.skywalking.analysis.model.ChainNode;
import com.ai.cloud.skywalking.analysis.mapper.CostMap;
import com.ai.cloud.skywalking.analysis.mapper.SpanEntry;
import com.ai.cloud.skywalking.analysis.categorize2chain.CostMap;
import com.ai.cloud.skywalking.analysis.categorize2chain.SpanEntry;
import com.ai.cloud.skywalking.analysis.categorize2chain.filter.SpanNodeProcessFilter;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainNode;
public class ReplaceAddressFilter extends SpanNodeProcessFilter {
......
package com.ai.cloud.skywalking.analysis.filter.impl;
package com.ai.cloud.skywalking.analysis.categorize2chain.filter.impl;
import com.ai.cloud.skywalking.analysis.filter.SpanNodeProcessFilter;
import com.ai.cloud.skywalking.analysis.model.ChainNode;
import com.ai.cloud.skywalking.analysis.mapper.CostMap;
import com.ai.cloud.skywalking.analysis.mapper.SpanEntry;
import com.ai.cloud.skywalking.analysis.categorize2chain.CostMap;
import com.ai.cloud.skywalking.analysis.categorize2chain.SpanEntry;
import com.ai.cloud.skywalking.analysis.categorize2chain.filter.SpanNodeProcessFilter;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainNode;
import com.ai.cloud.skywalking.analysis.util.TokenGenerator;
public class TokenGenerateFilter extends SpanNodeProcessFilter {
......
package com.ai.cloud.skywalking.analysis.model;
package com.ai.cloud.skywalking.analysis.categorize2chain.model;
import com.ai.cloud.skywalking.analysis.util.TokenGenerator;
import org.apache.hadoop.io.Writable;
......
package com.ai.cloud.skywalking.analysis.model;
package com.ai.cloud.skywalking.analysis.categorize2chain.model;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.Expose;
......
......@@ -11,7 +11,7 @@ public class Config {
public static String TRACE_INFO_COLUMN_FAMILY = "trace_info";
public static String CALL_CHAIN_TABLE_NAME;
public static String CALL_CHAIN_TABLE_NAME = "sw-call-chain";
public static String ZK_QUORUM;
......
package com.ai.cloud.skywalking.analysis.config;
public class Constants {
public static final String EXCEPTIONMAPPER = "-1:";
public static final String UNCATEGORIZED_QUALIFIER_NAME = "UNCATEGORIZED_CALL_CHAIN";
}
package com.ai.cloud.skywalking.analysis.util;
import com.ai.cloud.skywalking.analysis.categorize2chain.CategorizedChainInfo;
import com.ai.cloud.skywalking.analysis.categorize2chain.ChainNodeSpecificTimeWindowSummary;
import com.ai.cloud.skywalking.analysis.categorize2chain.ChainRelate;
import com.ai.cloud.skywalking.analysis.categorize2chain.ChainSpecificTimeWindowSummary;
import com.ai.cloud.skywalking.analysis.categorize2chain.UncategorizeChainInfo;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.ai.cloud.skywalking.analysis.config.Constants;
import com.ai.cloud.skywalking.analysis.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.reduce.*;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
......
package com.ai.cloud.skywalking.analysis.mapper;
import com.ai.cloud.skywalking.analysis.categorize2chain.Categorize2ChainMapper;
import com.ai.cloud.skywalking.analysis.categorize2chain.Categorize2ChainReduce;
import com.ai.cloud.skywalking.analysis.categorize2chain.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;
import com.ai.cloud.skywalking.analysis.model.ChainInfo;
import com.ai.cloud.skywalking.analysis.reduce.ChainInfoReduce;
import com.ai.cloud.skywalking.protocol.Span;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
......@@ -36,12 +38,12 @@ public class CallChainMapperTest {
public void testMap() throws Exception {
ConfigInitializer.initialize();
List<Span> spanList = selectByTraceId(chain_Id);
ChainInfo chainInfo = CallChainMapper.spanToChainInfo(chain_Id, spanList);
ChainInfo chainInfo = Categorize2ChainMapper.spanToChainInfo(chain_Id, spanList);
List<ChainInfo> chainInfos = new ArrayList<ChainInfo>();
chainInfos.add(chainInfo);
ChainInfoReduce.reduceAction(chainInfo.getUserId() + ":" + chainInfo.getEntranceNodeToken(), chainInfos.iterator());
Categorize2ChainReduce.reduceAction(chainInfo.getUserId() + ":" + chainInfo.getEntranceNodeToken(), chainInfos.iterator());
}
public static List<Span> selectByTraceId(String traceId) throws IOException {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册