提交 5efa059a 编写于 作者: A ascrutae

1.解决编译问题

上级 54c8dbf7
package com.ai.cloud.skywalking.analysis.chainbuild;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.ai.cloud.skywalking.analysis.chainbuild.exception.Tid2CidECovertException;
import com.ai.cloud.skywalking.analysis.chainbuild.filter.SpanNodeProcessChain;
import com.ai.cloud.skywalking.analysis.chainbuild.filter.SpanNodeProcessFilter;
......@@ -29,21 +11,35 @@ import com.ai.cloud.skywalking.analysis.chainbuild.util.SubLevelSpanCostCounter;
import com.ai.cloud.skywalking.analysis.chainbuild.util.TokenGenerator;
import com.ai.cloud.skywalking.analysis.chainbuild.util.VersionIdentifier;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;
import com.ai.cloud.skywalking.protocol.Span;
import com.ai.cloud.skywalking.protocol.AckSpan;
import com.ai.cloud.skywalking.protocol.FullSpan;
import com.ai.cloud.skywalking.protocol.RequestSpan;
import com.ai.cloud.skywalking.protocol.util.SpanLevelIdComparators;
import com.google.gson.Gson;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;
public class ChainBuildMapper extends TableMapper<Text, Text> {
private Logger logger = LogManager.getLogger(ChainBuildMapper.class);
private SimpleDateFormat hourSimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH");
private SimpleDateFormat daySimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
private Logger logger = LogManager.getLogger(ChainBuildMapper.class);
private SimpleDateFormat hourSimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH");
private SimpleDateFormat daySimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
private SimpleDateFormat monthSimpleDateFormat = new SimpleDateFormat("yyyy-MM");
private SimpleDateFormat yearSimpleDateFormat = new SimpleDateFormat("yyyy");
private SimpleDateFormat yearSimpleDateFormat = new SimpleDateFormat("yyyy");
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
protected void setup(Context context) throws IOException, InterruptedException {
ConfigInitializer.initialize();
}
......@@ -54,87 +50,79 @@ public class ChainBuildMapper extends TableMapper<Text, Text> {
return;
}
List<Span> spanList = new ArrayList<Span>();
ChainInfo chainInfo = null;
try {
for (Cell cell : value.rawCells()) {
Span span = new Span(Bytes.toString(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength()));
spanList.add(span);
}
List<FullSpan> spanList = HBaseUtil.fetchTraceSpansFromHBase(value);
if (spanList.size() == 0) {
throw new Tid2CidECovertException("tid["
+ Bytes.toString(key.get()) + "] has no span data.");
throw new Tid2CidECovertException("tid[" + Bytes.toString(key.get()) + "] has no span data.");
}
if (spanList.size() > 2000) {
throw new Tid2CidECovertException("tid["
+ Bytes.toString(key.get()) + "] node size has over 2000.");
throw new Tid2CidECovertException("tid[" + Bytes.toString(key.get()) + "] node size has over 2000.");
}
chainInfo = spanToChainInfo(Bytes.toString(key.get()), spanList);
logger.debug("convert tid[" + Bytes.toString(key.get())
+ "] to chain with cid[" + chainInfo.getCID() + "].");
logger.debug(
"convert tid[" + Bytes.toString(key.get()) + "] to chain with cid[" + chainInfo.getCID() + "].");
HBaseUtil.saveTraceIdAndTreeIdMapping(Bytes.toString(key.get()), chainInfo.getCID());
if (chainInfo.getCallEntrance() != null && chainInfo.getCallEntrance().length() > 0) {
for (ChainNode chainNode : chainInfo.getNodes()) {
/**
* TODO: 进一步提高运行速度所需的性能提升
* 此处修改原因,
* 1.更细粒度划分reduce任务,提高性能。
* 2.减少数据传输量,以及处理复杂度。
* 3.请避免使用gson序列化,提高程序处理性能
*
* hour/day/month/year,
* key 修改为:类型+时间字符+callEntrance+levelId+viewpoint,
* value 为ChainNodeSpecificTimeWindowSummaryValue中所需的明确的值组成的简单串
* value包含:
* 1.是否正确调用,由NodeStatus获取,值为N/A/I
* 2.调用所需时间,由cost获取
*
*/
context.write(new Text(SummaryType.HOUR.getValue() + "-" + hourSimpleDateFormat.format(
new Date(chainNode.getStartDate())
) + ":" + chainInfo.getCallEntrance()), new Text(new Gson().toJson(chainNode)));
context.write(new Text(SummaryType.DAY.getValue() + "-" + daySimpleDateFormat.format(
new Date(chainNode.getStartDate())
) + ":" + chainInfo.getCallEntrance()), new Text(new Gson().toJson(chainNode)));
context.write(new Text(SummaryType.MONTH.getValue() + "-" + monthSimpleDateFormat.format(
new Date(chainNode.getStartDate())
) + ":" + chainInfo.getCallEntrance()), new Text(new Gson().toJson(chainNode)));
context.write(new Text(SummaryType.YEAR.getValue() + "-" + yearSimpleDateFormat.format(
new Date(chainNode.getStartDate())
) + ":" + chainInfo.getCallEntrance()), new Text(new Gson().toJson(chainNode)));
/**
* 此处修改原因,
* 1.更细粒度划分reduce任务,提高性能。
* 2.减少数据传输量,以及处理复杂度。
* 3.请避免使用gson序列化,提高程序处理性能
*
* hour/day/month/year,
* key 修改为:类型+时间字符+callEntrance+levelId+viewpoint,
* value 为ChainNodeSpecificTimeWindowSummaryValue中所需的明确的值组成的简单串
* value包含:
* 1.是否正确调用,由NodeStatus获取,值为N/A/I
* 2.调用所需时间,由cost获取
*
*/
context.write(new Text(SummaryType.HOUR.getValue() + "-" + hourSimpleDateFormat
.format(new Date(chainNode.getStartDate())) + ":" + chainInfo.getCallEntrance()),
new Text(new Gson().toJson(chainNode)));
context.write(new Text(SummaryType.DAY.getValue() + "-" + daySimpleDateFormat
.format(new Date(chainNode.getStartDate())) + ":" + chainInfo.getCallEntrance()),
new Text(new Gson().toJson(chainNode)));
context.write(new Text(SummaryType.MONTH.getValue() + "-" + monthSimpleDateFormat
.format(new Date(chainNode.getStartDate())) + ":" + chainInfo.getCallEntrance()),
new Text(new Gson().toJson(chainNode)));
context.write(new Text(SummaryType.YEAR.getValue() + "-" + yearSimpleDateFormat
.format(new Date(chainNode.getStartDate())) + ":" + chainInfo.getCallEntrance()),
new Text(new Gson().toJson(chainNode)));
}
/**
* TODO:通过对本地的调用链进行缓存,每个map任务中的调用链,在一个JVM内只会被传递一次,大幅度降低reduce任务的数据量。
*
*
* 1.使用静态变量,缓存MAP中的调用链。每个典型调用链ID只传递一次
* 2.注意缓存需要限制容量,初期规划,缓存1W个典型调用链KEY(可通过配置扩展)。仅缓存典型调用链ID,非链路明细。
* 3.对于节点数量大于2K条的调用量,暂不进行关系处理
*
*
* 注意:CallChainRelationshipAction暂时不做修改,此处的修改会大规模降低reduce的处理数据量,提高总体运行速度
*
*
*/
// Reduce key : R-CallEntrance
context.write(new Text(SummaryType.RELATIONSHIP + "-" + TokenGenerator.generateTreeToken(chainInfo.getCallEntrance())
+ ":" + chainInfo.getCallEntrance()),
new Text(new Gson().toJson(chainInfo)));
context.write(new Text(
SummaryType.RELATIONSHIP + "-" + TokenGenerator.generateTreeToken(chainInfo.getCallEntrance())
+ ":" + chainInfo.getCallEntrance()), new Text(new Gson().toJson(chainInfo)));
}
} catch (Exception e) {
logger.error("Failed to mapper call chain[" + key.toString() + "]",
e);
logger.error("Failed to mapper call chain[" + key.toString() + "]", e);
}
}
public static ChainInfo spanToChainInfo(String tid, List<Span> spanList) {
public static ChainInfo spanToChainInfo(String tid, List<FullSpan> spanList) {
SubLevelSpanCostCounter costMap = new SubLevelSpanCostCounter();
ChainInfo chainInfo = new ChainInfo(tid);
Collections.sort(spanList, new SpanLevelIdComparators.SpanASCComparator());
......@@ -142,8 +130,8 @@ public class ChainBuildMapper extends TableMapper<Text, Text> {
Map<String, SpanEntry> spanEntryMap = mergeSpanDataSet(spanList);
for (Map.Entry<String, SpanEntry> entry : spanEntryMap.entrySet()) {
ChainNode chainNode = new ChainNode();
SpanNodeProcessFilter filter = SpanNodeProcessChain
.getProcessChainByCallType(entry.getValue().getSpanType());
SpanNodeProcessFilter filter =
SpanNodeProcessChain.getProcessChainByCallType(entry.getValue().getSpanType());
filter.doFilter(entry.getValue(), chainNode, costMap);
chainInfo.addNodes(chainNode);
}
......@@ -151,17 +139,14 @@ public class ChainBuildMapper extends TableMapper<Text, Text> {
return chainInfo;
}
private static Map<String, SpanEntry> mergeSpanDataSet(List<Span> spanList) {
private static Map<String, SpanEntry> mergeSpanDataSet(List<FullSpan> spanList) {
Map<String, SpanEntry> spanEntryMap = new LinkedHashMap<String, SpanEntry>();
for (int i = spanList.size() - 1; i >= 0; i--) {
Span span = spanList.get(i);
SpanEntry spanEntry = spanEntryMap.get(span.getParentLevel() + "."
+ span.getLevelId());
FullSpan span = spanList.get(i);
SpanEntry spanEntry = spanEntryMap.get(span.getParentLevel() + "." + span.getLevelId());
if (spanEntry == null) {
spanEntry = new SpanEntry();
spanEntryMap.put(
span.getParentLevel() + "." + span.getLevelId(),
spanEntry);
spanEntryMap.put(span.getParentLevel() + "." + span.getLevelId(), spanEntry);
}
spanEntry.setSpan(span);
}
......
package com.ai.cloud.skywalking.analysis.chainbuild;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainNode;
import com.ai.cloud.skywalking.protocol.FullSpan;
import com.ai.cloud.skywalking.protocol.common.CallType;
import com.ai.cloud.skywalking.protocol.Span;
public class SpanEntry {
private Span clientSpan;
private Span serverSpan;
private FullSpan clientSpan;
private FullSpan serverSpan;
public SpanEntry() {
......@@ -142,15 +143,15 @@ public class SpanEntry {
return serverSpan.getApplicationId();
}
public Span getClientSpan() {
public FullSpan getClientSpan() {
return clientSpan;
}
public Span getServerSpan() {
public FullSpan getServerSpan() {
return serverSpan;
}
public void setSpan(Span span) {
public void setSpan(FullSpan span) {
this.clientSpan = span;
}
......
......@@ -6,8 +6,12 @@ import com.ai.cloud.skywalking.analysis.chainbuild.entity.ChainNodeSpecificMinSu
import com.ai.cloud.skywalking.analysis.chainbuild.entity.ChainNodeSpecificMonthSummary;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.ai.cloud.skywalking.analysis.config.HBaseTableMetaData;
import com.ai.cloud.skywalking.protocol.AckSpan;
import com.ai.cloud.skywalking.protocol.FullSpan;
import com.ai.cloud.skywalking.protocol.RequestSpan;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
......@@ -17,7 +21,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class HBaseUtil {
private static Logger logger = LoggerFactory.getLogger(HBaseUtil.class.getName());
......@@ -242,4 +248,26 @@ public class HBaseUtil {
Table table = connection.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_TRACE_ID_AND_CID_MAPPING.TABLE_NAME));
table.put(put);
}
public static List<FullSpan> fetchTraceSpansFromHBase(Result value) throws InvalidProtocolBufferException {
List<FullSpan> spanList = new ArrayList<FullSpan>();
Map<String, AckSpan> ackSpans = new HashMap<String, AckSpan>();
for (Cell cell : value.rawCells()) {
String traceLevelId =
Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
String[] traceLevelIdArray = traceLevelId.split("-");
if (traceLevelIdArray.length == 2 && "ACK".equals(traceLevelIdArray[1])) {
ackSpans.put(traceLevelIdArray[0], new AckSpan(cell.getValueArray()));
} else {
spanList.add(new FullSpan(new RequestSpan(cell.getValueArray())));
}
}
for (FullSpan fullSpan : spanList) {
fullSpan.addAckSpan(ackSpans.get(fullSpan.getTraceLevelId()));
}
return spanList;
}
}
......@@ -2,9 +2,9 @@ package com.ai.cloud.skywalking.analysis.mapper.util;
import com.ai.cloud.skywalking.analysis.chainbuild.ChainBuildMapper;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainInfo;
import com.ai.cloud.skywalking.analysis.chainbuild.util.HBaseUtil;
import com.ai.cloud.skywalking.analysis.mapper.MappingTableCounter;
import com.ai.cloud.skywalking.protocol.Span;
import org.apache.hadoop.hbase.Cell;
import com.ai.cloud.skywalking.protocol.FullSpan;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.util.Bytes;
......@@ -25,10 +25,7 @@ public class Convert {
for (Result result : resultScanner) {
count++;
try {
List<Span> spanList = new ArrayList<Span>();
for (Cell cell : result.rawCells()) {
spanList.add(new Span(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())));
}
List<FullSpan> spanList = HBaseUtil.fetchTraceSpansFromHBase(result);
if (spanList.size() == 0 || spanList.size() > 2000) {
throw new RuntimeException("Failed to convert it");
......@@ -46,14 +43,14 @@ public class Convert {
}
System.out.println("count : " + count);
// System.out.println("Success count " + traceIds.size());
// System.out.println("Success count " + traceIds.size());
System.out.println("Failed count " + failedCount);
System.out.println("Success count " + successCount);
System.out.println("HBase :" + traceIds.size());
Set<String> traceMapping = MappingTableCounter.getTraceMappingCount();
for (String traceId : traceMapping){
for (String traceId : traceMapping) {
traceIds.remove(traceId);
}
......
......@@ -45,16 +45,4 @@ public class HBaseUtils {
public static Connection getConnection() {
return connection;
}
public static List<Span> selectByTraceId(String traceId) throws IOException {
List<Span> entries = new ArrayList<Span>();
Table table = connection.getTable(TableName.valueOf(HBaseTableMetaData.TABLE_CALL_CHAIN.TABLE_NAME));
Get g = new Get(Bytes.toBytes(traceId));
Result r = table.get(g);
for (Cell cell : r.rawCells()) {
if (cell.getValueArray().length > 0)
entries.add(new Span(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())));
}
return entries;
}
}
......@@ -17,6 +17,7 @@ public final class ContextGenerator {
public static Span generateSpanFromThreadLocal(Identification id) {
Span spanData = getSpanFromThreadLocal();
spanData.setStartDate(System.currentTimeMillis());
spanData.setViewPointId(id.getViewPoint());
spanData.appendParameters(id.getParameters());
return spanData;
}
......@@ -40,7 +41,7 @@ public final class ContextGenerator {
}
spanData.setStartDate(System.currentTimeMillis());
spanData.setViewPointId(id.getViewPoint());
return spanData;
}
......
......@@ -13,7 +13,7 @@ public class SerializeTest {
spandata.setStartDate(System.currentTimeMillis() - 1000 * 60);
AckSpan requestSpan = new AckSpan(spandata);
ContextBuffer.save(requestSpan);
Thread.sleep(2 * 1000);
Thread.sleep(500);
}
}
......
......@@ -48,6 +48,13 @@ public class AckSpan extends AbstractDataSerializable {
*/
private Map<String, String> paramters = new HashMap<String, String>();
private String viewPointId;
private String userId;
private String applicationId;
public AckSpan(Span spanData) {
this.traceId = spanData.getTraceId();
this.parentLevel = spanData.getParentLevel();
......@@ -55,12 +62,25 @@ public class AckSpan extends AbstractDataSerializable {
this.cost = System.currentTimeMillis() - spanData.getStartDate();
this.statusCode = spanData.getStatusCode();
this.exceptionStack = spanData.getExceptionStack();
this.userId = spanData.getUserId();
this.applicationId = spanData.getApplicationId();
}
public AckSpan() {
}
public AckSpan(byte[] originData) throws InvalidProtocolBufferException {
TraceProtocol.AckSpan ackSpanProtocol = TraceProtocol.AckSpan.parseFrom(originData);
this.setTraceId(ackSpanProtocol.getTraceId());
this.setParentLevel(ackSpanProtocol.getParentLevel());
this.setLevelId(ackSpanProtocol.getLevelId());
this.setCost(ackSpanProtocol.getCost());
this.setExceptionStack(ackSpanProtocol.getExceptionStack());
this.setStatusCode((byte) ackSpanProtocol.getStatusCode());
}
public String getTraceId() {
return traceId;
}
......@@ -151,4 +171,16 @@ public class AckSpan extends AbstractDataSerializable {
public boolean isNull() {
return false;
}
public String getUserId() {
return userId;
}
public String getApplicationId() {
return applicationId;
}
public String getViewPointId() {
return viewPointId;
}
}
package com.ai.cloud.skywalking.protocol;
import com.ai.cloud.skywalking.protocol.common.AbstractDataSerializable;
import com.ai.cloud.skywalking.protocol.exception.ConvertFailedException;
public class BufferFileEOFProtocol extends AbstractDataSerializable {
@Override
public int getDataType() {
return -1;
}
@Override
public byte[] getData() {
return new byte[0];
}
@Override
public AbstractDataSerializable convertData(byte[] data) throws ConvertFailedException {
return new BufferFileEOFProtocol();
}
@Override
public boolean isNull() {
return false;
}
}
package com.ai.cloud.skywalking.protocol;
public class FullSpan {
private String traceId;
private String parentLevel = "";
private int levelId = 0;
private String viewPointId;
private String applicationId;
private String callType;
private long cost;
private String businessKey;
private String exceptionStack;
private byte statusCode = 0;
private String spanTypeDesc;
private String userId;
private long startDate;
private String spanType;
public FullSpan() {
}
public FullSpan(RequestSpan span) {
this.traceId = span.getTraceId();
this.parentLevel = span.getParentLevel();
this.levelId = span.getLevelId();
this.applicationId = span.getApplicationId();
this.callType = span.getCallType();
this.businessKey = span.getBusinessKey();
this.spanTypeDesc = span.getSpanTypeDesc();
this.userId = span.getUserId();
this.startDate = span.getStartDate();
}
public String getTraceId() {
return traceId;
}
public String getParentLevel() {
return parentLevel;
}
public int getLevelId() {
return levelId;
}
public String getViewPointId() {
return viewPointId;
}
public String getApplicationId() {
return applicationId;
}
public String getCallType() {
return callType;
}
public long getCost() {
return cost;
}
public String getBusinessKey() {
return businessKey;
}
public void setBusinessKey(String businessKey) {
this.businessKey = businessKey;
}
public String getExceptionStack() {
return exceptionStack;
}
public byte getStatusCode() {
return statusCode;
}
public void setStatusCode(byte statusCode) {
this.statusCode = statusCode;
}
public String getSpanTypeDesc() {
return spanTypeDesc;
}
public void setSpanTypeDesc(String spanTypeDesc) {
this.spanTypeDesc = spanTypeDesc;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public long getStartDate() {
return startDate;
}
public String getSpanType() {
return spanType;
}
public void setSpanType(String spanType) {
this.spanType = spanType;
}
public String getTraceLevelId() {
if (getParentLevel() != null && getParentLevel().length() > 0) {
return getParentLevel() + "." + getLevelId();
}
return getLevelId() + "";
}
public void addAckSpan(AckSpan ackSpan) {
if (ackSpan != null) {
this.cost = ackSpan.getCost();
this.exceptionStack = ackSpan.getExceptionStack();
this.statusCode = ackSpan.getStatusCode();
}
}
public void setParentLevel(String parentLevel) {
this.parentLevel = parentLevel;
}
public void setViewPointId(String viewPointId) {
this.viewPointId = viewPointId;
}
}
......@@ -98,6 +98,22 @@ public class RequestSpan extends AbstractDataSerializable {
}
public RequestSpan(byte[] originData) throws InvalidProtocolBufferException {
TraceProtocol.RequestSpan requestSpanByte = TraceProtocol.RequestSpan.parseFrom(originData);
this.setTraceId(requestSpanByte.getTraceId());
this.setParentLevel(requestSpanByte.getParentLevel());
this.setLevelId(requestSpanByte.getLevelId());
this.setApplicationId(requestSpanByte.getApplicationId());
this.setCallType(requestSpanByte.getCallType());
this.setSpanType(SpanType.convert(requestSpanByte.getSpanType()));
this.setSpanTypeDesc(requestSpanByte.getSpanTypeDesc());
this.setStartDate(requestSpanByte.getStartDate());
this.setUserId(requestSpanByte.getUserId());
this.setViewPointId(requestSpanByte.getViewPointId());
this.setBusinessKey(requestSpanByte.getBussinessKey());
this.setAgentId(requestSpanByte.getAgentId());
}
private boolean isEntrySpan() {
return "0".equals(this.getParentLevel() + this.getLevelId());
}
......
......@@ -66,8 +66,15 @@ public class Span {
* 业务字段<br/>
*/
private String businessKey = "";
/**
* 应用编码
*/
private String applicationId;
/**
* 归属用户
*/
private String userId;
private String viewPointId;
public Span(String traceId, String applicationId, String userId) {
this.traceId = traceId;
......@@ -215,4 +222,7 @@ public class Span {
}
public void setViewPointId(String viewPointId) {
this.viewPointId = viewPointId;
}
}
package com.ai.cloud.skywalking.protocol;
import com.ai.cloud.skywalking.protocol.common.AbstractDataSerializable;
import com.ai.cloud.skywalking.protocol.common.ISerializable;
import com.ai.cloud.skywalking.protocol.exception.ConvertFailedException;
import com.ai.cloud.skywalking.protocol.util.IntegerAssist;
import java.util.ArrayList;
......@@ -8,35 +10,38 @@ import java.util.Arrays;
import java.util.List;
public class TransportPackager {
public static byte[] pack(List<ISerializable> beSendingData) {
// 对协议格式进行修改
// | check sum(4 byte) | data
byte[] dataText = packDataBody(beSendingData);
byte[] dataText = packSerializableObjects(beSendingData);
byte[] dataPackage = packCheckSum(dataText);
return dataPackage;
}
public static List<ISerializable> unpackDataBody(byte[] dataPackage) {
List<byte[]> serializeData = null;
try {
serializeData = new ArrayList<byte[]>();
int currentLength = 0;
while (true) {
//读取长度
int dataLength = IntegerAssist.bytesToInt(dataPackage, currentLength);
// 反序列化
byte[] data = new byte[dataLength];
System.arraycopy(dataPackage, currentLength + 4, data, 0, dataLength);
//
serializeData.add(data);
currentLength += 4 + dataLength;
if (currentLength >= dataPackage.length) {
break;
}
public static List<AbstractDataSerializable> unpackSerializableObjects(byte[] dataPackage) {
List<AbstractDataSerializable> serializeData = new ArrayList<AbstractDataSerializable>();
int currentLength = 0;
while (true) {
//读取长度
int dataLength = IntegerAssist.bytesToInt(dataPackage, currentLength);
// 反序列化
byte[] data = new byte[dataLength];
System.arraycopy(dataPackage, currentLength + 4, data, 0, dataLength);
try {
AbstractDataSerializable abstractDataSerializable = SerializedFactory.unSerialize(data);
serializeData.add(abstractDataSerializable);
} catch (ConvertFailedException e) {
e.printStackTrace();
}
currentLength += 4 + dataLength;
if (currentLength >= dataPackage.length) {
break;
}
} catch (Exception e) {
e.printStackTrace();
}
return serializeData;
}
......@@ -63,27 +68,33 @@ public class TransportPackager {
return dataPackage;
}
private static byte[] packDataBody(List<ISerializable> beSendingData) {
private static byte[] packSerializableObjects(List<ISerializable> beSendingData) {
byte[] dataText = null;
int currentIndex = 0;
for (ISerializable sendingData : beSendingData) {
byte[] dataElementText = setElementPackageLength(sendingData.convert2Bytes());
if (dataText == null) {
dataText = new byte[dataElementText.length];
} else {
dataText = Arrays.copyOf(dataText, dataText.length + dataElementText.length);
}
System.arraycopy(dataElementText, 0, dataText, currentIndex, dataElementText.length);
byte[] dataElementText = packSerializableObject(sendingData);
dataText = appendingDataBytes(dataText, currentIndex, dataElementText);
currentIndex += dataElementText.length;
}
return dataText;
}
private static byte[] setElementPackageLength(byte[] dataByte) {
byte[] dataText = new byte[dataByte.length + 4];
System.arraycopy(dataByte, 0, dataText, 4, dataByte.length);
byte[] length = IntegerAssist.intToBytes(dataByte.length);
private static byte[] appendingDataBytes(byte[] dataText, int currentIndex, byte[] dataElementText) {
if (dataText == null) {
dataText = new byte[dataElementText.length];
} else {
dataText = Arrays.copyOf(dataText, dataText.length + dataElementText.length);
}
System.arraycopy(dataElementText, 0, dataText, currentIndex, dataElementText.length);
return dataText;
}
public static byte[] packSerializableObject(ISerializable serializable) {
byte[] serializableBytes = serializable.convert2Bytes();
byte[] dataText = Arrays.copyOf(serializableBytes, serializableBytes.length + 4);
byte[] length = IntegerAssist.intToBytes(serializableBytes.length);
System.arraycopy(length, 0, dataText, 0, 4);
return dataText;
}
......
package com.ai.cloud.skywalking.protocol.util;
import com.ai.cloud.skywalking.protocol.FullSpan;
import com.ai.cloud.skywalking.protocol.Span;
import java.util.Comparator;
public class SpanLevelIdComparators {
public static class SpanDESCComparator implements Comparator<Span> {
public static class SpanASCComparator implements Comparator<FullSpan> {
@Override
public int compare(Span span1, Span span2) {
String span1TraceLevel = getTraceLevelId(span1);
String span2TraceLevel = getTraceLevelId(span2);
return descComparator(span1TraceLevel, span2TraceLevel);
}
}
public static class SpanASCComparator implements Comparator<Span> {
@Override
public int compare(Span span1, Span span2) {
public int compare(FullSpan span1, FullSpan span2) {
String span1TraceLevel = getTraceLevelId(span1);
String span2TraceLevel = getTraceLevelId(span2);
return ascCompare(span1TraceLevel, span2TraceLevel);
}
}
private static String getTraceLevelId(Span span) {
private static String getTraceLevelId(FullSpan span) {
String spanTraceLevelId = null;
if (span.getParentLevel() == null || span.getParentLevel().length() == 0) {
spanTraceLevelId = span.getLevelId() + "";
......
com.ai.cloud.skywalking.protocol.AckSpan
com.ai.cloud.skywalking.protocol.RequestSpan
com.ai.cloud.skywalking.protocol.BufferFileEOFProtocol
package com.ai.cloud.skywalking.reciever.buffer;
import com.ai.cloud.skywalking.protocol.BufferFileEOFProtocol;
import com.ai.cloud.skywalking.protocol.TransportPackager;
import com.ai.cloud.skywalking.reciever.model.BufferDataPackagerGenerator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -27,7 +29,8 @@ class AppendEOFFlagThread extends Thread {
try {
logger.info("Add EOF flags to unprocessed data file[{}]", file.getName());
fileOutputStream = new FileOutputStream(new File(file.getParent(), file.getName()), true);
fileOutputStream.write(BufferDataPackagerGenerator.generateEOFPackage());
fileOutputStream.write(BufferDataPackagerGenerator
.pack(TransportPackager.packSerializableObject(new BufferFileEOFProtocol())));
} catch (IOException e) {
logger.info("Add EOF flags to the unprocessed data file failed.", e);
} finally {
......
package com.ai.cloud.skywalking.reciever.buffer;
import com.ai.cloud.skywalking.protocol.BufferFileEOFProtocol;
import com.ai.cloud.skywalking.protocol.TransportPackager;
import com.ai.cloud.skywalking.protocol.util.AtomicRangeInteger;
import com.ai.cloud.skywalking.reciever.conf.Config;
import com.ai.cloud.skywalking.reciever.model.BufferDataPackagerGenerator;
......@@ -49,7 +51,6 @@ public class DataBufferThread extends Thread {
logger.error("Failed to write msg.", e);
}
if (length > Config.Buffer.BUFFER_FILE_MAX_LENGTH) {
closeCurrentBufferFile(fileOutputStream);
fileOutputStream = null;
......@@ -68,7 +69,8 @@ public class DataBufferThread extends Thread {
private void closeCurrentBufferFile(FileOutputStream fileOutputStream) {
try {
fileOutputStream.flush();
fileOutputStream.write(BufferDataPackagerGenerator.generateEOFPackage());
fileOutputStream.write(BufferDataPackagerGenerator
.pack(TransportPackager.packSerializableObject(new BufferFileEOFProtocol())));
} catch (IOException e) {
logger.error("Failed to write msg.", e);
} finally {
......
......@@ -58,6 +58,30 @@ public class Config {
}
public static class Alarm {
public static int ALARM_EXPIRE_SECONDS = 1000 * 60 * 90;
public static int ALARM_EXCEPTION_STACK_LENGTH = 300;
public static String REDIS_SERVER = "127.0.0.1:6379";
public static int REDIS_MAX_IDLE = 10;
public static int REDIS_MIN_IDLE = 1;
public static int REDIS_MAX_TOTAL = 20;
public static boolean ALARM_OFF_FLAG = false;
public static long ALARM_REDIS_INSPECTOR_INTERVAL = 5 * 1000L;
public static class Checker {
public static boolean TURN_ON_EXCEPTION_CHECKER = true;
public static boolean TURN_ON_EXECUTE_TIME_CHECKER = true;
}
}
public static class HBaseConfig {
......@@ -70,12 +94,6 @@ public class Config {
public static String CLIENT_PORT;
}
public static class StorageChain {
public static String STORAGE_TYPE = "hbase";
}
public static class Redis {
public static String REDIS_SERVER = "10.1.241.18:16379";
......@@ -87,26 +105,6 @@ public class Config {
public static int REDIS_MAX_TOTAL = 20;
}
public static class Alarm {
public static int ALARM_EXPIRE_SECONDS = 1000 * 60 * 90;
public static int ALARM_EXCEPTION_STACK_LENGTH = 300;
public static boolean ALARM_OFF_FLAG = false;
public static long ALARM_REDIS_INSPECTOR_INTERVAL = 5 * 1000L;
public static class Checker {
public static boolean TURN_ON_EXCEPTION_CHECKER = true;
public static boolean TURN_ON_EXECUTE_TIME_CHECKER = true;
}
}
public static class HealthCollector {
// 默认健康检查上报时间
public static long REPORT_INTERVAL = 5 * 60 * 1000L;
......
package com.ai.cloud.skywalking.reciever.peresistent;
import com.ai.cloud.skywalking.protocol.TransportPackager;
import com.ai.cloud.skywalking.protocol.common.AbstractDataSerializable;
import com.ai.cloud.skywalking.protocol.util.IntegerAssist;
import com.ai.cloud.skywalking.reciever.selfexamination.ServerHealthCollector;
import com.ai.cloud.skywalking.reciever.selfexamination.ServerHeathReading;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -9,6 +12,7 @@ import org.apache.logging.log4j.Logger;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
......@@ -16,140 +20,93 @@ public class BufferFileReader {
private File bufferFile;
private FileInputStream bufferInputStream;
private int currentOffset;
private boolean hasNextBufferBale = false;
private static final byte[] SPILT_BALE_ARRAY = new byte[] {127, 127, 127, 127};
private static final byte[] EOF_BALE_ARRAY = "EOF".getBytes();
private int remainderLength = 0;
private byte[] remainderByte = null;
private Logger logger = LogManager.getLogger(BufferFileReader.class);
private PluckerStatus status = PluckerStatus.INITIAL;
private static final byte[] SPILT_BALE_ARRAY = new byte[] {127, 127, 127, 127};
private int remainderLength = 0;
private byte[] remainderByte = null;
private Logger logger = LogManager.getLogger(BufferFileReader.class);
private List<AbstractDataSerializable> serializables;
public BufferFileReader(File bufferFile, int currentOffset) {
this.bufferFile = bufferFile;
this.currentOffset = currentOffset;
if (bufferFile.length() >= currentOffset) {
hasNextBufferBale = true;
}
try {
this.bufferInputStream = new FileInputStream(bufferFile);
bufferInputStream.skip(currentOffset);
} catch (IOException e) {
hasNextBufferBale = false;
}
}
/**
* TODO: 确保读到下一个List<ISerializable>,并进行缓存
*
* <p>
* 按照如下格式读取:
* 1.头4位长度
* 2.根据长度读取正文
* 2.1. 正文包含多个ISerializable。每个块包含每个ISerializable的长度和ISerializable的正文
* 3.长度外,读取4位,为分隔符(不可见字符)
*
* <p>
* 封装方法要求:
* 1.封装读取指定长度块的方法,读取完成则返回。读取长度不足,则缓存并等待。
*
* <p>
* 异常处理:
* 1.长度外,读取4位,不是分隔符: 则启动异常跳位处理,直到读取到分隔符位置
*
*
* @return
*/
public boolean hasNext() {
if (status == PluckerStatus.SUSPEND) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
logger.error("Failed to sleep ", e);
try {
int length = unpackLength();
byte[] dataContext = readByte(length);
// 转换对象
serializables = new ArrayList<>();
serializables = TransportPackager.unpackSerializableObjects(dataContext);
byte[] skip = new byte[4];
bufferInputStream.read(skip);
if (!Arrays.equals(SPILT_BALE_ARRAY, skip)) {
skipToNextBufferBale();
}
MemoryRegister.instance().updateOffSet(bufferFile.getName(), currentOffset);
} catch (IOException e) {
logger.error("The data file I/O exception.", e);
ServerHealthCollector.getCurrentHeathReading(null).updateData(ServerHeathReading.ERROR, e.getMessage());
return false;
}
if (status == PluckerStatus.INITIAL) {
status = PluckerStatus.RUNNING;
return hasNextBufferBale;
}
return serializables.get(0).getDataType() != -1;
}
if (status == PluckerStatus.RUNNING) {
byte[] spiltArray = new byte[4];
private byte[] readByte(int length) throws IOException {
byte[] dataContext = new byte[length];
int realLength = bufferInputStream.read(dataContext);
currentOffset += realLength;
int remainderLength = length - (realLength == -1 ? 0 : realLength);
while (remainderLength > 0) {
try {
bufferInputStream.read(spiltArray);
if (!Arrays.equals(spiltArray, SPILT_BALE_ARRAY)) {
skipToNextBufferBale();
}
} catch (IOException e) {
return false;
Thread.sleep(500L);
} catch (InterruptedException e) {
logger.error("Failed to sleep.", e);
}
currentOffset += 4;
}
MemoryRegister.instance().updateOffSet(bufferFile.getName(), currentOffset);
return hasNextBufferBale;
}
public List<AbstractDataSerializable> next() throws IOException {
int packageLength = unpackBaleLength();
byte[] dataPackage = unpackDataContext(packageLength);
if (dataPackage == null || dataPackage.length == 0) {
// 文件没有完结,但是已经被处理完成了
return null;
}
if (checkDataPackageIsEOF(dataPackage)) {
hasNextBufferBale = false;
return null;
}
if (logger.isDebugEnabled()) {
logger.debug("Pluck bale size : " + dataPackage.length);
}
return dataPackage;
}
private boolean checkDataPackageIsEOF(byte[] dataPackage) {
if (dataPackage.length == EOF_BALE_ARRAY.length) {
return Arrays.equals(dataPackage, EOF_BALE_ARRAY);
}
return false;
}
byte[] remainderByte = new byte[remainderLength];
int readLength = bufferInputStream.read(remainderByte, 0, remainderLength);
if (readLength == -1) {
continue;
}
private byte[] unpackDataContext(int length) throws IOException {
if (currentOffset >= bufferFile.length()) {
status = PluckerStatus.SUSPEND;
return null;
currentOffset += realLength;
System.arraycopy(remainderByte, 0, dataContext, length - remainderLength, readLength);
remainderLength -= readLength;
}
byte[] dataContext = new byte[length];
bufferInputStream.read(dataContext);
currentOffset += length;
return dataContext;
}
private int unpackBaleLength() {
int length;
while (true) {
try {
length = calculateCurrentPackageLength();
if (length > 0 && length < 90000) {
break;
}
skipToNextBufferBale();
} catch (IOException e) {
skipToNextBufferBale();
}
}
return length;
public List<AbstractDataSerializable> next() {
return serializables;
}
private int calculateCurrentPackageLength() throws IOException {
byte[] lengthByte = new byte[4 - remainderLength];
bufferInputStream.read(lengthByte);
private int unpackLength() throws IOException {
byte[] lengthByte = readByte(4 - remainderLength);
currentOffset += 4 - remainderLength;
lengthByte = spliceRemainderByteOfPreviousSkipIfNecessary(lengthByte);
return IntegerAssist.bytesToInt(lengthByte, 0);
......@@ -165,28 +122,18 @@ public class BufferFileReader {
return lengthByte;
}
public void skipToNextBufferBale() {
public void skipToNextBufferBale() throws IOException {
byte[] previousDataByte = new byte[4];
byte[] currentDataByte = new byte[4];
byte[] compactDataByte = new byte[8];
while (true) {
try {
currentOffset += bufferInputStream.read(currentDataByte);
} catch (IOException e) {
hasNextBufferBale = false;
}
currentDataByte = readByte(4);
if (Arrays.equals(currentDataByte, SPILT_BALE_ARRAY)) {
remainderLength = 0;
break;
}
//
if (currentOffset + 8000 >= bufferFile.length()) {
status = PluckerStatus.SUSPEND;
break;
}
System.arraycopy(previousDataByte, 0, compactDataByte, 0, 4);
System.arraycopy(currentDataByte, 0, compactDataByte, 4, 4);
......@@ -238,10 +185,4 @@ public class BufferFileReader {
bufferFile = null;
}
enum PluckerStatus {
INITIAL,
RUNNING,
SUSPEND
}
}
package com.ai.cloud.skywalking.reciever.peresistent;
import com.ai.cloud.skywalking.protocol.SerializedFactory;
import com.ai.cloud.skywalking.protocol.TransportPackager;
import com.ai.cloud.skywalking.protocol.common.AbstractDataSerializable;
import com.ai.cloud.skywalking.protocol.exception.ConvertFailedException;
import com.ai.cloud.skywalking.reciever.conf.Config;
import com.ai.cloud.skywalking.reciever.processor.IProcessor;
import com.ai.cloud.skywalking.reciever.processor.ProcessorFactory;
......@@ -15,8 +12,6 @@ import org.apache.logging.log4j.Logger;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -47,17 +42,9 @@ public class PersistenceThread extends Thread {
BufferFileReader bufferReader = new BufferFileReader(bufferFile, offset);
while (bufferReader.hasNext()) {
try {
List<AbstractDataSerializable> serializableDataList = bufferReader.next();
System.out.println(serializableDataList.size());
//handleSpans(spans);
} catch (ConvertFailedException e) {
bufferReader.skipToNextBufferBale();
} catch (IOException e) {
logger.error("The data file I/O exception.", e);
ServerHealthCollector.getCurrentHeathReading(null)
.updateData(ServerHeathReading.ERROR, e.getMessage());
}
List<AbstractDataSerializable> serializableDataList = bufferReader.next();
System.out.println(serializableDataList.size());
//handleSpans(spans);
}
try {
......
......@@ -3,6 +3,9 @@ package com.ai.cloud.skywalking.reciever.processor;
import com.ai.cloud.skywalking.protocol.AckSpan;
import com.ai.cloud.skywalking.protocol.common.AbstractDataSerializable;
import com.ai.cloud.skywalking.reciever.conf.Config;
import com.ai.cloud.skywalking.reciever.processor.ackspan.alarm.ExceptionChecker;
import com.ai.cloud.skywalking.reciever.processor.ackspan.alarm.ExecuteTimeChecker;
import com.ai.cloud.skywalking.reciever.processor.ackspan.alarm.ISpanChecker;
import com.ai.cloud.skywalking.reciever.util.HBaseUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.Connection;
......@@ -12,14 +15,29 @@ import org.apache.hadoop.hbase.util.Bytes;
import java.util.ArrayList;
import java.util.List;
import static com.ai.cloud.skywalking.reciever.conf.Config.Alarm.Checker.TURN_ON_EXCEPTION_CHECKER;
import static com.ai.cloud.skywalking.reciever.conf.Config.Alarm.Checker.TURN_ON_EXECUTE_TIME_CHECKER;
import static com.ai.cloud.skywalking.reciever.util.SpanUtil.getTSBySpanTraceId;
@DefaultProcessor
public class AckSpanProcessor extends AbstractSpanProcessor {
@Override
public void doAlarm(List<AbstractDataSerializable> serializedObjects) {
private List<ISpanChecker> checkList = new ArrayList<ISpanChecker>();
public AckSpanProcessor() {
if (TURN_ON_EXCEPTION_CHECKER)
checkList.add(new ExceptionChecker());
if (TURN_ON_EXECUTE_TIME_CHECKER)
checkList.add(new ExecuteTimeChecker());
}
@Override
public void doAlarm(List<AbstractDataSerializable> ackSpans) {
for (AbstractDataSerializable ackSpan : ackSpans) {
for (ISpanChecker checker : checkList) {
checker.check((AckSpan) ackSpan);
}
}
}
@Override
......@@ -38,7 +56,8 @@ public class AckSpanProcessor extends AbstractSpanProcessor {
// appending suffix
columnName += "-ACK";
put.addColumn(Bytes.toBytes(Config.HBaseConfig.FAMILY_COLUMN_NAME), Bytes.toBytes(columnName), ackSpan.getData());
put.addColumn(Bytes.toBytes(Config.HBaseConfig.FAMILY_COLUMN_NAME), Bytes.toBytes(columnName),
ackSpan.getData());
puts.add(put);
}
// save
......
package com.ai.cloud.skywalking.reciever.processor.ackspan.alarm;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import redis.clients.jedis.Jedis;
import static com.ai.cloud.skywalking.reciever.conf.Config.Alarm.ALARM_EXPIRE_SECONDS;
public abstract class AbstractSpanChecker implements ISpanChecker {
private static Logger logger = LogManager.getLogger(AbstractSpanChecker.class);
protected void saveAlarmMessage(String key, String traceId, String alarmMsg) {
Jedis jedis = null;
try {
jedis = AlarmRedisConnector.getJedis();
jedis.hsetnx(key, traceId, alarmMsg);
jedis.expire(key, ALARM_EXPIRE_SECONDS);
} catch (Exception e) {
AlarmRedisConnector.reportJedisFailure();
logger.error("Failed to set data.", e);
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
package com.ai.cloud.skywalking.reciever.processor.ackspan.alarm;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import com.ai.cloud.skywalking.reciever.conf.Config;
import com.ai.cloud.skywalking.reciever.selfexamination.ServerHealthCollector;
import com.ai.cloud.skywalking.reciever.selfexamination.ServerHeathReading;
/**
* 告警的redis连接器,用于管理redis连接
*
* @author wusheng
*
*/
public class AlarmRedisConnector {
private static JedisPool jedisPool;
static {
new RedisInspector().connect().start();
}
public static Jedis getJedis() {
if (Config.Alarm.ALARM_OFF_FLAG) {
return null;
} else {
return jedisPool.getResource();
}
}
public static void reportJedisFailure() {
RedisInspector.needConnectInit = true;
}
private static class RedisInspector extends Thread {
private static Logger logger = LogManager
.getLogger(RedisInspector.class);
private static boolean needConnectInit = true;
private String[] config;
public RedisInspector() {
super("RedisInspectorThread");
String redisServerConfig = Config.Alarm.REDIS_SERVER;
if (redisServerConfig == null || redisServerConfig.length() <= 0) {
logger.error("Redis server is not setting. Switch off alarm module. ");
Config.Alarm.ALARM_OFF_FLAG = true;
} else {
config = redisServerConfig.split(":");
if (config.length != 2) {
logger.error("Redis server address is illegal setting, need to be 'ip:port'. Switch off alarm module. ");
Config.Alarm.ALARM_OFF_FLAG = true;
}
}
}
private RedisInspector connect() {
if (jedisPool != null && !jedisPool.isClosed()) {
jedisPool.close();
}
GenericObjectPoolConfig genericObjectPoolConfig = buildGenericObjectPoolConfig();
jedisPool = new JedisPool(genericObjectPoolConfig, config[0],
Integer.valueOf(config[1]));
// Test connect redis.
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.get("ok");
needConnectInit = false;
} catch (Exception e) {
logger.error("can't connect to redis["
+ Config.Alarm.REDIS_SERVER + "]", e);
} finally {
if (jedis != null) {
jedis.close();
}
}
return this;
}
@Override
public void run() {
if (Config.Alarm.ALARM_OFF_FLAG)
return;
while (true) {
try {
if (needConnectInit) {
connect();
}
if (needConnectInit) {
ServerHealthCollector.getCurrentHeathReading(null)
.updateData(ServerHeathReading.ERROR,
"alarm redis connect failue.");
} else {
ServerHealthCollector.getCurrentHeathReading(null)
.updateData(ServerHeathReading.INFO,
"alarm redis connectted.");
}
} catch (Throwable t) {
logger.error("redis init connect failue", t);
}
try {
Thread.sleep(Config.Alarm.ALARM_REDIS_INSPECTOR_INTERVAL);
} catch (InterruptedException e) {
logger.error("Failure sleep.", e);
}
}
}
private GenericObjectPoolConfig buildGenericObjectPoolConfig() {
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
genericObjectPoolConfig.setTestOnBorrow(true);
genericObjectPoolConfig.setMaxIdle(Config.Alarm.REDIS_MAX_IDLE);
genericObjectPoolConfig.setMinIdle(Config.Alarm.REDIS_MIN_IDLE);
genericObjectPoolConfig.setMaxTotal(Config.Alarm.REDIS_MAX_TOTAL);
return genericObjectPoolConfig;
}
}
}
package com.ai.cloud.skywalking.reciever.processor.ackspan.alarm;
import com.ai.cloud.skywalking.protocol.AckSpan;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import static com.ai.cloud.skywalking.reciever.conf.Config.Alarm.ALARM_EXCEPTION_STACK_LENGTH;
public class ExceptionChecker extends AbstractSpanChecker {
private static Logger logger = LogManager.getLogger(ExceptionChecker.class);
@Override
public void check(AckSpan span) {
if (span.getStatusCode() != 1)
return;
String exceptionStack = span.getExceptionStack();
if (exceptionStack == null) {
exceptionStack = "";
} else if (exceptionStack.length() > ALARM_EXCEPTION_STACK_LENGTH) {
exceptionStack = exceptionStack.substring(0, ALARM_EXCEPTION_STACK_LENGTH);
}
saveAlarmMessage(generateAlarmKey(span), span.getTraceId(), exceptionStack);
}
private String generateAlarmKey(AckSpan span) {
return span.getUserId() + "-" + span.getApplicationId() + "-" + (System.currentTimeMillis() / (10000 * 6));
}
}
package com.ai.cloud.skywalking.reciever.processor.ackspan.alarm;
import com.ai.cloud.skywalking.protocol.AckSpan;
public class ExecuteTimeChecker extends AbstractSpanChecker {
@Override
public void check(AckSpan span) {
long cost = span.getCost();
if (cost > 500 && cost < 3000) {
/**
* Issue #43 <br/>
* 单埋点调用时间超过500ms的进行预警
*/
saveAlarmMessage(generateWarningAlarmKey(span), span.getTraceId(),
span.getViewPointId() + " cost " + cost + " ms.");
}
if (cost >= 3000) {
/**
* Issue #43 <br/>
* 单埋点调用时间超过3S的进行告警
*/
saveAlarmMessage(generatePossibleErrorAlarmKey(span), span.getTraceId(),
span.getViewPointId() + " cost " + cost + " ms.");
}
}
private String generateWarningAlarmKey(AckSpan span) {
return span.getUserId() + "-" + span.getApplicationId() + "-" + (System.currentTimeMillis() / (10000 * 6))
+ "-ExecuteTime-Warning";
}
private String generatePossibleErrorAlarmKey(AckSpan span) {
return span.getUserId() + "-" + span.getApplicationId() + "-" + (System.currentTimeMillis() / (10000 * 6))
+ "-ExecuteTime-PossibleError";
}
}
package com.ai.cloud.skywalking.reciever.processor.ackspan.alarm;
import com.ai.cloud.skywalking.protocol.AckSpan;
public interface ISpanChecker {
void check(AckSpan span);
}
package test.com.ai.cloud.skywalking.reciever.peresistent;
import com.ai.cloud.skywalking.protocol.util.IntegerAssist;
import com.ai.cloud.skywalking.reciever.peresistent.BufferBalePlucker;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNull;
public class BufferBalePluckerTest {
private File file;
@Before
public void initData() throws IOException {
file = new File("/tmp", "test.file");
if (!file.exists()) {
file.createNewFile();
}
FileOutputStream fileOutputStream = new FileOutputStream(file);
byte[] bytes = "HelloWorld".getBytes();
byte[] packageByte = Arrays.copyOf(IntegerAssist.intToBytes(bytes.length), bytes.length + 4 + 4);
System.arraycopy(bytes, 0, packageByte, 4, bytes.length);
System.arraycopy(new byte[] {127, 127, 127, 127}, 0, packageByte, bytes.length + 4, 4);
fileOutputStream.write(packageByte);
fileOutputStream.write(generatePackage("EOF".getBytes()));
fileOutputStream.close();
}
private static byte[] generatePackage(byte[] msg) {
byte[] dataPackage = new byte[msg.length + 8];
// 前四位长度
System.arraycopy(IntegerAssist.intToBytes(msg.length), 0, dataPackage, 0, 4);
// 中间正文
System.arraycopy(msg, 0, dataPackage, 4, msg.length);
// 后四位特殊字符
System.arraycopy(new byte[] {127, 127, 127, 127}, 0, dataPackage, msg.length + 4, 4);
return dataPackage;
}
@Test
public void pluck() throws Exception {
BufferBalePlucker plucker = new BufferBalePlucker(file, 0);
while (plucker.hasNextBufferBale()) {
byte[] data = plucker.pluck();
if (data == null)
continue;
assertArrayEquals("HelloWorld".getBytes(), data);
}
}
@Test
public void skipToNextBufferBale() throws Exception {
BufferBalePlucker plucker = new BufferBalePlucker(file, 3);
plucker.skipToNextBufferBale();
assertNull(plucker.pluck());
}
@After
public void clearData() {
file.deleteOnExit();
}
}
package test.com.ai.skywalking.recieve.buffer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.*;
public class BufferReaderTest {
private File bufferFile;
private InputStream bufferInputStream;
@Before
public void initData() throws IOException {
bufferFile = new File("/tmp", "test.aaaa");
if (!bufferFile.exists()) {
bufferFile.createNewFile();
}
OutputStream outputStream = new FileOutputStream(bufferFile);
outputStream.write("Hello".getBytes());
outputStream.close();
bufferInputStream = new FileInputStream(bufferFile);
}
@Test
public void testReadByte() throws IOException {
readByte(2000);
}
private byte[] readByte(int length) throws IOException {
byte[] dataContext = new byte[length];
int realReadLength = bufferInputStream.read(dataContext);
int remainderLength = length - realReadLength;
while (remainderLength > 0) {
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
byte[] remainderByte = new byte[remainderLength];
int tmpRemainder = bufferInputStream.read(remainderByte, 0, remainderLength);
if (tmpRemainder == -1) {
continue;
}
System.arraycopy(remainderByte, 0, dataContext, length - remainderLength, tmpRemainder);
remainderLength -= tmpRemainder;
}
return dataContext;
}
@After
public void clearData() throws IOException {
if (bufferInputStream != null) {
bufferInputStream.close();
}
bufferFile.deleteOnExit();
}
}
package com.ai.cloud.skywalking.web.dao.impl;
import com.ai.cloud.skywalking.web.dto.TraceNodeInfo;
import com.ai.cloud.skywalking.web.dto.TraceNodesResult;
import com.ai.cloud.skywalking.web.dao.inter.ITraceNodeDao;
import com.ai.cloud.skywalking.web.util.Constants;
import com.ai.cloud.skywalking.web.util.HBaseUtils;
import com.ai.cloud.skywalking.web.util.SortUtil;
import com.ai.cloud.skywalking.web.util.StringUtil;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
/**
* Created by xin on 16-3-30.
*/
@Repository
public class TraceNodeDao implements ITraceNodeDao {
private String CALL_CHAIN_TABLE_NAME = "sw-call-chain";
@Autowired
private HBaseUtils hBaseUtils;
@Override
public TraceNodesResult queryTraceNodesByTraceId(String traceId) throws IOException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
Table table = hBaseUtils.getConnection().getTable(TableName.valueOf(CALL_CHAIN_TABLE_NAME));
Get g = new Get(Bytes.toBytes(traceId));
g.setFilter(new ColumnCountGetFilter(Constants.MAX_SEARCH_SPAN_SIZE + 1));
Result r = table.get(g);
Map<String, TraceNodeInfo> traceLogMap = new HashMap<String, TraceNodeInfo>();
Map<String, TraceNodeInfo> rpcMap = new HashMap<String, TraceNodeInfo>();
TraceNodesResult result = new TraceNodesResult();
if (r.rawCells().length < Constants.MAX_SEARCH_SPAN_SIZE) {
for (Cell cell : r.rawCells()) {
doDealSingleSpan(traceLogMap, rpcMap, cell);
}
computeRPCInfo(rpcMap, traceLogMap);
result.setOverMaxQueryNodeNumber(false);
result.setResult(traceLogMap.values());
}else{
result.setOverMaxQueryNodeNumber(true);
}
return result;
}
@Override
public Collection<TraceNodeInfo> queryEntranceNodeByTraceId(String traceId) throws IOException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
Table table = hBaseUtils.getConnection().getTable(TableName.valueOf(CALL_CHAIN_TABLE_NAME));
Get g = new Get(Bytes.toBytes(traceId));
g.addColumn("call-chain".getBytes(), "0".getBytes());
g.addColumn("call-chain".getBytes(), "0-S".getBytes());
g.addColumn("call-chain".getBytes(), "0.0".getBytes());
Result r = table.get(g);
Map<String, TraceNodeInfo> traceLogMap = new HashMap<String, TraceNodeInfo>();
Map<String, TraceNodeInfo> rpcMap = new HashMap<String, TraceNodeInfo>();
Cell cell = r.getColumnLatestCell("call-chain".getBytes(), "0".getBytes());
if (cell == null){
cell = r.getColumnLatestCell("call-chain".getBytes(), "0-S".getBytes());
}
doDealSingleSpan(traceLogMap, rpcMap, cell);
cell = r.getColumnLatestCell("call-chain".getBytes(), "0.0".getBytes());
doDealSingleSpan(traceLogMap, rpcMap, cell);
computeRPCInfo(rpcMap, traceLogMap);
return traceLogMap.values();
}
private void doDealSingleSpan(Map<String, TraceNodeInfo> traceLogMap, Map<String, TraceNodeInfo> rpcMap, Cell cell) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
if (cell != null && cell.getValueArray().length > 0) {
String colId = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength());
TraceNodeInfo tmpEntry = TraceNodeInfo.convert(
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()), colId);
// 特殊处理RPC的服务端信息
if (colId.endsWith(Constants.RPC_END_FLAG)) {
rpcMap.put(colId.substring(0, colId.lastIndexOf(Constants.RPC_END_FLAG)), tmpEntry);
} else {
SortUtil.addCurNodeTreeMapKey(traceLogMap, colId, tmpEntry);
}
}
}
private void computeRPCInfo(Map<String, TraceNodeInfo> rpcMap, Map<String, TraceNodeInfo> traceLogMap) {
// 合并处理
if (rpcMap.size() > 0) {
for (Map.Entry<String, TraceNodeInfo> rpcVO : rpcMap.entrySet()) {
String colId = rpcVO.getKey();
if (traceLogMap.containsKey(colId)) {
TraceNodeInfo logVO = traceLogMap.get(colId);
TraceNodeInfo serverLog = rpcVO.getValue();
if (StringUtil.isBlank(logVO.getStatusCodeStr()) || Constants.STATUS_CODE_9.equals(logVO.getStatusCodeStr())) {
serverLog.setColId(colId);
traceLogMap.put(colId, serverLog);
} else {
TraceNodeInfo clientLog = traceLogMap.get(colId);
clientLog.setApplicationIdStr(clientLog.getApplicationIdStr() + " --> " + serverLog.getApplicationIdStr());
clientLog.setViewPointId(serverLog.getViewPointId());
clientLog.setViewPointIdSub(serverLog.getViewPointIdSub());
clientLog.setAddress(serverLog.getAddress());
if (StringUtil.isBlank(clientLog.getExceptionStack())) {
clientLog.setExceptionStack(serverLog.getExceptionStack());
}else{
clientLog.setServerExceptionStr(serverLog.getServerExceptionStr());
}
System.out.println("1");
}
logVO.addTimeLine(rpcVO.getValue().getStartDate(), rpcVO.getValue().getCost());
} else {
traceLogMap.put(colId, rpcVO.getValue());
}
}
}
}
}
package com.ai.cloud.skywalking.web.dto;
import com.ai.cloud.skywalking.protocol.Span;
import com.ai.cloud.skywalking.protocol.FullSpan;
import com.ai.cloud.skywalking.protocol.RequestSpan;
import com.ai.cloud.skywalking.web.util.Constants;
import com.ai.cloud.skywalking.web.util.StringUtil;
import com.google.protobuf.InvalidProtocolBufferException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class TraceNodeInfo extends Span {
public class TraceNodeInfo extends FullSpan {
private String colId;
......@@ -41,11 +43,11 @@ public class TraceNodeInfo extends Span {
private String serverExceptionStr;
private TraceNodeInfo() {
private TraceNodeInfo(){
}
private TraceNodeInfo(String str) {
super(str);
private TraceNodeInfo(RequestSpan requestSpan) {
super(requestSpan);
}
public String getColId() {
......@@ -80,9 +82,10 @@ public class TraceNodeInfo extends Span {
this.viewPointIdSub = viewPointIdSub;
}
private static TraceNodeInfo convert(String str)
throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
TraceNodeInfo result = new TraceNodeInfo(str);
private static TraceNodeInfo convert(byte[] originData)
throws IllegalAccessException, InvocationTargetException, NoSuchMethodException,
InvalidProtocolBufferException {
TraceNodeInfo result = new TraceNodeInfo(new RequestSpan(originData));
// 处理类型key-value
String spanTypeStr = String.valueOf(result.getSpanType());
......@@ -107,16 +110,16 @@ public class TraceNodeInfo extends Span {
result.statusCodeStr = statusCodeStr;
result.statusCodeName = statusCodeName;
result.applicationIdStr = result.applicationId;
if (!StringUtil.isBlank(result.viewPointId) && result.viewPointId.length() > 60) {
result.viewPointIdSub = result.viewPointId.substring(0, 30) + "..."
+ result.viewPointId.substring(result.viewPointId.length() - 30);
result.applicationIdStr = result.getApplicationId();
if (!StringUtil.isBlank(result.getViewPointId()) && result.getViewPointId().length() > 60) {
result.viewPointIdSub = result.getViewPointId().substring(0, 30) + "..." + result.getViewPointId()
.substring(result.getViewPointId().length() - 30);
} else {
result.viewPointIdSub = result.viewPointId;
result.viewPointIdSub = result.getViewPointId();
}
result.addTimeLine(result.startDate, result.cost);
result.endDate = result.startDate + result.cost;
result.addTimeLine(result.getStartDate(), result.getCost());
result.endDate = result.getStartDate() + result.getCost();
return result;
}
......@@ -130,9 +133,10 @@ public class TraceNodeInfo extends Span {
timeLineList.add(new TimeLineEntry(startDate, cost));
}
public static TraceNodeInfo convert(String str, String colId)
throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
TraceNodeInfo result = convert(str);
public static TraceNodeInfo convert(byte[] requestSpanBytes, String colId)
throws IllegalAccessException, InvocationTargetException, NoSuchMethodException,
InvalidProtocolBufferException {
TraceNodeInfo result = convert(requestSpanBytes);
result.colId = colId;
return result;
}
......@@ -147,10 +151,9 @@ public class TraceNodeInfo extends Span {
TraceNodeInfo result = new TraceNodeInfo();
result.colId = colId;
if (colId.indexOf(Constants.VAL_SPLIT_CHAR) > -1) {
result.parentLevel = colId.substring(0, colId.lastIndexOf(Constants.VAL_SPLIT_CHAR));
} else {
result.parentLevel = "";
result.setParentLevel(colId.substring(0, colId.lastIndexOf(Constants.VAL_SPLIT_CHAR)));
}
result.timeLineList.add(new TimeLineEntry());
// 其它默认值
......@@ -162,11 +165,11 @@ public class TraceNodeInfo extends Span {
return "TraceNodeInfo [colId=" + colId + ", endDate=" + endDate + ", timeLineList=" + timeLineList
+ ", spanTypeStr=" + spanTypeStr + ", spanTypeName=" + spanTypeName + ", statusCodeStr=" + statusCodeStr
+ ", statusCodeName=" + statusCodeName + ", applicationIdStr=" + applicationIdStr + ", viewPointIdSub="
+ viewPointIdSub + ", traceId=" + traceId + ", parentLevel=" + parentLevel + ", levelId=" + levelId
+ ", viewPointId=" + viewPointId + ", startDate=" + startDate + ", cost=" + cost + ", address="
+ address + ", statusCode=" + statusCode + ", exceptionStack=" + exceptionStack + ", spanType="
+ spanType + ", businessKey=" + businessKey + ", processNo=" + processNo
+ ", applicationId=" + applicationId + ", originData=" + originData + "]";
+ viewPointIdSub + ", traceId=" + getTraceId() + ", parentLevel=" + getParentLevel() + ", levelId=" +
getLevelId() + ", viewPointId=" + getViewPointId() + ", startDate=" + getStartDate() + ", cost="
+ getCost() + ", statusCode=" + getStatusCode() + ", exceptionStack=" + getExceptionStack()
+ ", spanType=" + getSpanType() + ", businessKey=" + getBusinessKey() + ", applicationId="
+ getApplicationId() + "]";
}
public List<TimeLineEntry> getTimeLineList() {
......@@ -192,4 +195,5 @@ public class TraceNodeInfo extends Span {
public void setServerExceptionStr(String serverExceptionStr) {
this.serverExceptionStr = serverExceptionStr;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册