diff --git a/skywalking-analysis/bin/start-analysis.sh b/skywalking-analysis/bin/start-analysis.sh index 4562ffd13e558d7f17f9f8212412ae8c797fecc9..f7082314ee2beceba04e594a2088560bb4fe74ee 100755 --- a/skywalking-analysis/bin/start-analysis.sh +++ b/skywalking-analysis/bin/start-analysis.sh @@ -25,19 +25,15 @@ fi SW_ANALYSIS_PID=`cat ${PID_FILES}` # check if the skywalking analysis process is running if [ "$SW_ANALYSIS_PID" != "" ]; then - PS_OUT=`ps -ef | grep $SW_ANALYSIS_PID | grep -v 'grep' | grep -v $0` - result=$(echo $PS_OUT | grep $SW_ALALYSIS_PID) - if [ "$result" != "" ]; then + #PS_OUT=`ps -ef | grep $SW_ANALYSIS_PID | grep -v 'grep' | grep -v $0` + #PS_OUT=`ps -ax | awk '{ print $1 }' | grep -e "^${SW_ANALYSIS_PID}$"` | echo ${PS_OUT} + PS_OUT=`ps -ax | awk '{print $1}' | grep -e "^${SW_ANALYSIS_PID}$"` + if [ "$PS_OUT" != "" ]; then echo "The skywalking analysis process is running. Will delay the analysis." exit -1; fi fi -# echo the current process Id to pid file -PID=$! -echo "Current analysis process Id : ${PID}" -echo $PID > SW_ANALYSIS_PID - #skywalking analysis mode:1)accumulate(default) 2)rewrite SW_ANALYSIS_MODE=ACCUMULATE #skywalking rewrite execute dates. Each number represents the day of the month. @@ -58,7 +54,7 @@ if [ "$PRE_TIME_OF_REWRITE_TIME" != "" ]; then THE_DAY_OF_MONTH=$(date "+%d") for THE_DAY in ${REWRITE_EXECUTIVE_DAY_ARR[@]} do - if [ ${THE_DAY} -eq ${THE_DAY_OF_MONTH} ]; then + if [ ${THE_DAY} == ${THE_DAY_OF_MONTH} ]; then SW_ANALYSIS_MODE=REWRITE START_TIME=$(date --date='1 month ago' '+%Y-%m-01/00:00:00') echo "skywalking analysis will execute rewrite mode. Start time:${START_TIME}" @@ -83,7 +79,7 @@ if [ "${SW_ANALYSIS_MODE}" != "REWRITE" ]; then START_TIME=`cat ${PRE_TIME_OF_ACCUMULATE_FILE}` if [ "$START_TIME" = "" ]; then - START_TIME=`date --date='3 month ago' "+%Y-%m-%d/%H:%M:%S"` + START_TIME=`date --date='1 month ago' "+%Y-%m-%d/%H:%M:%S"` fi SW_ANALYSIS_MODE=ACCUMULATE echo "skywalking analysis process will execute accumulate mode. start time: ${START_TIME}." @@ -95,8 +91,13 @@ END_TIME=`date --date='10 minute ago' "+%Y-%m-%d/%H:%M:%S"` ## execute command echo "Begin to analysis the buried point data between ${START_TIME} to ${END_TIME}." +export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` +nohup ${HADOOP_HOME}/bin/hadoop jar skywalking-analysis-1.0-SNAPSHOT.jar -Dskywalking.analysis.mode=${SW_ANALYSIS_MODE} ${START_TIME} ${END_TIME} > ${SW_ANALYSIS_HOME}/log/map-reduce.log 2>&1 & + +CURRENT_PID=`echo $!` -HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar skywalking-analysis-1.0-SNAPSHOT.jar -Dskywalking.analysis.mode=${SW_ANALYSIS_MODE} ${START_TIME} ${END_TIME} +echo "current analysis process Id is ${CURRENT_PID}" +echo ${CURRENT_PID} > ${PID_FILES} if [ "${SW_ANALYSIS_MODE}" = "REWRITE" ]; then echo $(date "+%d") > ${PRE_TIME_OF_REWRITE_FILE} diff --git a/skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/AnalysisServerDriver.java b/skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/AnalysisServerDriver.java index f07062806771c812c22396eb7613de540e2daa4d..12deec901cadbe17448d1c01defda07e58f23127 100644 --- a/skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/AnalysisServerDriver.java +++ b/skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/AnalysisServerDriver.java @@ -79,9 +79,7 @@ public class AnalysisServerDriver extends Configured implements Tool { Date startDate = simpleDateFormat.parse(args[0]); Date endDate = simpleDateFormat.parse(args[1]); Scan scan = new Scan(); - //scan.setMaxVersions(); scan.setBatch(2001); - scan.setMaxVersions(); scan.setTimeRange(startDate.getTime(), endDate.getTime()); return scan; } diff --git a/skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/chainbuild/filter/impl/ReplaceAddressFilter.java b/skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/chainbuild/filter/impl/ReplaceAddressFilter.java index d05d6b94c0cb317776e4b804b0ba99489f40417a..947a297090a850f33576cde30fe27fddfee90aac 100644 --- a/skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/chainbuild/filter/impl/ReplaceAddressFilter.java +++ b/skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/chainbuild/filter/impl/ReplaceAddressFilter.java @@ -7,7 +7,7 @@ import com.ai.cloud.skywalking.analysis.chainbuild.util.SubLevelSpanCostCounter; public class ReplaceAddressFilter extends SpanNodeProcessFilter { - //ip:port regex + //ip:PORT regex private static String IP_PORT_REGEX = "(([0-9]|[1-9][0-9]|1([0-9][0-9])|2([0-4][0-9]|5[0-5]))\\.([0-9]|[1-9][0-9]|1([0-9][0-9])|2([0-4][0-9]|5[0-5]))\\.([0-9]|[1-9][0-9]|1([0-9][0-9])|2([0-4][0-9]|5[0-5]))\\.([0-9]|[1-9][0-9]|1([0-9][0-9])|2([0-4][0-9]|5[0-5]))|localhost):([1-5][0-9]{4}|6[0-4][0-9]{3}|65[0-4][0-9]{2}|655[0-2][0-9]|6553[0-5]|[0-9]{1,4})"; @Override diff --git a/skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/config/Config.java b/skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/config/Config.java index 8c781e4f81421045d8ef607d0c7068647861b61c..adb7faa303eb025130298d3959fef8eece4376ae 100644 --- a/skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/config/Config.java +++ b/skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/config/Config.java @@ -33,17 +33,4 @@ public class Config { public static class MapReduce { public static String JAVA_OPTS = "-Xmx200m"; } - - - public static class Redis { - public static String HOST = "127.0.0.1"; - - public static int PORT = 6379; - - public static String MAPPER_COUNT_KEY = "ANALYSIS_TOTAL_SIZE"; - - public static String SUCCESS_MAPPER_COUNT_KEY = "ANALYSIS_SUCCESS_TOTAL_SIZE"; - - public static String FAILED_MAPPER_COUNT_KEY = "ANALYSIS_FAILED_TOTAL_SIZE"; - } } diff --git a/skywalking-analysis/src/main/resources/analysis.conf b/skywalking-analysis/src/main/resources/analysis.conf index c98283a37bd17f62aed4f42da80d1faf45aa7125..79202293ed4ac2d131ecb0b2b6b0b524275063b4 100644 --- a/skywalking-analysis/src/main/resources/analysis.conf +++ b/skywalking-analysis/src/main/resources/analysis.conf @@ -19,8 +19,4 @@ reducer.reducer_number=1 mapreduce.java_opts=-Xmx768m -redis.host=10.1.55.11 - -redis.port=6379 - redis.mapper_count_key=ANALYSIS_TOTAL_SIZE \ No newline at end of file diff --git a/skywalking-analysis/src/main/resources/log4j.xml b/skywalking-analysis/src/main/resources/log4j.xml index e280235476017031ac2d967684de81ccef4c4106..0da28a6d55c168b401f0a52f678adf49c17bdd0b 100644 --- a/skywalking-analysis/src/main/resources/log4j.xml +++ b/skywalking-analysis/src/main/resources/log4j.xml @@ -6,7 +6,7 @@ + value="%d - %c -%-4r [%t] %-5p %x - %m%n" /> diff --git a/skywalking-analysis/src/main/resources/log4j2.xml b/skywalking-analysis/src/main/resources/log4j2.xml index 38c27bbca44fce9b3dbe6025a97104f2c3d8f1ab..b454d4b5bf73986916853127f5a61e5f55ec5504 100644 --- a/skywalking-analysis/src/main/resources/log4j2.xml +++ b/skywalking-analysis/src/main/resources/log4j2.xml @@ -2,7 +2,7 @@ - + diff --git a/skywalking-analysis/src/test/java/com/ai/cloud/skywalking/analysis/mapper/CallChainMapperTest.java b/skywalking-analysis/src/test/java/com/ai/cloud/skywalking/analysis/mapper/CallChainMapperTest.java index 5a19b108af47e170e29a99ae9208c59c289c5d25..2571266b8c71bf7286540a2e2c93beecce5b4fc0 100644 --- a/skywalking-analysis/src/test/java/com/ai/cloud/skywalking/analysis/mapper/CallChainMapperTest.java +++ b/skywalking-analysis/src/test/java/com/ai/cloud/skywalking/analysis/mapper/CallChainMapperTest.java @@ -33,9 +33,9 @@ public class CallChainMapperTest { Table table = connection.getTable(TableName.valueOf (HBaseTableMetaData.TABLE_CALL_CHAIN.TABLE_NAME)); Scan scan = new Scan(); - //2016-04-13/16:59:24 to 2016-05-13/16:49:24 - Date startDate = simpleDateFormat.parse("2016-04-13/16:59:24"); - Date endDate = simpleDateFormat.parse("2016-05-13/16:49:24"); + //2016-04-16/19:49:25 to 2016-05-16/19:39:25 + Date startDate = simpleDateFormat.parse("2016-04-16/19:49:25"); + Date endDate = simpleDateFormat.parse("2016-05-16/19:39:25"); scan.setBatch(2001); scan.setTimeRange(startDate.getTime(), endDate.getTime()); diff --git a/skywalking-analysis/src/test/java/com/ai/cloud/skywalking/analysis/mapper/MappingTableCounter.java b/skywalking-analysis/src/test/java/com/ai/cloud/skywalking/analysis/mapper/MappingTableCounter.java new file mode 100644 index 0000000000000000000000000000000000000000..ef3da560c69936bb60078d3a458e73049009e3be --- /dev/null +++ b/skywalking-analysis/src/test/java/com/ai/cloud/skywalking/analysis/mapper/MappingTableCounter.java @@ -0,0 +1,29 @@ +package com.ai.cloud.skywalking.analysis.mapper; + +import com.ai.cloud.skywalking.analysis.config.HBaseTableMetaData; +import com.ai.cloud.skywalking.analysis.mapper.util.HBaseUtils; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +/** + * Created by xin on 16-5-16. + */ +public class MappingTableCounter { + + public static Set getTraceMappingCount() throws IOException { + Connection connection = HBaseUtils.getConnection(); + Table table = connection.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_TRACE_ID_AND_CID_MAPPING.TABLE_NAME)); + ResultScanner resultScanner = table.getScanner(new Scan()); + Set traceIds = new HashSet(); + for (Result result :resultScanner){ + traceIds.add(Bytes.toString(result.getRow())); + } + + return traceIds; + } +} diff --git a/skywalking-analysis/src/test/java/com/ai/cloud/skywalking/analysis/mapper/util/Convert.java b/skywalking-analysis/src/test/java/com/ai/cloud/skywalking/analysis/mapper/util/Convert.java index 18f80fde030d8aa6cffc5e5bcf630e406a037a43..eeaee5bbeb06078cf4c9c3d77214bcfca4e58662 100644 --- a/skywalking-analysis/src/test/java/com/ai/cloud/skywalking/analysis/mapper/util/Convert.java +++ b/skywalking-analysis/src/test/java/com/ai/cloud/skywalking/analysis/mapper/util/Convert.java @@ -2,12 +2,14 @@ package com.ai.cloud.skywalking.analysis.mapper.util; import com.ai.cloud.skywalking.analysis.chainbuild.ChainBuildMapper; import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainInfo; +import com.ai.cloud.skywalking.analysis.mapper.MappingTableCounter; import com.ai.cloud.skywalking.protocol.Span; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.util.Bytes; +import java.io.IOException; import java.util.*; /** @@ -17,9 +19,11 @@ public class Convert { private static final Map traceIds = new HashMap<>(); - public static List convert(ResultScanner resultScanner) { + public static List convert(ResultScanner resultScanner) throws IOException { List chainInfos = new ArrayList(); + int count = 0, failedCount = 0, successCount = 0; for (Result result : resultScanner) { + count++; try { List spanList = new ArrayList(); for (Cell cell : result.rawCells()) { @@ -27,19 +31,33 @@ public class Convert { } if (spanList.size() == 0 || spanList.size() > 2000) { - continue; + throw new RuntimeException("Failed to convert it"); } ChainInfo chainInfo = ChainBuildMapper.spanToChainInfo(Bytes.toString(result.getRow()), spanList); chainInfos.add(chainInfo); traceIds.put(spanList.get(0).getTraceId(), chainInfo.getCID()); + successCount++; } catch (Exception e) { + failedCount++; continue; } } - System.out.println(traceIds.size()); + System.out.println("count : " + count); +// System.out.println("Success count " + traceIds.size()); + System.out.println("Failed count " + failedCount); + System.out.println("Success count " + successCount); + System.out.println("HBase :" + traceIds.size()); + + + Set traceMapping = MappingTableCounter.getTraceMappingCount(); + for (String traceId : traceMapping){ + traceIds.remove(traceId); + } + + System.out.println(traceIds); return chainInfos; } diff --git a/skywalking-analysis/src/test/java/com/ai/cloud/skywalking/analysis/mapper/util/HBaseUtils.java b/skywalking-analysis/src/test/java/com/ai/cloud/skywalking/analysis/mapper/util/HBaseUtils.java index fef08ebe8240a43ce88fd4bc31c4082c7a5a6404..b2b396fb0b02778b9f36914b5086d85d96ff490f 100644 --- a/skywalking-analysis/src/test/java/com/ai/cloud/skywalking/analysis/mapper/util/HBaseUtils.java +++ b/skywalking-analysis/src/test/java/com/ai/cloud/skywalking/analysis/mapper/util/HBaseUtils.java @@ -19,7 +19,7 @@ import java.util.List; */ public class HBaseUtils { - private static String ZK_QUORUM = "host-10-1-241-18,host-10-1-241-19,host-10-1-241-20"; + private static String ZK_QUORUM = "host-10-1-235-197,host-10-1-235-198,host-10-1-235-199"; private static String ZK_CLIENT_PORT = "29181"; private static Configuration configuration = null;