提交 18b7c5f3 编写于 作者: wu-sheng's avatar wu-sheng

1.调整部分代码样式

2.增加traceid的描述注释
3.在第一次MR代码入口,加入对于tid的版本号识别,确保无效的、不兼容的低版本的traceid不会进入最新的分析系统,以避免造成不可预知的结果。
上级 07fa03d6
......@@ -23,6 +23,7 @@ import com.ai.cloud.skywalking.analysis.categorize2chain.po.ChainInfo;
import com.ai.cloud.skywalking.analysis.categorize2chain.po.ChainNode;
import com.ai.cloud.skywalking.analysis.categorize2chain.util.HBaseUtil;
import com.ai.cloud.skywalking.analysis.categorize2chain.util.SubLevelSpanCostCounter;
import com.ai.cloud.skywalking.analysis.categorize2chain.util.VersionIdentifier;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;
import com.ai.cloud.skywalking.protocol.Span;
......@@ -39,6 +40,10 @@ public class Categorize2ChainMapper extends TableMapper<Text, ChainInfo> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
if(!VersionIdentifier.enableAnaylsis(Bytes.toString(key.get()))){
return;
}
List<Span> spanList = new ArrayList<Span>();
ChainInfo chainInfo = null;
try {
......
package com.ai.cloud.skywalking.analysis.categorize2chain.util;
/**
* 版本识别器
*
* @author wusheng
*
*/
public class VersionIdentifier {
/**
* 根据tid识别数据是否可分析<br/>
* 目前允许分析所有1.x的版本号
*
* @param tid
* @return
*/
public static boolean enableAnaylsis(String tid){
if(tid != null){
String[] tidSections = tid.split("\\.");
if(tidSections.length == 7){
String version = tidSections[0];
String subVersion = tidSections[1];
if("1".equals(version) && subVersion.length() > 0){
return true;
}
}
}
return false;
}
}
......@@ -17,7 +17,7 @@ import java.io.IOException;
public class Chain2SummaryMapper extends TableMapper<Text, ChainSpecificTimeSummary> {
private Logger logger = LoggerFactory
.getLogger(Chain2SummaryMapper.class.getName());
.getLogger(Chain2SummaryMapper.class);
@Override
......@@ -36,7 +36,6 @@ public class Chain2SummaryMapper extends TableMapper<Text, ChainSpecificTimeSumm
}
context.write(new Text(summary.buildMapperKey()), summary);
} catch (Exception e) {
e.printStackTrace();
logger.error("Failed to mapper call chain[" + key.toString() + "]",
e);
}
......
......@@ -15,7 +15,7 @@ import java.util.Iterator;
public class Chain2SummaryReducer extends Reducer<Text, ChainSpecificTimeSummary, Text, IntWritable> {
private Logger logger = LoggerFactory
.getLogger(Chain2SummaryReducer.class.getName());
.getLogger(Chain2SummaryReducer.class);
@Override
protected void setup(Context context) throws IOException, InterruptedException {
......@@ -36,7 +36,6 @@ public class Chain2SummaryReducer extends Reducer<Text, ChainSpecificTimeSummary
ChainSpecificTimeSummary timeSummary = summaryIterator.next();
summary.summary(timeSummary, chainRelationship);
} catch (Exception e) {
e.printStackTrace();
logger.error("Failed to reduce", e);
}
}
......@@ -44,7 +43,7 @@ public class Chain2SummaryReducer extends Reducer<Text, ChainSpecificTimeSummary
summary.saveToHBase();
} catch (Exception e) {
e.printStackTrace();
logger.error("Failed to reduce", e);
logger.error("Failed to reduce key=" + Bytes.toString(key.getBytes()), e);
}
}
}
package com.ai.cloud.skywalking.analysis.chain2summary;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.ai.cloud.skywalking.analysis.chain2summary.entity.ChainSummaryWithRelationship;
import com.ai.cloud.skywalking.analysis.chain2summary.po.ChainSpecificTimeSummary;
......@@ -11,7 +11,7 @@ public class Summary {
private Map<String, ChainSummaryWithRelationship> summaryWithRelationshipMap;
public Summary(){
summaryWithRelationshipMap = new HashMap<String, ChainSummaryWithRelationship>();
summaryWithRelationshipMap = new ConcurrentHashMap<String, ChainSummaryWithRelationship>();
}
public void summary(ChainSpecificTimeSummary timeSummary, ChainRelationship4Search chainRelationship) throws IOException {
......
package com.ai.cloud.skywalking.conf;
public class Constants {
/**
* 务必严格保持两位的version
*/
public static String SDK_VERSION = "1.0a2";
public static final String HEALTH_DATA_SPILT_PATTERN = "^~";
......
......@@ -5,30 +5,40 @@ import java.util.UUID;
import com.ai.cloud.skywalking.conf.Constants;
public final class TraceIdGenerator {
private static final ThreadLocal<Integer> ThreadTraceIdSequence = new ThreadLocal<Integer>();
private static final String PROCESS_UUID;
static {
String uuid = UUID.randomUUID().toString().replaceAll("-", "");
PROCESS_UUID = uuid.substring(uuid.length() - 7);
}
private TraceIdGenerator() {
}
public static String generate() {
Integer seq = ThreadTraceIdSequence.get();
if (seq == null || seq == 10000 || seq > 10000) {
seq = 0;
}
seq++;
ThreadTraceIdSequence.set(seq);
return Constants.SDK_VERSION + "." + System.currentTimeMillis()
+ "."+ PROCESS_UUID
+ "."+ BuriedPointMachineUtil.getProcessNo()
+ "."+ Thread.currentThread().getId()
+ "."+ seq;
}
private static final ThreadLocal<Integer> ThreadTraceIdSequence = new ThreadLocal<Integer>();
private static final String PROCESS_UUID;
static {
String uuid = UUID.randomUUID().toString().replaceAll("-", "");
PROCESS_UUID = uuid.substring(uuid.length() - 7);
}
private TraceIdGenerator() {
}
/**
* TraceId由以下规则组成<br/>
* 2位version号 + 1位时间戳(毫秒数) + 1位进程随机号(UUID后7位) + 1位进程数号 + 1位线程号 + 1位线程内序号
*
* 注意:这里的位,是指“.”作为分隔符所占的位数,非字符串长度的位数。
* TraceId为不定长字符串,但保证在分布式集群条件下的唯一性
*
* @return
*/
public static String generate() {
Integer seq = ThreadTraceIdSequence.get();
if (seq == null || seq == 10000 || seq > 10000) {
seq = 0;
}
seq++;
ThreadTraceIdSequence.set(seq);
return Constants.SDK_VERSION
+ "." + System.currentTimeMillis()
+ "." + PROCESS_UUID
+ "." + BuriedPointMachineUtil.getProcessNo()
+ "." + Thread.currentThread().getId()
+ "." + seq;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册