提交 b11c8a79 编写于 作者: A ascrutae

1.修改工程结构,2. 完成部分Storage功能

上级 eb1a301a
...@@ -9,12 +9,12 @@ ...@@ -9,12 +9,12 @@
<modules> <modules>
<module>samples/skywalking-auth</module> <module>samples/skywalking-auth</module>
<module>samples/skywalking-example</module> <module>samples/skywalking-example</module>
<module>skywalking-collector</module>
<module>skywalking-server</module>
<module>skywalking-alarm</module> <module>skywalking-alarm</module>
<module>skywalking-analysis</module>
<module>skywalking-webui</module> <module>skywalking-webui</module>
<module>test/skywalking-test-api</module> <module>test/skywalking-test-api</module>
<module>skywalking-sniffer</module>
<module>skywalking-storage-center</module>
<module>skywalking-sniffer</module>
</modules> </modules>
<packaging>pom</packaging> <packaging>pom</packaging>
......
...@@ -71,6 +71,11 @@ ...@@ -71,6 +71,11 @@
<artifactId>HikariCP</artifactId> <artifactId>HikariCP</artifactId>
<version>2.4.3</version> <version>2.4.3</version>
</dependency> </dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>data-carrier</artifactId>
<version>1.0</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>
...@@ -179,6 +184,14 @@ ...@@ -179,6 +184,14 @@
</build> </build>
<repositories> <repositories>
<repository>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>bintray-wu-sheng-DataCarrier</id>
<name>bintray</name>
<url>http://dl.bintray.com/wu-sheng/DataCarrier</url>
</repository>
<repository> <repository>
<id>mvnrepository</id> <id>mvnrepository</id>
<name>mvnrepository</name> <name>mvnrepository</name>
......
#!/bin/bash
# check environment variables
SW_ANALYSIS_HOME=${HOME}/skywalking-analysis
#check hbase home
HBASE_HOME=${HOME}/hbase-1.1.2
#check hadoop home
HADOOP_HOME=${HOME}/hadoop-2.6.0
SW_RT_LOG_DIR=${SW_ANALYSIS_HOME}/log
if [ ! -d "$SW_RT_LOG_DIR" ]; then
mkdir -p $SW_RT_LOG_DIR
fi
#check skywalking runtime config directory is exisit, if not, will create it.
SW_RT_CONF_DIR="${SW_ANALYSIS_HOME}/runtime-conf"
if [ ! -d "$SW_RT_CONF_DIR" ]; then
mkdir -p $SW_RT_CONF_DIR
fi
# get the previous process id
PID_FILES="${SW_RT_CONF_DIR}/analysis.pid"
if [ ! -f "$FILE_PREVIOUS_EXECUTE_TIME" ]; then
touch "$PID_FILES"
fi
SW_ANALYSIS_PID=`cat ${PID_FILES}`
# check if the skywalking analysis process is running
if [ "$SW_ANALYSIS_PID" != "" ]; then
#PS_OUT=`ps -ef | grep $SW_ANALYSIS_PID | grep -v 'grep' | grep -v $0`
#PS_OUT=`ps -ax | awk '{ print $1 }' | grep -e "^${SW_ANALYSIS_PID}$"` | echo ${PS_OUT}
PS_OUT=`ps -ax | awk '{print $1}' | grep -e "^${SW_ANALYSIS_PID}$"`
if [ "$PS_OUT" != "" ]; then
echo "The skywalking analysis process is running. Will delay the analysis."
exit -1;
fi
fi
#skywalking analysis mode:1)accumulate(default) 2)rewrite
SW_ANALYSIS_MODE=ACCUMULATE
#skywalking rewrite execute dates. Each number represents the day of the month.
REWRITE_EXECUTIVE_DAY_ARR=(5,10)
#Get the previous execute time of rewrite mode
PRE_TIME_OF_REWRITE_FILE="${SW_RT_CONF_DIR}/rewrite_pre_time.conf"
if [ ! -f "$PRE_TIME_OF_REWRITE_FILE" ]; then
echo "skywalking rewrite time file is not exists, create it"
touch $PRE_TIME_OF_REWRITE_FILE
fi
PRE_TIME_OF_REWRITE_TIME=`cat $PRE_TIME_OF_REWRITE_FILE`
#check if the day is in the date of rewrite mode
if [ "$PRE_TIME_OF_REWRITE_TIME" != "" ]; then
TODAY=$(date "+%d")
if [ "$PRE_TIME_OF_REWRITE_TIME" != $TODAY ]; then
THE_DAY_OF_MONTH=$(date "+%d")
for THE_DAY in ${REWRITE_EXECUTIVE_DAY_ARR[@]}
do
if [ ${THE_DAY} == ${THE_DAY_OF_MONTH} ]; then
SW_ANALYSIS_MODE=REWRITE
START_TIME=$(date --date='1 month ago' '+%Y-%m-01/00:00:00')
echo "skywalking analysis will execute rewrite mode. Start time:${START_TIME}"
break
fi
done
else
echo "${TODAY} has been execute rewrite analysis process.Will not execute rewrite mode!!"
fi
else
echo "The previous time of rewrite execute is Null, will put today to the ${PRE_TIME_OF_REWRITE_FILE} file."
echo $(date "+%d") > ${PRE_TIME_OF_REWRITE_FILE}
fi
if [ "${SW_ANALYSIS_MODE}" != "REWRITE" ]; then
#check the file of previous executive accumulate mode time
PRE_TIME_OF_ACCUMULATE_FILE="${SW_RT_CONF_DIR}/accumulate_pre_time.conf"
if [ ! -f "${PRE_TIME_OF_ACCUMULATE_FILE}" ]; then
echo "skywalking accumulate time file is not exists, create it."
touch $PRE_TIME_OF_ACCUMULATE_FILE
fi
START_TIME=`cat ${PRE_TIME_OF_ACCUMULATE_FILE}`
if [ "$START_TIME" = "" ]; then
START_TIME=`date --date='3 month ago' "+%Y-%m-%d/%H:%M:%S"`
fi
SW_ANALYSIS_MODE=ACCUMULATE
echo "skywalking analysis process will execute accumulate mode. start time: ${START_TIME}."
fi
#Get the current datetime
END_TIME=`date --date='10 minute ago' "+%Y-%m-%d/%H:%M:%S"`
#echo $END_TIME
## execute command
echo "Begin to analysis the buried point data between ${START_TIME} to ${END_TIME}."
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath`
nohup ${HADOOP_HOME}/bin/hadoop jar skywalking-analysis-2.0-2016.jar -Dskywalking.analysis.mode=${SW_ANALYSIS_MODE} ${START_TIME} ${END_TIME} > ${SW_RT_LOG_DIR}/map-reduce.log 2>&1 &
CURRENT_PID=`echo $!`
echo "current analysis process Id is ${CURRENT_PID}"
echo ${CURRENT_PID} > ${PID_FILES}
if [ "${SW_ANALYSIS_MODE}" = "REWRITE" ]; then
echo $(date "+%d") > ${PRE_TIME_OF_REWRITE_FILE}
else
echo $END_TIME > ${PRE_TIME_OF_ACCUMULATE_FILE}
fi
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-analysis</artifactId>
<version>2.0-2016</version>
<packaging>jar</packaging>
<name>skywalking-analysis</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.1.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.1.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.4.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-protocol</artifactId>
<version>2.0-2016</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
<version>3.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>2.4.3</version>
<configuration>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<phase>package</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.a.eye.skywalking.analysis.AnalysisServerDriver</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package com.a.eye.skywalking.analysis;
import com.a.eye.skywalking.analysis.chainbuild.ChainBuildMapper;
import com.a.eye.skywalking.analysis.config.HBaseTableMetaData;
import com.a.eye.skywalking.analysis.chainbuild.ChainBuildReducer;
import com.a.eye.skywalking.analysis.config.Config;
import com.a.eye.skywalking.analysis.config.ConfigInitializer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class AnalysisServerDriver extends Configured implements Tool {
private static Logger logger = LoggerFactory.getLogger(AnalysisServerDriver.class.getName());
public static void main(String[] args) throws Exception {
logger.info("Begin to analysis call chain.");
String analysisMode = System.getenv("skywalking.analysis.mode");
if ("rewrite".equalsIgnoreCase(analysisMode)) {
logger.info("Skywalking analysis mode will switch to [REWRITE] mode");
Config.AnalysisServer.IS_ACCUMULATE_MODE = false;
} else {
logger.info("Skywalking analysis mode will switch to [ACCUMULATE] mode");
Config.AnalysisServer.IS_ACCUMULATE_MODE = true;
}
int res = ToolRunner.run(new AnalysisServerDriver(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
ConfigInitializer.initialize();
Configuration conf = new Configuration();
conf.set("skywalking.analysis.mode", String.valueOf(Config.AnalysisServer.IS_ACCUMULATE_MODE));
conf.set("hbase.zookeeper.quorum", Config.HBase.ZK_QUORUM);
conf.set("hbase.zookeeper.property.clientPort", Config.HBase.ZK_CLIENT_PORT);
//-XX:+UseParallelGC -XX:ParallelGCThreads=4 -XX:GCTimeRatio=10 -XX:YoungGenerationSizeIncrement=20 -XX:TenuredGenerationSizeIncrement=20 -XX:AdaptiveSizeDecrementScaleFactor=2
conf.set("mapred.child.java.opts", Config.MapReduce.JAVA_OPTS);
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: AnalysisServerDriver yyyy-MM-dd/HH:mm:ss yyyy-MM-dd/HH:mm:ss");
System.exit(2);
}
Job job = Job.getInstance(conf);
job.setJarByClass(AnalysisServerDriver.class);
Scan scan = buildHBaseScan(args);
TableMapReduceUtil.initTableMapperJob(HBaseTableMetaData.TABLE_CALL_CHAIN.TABLE_NAME, scan, ChainBuildMapper.class,
Text.class, Text.class, job);
job.setReducerClass(ChainBuildReducer.class);
job.setNumReduceTasks(Config.Reducer.REDUCER_NUMBER);
job.setOutputFormatClass(NullOutputFormat.class);
return job.waitForCompletion(true) ? 0 : 1;
}
private Scan buildHBaseScan(String[] args) throws ParseException, IOException {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd/HH:mm:ss");
Date startDate = simpleDateFormat.parse(args[0]);
Date endDate = simpleDateFormat.parse(args[1]);
Scan scan = new Scan();
scan.setBatch(2001);
scan.setTimeRange(startDate.getTime(), endDate.getTime());
return scan;
}
}
package com.a.eye.skywalking.analysis.chainbuild;
import com.a.eye.skywalking.analysis.chainbuild.exception.Tid2CidECovertException;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainNode;
import com.a.eye.skywalking.analysis.chainbuild.po.SummaryType;
import com.a.eye.skywalking.analysis.chainbuild.util.HBaseUtil;
import com.a.eye.skywalking.analysis.chainbuild.filter.SpanNodeProcessChain;
import com.a.eye.skywalking.analysis.chainbuild.filter.SpanNodeProcessFilter;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainInfo;
import com.a.eye.skywalking.analysis.chainbuild.util.SubLevelSpanCostCounter;
import com.a.eye.skywalking.analysis.chainbuild.util.TokenGenerator;
import com.a.eye.skywalking.analysis.chainbuild.util.VersionIdentifier;
import com.a.eye.skywalking.analysis.config.ConfigInitializer;
import com.a.eye.skywalking.protocol.FullSpan;
import com.a.eye.skywalking.protocol.util.SpanLevelIdComparators;
import com.google.gson.Gson;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;
public class ChainBuildMapper extends TableMapper<Text, Text> {
private Logger logger = LogManager.getLogger(ChainBuildMapper.class);
private SimpleDateFormat hourSimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH");
private SimpleDateFormat daySimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
private SimpleDateFormat monthSimpleDateFormat = new SimpleDateFormat("yyyy-MM");
private SimpleDateFormat yearSimpleDateFormat = new SimpleDateFormat("yyyy");
@Override
protected void setup(Context context) throws IOException, InterruptedException {
ConfigInitializer.initialize();
}
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
if (!VersionIdentifier.enableAnaylsis(Bytes.toString(key.get()))) {
return;
}
ChainInfo chainInfo = null;
try {
List<FullSpan> spanList = HBaseUtil.fetchTraceSpansFromHBase(value);
if (spanList.size() == 0) {
throw new Tid2CidECovertException("tid[" + Bytes.toString(key.get()) + "] has no span data.");
}
if (spanList.size() > 2000) {
throw new Tid2CidECovertException("tid[" + Bytes.toString(key.get()) + "] node size has over 2000.");
}
chainInfo = spanToChainInfo(Bytes.toString(key.get()), spanList);
logger.debug(
"convert tid[" + Bytes.toString(key.get()) + "] to chain with cid[" + chainInfo.getCID() + "].");
HBaseUtil.saveTraceIdAndTreeIdMapping(Bytes.toString(key.get()), chainInfo.getCID());
if (chainInfo.getCallEntrance() != null && chainInfo.getCallEntrance().length() > 0) {
for (ChainNode chainNode : chainInfo.getNodes()) {
/**
* 此处修改原因,
* 1.更细粒度划分reduce任务,提高性能。
* 2.减少数据传输量,以及处理复杂度。
* 3.请避免使用gson序列化,提高程序处理性能
*
* hour/day/month/year,
* key 修改为:类型+时间字符+callEntrance+levelId+viewpoint,
* value 为ChainNodeSpecificTimeWindowSummaryValue中所需的明确的值组成的简单串
* value包含:
* 1.是否正确调用,由NodeStatus获取,值为N/A/I
* 2.调用所需时间,由cost获取
*
*/
context.write(new Text(SummaryType.HOUR.getValue() + "-" + hourSimpleDateFormat
.format(new Date(chainNode.getStartDate())) + ":" + chainInfo.getCallEntrance()),
new Text(new Gson().toJson(chainNode)));
context.write(new Text(SummaryType.DAY.getValue() + "-" + daySimpleDateFormat
.format(new Date(chainNode.getStartDate())) + ":" + chainInfo.getCallEntrance()),
new Text(new Gson().toJson(chainNode)));
context.write(new Text(SummaryType.MONTH.getValue() + "-" + monthSimpleDateFormat
.format(new Date(chainNode.getStartDate())) + ":" + chainInfo.getCallEntrance()),
new Text(new Gson().toJson(chainNode)));
context.write(new Text(SummaryType.YEAR.getValue() + "-" + yearSimpleDateFormat
.format(new Date(chainNode.getStartDate())) + ":" + chainInfo.getCallEntrance()),
new Text(new Gson().toJson(chainNode)));
}
/**
*
* 1.使用静态变量,缓存MAP中的调用链。每个典型调用链ID只传递一次
* 2.注意缓存需要限制容量,初期规划,缓存1W个典型调用链KEY(可通过配置扩展)。仅缓存典型调用链ID,非链路明细。
* 3.对于节点数量大于2K条的调用量,暂不进行关系处理
*
* 注意:CallChainRelationshipAction暂时不做修改,此处的修改会大规模降低reduce的处理数据量,提高总体运行速度
*
*/
// Reduce key : R-CallEntrance
context.write(new Text(
SummaryType.RELATIONSHIP + "-" + TokenGenerator.generateTreeToken(chainInfo.getCallEntrance())
+ ":" + chainInfo.getCallEntrance()), new Text(new Gson().toJson(chainInfo)));
}
} catch (Exception e) {
logger.error("Failed to mapper call chain[" + key.toString() + "]", e);
}
}
public static ChainInfo spanToChainInfo(String tid, List<FullSpan> spanList) {
SubLevelSpanCostCounter costMap = new SubLevelSpanCostCounter();
ChainInfo chainInfo = new ChainInfo(tid);
Collections.sort(spanList, new SpanLevelIdComparators.SpanASCComparator());
Map<String, SpanEntry> spanEntryMap = mergeSpanDataSet(spanList);
for (Map.Entry<String, SpanEntry> entry : spanEntryMap.entrySet()) {
ChainNode chainNode = new ChainNode();
SpanNodeProcessFilter filter =
SpanNodeProcessChain.getProcessChainByCallType(entry.getValue().getSpanType());
filter.doFilter(entry.getValue(), chainNode, costMap);
chainInfo.addNodes(chainNode);
}
chainInfo.generateChainToken();
return chainInfo;
}
private static Map<String, SpanEntry> mergeSpanDataSet(List<FullSpan> spanList) {
Map<String, SpanEntry> spanEntryMap = new LinkedHashMap<String, SpanEntry>();
for (int i = spanList.size() - 1; i >= 0; i--) {
FullSpan span = spanList.get(i);
SpanEntry spanEntry = spanEntryMap.get(span.getParentLevel() + "." + span.getLevelId());
if (spanEntry == null) {
spanEntry = new SpanEntry();
spanEntryMap.put(span.getParentLevel() + "." + span.getLevelId(), spanEntry);
}
spanEntry.setSpan(span);
}
return spanEntryMap;
}
}
package com.a.eye.skywalking.analysis.chainbuild;
import java.io.IOException;
import java.util.Iterator;
import com.a.eye.skywalking.analysis.chainbuild.action.IStatisticsAction;
import com.a.eye.skywalking.analysis.chainbuild.po.SummaryType;
import com.a.eye.skywalking.analysis.config.Config;
import com.a.eye.skywalking.analysis.config.ConfigInitializer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class ChainBuildReducer extends Reducer<Text, Text, Text, IntWritable> {
private Logger logger = LogManager.getLogger(ChainBuildReducer.class);
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
ConfigInitializer.initialize();
Config.AnalysisServer.IS_ACCUMULATE_MODE = Boolean.parseBoolean(context
.getConfiguration().get("skywalking.analysis.mode", "false"));
logger.info("Skywalking analysis mode :[{}]",
Config.AnalysisServer.IS_ACCUMULATE_MODE ? "ACCUMULATE"
: "REWRITE");
}
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String reduceKey = key.toString();
int index = reduceKey.indexOf(":");
if (index == -1) {
return;
}
String summaryTypeAndDateStr = reduceKey.substring(0, index);
String entryKey = reduceKey.substring(index + 1);
IStatisticsAction summaryAction = SummaryType.chooseSummaryAction(
summaryTypeAndDateStr, entryKey);
doReduceAction(entryKey, summaryAction, values.iterator());
}
public void doReduceAction(String reduceKey, IStatisticsAction summaryAction,
Iterator<Text> iterator) {
long dataCounter = 0;
while (iterator.hasNext()) {
String summaryData = iterator.next().toString();
try {
summaryAction.doAction(summaryData);
} catch (Exception e) {
logger.error(
"Failed to summary call chain, maybe illegal data:"
+ summaryData, e);
} finally {
dataCounter++;
}
if (dataCounter % 1000 == 0) {
logger.debug("reduce for key: {}, count: {}", reduceKey, dataCounter);
}
}
try {
summaryAction.doSave();
} catch (Exception e) {
e.printStackTrace();
logger.error("Failed to save summaryresult/chainTree.", e);
}
}
}
package com.a.eye.skywalking.analysis.chainbuild;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.a.eye.skywalking.analysis.chainbuild.entity.CallChainDetailForMysql;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainNode;
import com.a.eye.skywalking.analysis.config.Config;
public class DBCallChainInfoDao {
private static Logger logger = LoggerFactory
.getLogger(DBCallChainInfoDao.class.getName());
private static Connection connection;
static {
try {
Class.forName(Config.MySql.DRIVER_CLASS);
connection = DriverManager.getConnection(Config.MySql.URL,
Config.MySql.USERNAME, Config.MySql.PASSWORD);
} catch (ClassNotFoundException e) {
logger.error("Failed to searchRelationship jdbc driver class["
+ Config.MySql.DRIVER_CLASS + "]", e);
System.exit(-1);
} catch (SQLException e) {
logger.error("Failed to connection database.", e);
System.exit(-1);
}
}
public synchronized static void saveChainDetail(CallChainDetailForMysql callChainDetailForMysql)
throws SQLException {
PreparedStatement preparedStatement = null;
try {
preparedStatement = connection
.prepareStatement("INSERT INTO sw_chain_detail(treeId,uid,traceLevelId,viewpoint,create_time)"
+ " VALUES(?,?,?,?,?)");
for (ChainNode chainNode : callChainDetailForMysql.getChainNodes()) {
preparedStatement.setString(1, callChainDetailForMysql.getTreeToken());
preparedStatement.setString(2, callChainDetailForMysql.getUserId());
preparedStatement.setString(3, chainNode.getTraceLevelId());
preparedStatement.setString(4, chainNode.getViewPoint());
preparedStatement.setTimestamp(5,
new Timestamp(System.currentTimeMillis()));
preparedStatement.addBatch();
}
int[] result = preparedStatement.executeBatch();
for (int i : result) {
if (i != 1) {
logger.error("Failed to save chain detail ["
+ callChainDetailForMysql.getChainToken() + "]");
}
}
} finally {
if (preparedStatement != null)
preparedStatement.close();
}
connection.commit();
}
}
package com.a.eye.skywalking.analysis.chainbuild;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainNode;
import com.a.eye.skywalking.protocol.FullSpan;
import com.a.eye.skywalking.protocol.common.CallType;
public class SpanEntry {
private FullSpan clientSpan;
private FullSpan serverSpan;
public SpanEntry() {
}
public int getLevelId() {
if (clientSpan != null) {
return clientSpan.getLevelId();
}
return serverSpan.getLevelId();
}
public String getParentLevelId() {
if (clientSpan != null) {
return clientSpan.getParentLevel();
}
return serverSpan.getParentLevel();
}
public String getViewPoint() {
if (clientSpan != null) {
return clientSpan.getViewPointId();
}
return serverSpan.getViewPointId();
}
public CallType getCallType() {
if (clientSpan != null) {
return CallType.convert(clientSpan.getCallType());
}
return CallType.convert(serverSpan.getCallType());
}
public long getCost() {
long resultCost = 0;
switch (getCallType()) {
case ASYNC:
resultCost = getClientCost() + getServerCost();
break;
case SYNC:
resultCost = getClientCost();
if (getClientSpan() == null) {
resultCost = getServerCost();
}
break;
case LOCAL:
resultCost = getClientCost();
break;
}
return resultCost;
}
private long getClientCost() {
if (clientSpan != null) {
return clientSpan.getCost();
}
return 0;
}
private long getServerCost() {
if (serverSpan != null) {
return serverSpan.getCost();
}
return 0;
}
public String getBusinessKey() {
if (clientSpan != null) {
return clientSpan.getBusinessKey();
}
return serverSpan.getBusinessKey();
}
public void setBusinessKey(String businessKey) {
if (clientSpan != null) {
clientSpan.setBusinessKey(businessKey);
return;
}
serverSpan.setBusinessKey(businessKey);
}
public ChainNode.NodeStatus getSpanStatus() {
if (clientSpan != null) {
if (clientSpan.getExceptionStack() != null && clientSpan.getExceptionStack().length() > 0) {
if (clientSpan.getStatusCode() == 1) {
return ChainNode.NodeStatus.ABNORMAL;
} else {
return ChainNode.NodeStatus.HUMAN_INTERRUPTION;
}
}
}
if (serverSpan != null) {
if (serverSpan.getExceptionStack() != null && serverSpan.getExceptionStack().length() > 0) {
if (clientSpan != null && clientSpan.getStatusCode() == 1) {
return ChainNode.NodeStatus.ABNORMAL;
} else {
return ChainNode.NodeStatus.HUMAN_INTERRUPTION;
}
}
}
return ChainNode.NodeStatus.NORMAL;
}
public String getExceptionStack() {
StringBuilder exceptionStack = new StringBuilder();
if (clientSpan != null) {
exceptionStack.append("Client" + clientSpan.getExceptionStack());
}
if (serverSpan != null) {
exceptionStack.append("Server cause by :" + serverSpan.getExceptionStack());
}
return exceptionStack.toString();
}
public String getApplicationId() {
if (clientSpan != null) {
return clientSpan.getApplicationId();
}
return serverSpan.getApplicationId();
}
public FullSpan getClientSpan() {
return clientSpan;
}
public FullSpan getServerSpan() {
return serverSpan;
}
public void setSpan(FullSpan span) {
this.clientSpan = span;
}
public String getSpanType() {
if (clientSpan != null) {
return clientSpan.getSpanTypeDesc();
}
return serverSpan.getSpanTypeDesc();
}
public String getUserId() {
if (clientSpan != null) {
return clientSpan.getUserId();
}
return serverSpan.getUserId();
}
public long getStartDate() {
if (clientSpan != null) {
return clientSpan.getStartDate();
}
return serverSpan.getStartDate();
}
}
package com.a.eye.skywalking.analysis.chainbuild.action;
import java.io.IOException;
import java.sql.SQLException;
public interface IStatisticsAction {
void doAction(String summaryData) throws IOException;
void doSave() throws InterruptedException, SQLException, IOException;
}
package com.a.eye.skywalking.analysis.chainbuild.action.impl;
import java.io.IOException;
import java.sql.SQLException;
import com.a.eye.skywalking.analysis.chainbuild.entity.CallChainTree;
import com.a.eye.skywalking.analysis.chainbuild.action.IStatisticsAction;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainInfo;
import com.a.eye.skywalking.analysis.chainbuild.po.SpecificTimeCallChainTreeContainer;
import com.google.gson.Gson;
public class CallChainRelationshipAction implements IStatisticsAction {
private CallChainTree chainTree;
private SpecificTimeCallChainTreeContainer container;
public CallChainRelationshipAction(String entryKey) throws IOException {
chainTree = CallChainTree.create(entryKey);
container = new SpecificTimeCallChainTreeContainer(chainTree.getTreeToken());
}
@Override
public void doAction(String summaryData) throws IOException {
ChainInfo chainInfo = new Gson().fromJson(summaryData, ChainInfo.class);
container.addChainIfNew(chainInfo);
}
@Override
public void doSave() throws InterruptedException, SQLException, IOException {
container.saveToHBase();
}
}
package com.a.eye.skywalking.analysis.chainbuild.action.impl;
import java.io.IOException;
import java.sql.SQLException;
import com.a.eye.skywalking.analysis.chainbuild.action.IStatisticsAction;
import com.a.eye.skywalking.analysis.chainbuild.entity.CallChainTree;
import com.a.eye.skywalking.analysis.chainbuild.entity.CallChainTreeNode;
import com.a.eye.skywalking.analysis.chainbuild.entity.ChainNodeSpecificTimeWindowSummaryValue;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainNode;
import com.a.eye.skywalking.analysis.chainbuild.po.SummaryType;
import com.google.gson.Gson;
public class NumberOfCalledStatisticsAction implements IStatisticsAction {
private CallChainTree callChainTree;
private String summaryDate;
private SummaryType summaryType;
/**
* 统计任务的主要存储值
*
* TODO:配置本次修改
*/
private ChainNodeSpecificTimeWindowSummaryValue summaryValue = new ChainNodeSpecificTimeWindowSummaryValue();
public NumberOfCalledStatisticsAction(String entryKey, String summaryDate) throws IOException {
callChainTree = CallChainTree.create(entryKey);
this.summaryDate = summaryDate;
}
@Override
public void doAction(String summaryData) throws IOException {
/**
* TODO:根据MAP的优化,大幅度减少对象逻辑复杂度
*
* 每次传递来的数据,根据NodeStatus的值在内存中快速累加到ChainNodeSpecificTimeWindowSummaryValue中
*
* ChainNodeSpecificTimeWindowSummaryValue执行以下逻辑:
* 1.totalCall默认+1
* 2.totalCostTime根据cost累加
* 3.correctNumber和humanInterruptionNumber根据状态累加
*
* 注意:
* 1.减少对象创建和嵌套
* 2.此过程中不要操作任何hbase数据
*/
ChainNode chainNode = new Gson().fromJson(summaryData, ChainNode.class);
CallChainTreeNode newCallChainTreeNode = new CallChainTreeNode(chainNode);
CallChainTreeNode callChainTreeNode = callChainTree.getNodes().get(newCallChainTreeNode.getTreeNodeId());
if (callChainTreeNode == null) {
callChainTreeNode = newCallChainTreeNode;
callChainTree.getNodes().put(newCallChainTreeNode.getTreeNodeId(), callChainTreeNode);
}
callChainTreeNode.summary(callChainTree.getTreeToken(), chainNode, summaryType, summaryDate);
}
@Override
public void doSave() throws InterruptedException, SQLException, IOException {
/**
* TODO:新的写入逻辑,快速完成
*
* 根据Config.AnalysisServer.IS_ACCUMULATE_MODE决定,在需要时,精确读取指定rowkey的指定列值。
* 和ChainNodeSpecificTimeWindowSummaryValue值进行简单相加,然后汇总写入。
*
* 注意:
* 1.减少对象创建和嵌套
* 2。确保任务的告诉高效完成
*/
callChainTree.saveToHBase(summaryType);
}
public void setSummaryType(SummaryType summaryType) {
this.summaryType = summaryType;
}
}
package com.a.eye.skywalking.analysis.chainbuild.entity;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import com.a.eye.skywalking.analysis.chainbuild.DBCallChainInfoDao;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainNode;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainInfo;
import com.google.gson.Gson;
public class CallChainDetailForMysql {
private String chainToken;
private String treeToken;
private Map<String, ChainNode> chainNodeMap = new HashMap<String, ChainNode>();
private String userId;
public CallChainDetailForMysql(ChainInfo chainInfo, String treeToken) {
chainToken = chainInfo.getCID();
for (ChainNode chainNode : chainInfo.getNodes()) {
chainNodeMap.put(chainNode.getTraceLevelId(), chainNode);
}
userId = chainInfo.getUserId();
this.treeToken = treeToken;
}
@Override
public String toString() {
return new Gson().toJson(this);
}
public void saveToMysql() throws SQLException {
DBCallChainInfoDao.saveChainDetail(this);
}
public Collection<ChainNode> getChainNodes() {
return chainNodeMap.values();
}
public String getUserId() {
return userId;
}
public String getChainToken() {
return chainToken;
}
public String getTreeToken() {
return treeToken;
}
}
package com.a.eye.skywalking.analysis.chainbuild.entity;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import com.a.eye.skywalking.analysis.chainbuild.po.SummaryType;
import com.a.eye.skywalking.analysis.chainbuild.util.TokenGenerator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class CallChainTree {
private Logger logger = LogManager.getLogger(CallChainTree.class);
private String callEntrance;
private String treeToken;
/**
* 命名规则:levelId + @ + viewPoint
* 存放各级的各个viewpoint节点的统计数值
*/
private Map<String, CallChainTreeNode> nodes;
public CallChainTree(String callEntrance) {
nodes = new HashMap<String, CallChainTreeNode>();
this.callEntrance = callEntrance;
this.treeToken = TokenGenerator.generateTreeToken(callEntrance);
logger.info("CallEntrance:[{}] == TreeToken[{}]",callEntrance, treeToken);
}
public static CallChainTree create(String callEntrance) throws IOException {
CallChainTree chain = new CallChainTree(callEntrance);
return chain;
}
public void saveToHBase(SummaryType summaryType) throws IOException, InterruptedException {
for (CallChainTreeNode entry : nodes.values()) {
entry.saveSummaryResultToHBase(summaryType);
}
}
public String getTreeToken() {
return treeToken;
}
public Map<String, CallChainTreeNode> getNodes() {
return nodes;
}
}
package com.a.eye.skywalking.analysis.chainbuild.entity;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainNode;
import com.a.eye.skywalking.analysis.chainbuild.po.SummaryType;
import com.a.eye.skywalking.analysis.chainbuild.util.HBaseUtil;
import com.a.eye.skywalking.analysis.config.Config;
import com.a.eye.skywalking.analysis.config.HBaseTableMetaData;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.Expose;
import org.apache.hadoop.hbase.client.Put;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.*;
/**
* 调用树的每个traceLevelId + "@" + viewPointId构成一个树节点<br/>
* 虚拟化节点概念。节点存储落地时,按照节点对应的时间戳<br/>
*
* @author wusheng
*/
public class CallChainTreeNode {
private Logger logger = LogManager.getLogger(CallChainTreeNode.class);
@Expose
private String traceLevelId;
@Expose
private String viewPointId;
/**
* key: treeId + 小时 value: 当前树的当前小时范围内的,所有分钟和节点的统计数据
*/
private Map<String, ChainNodeSpecificMinSummary> chainNodeSpecificMinSummaryContainer;
/**
* key: treeId + 天 value: 当前树的当前天范围内的,所有小时和节点的统计数据
*/
private Map<String, ChainNodeSpecificHourSummary> chainNodeSpecificHourSummaryContainer;
/**
* key: treeId + 月 value: 当前树的当前月范围内的,所有天和节点的统计数据
*/
private Map<String, ChainNodeSpecificDaySummary> chainNodeSpecificDaySummaryContainer;
/**
* key: treeId + 年 value: 当前树的当前年范围内的,所有月份和节点的统计数据
*/
private Map<String, ChainNodeSpecificMonthSummary> chainNodeSpecificMonthSummaryContainer;
public CallChainTreeNode(ChainNode node) {
this.traceLevelId = node.getTraceLevelId();
chainNodeSpecificMinSummaryContainer = new HashMap<String, ChainNodeSpecificMinSummary>();
chainNodeSpecificHourSummaryContainer = new HashMap<String, ChainNodeSpecificHourSummary>();
chainNodeSpecificDaySummaryContainer = new HashMap<String, ChainNodeSpecificDaySummary>();
chainNodeSpecificMonthSummaryContainer = new HashMap<String, ChainNodeSpecificMonthSummary>();
this.viewPointId = node.getViewPoint();
}
/**
* 针对节点、汇总类型,进行数据汇总。<br/>
* 已汇总数据加载使用lazy load模式,只有此次汇总过程中触发的数据节点统计数量,才会被读取<br/>
*
* @param treeId
* @param node
* @param summaryType
* @param summaryDate
* @throws IOException
*/
public void summary(String treeId, ChainNode node, SummaryType summaryType,
String summaryDate) throws IOException {
Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date(node.getStartDate()));
switch (summaryType) {
case HOUR: {
summaryMinResult(treeId, node, calendar, summaryDate);
break;
}
case DAY: {
summaryHourResult(treeId, node, calendar, summaryDate);
break;
}
case MONTH: {
summaryDayResult(treeId, node, calendar, summaryDate);
break;
}
case YEAR: {
summaryMonthResult(treeId, node, calendar, summaryDate);
break;
}
default: {
logger.error("unknown summary type :{}", summaryType);
break;
}
}
}
private void summaryMonthResult(String treeId, ChainNode node,
Calendar calendar, String summaryDate) throws IOException {
String keyOfMonthSummaryTable = generateRowKey(treeId, summaryDate);
ChainNodeSpecificMonthSummary monthSummary = chainNodeSpecificMonthSummaryContainer
.get(keyOfMonthSummaryTable);
if (monthSummary == null) {
if (Config.AnalysisServer.IS_ACCUMULATE_MODE) {
monthSummary = HBaseUtil.loadSpecificMonthSummary(
keyOfMonthSummaryTable, getTreeNodeId());
} else {
monthSummary = new ChainNodeSpecificMonthSummary();
}
chainNodeSpecificMonthSummaryContainer.put(keyOfMonthSummaryTable,
monthSummary);
}
monthSummary.summary(String.valueOf(calendar.get(Calendar.MONTH) + 1),
node);
}
private void summaryDayResult(String treeId, ChainNode node,
Calendar calendar, String summaryDate) throws IOException {
String keyOfDaySummaryTable = generateRowKey(treeId, summaryDate);
ChainNodeSpecificDaySummary daySummary = chainNodeSpecificDaySummaryContainer
.get(keyOfDaySummaryTable);
if (daySummary == null) {
if (Config.AnalysisServer.IS_ACCUMULATE_MODE) {
daySummary = HBaseUtil.loadSpecificDaySummary(
keyOfDaySummaryTable, getTreeNodeId());
} else {
daySummary = new ChainNodeSpecificDaySummary();
}
chainNodeSpecificDaySummaryContainer.put(keyOfDaySummaryTable,
daySummary);
}
daySummary.summary(String.valueOf(calendar.get(Calendar.DAY_OF_MONTH)),
node);
}
private void summaryHourResult(String treeId, ChainNode node,
Calendar calendar, String summaryDate) throws IOException {
String keyOfHourSummaryTable = generateRowKey(treeId, summaryDate);
ChainNodeSpecificHourSummary hourSummary = chainNodeSpecificHourSummaryContainer
.get(keyOfHourSummaryTable);
if (hourSummary == null) {
if (Config.AnalysisServer.IS_ACCUMULATE_MODE) {
hourSummary = HBaseUtil.loadSpecificHourSummary(
keyOfHourSummaryTable, getTreeNodeId());
} else {
hourSummary = new ChainNodeSpecificHourSummary();
}
chainNodeSpecificHourSummaryContainer.put(keyOfHourSummaryTable,
hourSummary);
}
hourSummary.summary(String.valueOf(calendar.get(Calendar.HOUR)), node);
}
/**
* 按分钟维度进行汇总<br/>
* chainNodeContainer以treeId和时间(精确到分钟)为key,value为当前时间范围内的所有分钟的汇总数据
*/
private void summaryMinResult(String treeId, ChainNode node,
Calendar calendar, String summaryDate) throws IOException {
String keyOfMinSummaryTable = generateRowKey(treeId, summaryDate);
ChainNodeSpecificMinSummary minSummary = chainNodeSpecificMinSummaryContainer
.get(keyOfMinSummaryTable);
if (minSummary == null) {
if (Config.AnalysisServer.IS_ACCUMULATE_MODE) {
minSummary = HBaseUtil.loadSpecificMinSummary(
keyOfMinSummaryTable, getTreeNodeId());
} else {
minSummary = new ChainNodeSpecificMinSummary();
}
chainNodeSpecificMinSummaryContainer.put(keyOfMinSummaryTable,
minSummary);
}
minSummary.summary(String.valueOf(calendar.get(Calendar.MINUTE)), node);
}
private String generateRowKey(String treeId, String dateKey) {
return treeId + "/" + dateKey;
}
@Override
public String toString() {
return new GsonBuilder().excludeFieldsWithoutExposeAnnotation()
.create().toJson(this);
}
/**
* 存储入库时 <br/>
* hbase的key 为 treeId + 小时 <br/>
* 列族中,列为节点id,规则为:traceLevelId + "@" + viewPointId <br/>
* 列的值,为当前节点按小时内各分钟的汇总 <br/>
*
* @throws IOException
* @throws InterruptedException
*/
public void saveSummaryResultToHBase(SummaryType summaryType)
throws IOException, InterruptedException {
switch (summaryType) {
case HOUR: {
batchSaveMinSummaryResult();
break;
}
case DAY: {
batchSaveHourSummaryResult();
break;
}
case MONTH: {
batchSaveDaySummaryResult();
break;
}
case YEAR: {
batchSaveMonthSummaryResult();
break;
}
default: {
logger.error("unknown summary type :{}", summaryType);
break;
}
}
}
private void batchSaveMonthSummaryResult() throws IOException,
InterruptedException {
List<Put> puts = new ArrayList<Put>();
for (Map.Entry<String, ChainNodeSpecificMonthSummary> entry : chainNodeSpecificMonthSummaryContainer
.entrySet()) {
Put put = new Put(entry.getKey().getBytes());
put.addColumn(
HBaseTableMetaData.TABLE_CHAIN_ONE_MONTH_SUMMARY.COLUMN_FAMILY_NAME
.getBytes(), getTreeNodeId().getBytes(), entry
.getValue().toString().getBytes());
puts.add(put);
}
HBaseUtil.batchSaveMonthSummaryResult(puts);
}
private void batchSaveDaySummaryResult() throws IOException,
InterruptedException {
List<Put> puts = new ArrayList<Put>();
for (Map.Entry<String, ChainNodeSpecificDaySummary> entry : chainNodeSpecificDaySummaryContainer
.entrySet()) {
Put put = new Put(entry.getKey().getBytes());
put.addColumn(
HBaseTableMetaData.TABLE_CHAIN_ONE_DAY_SUMMARY.COLUMN_FAMILY_NAME
.getBytes(), getTreeNodeId().getBytes(), entry
.getValue().toString().getBytes());
puts.add(put);
}
HBaseUtil.batchSaveDaySummaryResult(puts);
}
private void batchSaveHourSummaryResult() throws IOException,
InterruptedException {
List<Put> puts = new ArrayList<Put>();
for (Map.Entry<String, ChainNodeSpecificHourSummary> entry : chainNodeSpecificHourSummaryContainer
.entrySet()) {
Put put = new Put(entry.getKey().getBytes());
put.addColumn(
HBaseTableMetaData.TABLE_CHAIN_ONE_HOUR_SUMMARY.COLUMN_FAMILY_NAME
.getBytes(), getTreeNodeId().getBytes(), entry
.getValue().toString().getBytes());
puts.add(put);
}
HBaseUtil.batchSaveHourSummaryResult(puts);
}
private void batchSaveMinSummaryResult() throws IOException,
InterruptedException {
List<Put> puts = new ArrayList<Put>();
for (Map.Entry<String, ChainNodeSpecificMinSummary> entry : chainNodeSpecificMinSummaryContainer
.entrySet()) {
Put put = new Put(entry.getKey().getBytes());
put.addColumn(
HBaseTableMetaData.TABLE_CHAIN_ONE_MINUTE_SUMMARY.COLUMN_FAMILY_NAME
.getBytes(), getTreeNodeId().getBytes(), entry
.getValue().toString().getBytes());
puts.add(put);
}
HBaseUtil.batchSaveMinSummaryResult(puts);
}
public String getTreeNodeId() {
return traceLevelId + "@" + viewPointId;
}
}
package com.a.eye.skywalking.analysis.chainbuild.entity;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainNode;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
import java.util.HashMap;
import java.util.Map;
/**
* Created by xin on 16-3-10.
*/
public class ChainNodeSpecificDaySummary {
/**
* key : 天
*/
private Map<String, ChainNodeSpecificTimeWindowSummaryValue> summaryValueMap;
public ChainNodeSpecificDaySummary(String originData) {
JsonObject jsonObject = (JsonObject) new JsonParser().parse(originData);
summaryValueMap = new Gson().fromJson(jsonObject.get("summaryValueMap").toString(),
new TypeToken<Map<String, ChainNodeSpecificTimeWindowSummaryValue>>() {
}.getType());
}
public ChainNodeSpecificDaySummary() {
summaryValueMap = new HashMap<String, ChainNodeSpecificTimeWindowSummaryValue>();
}
public void summary(String minute, ChainNode node) {
ChainNodeSpecificTimeWindowSummaryValue summarValue = summaryValueMap.get(minute);
if (summarValue == null) {
summarValue = new ChainNodeSpecificTimeWindowSummaryValue();
summaryValueMap.put(minute, summarValue);
}
summarValue.summary(node);
}
@Override
public String toString() {
return new Gson().toJson(this);
}
}
package com.a.eye.skywalking.analysis.chainbuild.entity;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainNode;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
import java.util.HashMap;
import java.util.Map;
public class ChainNodeSpecificHourSummary {
/**
* key : 小时
*/
private Map<String, ChainNodeSpecificTimeWindowSummaryValue> summaryValueMap;
public ChainNodeSpecificHourSummary(String originData) {
JsonObject jsonObject = (JsonObject) new JsonParser().parse(originData);
summaryValueMap = new Gson().fromJson(jsonObject.get("summaryValueMap").toString(),
new TypeToken<Map<String, ChainNodeSpecificTimeWindowSummaryValue>>() {
}.getType());
}
public ChainNodeSpecificHourSummary() {
summaryValueMap = new HashMap<String, ChainNodeSpecificTimeWindowSummaryValue>();
}
public void summary(String minute, ChainNode node) {
ChainNodeSpecificTimeWindowSummaryValue summarValue = summaryValueMap.get(minute);
if (summarValue == null) {
summarValue = new ChainNodeSpecificTimeWindowSummaryValue();
summaryValueMap.put(minute, summarValue);
}
summarValue.summary(node);
}
@Override
public String toString() {
return new Gson().toJson(this);
}
}
package com.a.eye.skywalking.analysis.chainbuild.entity;
import java.util.HashMap;
import java.util.Map;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainNode;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
public class ChainNodeSpecificMinSummary {
/**
* key : 分钟
* value: 各节点统计数据
*/
private Map<String, ChainNodeSpecificTimeWindowSummaryValue> summaryValueMap;
public ChainNodeSpecificMinSummary(String originData) {
JsonObject jsonObject = (JsonObject) new JsonParser().parse(originData);
summaryValueMap = new Gson().fromJson(jsonObject.get("summaryValueMap").toString(),
new TypeToken<Map<String, ChainNodeSpecificTimeWindowSummaryValue>>() {
}.getType());
}
public ChainNodeSpecificMinSummary() {
summaryValueMap = new HashMap<String, ChainNodeSpecificTimeWindowSummaryValue>();
}
public void summary(String minute, ChainNode node) {
ChainNodeSpecificTimeWindowSummaryValue summarValue = summaryValueMap.get(minute);
if (summarValue == null) {
summarValue = new ChainNodeSpecificTimeWindowSummaryValue();
summaryValueMap.put(minute, summarValue);
}
summarValue.summary(node);
}
@Override
public String toString() {
return new Gson().toJson(this);
}
}
package com.a.eye.skywalking.analysis.chainbuild.entity;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainNode;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
import java.util.HashMap;
import java.util.Map;
/**
* Created by xin on 16-3-10.
*/
public class ChainNodeSpecificMonthSummary {
/**
* key : 月
*/
private Map<String, ChainNodeSpecificTimeWindowSummaryValue> summaryValueMap;
public ChainNodeSpecificMonthSummary(String originData) {
JsonObject jsonObject = (JsonObject) new JsonParser().parse(originData);
summaryValueMap = new Gson().fromJson(jsonObject.get("summaryValueMap").toString(),
new TypeToken<Map<String, ChainNodeSpecificTimeWindowSummaryValue>>() {
}.getType());
}
public ChainNodeSpecificMonthSummary() {
summaryValueMap = new HashMap<String, ChainNodeSpecificTimeWindowSummaryValue>();
}
public void summary(String month, ChainNode node) {
ChainNodeSpecificTimeWindowSummaryValue summarValue = summaryValueMap.get(month);
if (summarValue == null) {
summarValue = new ChainNodeSpecificTimeWindowSummaryValue();
summaryValueMap.put(month, summarValue);
}
summarValue.summary(node);
}
@Override
public String toString() {
return new Gson().toJson(this);
}
}
package com.a.eye.skywalking.analysis.chainbuild.entity;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainNode;
public class ChainNodeSpecificTimeWindowSummaryValue {
private long totalCall;
private long totalCostTime;
private long correctNumber;
private long humanInterruptionNumber;
public ChainNodeSpecificTimeWindowSummaryValue() {
totalCall = 0;
totalCostTime = 0;
correctNumber = 0;
humanInterruptionNumber = 0;
}
public void summary(ChainNode node) {
totalCall++;
if (node.getStatus() == ChainNode.NodeStatus.NORMAL) {
correctNumber++;
}
if (node.getStatus() == ChainNode.NodeStatus.HUMAN_INTERRUPTION) {
humanInterruptionNumber++;
}
totalCostTime += node.getCost();
}
}
package com.a.eye.skywalking.analysis.chainbuild.exception;
public class Tid2CidECovertException extends Exception{
private static final long serialVersionUID = -4679233837335940374L;
public Tid2CidECovertException(String msg){
super(msg);
}
public Tid2CidECovertException(String msg, Exception cause){
super(msg, cause);
}
}
package com.a.eye.skywalking.analysis.chainbuild.filter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.a.eye.skywalking.analysis.config.Config;
public class SpanNodeProcessChain {
private static Logger logger = LoggerFactory.getLogger(SpanNodeProcessChain.class.getName());
private static Map<String, SpanNodeProcessFilter> filterMap;
private static Object lock = new Object();
private SpanNodeProcessChain() {
//Non
}
private static void initFilterMap(Map<String, SpanNodeProcessFilter> filterMap) {
Properties properties = new Properties();
try {
properties.load(SpanNodeProcessChain.class.getResourceAsStream("/viewpointfilter.conf"));
} catch (IOException e) {
logger.error("Failed to searchRelationship config file[viewpointfilter.conf]", e);
System.exit(-1);
}
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
String[] filters = ((String) entry.getValue()).split("->");
String[] types = ((String) entry.getKey()).split(",");
SpanNodeProcessFilter currentFilter = null;
for (int i = filters.length - 1; i >= 0; i--) {
try {
Class<?> filterClass = Class.forName(Config.Filter.FILTER_PACKAGE_NAME + "." + filters[i]);
SpanNodeProcessFilter tmpSpanNodeFilter = (SpanNodeProcessFilter) filterClass.newInstance();
tmpSpanNodeFilter.setNextProcessChain(currentFilter);
currentFilter = tmpSpanNodeFilter;
} catch (ClassNotFoundException e) {
logger.error("Filed to searchRelationship class[" + Config.Filter.FILTER_PACKAGE_NAME + "." + filters[i] + "]", e);
System.exit(-1);
} catch (InstantiationException e) {
logger.error("Can not instance class[" + Config.Filter.FILTER_PACKAGE_NAME + "." + filters[i] + "]", e);
System.exit(-1);
} catch (IllegalAccessException e) {
logger.error("Can not access class[" + Config.Filter.FILTER_PACKAGE_NAME + "." + filters[i] + "]", e);
System.exit(-1);
} catch (ClassCastException e) {
logger.error("Class [" + Config.Filter.FILTER_PACKAGE_NAME + "." + filters[i] + "] is not subclass of " +
"com.a.eye.skywalking.analysis.filter.SpanNodeProcessFilter", e);
System.exit(-1);
}
}
for (String type : types) {
filterMap.put(type, currentFilter);
}
}
}
public static SpanNodeProcessFilter getProcessChainByCallType(String callType) {
if (filterMap == null) {
synchronized (lock) {
if (filterMap == null) {
filterMap = new HashMap<String, SpanNodeProcessFilter>();
initFilterMap(filterMap);
}
}
}
if (filterMap.containsKey(callType)) {
return filterMap.get(callType);
}
return filterMap.get("default");
}
}
package com.a.eye.skywalking.analysis.chainbuild.filter;
import com.a.eye.skywalking.analysis.chainbuild.SpanEntry;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainNode;
import com.a.eye.skywalking.analysis.chainbuild.util.SubLevelSpanCostCounter;
public abstract class SpanNodeProcessFilter {
private SpanNodeProcessFilter nextProcessChain;
public abstract void doFilter(SpanEntry spanEntry, ChainNode node, SubLevelSpanCostCounter costMap);
protected void doNext(SpanEntry spanEntry, ChainNode node, SubLevelSpanCostCounter costMap){
if(nextProcessChain != null){
nextProcessChain.doFilter(spanEntry, node, costMap);
}
}
void setNextProcessChain(SpanNodeProcessFilter nextProcessChain) {
this.nextProcessChain = nextProcessChain;
}
}
package com.a.eye.skywalking.analysis.chainbuild.filter.impl;
import com.a.eye.skywalking.analysis.chainbuild.SpanEntry;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainNode;
import com.a.eye.skywalking.analysis.chainbuild.util.SubLevelSpanCostCounter;
import com.a.eye.skywalking.analysis.chainbuild.filter.SpanNodeProcessFilter;
public class AppendBusinessKeyFilter extends SpanNodeProcessFilter {
@Override
public void doFilter(SpanEntry spanEntry, ChainNode node, SubLevelSpanCostCounter costMap) {
node.setViewPoint(node.getViewPoint() + spanEntry.getBusinessKey());
this.doNext(spanEntry, node, costMap);
}
}
package com.a.eye.skywalking.analysis.chainbuild.filter.impl;
import com.a.eye.skywalking.analysis.chainbuild.SpanEntry;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainNode;
import com.a.eye.skywalking.analysis.chainbuild.filter.SpanNodeProcessFilter;
import com.a.eye.skywalking.analysis.chainbuild.util.SubLevelSpanCostCounter;
public class CopyAttrFilter extends SpanNodeProcessFilter {
@Override
public void doFilter(SpanEntry spanEntry, ChainNode node, SubLevelSpanCostCounter costMap) {
node.setCallType(spanEntry.getCallType().toString());
node.setStatus(spanEntry.getSpanStatus());
node.setLevelId(spanEntry.getLevelId());
node.setParentLevelId(spanEntry.getParentLevelId());
node.setViewPoint(spanEntry.getViewPoint());
node.setUserId(spanEntry.getUserId());
node.setBusinessKey(spanEntry.getBusinessKey());
node.setStartDate(spanEntry.getStartDate());
this.doNext(spanEntry, node, costMap);
}
}
package com.a.eye.skywalking.analysis.chainbuild.filter.impl;
import com.a.eye.skywalking.analysis.chainbuild.SpanEntry;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.a.eye.skywalking.analysis.chainbuild.filter.SpanNodeProcessFilter;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainNode;
import com.a.eye.skywalking.analysis.chainbuild.util.SubLevelSpanCostCounter;
public class JDBCBusinessKeyHandleFilter extends SpanNodeProcessFilter {
private Logger logger = LogManager.getLogger(JDBCBusinessKeyHandleFilter.class);
@Override
public void doFilter(SpanEntry spanEntry, ChainNode node, SubLevelSpanCostCounter costMap) {
String businessKey = spanEntry.getBusinessKey();
if (businessKey != null) {
int index = businessKey.lastIndexOf(":");
if (index != -1) {
businessKey = businessKey.substring(index + 1).trim();
String key = businessKey.toUpperCase();
if (key.startsWith("SELECT")) {
businessKey = subBusinessKey(businessKey, key, "WHERE");
} else if (key.startsWith("UPDATE")) {
businessKey = subBusinessKey(businessKey, key, "SET");
} else if (key.startsWith("DELETE")) {
businessKey = subBusinessKey(businessKey, key, "WHERE");
} else if (key.startsWith("INSERT")) {
businessKey = subBusinessKey(businessKey, key, "VALUES");
}
}
}
spanEntry.setBusinessKey(businessKey);
this.doNext(spanEntry, node, costMap);
}
private String subBusinessKey(String businessKey, String key, String keyWord) {
int whereIndex = key.indexOf(keyWord);
if (whereIndex != -1) {
businessKey = businessKey.substring(0, whereIndex - 1).trim();
}
return businessKey;
}
}
package com.a.eye.skywalking.analysis.chainbuild.filter.impl;
import com.a.eye.skywalking.analysis.chainbuild.SpanEntry;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainNode;
import com.a.eye.skywalking.analysis.chainbuild.filter.SpanNodeProcessFilter;
import com.a.eye.skywalking.analysis.chainbuild.util.SubLevelSpanCostCounter;
public class ProcessCostTimeFilter extends SpanNodeProcessFilter {
@Override
public void doFilter(SpanEntry spanEntry, ChainNode node, SubLevelSpanCostCounter costMap) {
node.setCost(spanEntry.getCost());
this.saveCostAsSubNodeCost(spanEntry, node, costMap);
this.computeChainNodeCost(costMap, node);
this.doNext(spanEntry, node, costMap);
}
private void saveCostAsSubNodeCost(SpanEntry spanEntry, ChainNode node,
SubLevelSpanCostCounter costMap) {
long subNodeCost = spanEntry.getCost();
if (costMap.exists(spanEntry.getParentLevelId())) {
subNodeCost += costMap.get(spanEntry.getParentLevelId());
}
costMap.put(spanEntry.getParentLevelId(), subNodeCost);
}
private void computeChainNodeCost(SubLevelSpanCostCounter costMap, ChainNode node) {
String levelId = node.getParentLevelId();
if (levelId != null && levelId.length() > 0) {
levelId += ".";
}
levelId += node.getLevelId() + "";
if (costMap.exists(levelId)) {
node.setCost(node.getCost() - costMap.get(levelId));
}
}
}
package com.a.eye.skywalking.analysis.chainbuild.filter.impl;
import com.a.eye.skywalking.analysis.chainbuild.SpanEntry;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainNode;
import com.a.eye.skywalking.analysis.chainbuild.util.SubLevelSpanCostCounter;
import com.a.eye.skywalking.analysis.chainbuild.filter.SpanNodeProcessFilter;
public class ReplaceAddressFilter extends SpanNodeProcessFilter {
//ip:PORT regex
private static String IP_PORT_REGEX = "(([0-9]|[1-9][0-9]|1([0-9][0-9])|2([0-4][0-9]|5[0-5]))\\.([0-9]|[1-9][0-9]|1([0-9][0-9])|2([0-4][0-9]|5[0-5]))\\.([0-9]|[1-9][0-9]|1([0-9][0-9])|2([0-4][0-9]|5[0-5]))\\.([0-9]|[1-9][0-9]|1([0-9][0-9])|2([0-4][0-9]|5[0-5]))|localhost):([1-5][0-9]{4}|6[0-4][0-9]{3}|65[0-4][0-9]{2}|655[0-2][0-9]|6553[0-5]|[0-9]{1,4})";
@Override
public void doFilter(SpanEntry spanEntry, ChainNode node, SubLevelSpanCostCounter costMap) {
String viewPoint = spanEntry.getViewPoint().replaceAll(IP_PORT_REGEX, spanEntry.getApplicationId());
node.setViewPoint(viewPoint);
this.doNext(spanEntry, node, costMap);
}
}
package com.a.eye.skywalking.analysis.chainbuild.filter.impl;
import com.a.eye.skywalking.analysis.chainbuild.SpanEntry;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainNode;
import com.a.eye.skywalking.analysis.chainbuild.util.SubLevelSpanCostCounter;
import com.a.eye.skywalking.analysis.chainbuild.util.TokenGenerator;
import com.a.eye.skywalking.analysis.chainbuild.filter.SpanNodeProcessFilter;
public class TokenGenerateFilter extends SpanNodeProcessFilter {
@Override
public void doFilter(SpanEntry spanEntry, ChainNode node, SubLevelSpanCostCounter costMap) {
String nodeToken = TokenGenerator.generateNodeToken(node.getParentLevelId() + "." + node.getLevelId() +
"-" + node.getViewPoint());
node.setNodeToken(nodeToken);
this.doNext(spanEntry, node, costMap);
}
}
package com.a.eye.skywalking.analysis.chainbuild.po;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import com.a.eye.skywalking.analysis.chainbuild.exception.Tid2CidECovertException;
import com.a.eye.skywalking.analysis.chainbuild.util.TokenGenerator;
import com.a.eye.skywalking.analysis.config.HBaseTableMetaData;
import org.apache.hadoop.hbase.client.Put;
public class ChainInfo implements Serializable {
private static final long serialVersionUID = -7194044877533469817L;
/**
* 0节点的viewpoint,用于明文标识入口
*/
private String callEntrance;
private String cid;
private ChainStatus chainStatus = ChainStatus.NORMAL;
private List<ChainNode> nodes = new ArrayList<ChainNode>();
private String userId = null;
private ChainNode firstChainNode;
private long startDate;
private String tid;
public ChainInfo(String tid) {
super();
this.tid = tid;
}
public String getCID() {
return cid;
}
public String getEntranceNodeToken() throws Tid2CidECovertException {
if (firstChainNode == null) {
throw new Tid2CidECovertException("tid[" + tid + "] can't find span node with level=0.");
} else {
return this.getUserId() + ":"
+ firstChainNode.getNodeToken();
}
}
public void generateChainToken() {
StringBuilder chainTokenDesc = new StringBuilder();
for (ChainNode node : nodes) {
chainTokenDesc.append(node.getParentLevelId() + "."
+ node.getLevelId() + "-" + node.getNodeToken() + ";");
}
this.cid = TokenGenerator.generateCID(chainTokenDesc.toString());
}
public ChainStatus getChainStatus() {
return chainStatus;
}
public void setChainStatus(ChainStatus chainStatus) {
this.chainStatus = chainStatus;
}
public void addNodes(ChainNode chainNode) {
this.nodes.add(0, chainNode);
if (chainNode.getStatus() == ChainNode.NodeStatus.ABNORMAL
|| chainNode.getStatus() == ChainNode.NodeStatus.HUMAN_INTERRUPTION) {
chainStatus = ChainStatus.ABNORMAL;
}
if (userId == null) {
userId = chainNode.getUserId();
}
if ((chainNode.getParentLevelId() == null || chainNode
.getParentLevelId().length() == 0)
&& chainNode.getLevelId() == 0) {
firstChainNode = chainNode;
startDate = chainNode.getStartDate();
callEntrance = firstChainNode.getViewPoint();
}
}
public List<ChainNode> getNodes() {
return nodes;
}
public String getUserId() {
return userId;
}
public void saveToHBase(Put put) {
for (ChainNode node : nodes){
put.addColumn(HBaseTableMetaData.TABLE_CHAIN_DETAIL.COLUMN_FAMILY_NAME.getBytes(),
node.getTraceLevelId().getBytes(), node.toString().getBytes());
}
}
public enum ChainStatus {
NORMAL('N'), ABNORMAL('A');
private char value;
ChainStatus(char value) {
this.value = value;
}
public static ChainStatus convert(char value) {
switch (value) {
case 'N':
return NORMAL;
case 'A':
return ABNORMAL;
default:
throw new IllegalStateException("Failed to convert[" + value
+ "]");
}
}
@Override
public String toString() {
return value + "";
}
}
public void setCID(String cid) {
this.cid = cid;
}
public long getStartDate() {
return startDate;
}
public String getCallEntrance() {
return callEntrance;
}
}
package com.a.eye.skywalking.analysis.chainbuild.po;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.Expose;
public class ChainNode {
@Expose
private String nodeToken;
@Expose
private String viewPoint;
private String businessKey;
private long cost;
private NodeStatus status;
@Expose
private String parentLevelId;
@Expose
private int levelId;
private String callType;
private long startDate;
@Expose
private String userId;
public String getNodeToken() {
return nodeToken;
}
public void setNodeToken(String nodeToken) {
this.nodeToken = nodeToken;
}
public String getViewPoint() {
return viewPoint;
}
public void setViewPoint(String viewPoint) {
this.viewPoint = viewPoint;
}
public long getCost() {
return cost;
}
public void setCost(long cost) {
this.cost = cost;
}
public NodeStatus getStatus() {
return status;
}
public void setStatus(NodeStatus status) {
this.status = status;
}
public String getParentLevelId() {
return parentLevelId;
}
public void setParentLevelId(String parentLevelId) {
this.parentLevelId = parentLevelId;
}
public int getLevelId() {
return levelId;
}
public void setLevelId(int levelId) {
this.levelId = levelId;
}
public void setCallType(String callType) {
this.callType = callType;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getUserId() {
return userId;
}
public void setBusinessKey(String businessKey) {
this.businessKey = businessKey;
}
public String getBusinessKey() {
return businessKey;
}
public String getTraceLevelId() {
StringBuilder stringBuilder = new StringBuilder();
if (getParentLevelId() != null && getParentLevelId().length() > 0) {
stringBuilder.append(getParentLevelId() + ".");
}
return stringBuilder.append(getLevelId()).toString();
}
public void setStartDate(long startDate) {
this.startDate = startDate;
}
public long getStartDate() {
return startDate;
}
public enum NodeStatus {
NORMAL('N'), ABNORMAL('A'), HUMAN_INTERRUPTION('I');
private char value;
NodeStatus(char value) {
this.value = value;
}
public char getValue() {
return value;
}
public static NodeStatus convert(char value) {
switch (value) {
case 'N':
return NORMAL;
case 'A':
return ABNORMAL;
case 'I':
return HUMAN_INTERRUPTION;
default:
throw new IllegalStateException("Failed to convert[" + value + "]");
}
}
}
@Override
public String toString() {
return new GsonBuilder().excludeFieldsWithoutExposeAnnotation().create().toJson(this);
}
}
package com.a.eye.skywalking.analysis.chainbuild.po;
import com.a.eye.skywalking.analysis.chainbuild.util.HBaseUtil;
import com.a.eye.skywalking.analysis.config.HBaseTableMetaData;
import com.a.eye.skywalking.analysis.chainbuild.entity.CallChainDetailForMysql;
import com.google.gson.Gson;
import org.apache.hadoop.hbase.client.Put;
import java.io.IOException;
import java.sql.SQLException;
import java.util.*;
public class SpecificTimeCallChainTreeContainer {
private String treeToken;
private Map<String, List<String>> hasBeenMergedChainIds;
private Map<String, CallChainDetailForMysql> newChain4DB;
// 本次Reduce合并过的调用链
private Map<String, ChainInfo> newChains;
public SpecificTimeCallChainTreeContainer(String treeToken) {
this.treeToken = treeToken;
hasBeenMergedChainIds = new HashMap<String, List<String>>();
newChains = new HashMap<String, ChainInfo>();
newChain4DB = new HashMap<String, CallChainDetailForMysql>();
}
public void addChainIfNew(ChainInfo chainInfo) throws IOException {
String key = generateKey(chainInfo.getStartDate());
List<String> cIds = hasBeenMergedChainIds.get(key);
if (cIds == null) {
cIds = HBaseUtil.loadHasBeenMergeChainIds(key);
hasBeenMergedChainIds.put(key, cIds);
}
if (!cIds.contains(chainInfo.getCID())) {
cIds.add(chainInfo.getCID());
newChains.put(chainInfo.getCID(), chainInfo);
if (chainInfo.getChainStatus() == ChainInfo.ChainStatus.NORMAL) {
newChain4DB.put(chainInfo.getCID(), new CallChainDetailForMysql(chainInfo, treeToken));
}
}
}
private String generateKey(long date) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date(date));
return treeToken + "@" + calendar.get(Calendar.YEAR) + "-" + calendar.get(Calendar.MONTH);
}
public void saveToHBase() throws IOException, InterruptedException, SQLException {
batchSaveNewChainsInfo();
batchSaveMergedChainId();
batchSaveToMysql();
}
private void batchSaveToMysql() throws SQLException {
for (Map.Entry<String, CallChainDetailForMysql> entry : newChain4DB.entrySet()) {
entry.getValue().saveToMysql();
}
}
/**
* 保存被合并的cid信息列表
*
* @throws IOException
* @throws InterruptedException
*/
private void batchSaveMergedChainId() throws IOException, InterruptedException {
List<Put> chainIdPuts = new ArrayList<Put>();
for (Map.Entry<String, List<String>> entry : hasBeenMergedChainIds.entrySet()) {
Put chainIdPut = new Put(entry.getKey().getBytes());
chainIdPut.addColumn(HBaseTableMetaData.TABLE_CALL_CHAIN_TREE_ID_AND_CID_MAPPING.COLUMN_FAMILY_NAME.getBytes()
, HBaseTableMetaData.TABLE_CALL_CHAIN_TREE_ID_AND_CID_MAPPING.COLUMN_NAME.getBytes(), new Gson().toJson(entry.getValue()).getBytes());
chainIdPuts.add(chainIdPut);
}
HBaseUtil.batchSaveHasBeenMergedCID(chainIdPuts);
}
/**
* 保存已经合并的调用链信息,包含调用链明细
*
* @throws IOException
* @throws InterruptedException
*/
private void batchSaveNewChainsInfo() throws IOException, InterruptedException {
List<Put> chainInfoPuts = new ArrayList<Put>();
for (Map.Entry<String, ChainInfo> entry : newChains.entrySet()) {
Put put = new Put(entry.getKey().getBytes());
entry.getValue().saveToHBase(put);
chainInfoPuts.add(put);
}
HBaseUtil.batchSaveChainInfo(chainInfoPuts);
}
}
package com.a.eye.skywalking.analysis.chainbuild.po;
import com.a.eye.skywalking.analysis.chainbuild.action.IStatisticsAction;
import com.a.eye.skywalking.analysis.chainbuild.action.impl.CallChainRelationshipAction;
import com.a.eye.skywalking.analysis.chainbuild.action.impl.NumberOfCalledStatisticsAction;
import java.io.IOException;
public enum SummaryType {
HOUR('H'), DAY('D'), MONTH('M'), YEAR('Y'), RELATIONSHIP('R');
private char value;
SummaryType(char value) {
this.value = value;
}
public char getValue() {
return value;
}
public static IStatisticsAction chooseSummaryAction(String summaryTypeAndDateStr, String entryKey) throws IOException {
char valueChar = summaryTypeAndDateStr.charAt(0);
// HOUR : 2016-05-02/12
// DAY : 2016-05-02
// MONTH : 2016-05
// YEAR : 2016
// RELATIONSHIP : treeId
String summaryDateStr = summaryTypeAndDateStr.substring(summaryTypeAndDateStr.indexOf("-") + 1);
SummaryType type = null;
switch (valueChar) {
case 'H':
type = HOUR;
break;
case 'D':
type = DAY;
break;
case 'M':
type = MONTH;
break;
case 'Y':
type = YEAR;
break;
case 'R':
return new CallChainRelationshipAction(entryKey);
default:
throw new RuntimeException("Can not find the summary type[" + valueChar + "]");
}
NumberOfCalledStatisticsAction summaryAction = new NumberOfCalledStatisticsAction(entryKey, summaryDateStr);
summaryAction.setSummaryType(type);
return summaryAction;
}
}
package com.a.eye.skywalking.analysis.chainbuild.util;
import com.a.eye.skywalking.protocol.AckSpan;
import com.a.eye.skywalking.protocol.FullSpan;
import com.a.eye.skywalking.protocol.RequestSpan;
import com.a.eye.skywalking.protocol.exception.ConvertFailedException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.HashMap;
import java.util.Map;
public class CellToSpanHandler {
private Map<String, RequestSpan> levelIdToRequestSpans;
private Map<String, AckSpan> levelIdToAckSpans;
public CellToSpanHandler() {
levelIdToRequestSpans = new HashMap<String, RequestSpan>();
levelIdToAckSpans = new HashMap<String, AckSpan>();
}
public void addSpan(Cell cell) throws ConvertFailedException {
if (cell != null && cell.getValueArray().length > 0) {
String colId =
Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
if (colId.endsWith("-ACK")) {
levelIdToAckSpans.put(colId.substring(0, colId.length() - 4), convertACKSpan(CellUtil.cloneValue(cell)));
} else {
levelIdToRequestSpans.put(colId, convertRequestSpan(CellUtil.cloneValue(cell)));
}
}
}
private RequestSpan convertRequestSpan(byte[] originData) throws ConvertFailedException {
return RequestSpan.convert(originData);
}
private AckSpan convertACKSpan(byte[] originData) throws ConvertFailedException {
return AckSpan.convert(originData);
}
public Map<String, FullSpan> handle() {
Map<String,FullSpan> fullSpans = new HashMap<String,FullSpan>();
for (Map.Entry<String, RequestSpan> entry : levelIdToRequestSpans.entrySet()){
FullSpan traceNodeInfo = new FullSpan(entry.getValue(), levelIdToAckSpans.get(entry.getKey()));
fullSpans.put(entry.getKey(),traceNodeInfo);
}
return fullSpans;
}
public Map<String, RequestSpan> getRequestSpans() {
return levelIdToRequestSpans;
}
public Map<String, AckSpan> getAckSpans() {
return levelIdToAckSpans;
}
}
package com.a.eye.skywalking.analysis.chainbuild.util;
import com.a.eye.skywalking.analysis.chainbuild.entity.ChainNodeSpecificMinSummary;
import com.a.eye.skywalking.analysis.config.HBaseTableMetaData;
import com.a.eye.skywalking.analysis.chainbuild.entity.ChainNodeSpecificDaySummary;
import com.a.eye.skywalking.analysis.chainbuild.entity.ChainNodeSpecificHourSummary;
import com.a.eye.skywalking.analysis.chainbuild.entity.ChainNodeSpecificMonthSummary;
import com.a.eye.skywalking.analysis.config.Config;
import com.a.eye.skywalking.protocol.FullSpan;
import com.a.eye.skywalking.protocol.exception.ConvertFailedException;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class HBaseUtil {
private static Logger logger = LoggerFactory.getLogger(HBaseUtil.class.getName());
private static Configuration configuration = null;
private static Connection connection;
static {
try {
initHBaseClient();
createTableIfNeed(HBaseTableMetaData.TABLE_CALL_CHAIN_TREE_ID_AND_CID_MAPPING.TABLE_NAME,
HBaseTableMetaData.TABLE_CALL_CHAIN_TREE_ID_AND_CID_MAPPING.COLUMN_FAMILY_NAME);
createTableIfNeed(HBaseTableMetaData.TABLE_TRACE_ID_AND_CID_MAPPING.TABLE_NAME,
HBaseTableMetaData.TABLE_TRACE_ID_AND_CID_MAPPING.COLUMN_FAMILY_NAME);
createTableIfNeed(HBaseTableMetaData.TABLE_CHAIN_ONE_MINUTE_SUMMARY.TABLE_NAME,
HBaseTableMetaData.TABLE_CHAIN_ONE_MINUTE_SUMMARY.COLUMN_FAMILY_NAME);
createTableIfNeed(HBaseTableMetaData.TABLE_CHAIN_DETAIL.TABLE_NAME,
HBaseTableMetaData.TABLE_CHAIN_DETAIL.COLUMN_FAMILY_NAME);
// 1 hour summary table
createTableIfNeed(HBaseTableMetaData.TABLE_CHAIN_ONE_HOUR_SUMMARY.TABLE_NAME,
HBaseTableMetaData.TABLE_CHAIN_ONE_HOUR_SUMMARY.COLUMN_FAMILY_NAME);
// 1 day summary table
createTableIfNeed(HBaseTableMetaData.TABLE_CHAIN_ONE_DAY_SUMMARY.TABLE_NAME,
HBaseTableMetaData.TABLE_CHAIN_ONE_DAY_SUMMARY.COLUMN_FAMILY_NAME);
// 1 month summary table
createTableIfNeed(HBaseTableMetaData.TABLE_CHAIN_ONE_MONTH_SUMMARY.TABLE_NAME,
HBaseTableMetaData.TABLE_CHAIN_ONE_MONTH_SUMMARY.COLUMN_FAMILY_NAME);
} catch (IOException e) {
logger.error("Create tables failed", e);
}
}
private static void createTableIfNeed(String tableName, String familyName) throws IOException {
Admin admin = connection.getAdmin();
if (!admin.isTableAvailable(TableName.valueOf(tableName))) {
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
tableDesc.addFamily(new HColumnDescriptor(familyName.getBytes()));
admin.createTable(tableDesc);
logger.info("Create table [{}] ok!", tableName);
}
}
private static void initHBaseClient() throws IOException {
if (configuration == null) {
configuration = HBaseConfiguration.create();
if (Config.HBase.ZK_QUORUM == null || "".equals(Config.HBase.ZK_QUORUM)) {
logger.error("Miss HBase ZK quorum Configuration",
new IllegalArgumentException("Miss HBase ZK quorum Configuration"));
System.exit(-1);
}
configuration.set("hbase.zookeeper.quorum", Config.HBase.ZK_QUORUM);
configuration.set("hbase.zookeeper.property.clientPort", Config.HBase.ZK_CLIENT_PORT);
connection = ConnectionFactory.createConnection(configuration);
}
}
public static ChainNodeSpecificMinSummary loadSpecificMinSummary(String key, String qualifier) throws IOException {
ChainNodeSpecificMinSummary result = null;
Table table =
connection.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_CHAIN_ONE_MINUTE_SUMMARY.TABLE_NAME));
Get g = new Get(Bytes.toBytes(key));
Result r = table.get(g);
if (r.rawCells().length == 0) {
return new ChainNodeSpecificMinSummary();
}
Cell cell =
r.getColumnLatestCell(HBaseTableMetaData.TABLE_CHAIN_ONE_MINUTE_SUMMARY.COLUMN_FAMILY_NAME.getBytes(),
qualifier.getBytes());
if (cell != null && cell.getValueArray().length > 0) {
result = new ChainNodeSpecificMinSummary(
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
} else {
result = new ChainNodeSpecificMinSummary();
}
return result;
}
public static ChainNodeSpecificHourSummary loadSpecificHourSummary(String keyOfHourSummaryTable, String treeNodeId)
throws IOException {
ChainNodeSpecificHourSummary result = null;
Table table =
connection.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_CHAIN_ONE_HOUR_SUMMARY.TABLE_NAME));
Get g = new Get(Bytes.toBytes(keyOfHourSummaryTable));
Result r = table.get(g);
if (r.rawCells().length == 0) {
return new ChainNodeSpecificHourSummary();
}
Cell cell = r.getColumnLatestCell(HBaseTableMetaData.TABLE_CHAIN_ONE_HOUR_SUMMARY.COLUMN_FAMILY_NAME.getBytes(),
treeNodeId.getBytes());
if (cell != null && cell.getValueArray().length > 0) {
result = new ChainNodeSpecificHourSummary(
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
} else {
result = new ChainNodeSpecificHourSummary();
}
return result;
}
public static ChainNodeSpecificDaySummary loadSpecificDaySummary(String keyOfDaySummaryTable, String treeNodeId)
throws IOException {
ChainNodeSpecificDaySummary result = null;
Table table = connection.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_CHAIN_ONE_DAY_SUMMARY.TABLE_NAME));
Get g = new Get(Bytes.toBytes(keyOfDaySummaryTable));
Result r = table.get(g);
if (r.rawCells().length == 0) {
return new ChainNodeSpecificDaySummary();
}
Cell cell = r.getColumnLatestCell(HBaseTableMetaData.TABLE_CHAIN_ONE_DAY_SUMMARY.COLUMN_FAMILY_NAME.getBytes(),
treeNodeId.getBytes());
if (cell != null && cell.getValueArray().length > 0) {
result = new ChainNodeSpecificDaySummary(
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
} else {
result = new ChainNodeSpecificDaySummary();
}
return result;
}
public static ChainNodeSpecificMonthSummary loadSpecificMonthSummary(String keyOfMonthSummaryTable,
String treeNodeId) throws IOException {
ChainNodeSpecificMonthSummary result = null;
Table table =
connection.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_CHAIN_ONE_MONTH_SUMMARY.TABLE_NAME));
Get g = new Get(Bytes.toBytes(keyOfMonthSummaryTable));
Result r = table.get(g);
if (r.rawCells().length == 0) {
return new ChainNodeSpecificMonthSummary();
}
Cell cell =
r.getColumnLatestCell(HBaseTableMetaData.TABLE_CHAIN_ONE_MONTH_SUMMARY.COLUMN_FAMILY_NAME.getBytes(),
treeNodeId.getBytes());
if (cell != null && cell.getValueArray().length > 0) {
result = new ChainNodeSpecificMonthSummary(
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
} else {
result = new ChainNodeSpecificMonthSummary();
}
return result;
}
public static List<String> loadHasBeenMergeChainIds(String treeId) throws IOException {
List<String> result = new ArrayList<String>();
Table table = connection
.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_CALL_CHAIN_TREE_ID_AND_CID_MAPPING.TABLE_NAME));
Get g = new Get(Bytes.toBytes(treeId));
Result r = table.get(g);
if (r.rawCells().length == 0) {
return new ArrayList<>();
}
for (Cell cell : r.rawCells()) {
if (cell.getValueArray().length > 0) {
List<String> hasBeenMergedCIds = new Gson()
.fromJson(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()),
new TypeToken<List<String>>() {
}.getType());
result.addAll(hasBeenMergedCIds);
}
}
return result;
}
public static void batchSaveMinSummaryResult(List<Put> puts) throws IOException, InterruptedException {
Table table =
connection.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_CHAIN_ONE_MINUTE_SUMMARY.TABLE_NAME));
batchSaveData(puts, table);
}
public static void batchSaveMonthSummaryResult(List<Put> puts) throws IOException, InterruptedException {
Table table =
connection.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_CHAIN_ONE_MONTH_SUMMARY.TABLE_NAME));
batchSaveData(puts, table);
}
public static void batchSaveDaySummaryResult(List<Put> puts) throws IOException, InterruptedException {
Table table = connection.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_CHAIN_ONE_DAY_SUMMARY.TABLE_NAME));
batchSaveData(puts, table);
}
public static void batchSaveHourSummaryResult(List<Put> puts) throws IOException, InterruptedException {
Table table =
connection.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_CHAIN_ONE_HOUR_SUMMARY.TABLE_NAME));
batchSaveData(puts, table);
}
private static void batchSaveData(List<Put> puts, Table table) throws IOException, InterruptedException {
Object[] resultArray = new Object[puts.size()];
table.batch(puts, resultArray);
int index = 0;
for (Object result : resultArray) {
if (result == null) {
logger.error("Failed to insert the put the Value[" + puts.get(index).getId() + "]");
}
index++;
}
}
public static void batchSaveChainInfo(List<Put> chainInfoPuts) throws IOException, InterruptedException {
Table table = connection.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_CHAIN_DETAIL.TABLE_NAME));
batchSaveData(chainInfoPuts, table);
}
public static void batchSaveHasBeenMergedCID(List<Put> chainIdPuts) throws IOException, InterruptedException {
Table table = connection
.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_CALL_CHAIN_TREE_ID_AND_CID_MAPPING.TABLE_NAME));
batchSaveData(chainIdPuts, table);
}
public static void saveTraceIdAndTreeIdMapping(String traceId, String cid) throws IOException {
Put put = new Put(traceId.getBytes());
put.addColumn(HBaseTableMetaData.TABLE_TRACE_ID_AND_CID_MAPPING.COLUMN_FAMILY_NAME.getBytes(),
HBaseTableMetaData.TABLE_TRACE_ID_AND_CID_MAPPING.COLUMN_NAME.getBytes(), cid.getBytes());
Table table =
connection.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_TRACE_ID_AND_CID_MAPPING.TABLE_NAME));
table.put(put);
}
public static List<FullSpan> fetchTraceSpansFromHBase(Result value) throws ConvertFailedException {
CellToSpanHandler cellToSpanHandler = new CellToSpanHandler();
for (Cell cell : value.rawCells()) {
cellToSpanHandler.addSpan(cell);
}
return new ArrayList<>(cellToSpanHandler.handle().values());
}
}
package com.a.eye.skywalking.analysis.chainbuild.util;
import java.util.HashMap;
import java.util.Map;
public class SubLevelSpanCostCounter {
private Map<String, Long> costs = new HashMap<String, Long>();
public void put(String parentLevel, Long cost) {
costs.put(parentLevel, cost);
}
public boolean exists(String parentLevel) {
return costs.containsKey(parentLevel);
}
public Long get(String parentLevel) {
return costs.get(parentLevel);
}
}
package com.a.eye.skywalking.analysis.chainbuild.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
public class TokenGenerator {
private static Logger logger = LoggerFactory.getLogger(TokenGenerator.class.getName());
private TokenGenerator() {
//Non
}
public static String generateCID(String originData) {
return "CID_" + generate(originData);
}
public static String generateNodeToken(String originData) {
return "C_NID_" + generate(originData);
}
public static String generateTreeToken(String originData) {
return "TREE_ID_" + generate(originData);
}
private static String generate(String originData) {
StringBuilder result = new StringBuilder();
if (originData != null) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
byte bytes[] = md.digest(originData.getBytes());
for (int i = 0; i < bytes.length; i++) {
String str = Integer.toHexString(bytes[i] & 0xFF);
if (str.length() == 1) {
str += "F";
}
result.append(str);
}
} catch (NoSuchAlgorithmException e) {
logger.error("Cannot found algorithm.", e);
System.exit(-1);
}
}
return result.toString().toUpperCase();
}
}
package com.a.eye.skywalking.analysis.chainbuild.util;
public class VersionIdentifier {
/**
* 根据tid识别数据是否可分析<br/>
* 目前允许分析所有1.x的版本号
*
* @param tid
* @return
*/
public static boolean enableAnaylsis(String tid) {
if (tid != null) {
String[] tidSections = tid.split("\\.");
if (tidSections.length == 7) {
String version = tidSections[0];
String subVersion = tidSections[1];
if ("1".equals(version) && subVersion.length() > 0) {
return true;
}
}
}
return false;
}
}
package com.a.eye.skywalking.analysis.config;
public class Config {
public static class Reducer {
public static int REDUCER_NUMBER = 1;
}
public static class HBase {
public static String ZK_QUORUM;
public static String ZK_CLIENT_PORT;
}
public static class MySql {
public static String URL;
public static String USERNAME;
public static String PASSWORD;
public static String DRIVER_CLASS = "com.mysql.jdbc.Driver";
}
public static class Filter {
public static String FILTER_PACKAGE_NAME;
}
public static class AnalysisServer {
public static boolean IS_ACCUMULATE_MODE = true;
}
public static class MapReduce {
public static String JAVA_OPTS = "-Xmx200m";
}
}
package com.a.eye.skywalking.analysis.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.LinkedList;
import java.util.Properties;
public class ConfigInitializer {
private static Logger logger = LoggerFactory.getLogger(ConfigInitializer.class.getName());
public static void initialize() {
InputStream inputStream = ConfigInitializer.class.getResourceAsStream("/analysis.conf");
if (inputStream == null) {
logger.error("No provider sky-walking certification documents, sky-walking api auto shutdown.");
} else {
try {
Properties properties = new Properties();
properties.load(inputStream);
initNextLevel(properties, Config.class, new ConfigDesc());
} catch (IllegalAccessException e) {
logger.error("Parsing certification file failed, sky-walking api auto shutdown.");
} catch (IOException e) {
logger.error("Failed to read the certification file, sky-walking api auto shutdown.");
}
}
}
private static void initNextLevel(Properties properties, Class<?> recentConfigType, ConfigDesc parentDesc) throws NumberFormatException, IllegalArgumentException, IllegalAccessException {
for (Field field : recentConfigType.getFields()) {
if (Modifier.isPublic(field.getModifiers()) && Modifier.isStatic(field.getModifiers())) {
String configKey = (parentDesc + "." +
field.getName()).toLowerCase();
String value = properties.getProperty(configKey);
if (value != null) {
if (field.getType().equals(int.class))
field.set(null, Integer.valueOf(value));
if (field.getType().equals(String.class))
field.set(null, value);
if (field.getType().equals(long.class))
field.set(null, Long.valueOf(value));
if (field.getType().equals(boolean.class))
field.set(null, Boolean.valueOf(value));
}
}
}
for (Class<?> innerConfiguration : recentConfigType.getClasses()) {
parentDesc.append(innerConfiguration.getSimpleName());
initNextLevel(properties, innerConfiguration, parentDesc);
parentDesc.removeLastDesc();
}
}
}
class ConfigDesc {
private LinkedList<String> descs = new LinkedList<String>();
void append(String currentDesc) {
descs.addLast(currentDesc);
}
void removeLastDesc() {
descs.removeLast();
}
@Override
public String toString() {
if (descs.size() == 0) {
return "";
}
StringBuilder ret = new StringBuilder(descs.getFirst());
boolean first = true;
for (String desc : descs) {
if (first) {
first = false;
continue;
}
ret.append(".").append(desc);
}
return ret.toString();
}
}
package com.a.eye.skywalking.analysis.config;
public class HBaseTableMetaData {
/**
* 调用链明细表,前端收集程序入库数据
*
* @author wusheng
*/
public final static class TABLE_CALL_CHAIN {
public static final String TABLE_NAME = "trace-data";
public static final String FAMILY_NAME = "call-chain";
}
/**
* CID明细信息表
*
* @author wusheng
*/
public final static class TABLE_CHAIN_DETAIL {
public static final String TABLE_NAME = "sw-chain-detail";
public static final String COLUMN_FAMILY_NAME = "chain_detail";
}
/**
* 用于存放每个CID在一分钟内的汇总,汇总结果不包含关系汇总
*
* @author wusheng
*/
public final static class TABLE_CHAIN_ONE_MINUTE_SUMMARY {
public static final String TABLE_NAME = "sw-chain-1min-summary";
public static final String COLUMN_FAMILY_NAME = "chain_summary";
}
/**
* 用于存放每个CID在一小时内的汇总,汇总结果不包含关系汇总
*
* @author zhangxin
*/
public final static class TABLE_CHAIN_ONE_HOUR_SUMMARY {
public static final String TABLE_NAME = "sw-chain-1hour-summary";
public static final String COLUMN_FAMILY_NAME = "chain_summary";
}
/**
* 用于存放每个CID在一小时内的汇总,汇总结果不包含关系汇总
*
* @author zhangxin
*/
public final static class TABLE_CHAIN_ONE_DAY_SUMMARY {
public static final String TABLE_NAME = "sw-chain-1day-summary";
public static final String COLUMN_FAMILY_NAME = "chain_summary";
}
/**
* 用于存放每个CID在一小时内的汇总,汇总结果不包含关系汇总
*
* @author zhangxin
*/
public final static class TABLE_CHAIN_ONE_MONTH_SUMMARY {
public static final String TABLE_NAME = "sw-chain-1month-summary";
public static final String COLUMN_FAMILY_NAME = "chain_summary";
}
/**
* 用于存放调用树和CID的映射关系
*
* @author zhangxin
*/
public final static class TABLE_CALL_CHAIN_TREE_ID_AND_CID_MAPPING {
public static final String TABLE_NAME = "sw-treeId-cid-mapping";
public static final String COLUMN_FAMILY_NAME = "cids";
public static final String COLUMN_NAME = "been_merged_cid";
}
/**
* 用于存放TraceID和CID的映射关系
*
* @author zhangxin
*/
public final static class TABLE_TRACE_ID_AND_CID_MAPPING {
public static final String TABLE_NAME = "sw-traceId-cid-mapping";
public static final String COLUMN_FAMILY_NAME = "cid";
public static final String COLUMN_NAME = "cid";
}
}
hbase.zk_quorum=10.1.235.197,10.1.235.198,10.1.235.199
hbase.zk_client_port=29181
mysql.url=jdbc:mysql://10.1.228.202:31316/test
mysql.username=devrdbusr21
mysql.password=devrdbusr21
mysql.driver_class=com.mysql.jdbc.Driver
filter.filter_package_name=com.a.eye.skywalking.analysis.chainbuild.filter.impl
chainnodesummary.interval=1
reducer.reducer_number=1
mapreduce.java_opts=-Xmx768m
redis.mapper_count_key=ANALYSIS_TOTAL_SIZE
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration>
<appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern"
value="%d - %c -%-4r [%t] %-5p %x - %m%n" />
</layout>
</appender>
<root>
<priority value="debug" />
<appender-ref ref="CONSOLE" />
</root>
</log4j:configuration>
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="debug">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="debug">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
\ No newline at end of file
#
default=CopyAttrFilter->ReplaceAddressFilter->ProcessCostTimeFilter->TokenGenerateFilter
J=CopyAttrFilter->JDBCBusinessKeyHandleFilter->AppendBusinessKeyFilter->ProcessCostTimeFilter->TokenGenerateFilter
```sql
truncate "sw-chain-1day-summary"
truncate "sw-chain-1hour-summary"
truncate "sw-chain-1min-summary"
truncate "sw-chain-1month-summary"
truncate "sw-chain-detail"
truncate "sw-treeId-cid-mapping"
truncate "sw-traceId-cid-mapping"
```
\ No newline at end of file
* 查询某一张表中的某一列等于某一个值(比如sw-treeId-cid-mapping,sw-traceId-cid-mapping)
```java
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter. Filter;
scan 'sw-chain-info', { FILTER
=>SingleColumnValueFilter.new(Bytes.toBytes('trace_info'),Bytes.toBytes('cid'),
CompareFilter::CompareOp.valueOf('EQUAL'),
BinaryComparator.new(Bytes.toBytes('CID_B4CAEE3953B3C0DBD7A43656B2EEE345')))}
```
* 限制查询某张表的条数
**以下只返回10条记录**
```java
scan "sw-treeId-cid-mapping",{LIMIT =>10}
```
* 根据TreeId查询某个月份合并过的CID
**以下是查询2016年4月份TREE_ID_BF32F7D57F6E69912C8AF2487F1FFCE1合并过的CID**
```java
get "sw-treeId-cid-mapping","TREE_ID_354AF3F3F58CE4AECFCC7782A188396B@2016-3"
```
* 根据CID查询所有归属TraceId
```java
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter. Filter;
scan 'sw-traceId-cid-mapping', { FILTER =>SingleColumnValueFilter.new(Bytes.toBytes('cid'),Bytes.toBytes('cid'), CompareFilter::CompareOp.valueOf('EQUAL'),BinaryComparator.new(Bytes.toBytes('CID_A4AD2F204E3DA016BC455AB95FAEFFF9')))}
```
\ No newline at end of file
package com.a.eye.skywalking.analysis.mapper;
import com.a.eye.skywalking.analysis.config.HBaseTableMetaData;
import com.a.eye.skywalking.analysis.mapper.util.Convert;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainInfo;
import com.a.eye.skywalking.analysis.mapper.util.HBaseUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import java.text.SimpleDateFormat;
import java.util.*;
public class CallChainMapperTest {
private static Connection connection = HBaseUtils.getConnection();
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd/HH:mm:ss");
private static final int EXCEPT_CHAIN_INFO_SIZE = 124985;
public static void main(String[] args) throws Exception {
Connection connection = HBaseUtils.getConnection();
Table table = connection.getTable(TableName.valueOf
(HBaseTableMetaData.TABLE_CALL_CHAIN.TABLE_NAME));
Scan scan = new Scan();
//2016-04-16/19:49:25 to 2016-05-16/19:39:25
Date startDate = simpleDateFormat.parse("2016-04-16/19:49:25");
Date endDate = simpleDateFormat.parse("2016-05-16/19:39:25");
scan.setBatch(2001);
scan.setTimeRange(startDate.getTime(), endDate.getTime());
List<ChainInfo> chainInfos = Convert.convert(table.getScanner(scan));
if (EXCEPT_CHAIN_INFO_SIZE != chainInfos.size()) {
System.out.println("except size :" + EXCEPT_CHAIN_INFO_SIZE + " accutal size:" + chainInfos.size());
System.exit(-1);
}
}
}
package com.a.eye.skywalking.analysis.mapper;
import com.a.eye.skywalking.analysis.config.HBaseTableMetaData;
import com.a.eye.skywalking.analysis.mapper.util.HBaseUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
/**
* Created by xin on 16-5-16.
*/
public class MappingTableCounter {
public static Set<String> getTraceMappingCount() throws IOException {
Connection connection = HBaseUtils.getConnection();
Table table = connection.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_TRACE_ID_AND_CID_MAPPING.TABLE_NAME));
ResultScanner resultScanner = table.getScanner(new Scan());
Set<String> traceIds = new HashSet<String>();
for (Result result :resultScanner){
traceIds.add(Bytes.toString(result.getRow()));
}
return traceIds;
}
}
package com.a.eye.skywalking.analysis.mapper;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainInfo;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainNode;
import com.a.eye.skywalking.analysis.config.HBaseTableMetaData;
import com.a.eye.skywalking.analysis.mapper.util.Convert;
import com.a.eye.skywalking.analysis.mapper.util.HBaseUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ValidateSpecialTimeSummaryResult {
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd/HH:mm:ss");
public static void main(String[] args) throws IOException, ParseException {
Connection connection = HBaseUtils.getConnection();
Table table = connection.getTable(TableName.valueOf
(HBaseTableMetaData.TABLE_CALL_CHAIN.TABLE_NAME));
Scan scan = new Scan();
Date startDate = simpleDateFormat.parse("2016-04-13/16:59:24");
Date endDate = simpleDateFormat.parse("2016-05-13/16:49:24");
scan.setMaxVersions();
scan.setTimeRange(startDate.getTime(), endDate.getTime());
Filter filter = new SingleColumnValueFilter(HBaseTableMetaData.TABLE_CALL_CHAIN.FAMILY_NAME.getBytes(),
"0-S".getBytes(), CompareFilter.CompareOp.EQUAL,
"http://hire.asiainfo.com/Aisse-Mobile-Web/aisseWorkUser/queryProsonLoding".getBytes());
scan.setFilter(filter);
List<ChainInfo> chainInfos = Convert.convert(table.getScanner(scan));
Map<String, Integer> totalCallSize = new HashMap<String, Integer>();
Map<String, Float> totalCallTimes = new HashMap<>();
for (ChainInfo chainInfo : chainInfos) {
for (ChainNode chainNode : chainInfo.getNodes()) {
Integer totalCall = totalCallSize.get(chainNode.getViewPoint());
if (totalCallSize == null) {
totalCall = 0;
}
totalCallSize.put(chainNode.getTraceLevelId(), ++totalCall);
Float totalCallTime = totalCallTimes.get(chainNode.getViewPoint());
if (totalCallTime == null) {
totalCallTime = 0F;
}
totalCallTimes.put(chainNode.getTraceLevelId(), (totalCallTime + chainNode.getCost()));
}
}
}
}
package com.a.eye.skywalking.analysis.mapper.util;
import com.a.eye.skywalking.analysis.chainbuild.ChainBuildMapper;
import com.a.eye.skywalking.analysis.chainbuild.util.HBaseUtil;
import com.a.eye.skywalking.analysis.chainbuild.po.ChainInfo;
import com.a.eye.skywalking.analysis.mapper.MappingTableCounter;
import com.a.eye.skywalking.protocol.FullSpan;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.*;
/**
* Created by xin on 16-5-13.
*/
public class Convert {
private static final Map<String, String> traceIds = new HashMap<>();
public static List<ChainInfo> convert(ResultScanner resultScanner) throws IOException {
List<ChainInfo> chainInfos = new ArrayList<ChainInfo>();
int count = 0, failedCount = 0, successCount = 0;
for (Result result : resultScanner) {
count++;
try {
List<FullSpan> spanList = HBaseUtil.fetchTraceSpansFromHBase(result);
if (spanList.size() == 0 || spanList.size() > 2000) {
throw new RuntimeException("Failed to convert it");
}
ChainInfo chainInfo = ChainBuildMapper.spanToChainInfo(Bytes.toString(result.getRow()), spanList);
chainInfos.add(chainInfo);
traceIds.put(spanList.get(0).getTraceId(), chainInfo.getCID());
successCount++;
} catch (Exception e) {
failedCount++;
continue;
}
}
System.out.println("count : " + count);
// System.out.println("Success count " + traceIds.size());
System.out.println("Failed count " + failedCount);
System.out.println("Success count " + successCount);
System.out.println("HBase :" + traceIds.size());
Set<String> traceMapping = MappingTableCounter.getTraceMappingCount();
for (String traceId : traceMapping) {
traceIds.remove(traceId);
}
System.out.println(traceIds);
return chainInfos;
}
}
package com.a.eye.skywalking.analysis.mapper.util;
import com.a.eye.skywalking.analysis.config.ConfigInitializer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import java.io.IOException;
/**
* Created by xin on 16-5-4.
*/
public class HBaseUtils {
private static String ZK_QUORUM = "host-10-1-235-197,host-10-1-235-198,host-10-1-235-199";
private static String ZK_CLIENT_PORT = "29181";
private static Configuration configuration = null;
private static Connection connection;
static {
if (configuration == null) {
configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", ZK_QUORUM);
configuration.set("hbase.zookeeper.property.clientPort", ZK_CLIENT_PORT);
configuration.set("hbase.rpc.timeout", "600000");
try {
connection = ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
e.printStackTrace();
System.exit(-1);
}
}
ConfigInitializer.initialize();
}
public static Connection getConnection() {
return connection;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.a.eye</groupId>
<artifactId>skywalking</artifactId>
<version>2.0-2016</version>
</parent>
<artifactId>skywalking-collector</artifactId>
<modules>
<module>skywalking-api</module>
<module>skywalking-sdk-plugin</module>
<module>skywalking-agent</module>
</modules>
<packaging>pom</packaging>
<name>skywalking-collector</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
package com.a.eye.skywalking.api;
import com.a.eye.skywalking.protocol.common.CallType;
public interface IBuriedPointType {
String getTypeName();
CallType getCallType();
}
package com.a.eye.skywalking.network;
import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.internal.DnsNameResolverProvider;
public class Client {
private static Client client;
private Channel channel;
private static final String DEFAULT_ADDRESS = "localhost";
private static final int DEFAULT_PORT = 34000;
private String host = DEFAULT_ADDRESS;
private int port = DEFAULT_PORT;
public static Client forAddress(String host, int port) {
if (client == null) {
client = new Client(host, port);
}
return client;
}
public static Client INSTANCE() {
return client;
}
private Client(String host, int port) {
this.host = host;
this.port = port;
}
public void start() {
ManagedChannelBuilder<?> channelBuilder =
ManagedChannelBuilder.forAddress(host, port).nameResolverFactory(new DnsNameResolverProvider())
.usePlaintext(true);
channel = channelBuilder.build();
}
}
package com.a.eye.skywalking.network;
import com.a.eye.skywalking.network.grpc.SendResult;
import com.a.eye.skywalking.network.grpc.SendingSpans;
import com.a.eye.skywalking.network.grpc.SpanSenderGrpc;
import io.grpc.stub.StreamObserver;
/**
* Created by xin on 2016/10/25.
*/
public class GrpcSpanSender extends SpanSenderGrpc.SpanSenderImplBase {
@Override
public StreamObserver<SendingSpans> send(StreamObserver<SendResult> responseObserver) {
//TODO
return super.send(responseObserver);
}
}
...@@ -5,21 +5,29 @@ option java_package = "com.a.eye.skywalking.network.grpc"; ...@@ -5,21 +5,29 @@ option java_package = "com.a.eye.skywalking.network.grpc";
import "Spans.proto"; import "Spans.proto";
service SpanSender { service AckSpanSender {
rpc send (stream SendingSpans) returns (SendResult) { rpc send (stream AckSpan) returns (SendResult) {
}; };
} }
service RequestSpanSender {
rpc send (stream RequestSpan) returns (SendResult) {
};
}
message SendResult {
// 0 成功 1 失败 service InputParametersSpanSender {
int32 result = 1; rpc send (stream InputParametersSpan) returns (SendResult) {
};
} }
message SendingSpans {
repeated AckSpan ackSpans = 1; service OutputParametersSpanSender {
repeated RequestSpan requestSpans = 2; rpc send (stream OutputParametersSpan) returns (SendResult) {
repeated InputParametersSpan inputParamSpans = 3; };
repeated OutputParametersSpan outputParamSpans = 4;
} }
message SendResult {
// 0 成功 1 失败
int32 result = 1;
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.a.eye</groupId>
<version>2.0-2016</version>
<name>skywalking-protocol</name>
<url>http://maven.apache.org</url>
<artifactId>skywalking-protocol</artifactId>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<shade.com.google.protobuf.source>com.google.protobuf</shade.com.google.protobuf.source>
<shade.com.google.protobuf.target>com.a.eye.skywalking.protocol.dependencies.com.google.protobuf</shade.com.google.protobuf.target>
</properties>
<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<createDependencyReducedPom>true</createDependencyReducedPom>
<createSourcesJar>true</createSourcesJar>
<shadeSourcesContent>true</shadeSourcesContent>
<relocations>
<relocation>
<pattern>${shade.com.google.protobuf.source}</pattern>
<shadedPattern>${shade.com.google.protobuf.target}</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.9.1</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.basedir}/src/main/gen-java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>2.4.3</version>
<configuration>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<!-- 源码插件 -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<!-- 发布时自动将源码同时发布的配置 -->
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
<version>2.4</version>
</plugin>
<!--<plugin>-->
<!--<artifactId>maven-antrun-plugin</artifactId>-->
<!--<executions>-->
<!--<execution>-->
<!--<id>compile-protoc</id>-->
<!--<phase>generate-sources</phase>-->
<!--<configuration>-->
<!--<tasks>-->
<!--<mkdir dir="src/main/gen-java" />-->
<!--<path id="proto.path">-->
<!--<fileset dir="src/main/proto">-->
<!--<include name="*.proto" />-->
<!--</fileset>-->
<!--</path>-->
<!--<pathconvert pathsep=" " property="proto.files"-->
<!--refid="proto.path" />-->
<!--<exec executable="protoc">-->
<!--<arg value="&#45;&#45;java_out=src/main/gen-java" />-->
<!--<arg value="-I${project.basedir}/src/main/proto" />-->
<!--<arg line="${proto.files}" />-->
<!--</exec>-->
<!--</tasks>-->
<!--<sourceRoot>src/main/gen-java</sourceRoot>-->
<!--</configuration>-->
<!--<goals>-->
<!--<goal>run</goal>-->
<!--</goals>-->
<!--</execution>-->
<!--</executions>-->
<!--</plugin>-->
</plugins>
</build>
</project>
package com.a.eye.skywalking.protocol;
import com.a.eye.skywalking.protocol.exception.ConvertFailedException;
import com.a.eye.skywalking.protocol.common.AbstractDataSerializable;
import com.a.eye.skywalking.protocol.proto.TraceProtocol;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.HashMap;
import java.util.Map;
/**
* Created by wusheng on 16/7/4.
*/
public class AckSpan extends AbstractDataSerializable {
/**
* tid,调用链的全局唯一标识
*/
private String traceId;
/**
* 当前调用链的上级描述<br/>
* 如当前序号为:0.1.0时,parentLevel=0.1
*/
private String parentLevel;
/**
* 当前调用链的本机描述<br/>
* 如当前序号为:0.1.0时,levelId=0
*/
private int levelId = 0;
/**
* 节点调用花费时间
*/
private long cost = 0L;
/**
* 节点调用的状态<br/>
* 0:成功<br/>
* 1:异常<br/>
* 异常判断原则:代码产生exception,并且此exception不在忽略列表中
*/
private byte statusCode = 0;
/**
* 节点调用的错误堆栈<br/>
* 堆栈以JAVA的exception为主要判断依据
*/
private String exceptionStack = "";
private String viewPointId;
private String userId;
private String applicationId;
private static final AckSpan INSTANCE = new AckSpan();
public AckSpan(Span spanData) {
this.traceId = spanData.getTraceId();
this.parentLevel = spanData.getParentLevel();
this.levelId = spanData.getLevelId();
this.cost = System.currentTimeMillis() - spanData.getStartDate();
this.statusCode = spanData.getStatusCode();
this.exceptionStack = spanData.getExceptionStack();
this.userId = spanData.getUserId();
this.applicationId = spanData.getApplicationId();
this.viewPointId = spanData.getViewPointId();
}
public AckSpan() {
}
public String getTraceId() {
return traceId;
}
public void setTraceId(String traceId) {
this.traceId = traceId;
}
public String getParentLevel() {
return parentLevel;
}
public void setParentLevel(String parentLevel) {
this.parentLevel = parentLevel;
}
public int getLevelId() {
return levelId;
}
public void setLevelId(int levelId) {
this.levelId = levelId;
}
public long getCost() {
return cost;
}
public void setCost(long cost) {
this.cost = cost;
}
public byte getStatusCode() {
return statusCode;
}
public void setStatusCode(byte statusCode) {
this.statusCode = statusCode;
}
public String getExceptionStack() {
return exceptionStack;
}
public void setExceptionStack(String exceptionStack) {
this.exceptionStack = exceptionStack;
}
@Override
public int getDataType() {
return 2;
}
@Override
public byte[] getData() {
TraceProtocol.AckSpan.Builder
builder = TraceProtocol.AckSpan.newBuilder().setTraceId(traceId).setParentLevel(parentLevel).
setLevelId(levelId).setCost(cost).setViewpointId(viewPointId).setStatusCode(statusCode)
.setExceptionStack(exceptionStack);
return builder.build().toByteArray();
}
@Override
public AbstractDataSerializable convertData(byte[] data) throws ConvertFailedException {
AckSpan ackSpan = new AckSpan();
try {
TraceProtocol.AckSpan ackSpanProtocol = TraceProtocol.AckSpan.parseFrom(data);
ackSpan.setTraceId(ackSpanProtocol.getTraceId());
ackSpan.setParentLevel(ackSpanProtocol.getParentLevel());
ackSpan.setLevelId(ackSpanProtocol.getLevelId());
ackSpan.setCost(ackSpanProtocol.getCost());
ackSpan.setExceptionStack(ackSpanProtocol.getExceptionStack());
ackSpan.setStatusCode((byte) ackSpanProtocol.getStatusCode());
ackSpan.viewPointId = ackSpanProtocol.getViewpointId();
} catch (InvalidProtocolBufferException e) {
throw new ConvertFailedException(e.getMessage(),e);
}
return ackSpan;
}
public static AckSpan convert(byte[] data) throws ConvertFailedException {
return (AckSpan) INSTANCE.convertData(data);
}
public boolean isNull() {
return false;
}
public String getUserId() {
return userId;
}
public String getApplicationId() {
return applicationId;
}
public String getViewPointId() {
return viewPointId;
}
}
package com.a.eye.skywalking.protocol;
import com.a.eye.skywalking.protocol.common.AbstractDataSerializable;
import com.a.eye.skywalking.protocol.exception.ConvertFailedException;
public class BufferFileEOFProtocol extends AbstractDataSerializable {
@Override
public int getDataType() {
return -1;
}
@Override
public byte[] getData() {
return new byte[0];
}
@Override
public AbstractDataSerializable convertData(byte[] data) throws ConvertFailedException {
return new BufferFileEOFProtocol();
}
public boolean isNull() {
return false;
}
}
package com.a.eye.skywalking.protocol;
public class FullSpan {
protected String traceId;
protected String parentLevel = "";
protected int levelId = 0;
protected String viewPointId;
protected String applicationId;
protected String callType;
protected long cost;
protected String businessKey;
protected String exceptionStack;
protected byte statusCode = 0;
protected String spanTypeDesc;
protected String userId;
protected long startDate;
protected String spanType;
protected String address = "";
protected String processNo = "";
public FullSpan() {
}
public FullSpan(RequestSpan span, AckSpan ackSpan) {
this.traceId = span.getTraceId();
this.parentLevel = span.getParentLevel();
this.levelId = span.getLevelId();
this.applicationId = span.getApplicationId();
this.callType = span.getCallType();
this.businessKey = span.getBusinessKey();
this.spanTypeDesc = span.getSpanTypeDesc();
this.userId = span.getUserId();
this.startDate = span.getStartDate();
this.viewPointId = span.getViewPointId();
this.spanType = span.getSpanType() + "";
this.address = span.getAddress();
this.processNo = span.getProcessNo();
if (ackSpan != null) {
this.cost = ackSpan.getCost();
this.exceptionStack = ackSpan.getExceptionStack();
this.statusCode = ackSpan.getStatusCode();
}
}
public String getTraceId() {
return traceId;
}
public String getParentLevel() {
return parentLevel;
}
public int getLevelId() {
return levelId;
}
public String getViewPointId() {
return viewPointId;
}
public String getApplicationId() {
return applicationId;
}
public String getCallType() {
return callType;
}
public long getCost() {
return cost;
}
public String getBusinessKey() {
return businessKey;
}
public void setBusinessKey(String businessKey) {
this.businessKey = businessKey;
}
public String getExceptionStack() {
return exceptionStack;
}
public byte getStatusCode() {
return statusCode;
}
public void setStatusCode(byte statusCode) {
this.statusCode = statusCode;
}
public String getSpanTypeDesc() {
return spanTypeDesc;
}
public void setSpanTypeDesc(String spanTypeDesc) {
this.spanTypeDesc = spanTypeDesc;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public long getStartDate() {
return startDate;
}
public String getSpanType() {
return spanType;
}
public void setSpanType(String spanType) {
this.spanType = spanType;
}
public String getTraceLevelId() {
if (getParentLevel() != null && getParentLevel().length() > 0) {
return getParentLevel() + "." + getLevelId();
}
return getLevelId() + "";
}
public void setParentLevel(String parentLevel) {
this.parentLevel = parentLevel;
}
public void setViewPointId(String viewPointId) {
this.viewPointId = viewPointId;
}
public String getAddress() {
return address;
}
public String getProcessNo() {
return processNo;
}
}
package com.a.eye.skywalking.protocol;
import com.a.eye.skywalking.protocol.common.AbstractDataSerializable;
import com.a.eye.skywalking.protocol.exception.ConvertFailedException;
import com.a.eye.skywalking.protocol.proto.TraceProtocol;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.HashMap;
import java.util.Map;
public class InputParametersSpan extends AbstractDataSerializable {
private static final InputParametersSpan INSTANCE = new InputParametersSpan();
private static int parameterIndex = 0;
/**
* tid,调用链的全局唯一标识
*/
private String traceId;
/**
* 当前调用链的描述<br/>
*/
private String traceLevelId;
/**
* 埋点入参列表,补充时触发
*/
private Map<String, String> parameters = new HashMap<String, String>();
public InputParametersSpan() {
}
public InputParametersSpan(String traceId, String traceLevelId) {
this.traceLevelId = traceLevelId;
this.traceId = traceId;
}
public int getDataType() {
return 3;
}
public byte[] getData() {
TraceProtocol.InputParametersSpan.Builder builder =
TraceProtocol.InputParametersSpan.newBuilder().setTraceId(traceId).setTraceLevelId(traceLevelId);
if (parameters != null && parameters.size() > 0) {
builder.putAllParameters(parameters);
}
return builder.build().toByteArray();
}
public AbstractDataSerializable convertData(byte[] data) throws ConvertFailedException {
InputParametersSpan result = new InputParametersSpan();
try {
TraceProtocol.InputParametersSpan parametersSpan = TraceProtocol.InputParametersSpan.parseFrom(data);
result.traceId = parametersSpan.getTraceId();
result.traceLevelId = parametersSpan.getTraceLevelId();
result.parameters = parametersSpan.getParametersMap();
} catch (InvalidProtocolBufferException e) {
throw new ConvertFailedException("Failed to convert to parametersSpan", e);
}
return result;
}
public static InputParametersSpan convert(byte[] data) throws ConvertFailedException {
return (InputParametersSpan) INSTANCE.convertData(data);
}
public boolean isNull() {
return false;
}
public String getTraceId() {
return traceId;
}
public void setTraceId(String traceId) {
this.traceId = traceId;
}
public String getTraceLevelId() {
return traceLevelId;
}
public void setTraceLevelId(String traceLevelId) {
this.traceLevelId = traceLevelId;
}
public void addParameter(String parameter) {
parameters.put("_" + parameterIndex, parameter);
parameterIndex++;
}
public Map<String, String> getParameters() {
return parameters;
}
public void setParameters(Map<String, String> parameters) {
this.parameters = parameters;
}
}
package com.a.eye.skywalking.protocol;
import com.a.eye.skywalking.protocol.common.NullableClass;
/**
* Created by wusheng on 16/7/4.
*/
public class NullClass implements NullableClass {
@Override
public boolean isNull() {
return true;
}
}
package com.a.eye.skywalking.protocol;
import com.a.eye.skywalking.protocol.common.AbstractDataSerializable;
import com.a.eye.skywalking.protocol.exception.ConvertFailedException;
import com.a.eye.skywalking.protocol.proto.TraceProtocol;
/**
* Created by xin on 16/8/16.
*/
public class OutputParameterSpan extends AbstractDataSerializable {
private static final OutputParameterSpan INSTANCE = new OutputParameterSpan();
/**
* tid,调用链的全局唯一标识
*/
private String traceId;
/**
* 当前调用链的描述<br/>
*/
private String traceLevelId;
private String outputParameter;
public OutputParameterSpan() {
}
public OutputParameterSpan(String traceId, String traceLevelId) {
this.traceId = traceId;
this.traceLevelId = traceLevelId;
}
public int getDataType() {
return 4;
}
public byte[] getData() {
return TraceProtocol.OutputParametersSpan.newBuilder().setOutputParameter(getOutputParameter())
.setTraceId(getTraceId()).setTraceLevelId(getTraceLevelId()).build().toByteArray();
}
public AbstractDataSerializable convertData(byte[] data) throws ConvertFailedException {
try {
OutputParameterSpan outputParameterSpan = new OutputParameterSpan();
TraceProtocol.OutputParametersSpan _protobufOutputSpan = TraceProtocol.OutputParametersSpan.parseFrom(data);
outputParameterSpan.setOutputParameter(_protobufOutputSpan.getOutputParameter());
outputParameterSpan.setTraceId(_protobufOutputSpan.getTraceId());
outputParameterSpan.setTraceLevelId(_protobufOutputSpan.getTraceLevelId());
return outputParameterSpan;
} catch (Exception e) {
throw new ConvertFailedException("Failed to convert output parameter span.", e);
}
}
public static OutputParameterSpan convert(byte[] data) throws ConvertFailedException {
return (OutputParameterSpan) INSTANCE.convertData(data);
}
public boolean isNull() {
return false;
}
public String getTraceId() {
return traceId;
}
public void setTraceId(String traceId) {
this.traceId = traceId;
}
public String getTraceLevelId() {
return traceLevelId;
}
public void setTraceLevelId(String traceLevelId) {
this.traceLevelId = traceLevelId;
}
public String getOutputParameter() {
if (outputParameter == null) {
return "";
}
return outputParameter;
}
public void setOutputParameter(String outputParameter) {
this.outputParameter = outputParameter;
}
}
package com.a.eye.skywalking.protocol;
import com.a.eye.skywalking.protocol.common.CallType;
import com.a.eye.skywalking.protocol.common.SpanType;
import com.a.eye.skywalking.protocol.exception.ConvertFailedException;
import com.a.eye.skywalking.protocol.common.AbstractDataSerializable;
import com.a.eye.skywalking.protocol.proto.TraceProtocol;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.HashMap;
import java.util.Map;
/**
* Created by wusheng on 16/7/4.
*/
public class RequestSpan extends AbstractDataSerializable {
/**
* tid,调用链的全局唯一标识
*/
private String traceId;
/**
* 当前调用链的上级描述<br/>
* 如当前序号为:0.1.0时,parentLevel=0.1
*/
private String parentLevel;
/**
* 当前调用链的本机描述<br/>
* 如当前序号为:0.1.0时,levelId=0
*/
private int levelId = 0;
/**
* 调用链中单个节点的入口描述<br/>
* 如:java方法名,调用的RPC地址等等
*/
private String viewPointId = "";
/**
* 节点调用开始时间
*/
private long startDate = System.currentTimeMillis();
/**
* 节点类型描述<br/>
* 已字符串的形式描述<br/>
* 如:java,dubbo等
*/
private String spanTypeDesc = "";
/**
* 节点调用类型描述<br/>
*
* @see CallType
*/
private String callType = "";
/**
* 节点分布式类型<br/>
* 本地调用 / RPC服务端 / RPC客户端
*/
private SpanType spanType = SpanType.LOCAL;
/**
* 节点调用所在的系统逻辑名称<br/>
* 由授权文件指定
*/
private String applicationId = "";
/**
* 用户id<br/>
* 由授权文件指定
*/
private String userId = "";
/**
* 业务字段
*/
private String businessKey = "";
/**
* 实例ID
*/
private String agentId = "";
/**
* 节点调用的所在进程号
*/
protected String processNo = "";
/**
* 节点调用的发生机器描述<br/>
* 包含机器名 + IP地址
*/
protected String address = "";
private static final RequestSpan INSTANCE = new RequestSpan();
public RequestSpan(Span spanData) {
this.traceId = spanData.getTraceId();
this.parentLevel = spanData.getParentLevel();
this.levelId = spanData.getLevelId();
this.spanType = spanData.getSpanType();
this.applicationId = spanData.getApplicationId();
this.userId = spanData.getUserId();
}
public RequestSpan() {
}
private boolean isEntrySpan() {
return "0".equals(this.getParentLevel() + this.getLevelId());
}
public String getTraceId() {
return traceId;
}
public void setTraceId(String traceId) {
this.traceId = traceId;
}
public String getParentLevel() {
return parentLevel;
}
public void setParentLevel(String parentLevel) {
this.parentLevel = parentLevel;
}
public int getLevelId() {
return levelId;
}
public void setLevelId(int levelId) {
this.levelId = levelId;
}
public String getViewPointId() {
return viewPointId;
}
public void setViewPointId(String viewPointId) {
this.viewPointId = viewPointId;
}
public long getStartDate() {
return startDate;
}
public void setStartDate(long startDate) {
this.startDate = startDate;
}
public String getSpanTypeDesc() {
return spanTypeDesc;
}
public void setSpanTypeDesc(String spanTypeDesc) {
this.spanTypeDesc = spanTypeDesc;
}
public String getCallType() {
return callType;
}
public void setCallType(String callType) {
this.callType = callType;
}
public SpanType getSpanType() {
return spanType;
}
public void setSpanType(SpanType spanType) {
this.spanType = spanType;
}
public String getApplicationId() {
return applicationId;
}
public void setApplicationId(String applicationId) {
this.applicationId = applicationId;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
@Override
public int getDataType() {
return 1;
}
@Override
public byte[] getData() {
TraceProtocol.RequestSpan.Builder builder =
TraceProtocol.RequestSpan.newBuilder().setTraceId(traceId).setParentLevel(parentLevel)
.setLevelId(levelId).setViewPointId(viewPointId).setStartDate(startDate)
.setSpanType(spanType.getValue()).setSpanTypeDesc(spanTypeDesc).setAddress(address)
.setProcessNo(processNo);
if (businessKey != null && businessKey.length() > 0) {
builder.setBussinessKey(businessKey);
}
return builder.setCallType(callType).setApplicationId(applicationId).setUserId(userId).setAgentId(agentId)
.build().toByteArray();
}
@Override
public AbstractDataSerializable convertData(byte[] data) throws ConvertFailedException {
RequestSpan requestSpan = new RequestSpan();
try {
TraceProtocol.RequestSpan requestSpanByte = TraceProtocol.RequestSpan.parseFrom(data);
requestSpan.setTraceId(requestSpanByte.getTraceId());
requestSpan.setParentLevel(requestSpanByte.getParentLevel());
requestSpan.setLevelId(requestSpanByte.getLevelId());
requestSpan.setApplicationId(requestSpanByte.getApplicationId());
requestSpan.setCallType(requestSpanByte.getCallType());
requestSpan.setSpanType(SpanType.convert(requestSpanByte.getSpanType()));
requestSpan.setSpanTypeDesc(requestSpanByte.getSpanTypeDesc());
requestSpan.setStartDate(requestSpanByte.getStartDate());
requestSpan.setUserId(requestSpanByte.getUserId());
requestSpan.setViewPointId(requestSpanByte.getViewPointId());
requestSpan.setBusinessKey(requestSpanByte.getBussinessKey());
requestSpan.setAgentId(requestSpanByte.getAgentId());
requestSpan.setProcessNo(requestSpanByte.getProcessNo());
requestSpan.setAddress(requestSpanByte.getAddress());
} catch (InvalidProtocolBufferException e) {
throw new ConvertFailedException(e.getMessage(), e);
}
return requestSpan;
}
public static RequestSpan convert(byte[] data) throws ConvertFailedException {
return (RequestSpan) INSTANCE.convertData(data);
}
public void setBusinessKey(String businessKey) {
this.businessKey = businessKey;
}
public String getAgentId() {
return agentId;
}
public void setAgentId(String agentId) {
this.agentId = agentId;
}
public boolean isNull() {
return false;
}
public static class RequestSpanBuilder {
private RequestSpan requestSpan;
private RequestSpanBuilder(Span span) {
requestSpan = new RequestSpan(span);
}
public static RequestSpanBuilder newBuilder(Span span) {
return new RequestSpanBuilder(span);
}
public RequestSpanBuilder applicationId(String applicationId) {
requestSpan.applicationId = applicationId;
return this;
}
public RequestSpanBuilder callType(String callType) {
requestSpan.callType = callType;
return this;
}
public RequestSpanBuilder spanTypeDesc(String spanTypeDesc) {
requestSpan.spanTypeDesc = spanTypeDesc;
return this;
}
public RequestSpanBuilder userId(String userId) {
requestSpan.userId = userId;
return this;
}
public RequestSpanBuilder bussinessKey(String bussinessKey) {
requestSpan.businessKey = bussinessKey;
return this;
}
public RequestSpan build() {
return requestSpan;
}
public RequestSpanBuilder viewPoint(String viewPoint) {
requestSpan.viewPointId = viewPoint;
return this;
}
public RequestSpanBuilder processNo(String processNo) {
requestSpan.processNo = processNo;
return this;
}
public RequestSpanBuilder address(String address) {
requestSpan.address = address;
return this;
}
}
public String getBusinessKey() {
return businessKey;
}
public String getProcessNo() {
return processNo;
}
public String getAddress() {
return address;
}
public void setProcessNo(String processNo) {
this.processNo = processNo;
}
public void setAddress(String address) {
this.address = address;
}
}
package com.a.eye.skywalking.protocol;
import com.a.eye.skywalking.protocol.common.NullableClass;
import com.a.eye.skywalking.protocol.util.IntegerAssist;
import com.a.eye.skywalking.protocol.common.AbstractDataSerializable;
import com.a.eye.skywalking.protocol.exception.ConvertFailedException;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
public class SerializedFactory {
public static Map<Integer, AbstractDataSerializable> serializableMap =
new HashMap<Integer, AbstractDataSerializable>();
static {
ServiceLoader<AbstractDataSerializable> loaders = ServiceLoader.load(AbstractDataSerializable.class);
for (AbstractDataSerializable serializable : loaders) {
serializableMap.put(serializable.getDataType(), serializable);
}
}
public static AbstractDataSerializable deserialize(byte[] bytes) throws ConvertFailedException {
try {
AbstractDataSerializable abstractDataSerializable = serializableMap.get(IntegerAssist.bytesToInt(bytes, 0));
if (abstractDataSerializable != null) {
NullableClass nullableClass = abstractDataSerializable.convert2Object(bytes);
if (!nullableClass.isNull()) {
return (AbstractDataSerializable) nullableClass;
}
}
} catch (Exception e) {
throw new ConvertFailedException(e.getMessage(), e);
}
return null;
}
public static byte[] serialize(AbstractDataSerializable dataSerializable) {
return dataSerializable.convert2Bytes();
}
public static boolean isCanSerialized(int dataType) {
return serializableMap.get(dataType) != null ? true : false;
}
}
package com.a.eye.skywalking.protocol;
import com.a.eye.skywalking.protocol.util.ByteDataUtil;
import com.a.eye.skywalking.protocol.util.IntegerAssist;
import com.a.eye.skywalking.protocol.common.ISerializable;
import java.util.Arrays;
import java.util.List;
public class TransportPackager {
public static byte[] pack(List<ISerializable> beSendingData) {
// 对协议格式进行修改
// | check sum(4 byte) | data
byte[] data = serializeObjects(beSendingData);
byte[] dataPackage = appendCheckSum(data);
return dataPackage;
}
private static byte[] appendCheckSum(byte[] dataText) {
byte[] dataPackage = new byte[dataText.length + 4];
byte[] checkSum = ByteDataUtil.generateChecksum(dataText, 0);
System.arraycopy(checkSum, 0, dataPackage, 0, 4);
System.arraycopy(dataText, 0, dataPackage, 4, dataText.length);
return dataPackage;
}
private static byte[] serializeObjects(List<ISerializable> beSendingData) {
byte[] data = null;
int currentIndex = 0;
for (ISerializable sendingData : beSendingData) {
byte[] elementData = serialize(sendingData);
data = appendData(data, currentIndex, elementData);
currentIndex += elementData.length;
}
return data;
}
private static byte[] appendData(byte[] dataText, int currentIndex, byte[] dataElementText) {
if (dataText == null) {
dataText = new byte[dataElementText.length];
} else {
dataText = Arrays.copyOf(dataText, dataText.length + dataElementText.length);
}
System.arraycopy(dataElementText, 0, dataText, currentIndex, dataElementText.length);
return dataText;
}
public static byte[] serialize(ISerializable serializable) {
byte[] serializableBytes = serializable.convert2Bytes();
byte[] dataText = new byte[serializableBytes.length + 4];
System.arraycopy(serializableBytes, 0, dataText, 4, serializableBytes.length);
byte[] length = IntegerAssist.intToBytes(serializableBytes.length);
System.arraycopy(length, 0, dataText, 0, 4);
return dataText;
}
}
package com.a.eye.skywalking.protocol.common;
import com.a.eye.skywalking.protocol.NullClass;
import com.a.eye.skywalking.protocol.SerializedFactory;
import com.a.eye.skywalking.protocol.exception.ConvertFailedException;
import com.a.eye.skywalking.protocol.util.IntegerAssist;
import java.util.Arrays;
/**
* Created by wusheng on 16/7/4.
*/
public abstract class AbstractDataSerializable implements ISerializable, NullableClass {
public abstract int getDataType();
public abstract byte[] getData();
public abstract AbstractDataSerializable convertData(byte[] data) throws ConvertFailedException;
/**
* 消息包结构:
* 4位消息体类型
* n位数据正文
*
* @return
*/
@Override
public byte[] convert2Bytes() {
byte[] messageByteData = getData();
byte[] messagePackage = new byte[4 + messageByteData.length];
packData(messageByteData, messagePackage);
setProtocolType(messageByteData, messagePackage);
return messagePackage;
}
private void setProtocolType(byte[] messageByteData, byte[] messagePackage) {
System.arraycopy(IntegerAssist.intToBytes(getDataType()), 0, messagePackage, 0, 4);
}
private void packData(byte[] messageByteData, byte[] messagePackage) {
System.arraycopy(messageByteData, 0, messagePackage, 4, messageByteData.length);
}
@Override
public NullableClass convert2Object(byte[] data) throws ConvertFailedException {
int dataType = IntegerAssist.bytesToInt(data, 0);
if (!SerializedFactory.isCanSerialized(dataType)) {
return new NullClass();
}
return this.convertData(Arrays.copyOfRange(data, 4, data.length));
}
}
package com.a.eye.skywalking.protocol.common;
public enum CallType {
LOCAL('L'), SYNC('S'), ASYNC('A');
private char value;
CallType(char value) {
this.value = value;
}
public static CallType convert(String id) {
char v = id.charAt(0);
switch (v) {
case 'L':
return LOCAL;
case 'S':
return SYNC;
case 'A':
return ASYNC;
default:
throw new IllegalStateException("Failed to convert callType[" + id + "]");
}
}
@Override
public String toString() {
return String.valueOf(value);
}
}
package com.a.eye.skywalking.protocol.common;
import com.a.eye.skywalking.protocol.exception.ConvertFailedException;
/**
* Created by wusheng on 16/7/4.
*/
public interface ISerializable {
byte[] convert2Bytes();
NullableClass convert2Object(byte[] data) throws ConvertFailedException;
}
package com.a.eye.skywalking.protocol.common;
/**
* Created by wusheng on 16/7/4.
*/
public interface NullableClass {
boolean isNull();
}
package com.a.eye.skywalking.protocol.common;
import com.a.eye.skywalking.protocol.exception.SpanTypeCannotConvertException;
/**
* Created by xin on 16-7-2.
*/
public enum SpanType {
LOCAL(1),
RPC_CLIENT(2),
RPC_SERVER(4);
private int value;
SpanType(int value) {
this.value = value;
}
public static SpanType convert(int spanTypeValue) {
switch (spanTypeValue) {
case 1:
return LOCAL;
case 2:
return RPC_CLIENT;
case 4:
return RPC_SERVER;
default:
throw new SpanTypeCannotConvertException(spanTypeValue + "");
}
}
public int getValue() {
return value;
}
}
package com.a.eye.skywalking.protocol.exception;
import com.google.protobuf.InvalidProtocolBufferException;
public class ConvertFailedException extends Exception {
public ConvertFailedException(String message, Exception e) {
super(message, e);
}
}
package com.a.eye.skywalking.protocol.exception;
/**
* Created by wusheng on 16/7/4.
*/
public class SerializableDataTypeRegisterException extends RuntimeException {
public SerializableDataTypeRegisterException(String message) {
super(message);
}
}
package com.a.eye.skywalking.protocol.exception;
public class SpanTypeCannotConvertException extends RuntimeException {
public SpanTypeCannotConvertException(String spanTypeValue) {
super("Can not convert SpanTypeValue[" + spanTypeValue + "]");
}
}
package com.a.eye.skywalking.protocol.util;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 原子性,带范围性自增的整数
*
* @author wusheng
*
*/
public class AtomicRangeInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = -4099792402691141643L;
private AtomicInteger value;
private int startValue;
private int endValue;
/**
* Creates a new AtomicInteger with the given initial value and max value
*
* @param startValue
* the initial value
* @param maxValue
*
* AtomicRangeInteger在startValue和maxValue循环取值( startValue <= value < maxValue)
*/
public AtomicRangeInteger(int startValue, int maxValue) {
value = new AtomicInteger(startValue);
this.startValue = startValue;
this.endValue = maxValue - 1;
}
/**
* Atomically increments by one the current value.
*
* @return the previous value
*/
public final int getAndIncrement() {
for (;;) {
int current = value.get();
int next = current >= this.endValue ? this.startValue : current + 1;
if (value.compareAndSet(current, next))
return current;
}
}
public final int get() {
return value.get();
}
public int intValue() {
return value.intValue();
}
public long longValue() {
return value.longValue();
}
public float floatValue() {
return value.floatValue();
}
public double doubleValue() {
return value.doubleValue();
}
}
package com.a.eye.skywalking.protocol.util;
import java.util.Arrays;
public class ByteDataUtil {
public static byte[] unpackCheckSum(byte[] msg) {
return Arrays.copyOfRange(msg, 4, msg.length);
}
public static boolean validateCheckSum(byte[] dataPackage) {
byte[] checkSum = generateChecksum(dataPackage, 4);
byte[] originCheckSum = new byte[4];
System.arraycopy(dataPackage, 0, originCheckSum, 0, 4);
return Arrays.equals(checkSum, originCheckSum);
}
public static byte[] generateChecksum(byte[] data, int offset) {
int result = data[offset];
for (int i = offset + 1; i < data.length; i++) {
result ^= data[i];
}
return IntegerAssist.intToBytes(result);
}
}
package com.a.eye.skywalking.protocol.util;
/**
* Created by wusheng on 16/7/4.
*/
public class IntegerAssist {
public static byte[] intToBytes(int value) {
byte[] src = new byte[4];
src[0] = (byte) ((value >> 24) & 0xFF);
src[1] = (byte) ((value >> 16) & 0xFF);
src[2] = (byte) ((value >> 8) & 0xFF);
src[3] = (byte) (value & 0xFF);
return src;
}
public static int bytesToInt(byte[] src, int offset) {
int value;
value = (((src[offset] & 0xFF) << 24) | ((src[offset + 1] & 0xFF) << 16) | ((src[offset + 2] & 0xFF) << 8) | (src[offset + 3] & 0xFF));
return value;
}
}
package com.a.eye.skywalking.protocol.util;
import com.a.eye.skywalking.protocol.FullSpan;
import java.util.Comparator;
public class SpanLevelIdComparators {
public static class SpanASCComparator implements Comparator<FullSpan> {
@Override
public int compare(FullSpan span1, FullSpan span2) {
String span1TraceLevel = getTraceLevelId(span1);
String span2TraceLevel = getTraceLevelId(span2);
return ascCompare(span1TraceLevel, span2TraceLevel);
}
}
private static String getTraceLevelId(FullSpan span) {
String spanTraceLevelId = null;
if (span.getParentLevel() == null || span.getParentLevel().length() == 0) {
spanTraceLevelId = span.getLevelId() + "";
} else {
spanTraceLevelId = span.getParentLevel() + "." + span.getLevelId();
}
return spanTraceLevelId;
}
public static int descComparator(String levelId0, String levelId1) {
String[] levelId0Array = levelId0.split("\\.");
String[] levelId1Array = levelId1.split("\\.");
int result = -1;
int index = 0;
while (true) {
if (index >= levelId0Array.length) {
result = 1;
break;
}
if (index >= levelId1Array.length) {
result = -1;
break;
}
result = -1 * new Integer(levelId0Array[index]).compareTo(new Integer(levelId1Array[index]));
if (result != 0)
break;
index++;
}
return result;
}
public static int ascCompare(String levelId0, String levelId1) {
String[] levelId0Array = levelId0.split("\\.");
String[] levelId1Array = levelId1.split("\\.");
int result = -1;
int index = 0;
while (true) {
if (index >= levelId0Array.length) {
result = -1;
break;
}
if (index >= levelId1Array.length) {
result = 1;
break;
}
result = new Integer(levelId0Array[index]).compareTo(new Integer(levelId1Array[index]));
if (result != 0)
break;
index++;
}
return result;
}
}
syntax = "proto2";
option java_package = "com.a.eye.skywalking.protocol.proto";
message AckSpan {
required string traceId = 1;
optional string parentLevel = 2;
required int32 levelId = 3;
required int64 cost = 4;
required int32 statusCode = 5;
optional string exceptionStack = 6;
required string viewpointId = 7;
}
message RequestSpan {
required string traceId = 1;
optional string parentLevel = 2;
required int32 levelId = 3;
required string viewPointId = 4;
required int64 startDate = 5;
required string spanTypeDesc = 6;
required string callType = 7;
required uint32 spanType = 8;
required string applicationId = 9;
required string userId = 10;
optional string bussinessKey = 11;
required string agentId = 12;
required string processNo = 13;
required string address = 14;
}
message InputParametersSpan{
required string traceId = 1;
optional string traceLevelId = 2;
map<string,string> parameters = 3;
}
message OutputParametersSpan{
required string traceId = 1;
optional string traceLevelId = 2;
required string outputParameter = 3;
}
com.a.eye.skywalking.protocol.AckSpan
com.a.eye.skywalking.protocol.RequestSpan
com.a.eye.skywalking.protocol.InputParametersSpan
com.a.eye.skywalking.protocol.OutputParameterSpan
com.a.eye.skywalking.protocol.BufferFileEOFProtocol
*.class
# Mobile Tools for Java (J2ME)
.mtj.tmp/
# Package Files #
*.jar
*.war
*.ear
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
.idea
*.iml
package com.a.eye.skywalking.reciever.buffer;
import com.a.eye.skywalking.protocol.SerializedFactory;
import com.a.eye.skywalking.protocol.common.AbstractDataSerializable;
import com.a.eye.skywalking.protocol.exception.ConvertFailedException;
import com.a.eye.skywalking.protocol.util.IntegerAssist;
import java.util.ArrayList;
import java.util.List;
public class BufferDataAssist {
private static byte[] SPILT = new byte[] {127, 127, 127, 127};
private static byte[] EOF = null;
private BufferDataAssist() {
//DO Nothing
}
public static byte[] appendLengthAndSplit(byte[] msg) {
byte[] dataPackage = new byte[msg.length + 8];
// 前四位长度
System.arraycopy(IntegerAssist.intToBytes(msg.length), 0, dataPackage, 0, 4);
// 中间正文
System.arraycopy(msg, 0, dataPackage, 4, msg.length);
// 后四位特殊字符
System.arraycopy(SPILT, 0, dataPackage, msg.length + 4, 4);
return dataPackage;
}
}
package com.a.eye.skywalking.reciever.conf;
public class Constants {
public static final String DATA_SPILT = "#&";
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册