提交 921eea23 编写于 作者: A ascrutae

完成出入参协议改造

上级 a514850b
......@@ -10,7 +10,7 @@
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd">
<context:component-scan base-package="com.ai.cloud.skywalking.sample.web.controller"></context:component-scan>
<context:component-scan base-package="com.a.eye.skywalking.sample.web"></context:component-scan>
<mvc:annotation-driven></mvc:annotation-driven>
......@@ -22,4 +22,4 @@
</bean>
<import resource="classpath*:consumer/dubbo-consumer.xml"/>
</beans>
\ No newline at end of file
</beans>
......@@ -23,16 +23,30 @@ public class Tracing {
return spanData.getTraceId();
}
public static String generateNextContextData(){
if (!AuthDesc.isAuth())
return null;
Span spanData = CurrentThreadSpanStack.peek();
if (spanData == null) {
return null;
}
ContextData contextData = new ContextData(spanData);
return contextData.toString();
}
public static String getTracelevelId() {
if (!AuthDesc.isAuth())
return "";
Span spanData = CurrentThreadSpanStack.peek();
if (spanData == null) {
return "";
}
return (spanData.getParentLevel() == null || spanData.getParentLevel().length() == 0) ?
Integer.toString(spanData.getLevelId()) :
spanData.getParentLevel() + "." + spanData.getLevelId();
}
public static String generateNextContextData() {
if (!AuthDesc.isAuth())
return null;
Span spanData = CurrentThreadSpanStack.peek();
if (spanData == null) {
return null;
}
ContextData contextData = new ContextData(spanData);
return contextData.toString();
}
}
......@@ -27,7 +27,7 @@ public abstract class BaseInvokeMonitor {
protected ContextData beforeInvoke(Span spanData, Identification id) {
if (Config.BuriedPoint.PRINTF) {
logger.debug("TraceId:" + spanData.getTraceId() + "\tParentLevelId:" + spanData.getParentLevel()
+ "\tLevelId:" + spanData.getLevelId() + "\tbusinessKey:" + spanData.getParameters());
+ "\tLevelId:" + spanData.getLevelId() + "\tbusinessKey:" + spanData.getBusinessKey());
}
// 将新创建的Context存放到ThreadLocal栈中。
......@@ -37,28 +37,23 @@ public abstract class BaseInvokeMonitor {
ContextBuffer.save(RequestSpan.RequestSpanBuilder.
newBuilder(CurrentThreadSpanStack.peek()).callType(id.getCallType()).viewPoint(id.getViewPoint())
.spanTypeDesc(id.getSpanTypeDesc()).processNo(BuriedPointMachineUtil.getProcessNo())
.address(BuriedPointMachineUtil.getHostDesc()).parameters(id.getParameters()).build());
.address(BuriedPointMachineUtil.getHostDesc()).build());
// 并将当前的Context返回回去
return new ContextData(spanData);
}
protected void afterInvoke() {
afterInvoke(null);
}
protected void afterInvoke(String invokeResult) {
try {
if (!AuthDesc.isAuth())
return;
// 弹出上下文的栈顶中的元素
Span spanData = CurrentThreadSpanStack.pop();
spanData.setInvokeResult(invokeResult);
if (Config.BuriedPoint.PRINTF) {
logger.debug("TraceId-ACK:" + spanData.getTraceId() + "\tParentLevelId:" + spanData.getParentLevel()
+ "\tLevelId:" + spanData.getLevelId() + "\tbusinessKey:" + spanData.getParameters());
+ "\tLevelId:" + spanData.getLevelId() + "\tbusinessKey:" + spanData.getBusinessKey());
}
// 生成并保存到缓存
ContextBuffer.save(new AckSpan(spanData));
......
......@@ -32,10 +32,6 @@ public class LocalMethodInvokeMonitor extends BaseInvokeMonitor {
super.afterInvoke();
}
public void afterInvoke(String invokeResult){
super.afterInvoke(invokeResult);
}
public void occurException(Throwable th){
super.occurException(th);
......
......@@ -31,7 +31,7 @@ public class RPCClientInvokeMonitor extends BaseInvokeMonitor {
if (Config.BuriedPoint.PRINTF) {
logger.debug("TraceId:" + spanData.getTraceId() + "\tParentLevelId:" + spanData.getParentLevel()
+ "\tLevelId:" + spanData.getLevelId() + "\tbusinessKey:" + spanData.getParameters());
+ "\tLevelId:" + spanData.getLevelId() + "\tbusinessKey:" + spanData.getBusinessKey());
}
CurrentThreadSpanStack.push(spanData);
......@@ -41,8 +41,7 @@ public class RPCClientInvokeMonitor extends BaseInvokeMonitor {
.spanTypeDesc(id.getSpanTypeDesc())
.bussinessKey(id.getBusinessKey())
.callType(id.getCallType()).processNo(BuriedPointMachineUtil.getProcessNo())
.address(BuriedPointMachineUtil.getHostDesc())
.parameters(id.getParameters()).build();
.address(BuriedPointMachineUtil.getHostDesc()).build();
ContextBuffer.save(requestSpan);
......
......@@ -8,24 +8,18 @@ import java.util.Map;
public class Identification {
private String viewPoint;
private Map<String, String> parameters;
private String businessKey;
private String spanTypeDesc;
private String callType;
public Identification() {
//Non
parameters = new HashMap<String, String>();
}
public String getViewPoint() {
return viewPoint;
}
public Map<String, String> getParameters() {
return parameters;
}
public String getSpanTypeDesc() {
return spanTypeDesc;
}
......@@ -45,7 +39,6 @@ public class Identification {
public static class IdentificationBuilder {
private Identification sendData;
private int parameterIdx = 0;
IdentificationBuilder() {
sendData = new Identification();
......@@ -60,13 +53,6 @@ public class Identification {
return this;
}
public IdentificationBuilder addParameter(String value){
parameterIdx++;
sendData.parameters.put("_" + parameterIdx, value);
return this;
}
public IdentificationBuilder businessKey(String businessKey) {
sendData.businessKey = businessKey;
return this;
......
......@@ -17,7 +17,6 @@ public final class ContextGenerator {
Span spanData = getSpanFromThreadLocal();
spanData.setStartDate(System.currentTimeMillis());
spanData.setViewPointId(id.getViewPoint());
spanData.appendParameters(id.getParameters());
return spanData;
}
......
package com.a.eye.skywalking.plugin.custom.localmethod;
import com.a.eye.skywalking.api.Tracing;
import com.a.eye.skywalking.buffer.ContextBuffer;
import com.a.eye.skywalking.conf.Config;
import com.a.eye.skywalking.invoke.monitor.LocalMethodInvokeMonitor;
import com.a.eye.skywalking.model.Identification;
import com.a.eye.skywalking.plugin.interceptor.EnhancedClassInstanceContext;
import com.a.eye.skywalking.plugin.interceptor.enhance.*;
import com.a.eye.skywalking.protocol.InputParametersSpan;
import com.a.eye.skywalking.protocol.OutputParameterSpan;
import com.google.gson.Gson;
public class CustomLocalMethodInterceptor implements InstanceMethodsAroundInterceptor, StaticMethodsAroundInterceptor {
......@@ -16,34 +20,38 @@ public class CustomLocalMethodInterceptor implements InstanceMethodsAroundInterc
@Override
public void beforeMethod(EnhancedClassInstanceContext context, InstanceMethodInvokeContext interceptorContext,
MethodInterceptResult result) {
Identification.IdentificationBuilder identificationBuilder = buildIdentificationBuilder(interceptorContext);
Identification.IdentificationBuilder identificationBuilder = Identification.newBuilder();
identificationBuilder.spanType(new CustomLocalSpanType()).viewPoint(
fullMethodName(interceptorContext.inst().getClass(), interceptorContext.methodName(),
interceptorContext.argumentTypes()));
new LocalMethodInvokeMonitor().beforeInvoke(identificationBuilder.build());
recordParametersAndSave2BufferIfNecessary(interceptorContext.allArguments());
}
private Identification.IdentificationBuilder buildIdentificationBuilder(MethodInvokeContext interceptorContext) {
Identification.IdentificationBuilder identificationBuilder = Identification.newBuilder();
private void recordParametersAndSave2BufferIfNecessary(Object[] arguments) {
if (Config.Plugin.CustomLocalMethodInterceptorPlugin.RECORD_PARAM_ENABLE) {
for (Object param : interceptorContext.allArguments()) {
InputParametersSpan inputParametersSpan = new InputParametersSpan(Tracing.getTraceId(), Tracing.getTracelevelId());
for (Object param : arguments) {
String paramStr;
try {
paramStr = new Gson().toJson(param);
} catch (Throwable e) {
paramStr = "N/A";
}
identificationBuilder.addParameter(paramStr);
inputParametersSpan.addParameter(paramStr);
}
}
return identificationBuilder;
ContextBuffer.save(inputParametersSpan);
}
}
@Override
public Object afterMethod(EnhancedClassInstanceContext context, InstanceMethodInvokeContext interceptorContext,
Object ret) {
recordResultIfNecessary(ret);
recordResultAndSave2BufferIfNecessary(ret);
new LocalMethodInvokeMonitor().afterInvoke();
return ret;
}
......@@ -56,30 +64,34 @@ public class CustomLocalMethodInterceptor implements InstanceMethodsAroundInterc
@Override
public void beforeMethod(StaticMethodInvokeContext interceptorContext, MethodInterceptResult result) {
Identification.IdentificationBuilder identificationBuilder = buildIdentificationBuilder(interceptorContext);
Identification.IdentificationBuilder identificationBuilder = Identification.newBuilder();
identificationBuilder.spanType(new CustomLocalSpanType()).viewPoint(
fullMethodName(interceptorContext.claszz(), interceptorContext.methodName(),
interceptorContext.argumentTypes()));
new LocalMethodInvokeMonitor().beforeInvoke(identificationBuilder.build());
recordParametersAndSave2BufferIfNecessary(interceptorContext.allArguments());
}
@Override
public Object afterMethod(StaticMethodInvokeContext interceptorContext, Object ret) {
recordResultIfNecessary(ret);
recordResultAndSave2BufferIfNecessary(ret);
new LocalMethodInvokeMonitor().afterInvoke();
return ret;
}
private void recordResultIfNecessary(Object ret) {
private void recordResultAndSave2BufferIfNecessary(Object ret) {
if (Config.Plugin.CustomLocalMethodInterceptorPlugin.RECORD_PARAM_ENABLE){
OutputParameterSpan outputParameterSpan = new OutputParameterSpan(Tracing.getTraceId(), Tracing.getTracelevelId());
String retStr;
try{
retStr = new Gson().toJson(ret);
}catch (Throwable e){
retStr = "N/A";
}
new LocalMethodInvokeMonitor().afterInvoke(retStr);
}else {
new LocalMethodInvokeMonitor().afterInvoke();
outputParameterSpan.setOutputParameter(retStr);
ContextBuffer.save(outputParameterSpan);
}
}
......
......@@ -43,10 +43,6 @@ public class AckSpan extends AbstractDataSerializable {
*/
private String exceptionStack = "";
/**
* 埋点入参列表,补充时触发
*/
private Map<String, String> paramters = new HashMap<String, String>();
private String viewPointId;
......@@ -66,7 +62,6 @@ public class AckSpan extends AbstractDataSerializable {
this.exceptionStack = spanData.getExceptionStack();
this.userId = spanData.getUserId();
this.applicationId = spanData.getApplicationId();
this.paramters.putAll(spanData.getParameters());
this.viewPointId = spanData.getViewPointId();
}
......@@ -133,11 +128,6 @@ public class AckSpan extends AbstractDataSerializable {
builder = TraceProtocol.AckSpan.newBuilder().setTraceId(traceId).setParentLevel(parentLevel).
setLevelId(levelId).setCost(cost).setViewpointId(viewPointId).setStatusCode(statusCode)
.setExceptionStack(exceptionStack);
if (paramters != null && paramters.size() > 0){
builder.putAllParameters(paramters);
}
return builder.build().toByteArray();
}
......@@ -153,7 +143,6 @@ public class AckSpan extends AbstractDataSerializable {
ackSpan.setExceptionStack(ackSpanProtocol.getExceptionStack());
ackSpan.setStatusCode((byte) ackSpanProtocol.getStatusCode());
ackSpan.viewPointId = ackSpanProtocol.getViewpointId();
ackSpan.paramters = ackSpanProtocol.getParametersMap();
} catch (InvalidProtocolBufferException e) {
throw new ConvertFailedException(e.getMessage(),e);
}
......
package com.a.eye.skywalking.protocol;
import com.a.eye.skywalking.protocol.common.AbstractDataSerializable;
import com.a.eye.skywalking.protocol.exception.ConvertFailedException;
import com.a.eye.skywalking.protocol.proto.TraceProtocol;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.HashMap;
import java.util.Map;
public class InputParametersSpan extends AbstractDataSerializable {
private static final InputParametersSpan INSTANCE = new InputParametersSpan();
private static int parameterIndex = 0;
/**
* tid,调用链的全局唯一标识
*/
private String traceId;
/**
* 当前调用链的描述<br/>
*/
private String traceLevelId;
/**
* 埋点入参列表,补充时触发
*/
private Map<String, String> parameters = new HashMap<String, String>();
public InputParametersSpan() {
}
public InputParametersSpan(String traceId, String traceLevelId) {
this.traceLevelId = traceLevelId;
this.traceId = traceId;
}
public int getDataType() {
return 3;
}
public byte[] getData() {
TraceProtocol.InputParametersSpan.Builder builder =
TraceProtocol.InputParametersSpan.newBuilder().setTraceId(traceId).setTraceLevelId(traceLevelId);
if (parameters != null && parameters.size() > 0) {
builder.putAllParameters(parameters);
}
return builder.build().toByteArray();
}
public AbstractDataSerializable convertData(byte[] data) throws ConvertFailedException {
InputParametersSpan result = new InputParametersSpan();
try {
TraceProtocol.InputParametersSpan parametersSpan = TraceProtocol.InputParametersSpan.parseFrom(data);
result.traceId = parametersSpan.getTraceId();
result.traceLevelId = parametersSpan.getTraceLevelId();
result.parameters = parametersSpan.getParametersMap();
} catch (InvalidProtocolBufferException e) {
throw new ConvertFailedException("Failed to convert to parametersSpan", e);
}
return result;
}
public static InputParametersSpan convert(byte[] data) throws ConvertFailedException {
return (InputParametersSpan) INSTANCE.convertData(data);
}
public boolean isNull() {
return false;
}
public String getTraceId() {
return traceId;
}
public void setTraceId(String traceId) {
this.traceId = traceId;
}
public String getTraceLevelId() {
return traceLevelId;
}
public void setTraceLevelId(String traceLevelId) {
this.traceLevelId = traceLevelId;
}
public void addParameter(String parameter) {
parameters.put("_" + parameterIndex, parameter);
parameterIndex++;
}
public Map<String, String> getParameters() {
return parameters;
}
public void setParameters(Map<String, String> parameters) {
this.parameters = parameters;
}
}
package com.a.eye.skywalking.protocol;
import com.a.eye.skywalking.protocol.common.AbstractDataSerializable;
import com.a.eye.skywalking.protocol.exception.ConvertFailedException;
import com.a.eye.skywalking.protocol.proto.TraceProtocol;
/**
* Created by xin on 16/8/16.
*/
public class OutputParameterSpan extends AbstractDataSerializable {
private static final OutputParameterSpan INSTANCE = new OutputParameterSpan();
/**
* tid,调用链的全局唯一标识
*/
private String traceId;
/**
* 当前调用链的描述<br/>
*/
private String traceLevelId;
private String outputParameter;
public OutputParameterSpan() {
}
public OutputParameterSpan(String traceId, String traceLevelId) {
this.traceId = traceId;
this.traceLevelId = traceLevelId;
}
public int getDataType() {
return 4;
}
public byte[] getData() {
return TraceProtocol.OutputParametersSpan.newBuilder().setOutputParameter(getOutputParameter())
.setTraceId(getTraceId()).setTraceLevelId(getTraceLevelId()).build().toByteArray();
}
public AbstractDataSerializable convertData(byte[] data) throws ConvertFailedException {
try {
OutputParameterSpan outputParameterSpan = new OutputParameterSpan();
TraceProtocol.OutputParametersSpan _protobufOutputSpan = TraceProtocol.OutputParametersSpan.parseFrom(data);
outputParameterSpan.setOutputParameter(_protobufOutputSpan.getOutputParameter());
outputParameterSpan.setTraceId(_protobufOutputSpan.getTraceId());
outputParameterSpan.setTraceLevelId(_protobufOutputSpan.getTraceLevelId());
return outputParameterSpan;
} catch (Exception e) {
throw new ConvertFailedException("Failed to convert output parameter span.", e);
}
}
public static OutputParameterSpan convert(byte[] data) throws ConvertFailedException {
return (OutputParameterSpan) INSTANCE.convertData(data);
}
public boolean isNull() {
return false;
}
public String getTraceId() {
return traceId;
}
public void setTraceId(String traceId) {
this.traceId = traceId;
}
public String getTraceLevelId() {
return traceLevelId;
}
public void setTraceLevelId(String traceLevelId) {
this.traceLevelId = traceLevelId;
}
public String getOutputParameter() {
if (outputParameter == null) {
return "";
}
return outputParameter;
}
public void setOutputParameter(String outputParameter) {
this.outputParameter = outputParameter;
}
}
......@@ -70,11 +70,6 @@ public class RequestSpan extends AbstractDataSerializable {
*/
private String userId = "";
/**
* 埋点入参列表
*/
private Map<String, String> parameters = new HashMap<String, String>();
/**
* 业务字段
*/
......@@ -196,14 +191,6 @@ public class RequestSpan extends AbstractDataSerializable {
this.userId = userId;
}
public Map<String, String> getParameters() {
return parameters;
}
public void setParameters(Map<String, String> parameters) {
this.parameters = parameters;
}
@Override
public int getDataType() {
return 1;
......@@ -220,10 +207,6 @@ public class RequestSpan extends AbstractDataSerializable {
builder.setBussinessKey(businessKey);
}
if (parameters != null && parameters.size() > 0) {
builder.putAllParameters(parameters);
}
return builder.setCallType(callType).setApplicationId(applicationId).setUserId(userId).setAgentId(agentId)
.build().toByteArray();
}
......@@ -247,7 +230,6 @@ public class RequestSpan extends AbstractDataSerializable {
requestSpan.setAgentId(requestSpanByte.getAgentId());
requestSpan.setProcessNo(requestSpanByte.getProcessNo());
requestSpan.setAddress(requestSpanByte.getAddress());
requestSpan.setParameters(requestSpanByte.getParametersMap());
} catch (InvalidProtocolBufferException e) {
throw new ConvertFailedException(e.getMessage(), e);
}
......@@ -311,11 +293,6 @@ public class RequestSpan extends AbstractDataSerializable {
return this;
}
public RequestSpanBuilder parameters(Map<String, String> parameters) {
requestSpan.parameters = parameters;
return this;
}
public RequestSpan build() {
return requestSpan;
}
......
......@@ -47,11 +47,6 @@ public class Span {
*/
protected String exceptionStack = "";
/**
* 节点调用过程中的业务字段<br/>
* 如:业务系统设置的订单号,SQL语句等
*/
protected Map<String, String> parameters = new HashMap<String, String>();
/**
* 节点类型<br/>
* 如:RPC Client,RPC Server,Local
......@@ -127,14 +122,6 @@ public class Span {
this.exceptionStack = exceptionStack;
}
public Map<String, String> getParameters() {
return parameters;
}
public void setParameters(Map<String, String> parameters) {
this.parameters = parameters;
}
public void setSpanType(SpanType spanType) {
this.spanType = spanType;
}
......@@ -183,10 +170,6 @@ public class Span {
this.businessKey = businessKey;
}
public void appendParameters(Map<String, String> parameters) {
this.parameters.putAll(parameters);
}
public String getApplicationId() {
return applicationId;
}
......@@ -208,12 +191,7 @@ public class Span {
this.viewPointId = viewPointId;
}
public void setInvokeResult(String result){
if (result == null){
result = "null";
}
this.parameters.put(INVOKE_RESULT_PARAMETER_KEY, result);
}
public String getViewPointId() {
return viewPointId;
......
......@@ -10,7 +10,6 @@ message AckSpan {
required int32 statusCode = 5;
optional string exceptionStack = 6;
required string viewpointId = 7;
map<string,string> parameters = 8;
}
message RequestSpan {
......@@ -26,7 +25,18 @@ message RequestSpan {
required string userId = 10;
optional string bussinessKey = 11;
required string agentId = 12;
map<string,string> parameters = 13;
required string processNo = 14;
required string address = 15;
required string processNo = 13;
required string address = 14;
}
message InputParametersSpan{
required string traceId = 1;
optional string traceLevelId = 2;
map<string,string> parameters = 3;
}
message OutputParametersSpan{
required string traceId = 1;
optional string traceLevelId = 2;
required string outputParameter = 3;
}
com.a.eye.skywalking.protocol.AckSpan
com.a.eye.skywalking.protocol.RequestSpan
com.a.eye.skywalking.protocol.InputParametersSpan
com.a.eye.skywalking.protocol.OutputParameterSpan
com.a.eye.skywalking.protocol.BufferFileEOFProtocol
......@@ -89,9 +89,20 @@ public class Config {
public static class HBaseConfig {
public static String TABLE_NAME = "sw-call-chain";
public static class TraceDataTable {
public static String FAMILY_COLUMN_NAME = "call-chain";
public static String TABLE_NAME = "trace-data";
public static String FAMILY_COLUMN_NAME = "call-chain";
}
public static class TraceParamTable {
public static String TABLE_NAME = "trace-param";
public static String FAMILY_COLUMN_NAME = "param-data";
}
public static String ZK_HOSTNAME;
......
......@@ -4,6 +4,7 @@ import com.a.eye.skywalking.protocol.SerializedFactory;
import com.a.eye.skywalking.protocol.common.AbstractDataSerializable;
import com.a.eye.skywalking.protocol.exception.ConvertFailedException;
import com.a.eye.skywalking.protocol.util.IntegerAssist;
import com.a.eye.skywalking.reciever.conf.Config;
import com.a.eye.skywalking.reciever.selfexamination.ServerHealthCollector;
import com.a.eye.skywalking.reciever.selfexamination.ServerHeathReading;
import org.apache.commons.io.FileUtils;
......@@ -24,7 +25,7 @@ public class BufferFileReader {
private static final byte[] DATA_SPILT = new byte[] {127, 127, 127, 127};
private int remainderLength = 0;
private byte[] remainderByte = null;
private Logger logger = LogManager.getLogger(BufferFileReader.class);
private static Logger logger = LogManager.getLogger(BufferFileReader.class);
private List<AbstractDataSerializable> serializables;
public BufferFileReader(File bufferFile, int currentOffset) {
......@@ -200,8 +201,7 @@ public class BufferFileReader {
AbstractDataSerializable abstractDataSerializable = SerializedFactory.deserialize(data);
serializeData.add(abstractDataSerializable);
} catch (ConvertFailedException e) {
// FIXME: 16/8/4 logger日志输出
e.printStackTrace();
logger.error("Failed to convert span.", e);
}
currentLength += 4 + dataLength;
......
package com.a.eye.skywalking.reciever.processor;
import com.a.eye.skywalking.reciever.processor.exception.HBaseInitFailedException;
import com.a.eye.skywalking.protocol.common.AbstractDataSerializable;
import com.a.eye.skywalking.reciever.conf.Config;
import com.a.eye.skywalking.reciever.selfexamination.ServerHealthCollector;
import com.a.eye.skywalking.reciever.selfexamination.ServerHeathReading;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import com.a.eye.skywalking.reciever.util.HBaseUtil;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.List;
import static org.apache.hadoop.hbase.util.Bytes.toBytes;
public abstract class AbstractSpanProcessor implements IProcessor {
private static Logger logger = LogManager.getLogger(AbstractSpanProcessor.class);
private static Configuration configuration = null;
private static Connection connection;
static {
if (configuration == null) {
configuration = HBaseConfiguration.create();
if (Config.HBaseConfig.ZK_HOSTNAME == null || "".equals(Config.HBaseConfig.ZK_HOSTNAME)) {
logger.error("Miss HBase ZK quorum Configuration", new IllegalArgumentException("Miss HBase ZK quorum Configuration"));
System.exit(-1);
}
configuration.set("hbase.zookeeper.quorum", Config.HBaseConfig.ZK_HOSTNAME);
configuration.set("hbase.zookeeper.property.clientPort", Config.HBaseConfig.CLIENT_PORT);
}
try {
connection = ConnectionFactory.createConnection(configuration);
Admin admin = connection.getAdmin();
if (!admin.tableExists(TableName.valueOf(Config.HBaseConfig.TABLE_NAME))){
HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(Config.HBaseConfig.TABLE_NAME));
HColumnDescriptor family = new HColumnDescriptor(toBytes(Config.HBaseConfig.FAMILY_COLUMN_NAME));
descriptor.addFamily(family);
admin.createTable(descriptor);
}
} catch (IOException e) {
ServerHealthCollector.getCurrentHeathReading("hbase").updateData(ServerHeathReading.ERROR, "connect to hbase failure.");
throw new HBaseInitFailedException("initHBaseClient failure", e);
}
}
public abstract class AbstractSpanProcessor implements IProcessor{
@Override
public void process(List<AbstractDataSerializable> serializedObjects) {
doAlarm(serializedObjects);
doSaveHBase(connection, serializedObjects);
doSaveHBase(HBaseUtil.getConnection(), serializedObjects);
}
public abstract void doAlarm(List<AbstractDataSerializable> serializedObjects);
......
......@@ -54,12 +54,12 @@ public class AckSpanProcessor extends AbstractSpanProcessor {
// appending suffix
columnName += "-ACK";
put.addColumn(Bytes.toBytes(Config.HBaseConfig.FAMILY_COLUMN_NAME), Bytes.toBytes(columnName),
put.addColumn(Bytes.toBytes(Config.HBaseConfig.TraceDataTable.FAMILY_COLUMN_NAME), Bytes.toBytes(columnName),
ackSpan.getData());
puts.add(put);
}
// save
HBaseUtil.batchSavePuts(connection, Config.HBaseConfig.TABLE_NAME, puts);
HBaseUtil.batchSavePuts(connection, Config.HBaseConfig.TraceDataTable.TABLE_NAME, puts);
}
@Override
......
package com.a.eye.skywalking.reciever.processor;
import com.a.eye.skywalking.protocol.InputParametersSpan;
import com.a.eye.skywalking.protocol.common.AbstractDataSerializable;
import com.a.eye.skywalking.reciever.conf.Config;
import com.a.eye.skywalking.reciever.util.HBaseUtil;
import com.a.eye.skywalking.reciever.util.SpanUtil;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.ArrayList;
import java.util.List;
@DefaultProcessor
public class InputParameterSpanProcessor extends ParameterSpanProcessor {
@Override
public void doSaveHBase(Connection connection, List<AbstractDataSerializable> serializedObjects) {
List<Put> puts = new ArrayList<Put>();
// convert to put
for (AbstractDataSerializable serializedObject : serializedObjects) {
InputParametersSpan inputParametersSpan = (InputParametersSpan) serializedObject;
Put put = new Put(Bytes.toBytes(inputParametersSpan.getTraceId()),
SpanUtil.getTSBySpanTraceId(inputParametersSpan.getTraceId()));
put.addColumn(Bytes.toBytes(Config.HBaseConfig.TraceParamTable.FAMILY_COLUMN_NAME),
Bytes.toBytes(inputParametersSpan.getTraceLevelId()), inputParametersSpan.getData());
puts.add(put);
}
// save
HBaseUtil.batchSavePuts(connection, Config.HBaseConfig.TraceParamTable.TABLE_NAME, puts);
}
@Override
public int getProtocolType() {
return 3;
}
}
package com.a.eye.skywalking.reciever.processor;
import com.a.eye.skywalking.protocol.OutputParameterSpan;
import com.a.eye.skywalking.protocol.common.AbstractDataSerializable;
import com.a.eye.skywalking.reciever.conf.Config;
import com.a.eye.skywalking.reciever.util.HBaseUtil;
import com.a.eye.skywalking.reciever.util.SpanUtil;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.ArrayList;
import java.util.List;
@DefaultProcessor
public class OutputParameterSpanProcessor extends ParameterSpanProcessor {
@Override
public void doSaveHBase(Connection connection, List<AbstractDataSerializable> serializedObjects) {
List<Put> puts = new ArrayList<Put>();
// convert to put
for (AbstractDataSerializable serializedObject : serializedObjects) {
OutputParameterSpan inputParametersSpan = (OutputParameterSpan) serializedObject;
String columnName = inputParametersSpan.getTraceLevelId() + "-RET";
Put put = new Put(Bytes.toBytes(inputParametersSpan.getTraceId()),
SpanUtil.getTSBySpanTraceId(inputParametersSpan.getTraceId()));
put.addColumn(Bytes.toBytes(Config.HBaseConfig.TraceParamTable.FAMILY_COLUMN_NAME),
Bytes.toBytes(columnName), inputParametersSpan.getData());
puts.add(put);
}
// save
HBaseUtil.batchSavePuts(connection, Config.HBaseConfig.TraceParamTable.TABLE_NAME, puts);
}
@Override
public int getProtocolType() {
return 4;
}
}
package com.a.eye.skywalking.reciever.processor;
import com.a.eye.skywalking.protocol.common.AbstractDataSerializable;
import com.a.eye.skywalking.reciever.util.HBaseUtil;
import java.util.List;
public abstract class ParameterSpanProcessor extends AbstractSpanProcessor {
@Override
public void process(List<AbstractDataSerializable> serializedObjects) {
doSaveHBase(HBaseUtil.getConnection(), serializedObjects);
}
@Override
public void doAlarm(List<AbstractDataSerializable> serializedObjects) {
// do Nothing
}
}
......@@ -35,12 +35,12 @@ public class RequestSpanProcessor extends AbstractSpanProcessor {
} else {
columnName = requestSpan.getParentLevel() + "." + requestSpan.getLevelId();
}
put.addColumn(Bytes.toBytes(Config.HBaseConfig.FAMILY_COLUMN_NAME), Bytes.toBytes(columnName),
put.addColumn(Bytes.toBytes(Config.HBaseConfig.TraceDataTable.FAMILY_COLUMN_NAME), Bytes.toBytes(columnName),
requestSpan.getData());
puts.add(put);
}
// save
HBaseUtil.batchSavePuts(connection, Config.HBaseConfig.TABLE_NAME, puts);
HBaseUtil.batchSavePuts(connection, Config.HBaseConfig.TraceDataTable.TABLE_NAME, puts);
}
@Override
......
package com.a.eye.skywalking.reciever.util;
import com.a.eye.skywalking.reciever.conf.Config;
import com.a.eye.skywalking.reciever.processor.exception.HBaseInitFailedException;
import com.a.eye.skywalking.reciever.selfexamination.ServerHealthCollector;
import com.a.eye.skywalking.reciever.selfexamination.ServerHeathReading;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.List;
import static org.apache.hadoop.hbase.util.Bytes.toBytes;
public class HBaseUtil {
private static Logger logger = LogManager.getLogger(HBaseUtil.class);
......@@ -24,4 +32,49 @@ public class HBaseUtil {
logger.error("batchSavePuts failure.", e);
}
}
private static Connection connection;
public static Connection initConnection() {
Configuration configuration = HBaseConfiguration.create();
if (Config.HBaseConfig.ZK_HOSTNAME == null || "".equals(Config.HBaseConfig.ZK_HOSTNAME)) {
logger.error("Miss HBase ZK quorum Configuration",
new IllegalArgumentException("Miss HBase ZK quorum Configuration"));
System.exit(-1);
}
configuration.set("hbase.zookeeper.quorum", Config.HBaseConfig.ZK_HOSTNAME);
configuration.set("hbase.zookeeper.property.clientPort", Config.HBaseConfig.CLIENT_PORT);
try {
Connection connection = ConnectionFactory.createConnection(configuration);
Admin admin = connection.getAdmin();
if (!admin.tableExists(TableName.valueOf(Config.HBaseConfig.TraceDataTable.TABLE_NAME))) {
HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(Config.HBaseConfig.TraceDataTable.TABLE_NAME));
HColumnDescriptor family = new HColumnDescriptor(toBytes(Config.HBaseConfig.TraceDataTable.FAMILY_COLUMN_NAME));
descriptor.addFamily(family);
admin.createTable(descriptor);
}
if (!admin.tableExists(TableName.valueOf(Config.HBaseConfig.TraceParamTable.TABLE_NAME))) {
HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(Config.HBaseConfig.TraceParamTable.TABLE_NAME));
HColumnDescriptor family = new HColumnDescriptor(toBytes(Config.HBaseConfig.TraceParamTable.FAMILY_COLUMN_NAME));
descriptor.addFamily(family);
admin.createTable(descriptor);
}
return connection;
} catch (IOException e) {
ServerHealthCollector.getCurrentHeathReading("hbase")
.updateData(ServerHeathReading.ERROR, "connect to hbase failure.");
throw new HBaseInitFailedException("initHBaseClient failure", e);
}
}
public static Connection getConnection() {
if (connection == null) {
connection = initConnection();
}
return connection;
}
}
com.a.eye.skywalking.reciever.processor.RequestSpanProcessor
com.a.eye.skywalking.reciever.processor.AckSpanProcessor
com.a.eye.skywalking.reciever.processor.InputParameterSpanProcessor
com.a.eye.skywalking.reciever.processor.OutputParameterSpanProcessor
......@@ -23,7 +23,7 @@ persistence.switch_file_wait_time=5000
#追加EOF标志位的线程数量
persistence.max_append_eof_flags_thread_number=1
#持久化线程个数
persistence.max_deal_data_thread_number=0
persistence.max_deal_data_thread_number=1
#偏移量注册文件的目录
registerpersistence.register_file_parent_directory=/tmp/skywalking/data/offset
......@@ -35,14 +35,20 @@ registerpersistence.register_bak_file_name=offset.txt.bak
registerpersistence.offset_written_file_wait_cycle=5000
#hbase表名
hbaseconfig.table_name=trace-data
#hbase列簇名字
hbaseconfig.family_column_name=call-chain
#trace data hbase表名
hbaseconfig.tracedatatable.table_name=trace-data
#trace data hbase列簇名字
hbaseconfig.tracedatatable.family_column_name=call-chain
#trace data hbase表名
hbaseconfig.traceparamtable.table_name=trace-param
#trace data hbase列簇名字
hbaseconfig.traceparamtable.family_column_name=param-data
#hbase zk quorum
hbaseconfig.zk_hostname=10.1.235.197,10.1.235.198,10.1.235.199
hbaseconfig.zk_hostname=swhbaseenv
#hbase zk port
hbaseconfig.client_port=29181
hbaseconfig.client_port=2181
#告警失效时间
alarm.alarm_expire_seconds=5400
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册