提交 413e8af6 编写于 作者: A ascrutae

1.修改测试类

2. 修改Sh脚本
上级 d8f84f2a
......@@ -25,19 +25,15 @@ fi
SW_ANALYSIS_PID=`cat ${PID_FILES}`
# check if the skywalking analysis process is running
if [ "$SW_ANALYSIS_PID" != "" ]; then
PS_OUT=`ps -ef | grep $SW_ANALYSIS_PID | grep -v 'grep' | grep -v $0`
result=$(echo $PS_OUT | grep $SW_ALALYSIS_PID)
if [ "$result" != "" ]; then
#PS_OUT=`ps -ef | grep $SW_ANALYSIS_PID | grep -v 'grep' | grep -v $0`
#PS_OUT=`ps -ax | awk '{ print $1 }' | grep -e "^${SW_ANALYSIS_PID}$"` | echo ${PS_OUT}
PS_OUT=`ps -ax | awk '{print $1}' | grep -e "^${SW_ANALYSIS_PID}$"`
if [ "$PS_OUT" != "" ]; then
echo "The skywalking analysis process is running. Will delay the analysis."
exit -1;
fi
fi
# echo the current process Id to pid file
PID=$!
echo "Current analysis process Id : ${PID}"
echo $PID > SW_ANALYSIS_PID
#skywalking analysis mode:1)accumulate(default) 2)rewrite
SW_ANALYSIS_MODE=ACCUMULATE
#skywalking rewrite execute dates. Each number represents the day of the month.
......@@ -58,7 +54,7 @@ if [ "$PRE_TIME_OF_REWRITE_TIME" != "" ]; then
THE_DAY_OF_MONTH=$(date "+%d")
for THE_DAY in ${REWRITE_EXECUTIVE_DAY_ARR[@]}
do
if [ ${THE_DAY} -eq ${THE_DAY_OF_MONTH} ]; then
if [ ${THE_DAY} == ${THE_DAY_OF_MONTH} ]; then
SW_ANALYSIS_MODE=REWRITE
START_TIME=$(date --date='1 month ago' '+%Y-%m-01/00:00:00')
echo "skywalking analysis will execute rewrite mode. Start time:${START_TIME}"
......@@ -83,7 +79,7 @@ if [ "${SW_ANALYSIS_MODE}" != "REWRITE" ]; then
START_TIME=`cat ${PRE_TIME_OF_ACCUMULATE_FILE}`
if [ "$START_TIME" = "" ]; then
START_TIME=`date --date='3 month ago' "+%Y-%m-%d/%H:%M:%S"`
START_TIME=`date --date='1 month ago' "+%Y-%m-%d/%H:%M:%S"`
fi
SW_ANALYSIS_MODE=ACCUMULATE
echo "skywalking analysis process will execute accumulate mode. start time: ${START_TIME}."
......@@ -95,8 +91,13 @@ END_TIME=`date --date='10 minute ago' "+%Y-%m-%d/%H:%M:%S"`
## execute command
echo "Begin to analysis the buried point data between ${START_TIME} to ${END_TIME}."
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath`
nohup ${HADOOP_HOME}/bin/hadoop jar skywalking-analysis-1.0-SNAPSHOT.jar -Dskywalking.analysis.mode=${SW_ANALYSIS_MODE} ${START_TIME} ${END_TIME} > ${SW_ANALYSIS_HOME}/log/map-reduce.log 2>&1 &
CURRENT_PID=`echo $!`
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar skywalking-analysis-1.0-SNAPSHOT.jar -Dskywalking.analysis.mode=${SW_ANALYSIS_MODE} ${START_TIME} ${END_TIME}
echo "current analysis process Id is ${CURRENT_PID}"
echo ${CURRENT_PID} > ${PID_FILES}
if [ "${SW_ANALYSIS_MODE}" = "REWRITE" ]; then
echo $(date "+%d") > ${PRE_TIME_OF_REWRITE_FILE}
......
......@@ -79,9 +79,7 @@ public class AnalysisServerDriver extends Configured implements Tool {
Date startDate = simpleDateFormat.parse(args[0]);
Date endDate = simpleDateFormat.parse(args[1]);
Scan scan = new Scan();
//scan.setMaxVersions();
scan.setBatch(2001);
scan.setMaxVersions();
scan.setTimeRange(startDate.getTime(), endDate.getTime());
return scan;
}
......
......@@ -7,7 +7,7 @@ import com.ai.cloud.skywalking.analysis.chainbuild.util.SubLevelSpanCostCounter;
public class ReplaceAddressFilter extends SpanNodeProcessFilter {
//ip:port regex
//ip:PORT regex
private static String IP_PORT_REGEX = "(([0-9]|[1-9][0-9]|1([0-9][0-9])|2([0-4][0-9]|5[0-5]))\\.([0-9]|[1-9][0-9]|1([0-9][0-9])|2([0-4][0-9]|5[0-5]))\\.([0-9]|[1-9][0-9]|1([0-9][0-9])|2([0-4][0-9]|5[0-5]))\\.([0-9]|[1-9][0-9]|1([0-9][0-9])|2([0-4][0-9]|5[0-5]))|localhost):([1-5][0-9]{4}|6[0-4][0-9]{3}|65[0-4][0-9]{2}|655[0-2][0-9]|6553[0-5]|[0-9]{1,4})";
@Override
......
......@@ -33,17 +33,4 @@ public class Config {
public static class MapReduce {
public static String JAVA_OPTS = "-Xmx200m";
}
public static class Redis {
public static String HOST = "127.0.0.1";
public static int PORT = 6379;
public static String MAPPER_COUNT_KEY = "ANALYSIS_TOTAL_SIZE";
public static String SUCCESS_MAPPER_COUNT_KEY = "ANALYSIS_SUCCESS_TOTAL_SIZE";
public static String FAILED_MAPPER_COUNT_KEY = "ANALYSIS_FAILED_TOTAL_SIZE";
}
}
......@@ -19,8 +19,4 @@ reducer.reducer_number=1
mapreduce.java_opts=-Xmx768m
redis.host=10.1.55.11
redis.port=6379
redis.mapper_count_key=ANALYSIS_TOTAL_SIZE
\ No newline at end of file
......@@ -6,7 +6,7 @@
<appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern"
value="%d - %c -%-4r [%t](%F:%L) %-5p %x - %m%n" />
value="%d - %c -%-4r [%t] %-5p %x - %m%n" />
</layout>
</appender>
......
......@@ -2,7 +2,7 @@
<Configuration status="debug">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t](%F:%L) %-5level %logger{36} - %msg%n"/>
<PatternLayout pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
</Console>
</Appenders>
<Loggers>
......
......@@ -33,9 +33,9 @@ public class CallChainMapperTest {
Table table = connection.getTable(TableName.valueOf
(HBaseTableMetaData.TABLE_CALL_CHAIN.TABLE_NAME));
Scan scan = new Scan();
//2016-04-13/16:59:24 to 2016-05-13/16:49:24
Date startDate = simpleDateFormat.parse("2016-04-13/16:59:24");
Date endDate = simpleDateFormat.parse("2016-05-13/16:49:24");
//2016-04-16/19:49:25 to 2016-05-16/19:39:25
Date startDate = simpleDateFormat.parse("2016-04-16/19:49:25");
Date endDate = simpleDateFormat.parse("2016-05-16/19:39:25");
scan.setBatch(2001);
scan.setTimeRange(startDate.getTime(), endDate.getTime());
......
package com.ai.cloud.skywalking.analysis.mapper;
import com.ai.cloud.skywalking.analysis.config.HBaseTableMetaData;
import com.ai.cloud.skywalking.analysis.mapper.util.HBaseUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
/**
* Created by xin on 16-5-16.
*/
public class MappingTableCounter {
public static Set<String> getTraceMappingCount() throws IOException {
Connection connection = HBaseUtils.getConnection();
Table table = connection.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_TRACE_ID_AND_CID_MAPPING.TABLE_NAME));
ResultScanner resultScanner = table.getScanner(new Scan());
Set<String> traceIds = new HashSet<String>();
for (Result result :resultScanner){
traceIds.add(Bytes.toString(result.getRow()));
}
return traceIds;
}
}
......@@ -2,12 +2,14 @@ package com.ai.cloud.skywalking.analysis.mapper.util;
import com.ai.cloud.skywalking.analysis.chainbuild.ChainBuildMapper;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainInfo;
import com.ai.cloud.skywalking.analysis.mapper.MappingTableCounter;
import com.ai.cloud.skywalking.protocol.Span;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.*;
/**
......@@ -17,9 +19,11 @@ public class Convert {
private static final Map<String, String> traceIds = new HashMap<>();
public static List<ChainInfo> convert(ResultScanner resultScanner) {
public static List<ChainInfo> convert(ResultScanner resultScanner) throws IOException {
List<ChainInfo> chainInfos = new ArrayList<ChainInfo>();
int count = 0, failedCount = 0, successCount = 0;
for (Result result : resultScanner) {
count++;
try {
List<Span> spanList = new ArrayList<Span>();
for (Cell cell : result.rawCells()) {
......@@ -27,19 +31,33 @@ public class Convert {
}
if (spanList.size() == 0 || spanList.size() > 2000) {
continue;
throw new RuntimeException("Failed to convert it");
}
ChainInfo chainInfo = ChainBuildMapper.spanToChainInfo(Bytes.toString(result.getRow()), spanList);
chainInfos.add(chainInfo);
traceIds.put(spanList.get(0).getTraceId(), chainInfo.getCID());
successCount++;
} catch (Exception e) {
failedCount++;
continue;
}
}
System.out.println(traceIds.size());
System.out.println("count : " + count);
// System.out.println("Success count " + traceIds.size());
System.out.println("Failed count " + failedCount);
System.out.println("Success count " + successCount);
System.out.println("HBase :" + traceIds.size());
Set<String> traceMapping = MappingTableCounter.getTraceMappingCount();
for (String traceId : traceMapping){
traceIds.remove(traceId);
}
System.out.println(traceIds);
return chainInfos;
}
......
......@@ -19,7 +19,7 @@ import java.util.List;
*/
public class HBaseUtils {
private static String ZK_QUORUM = "host-10-1-241-18,host-10-1-241-19,host-10-1-241-20";
private static String ZK_QUORUM = "host-10-1-235-197,host-10-1-235-198,host-10-1-235-199";
private static String ZK_CLIENT_PORT = "29181";
private static Configuration configuration = null;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册