提交 8c275427 编写于 作者: P pengys5

trace stack es save and ui search success

上级 c387cfb9
......@@ -9,4 +9,5 @@ public class Const {
public static final String PEERS_FRONT_SPLIT = "[";
public static final String PEERS_BEHIND_SPLIT = "]";
public static final String USER_CODE = "User";
public static final String SEGMENT_SPAN_SPLIT = "S";
}
package org.skywalking.apm.collector.agentstream.worker.global;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.global.dao.IGlobalTraceDAO;
import org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class GlobalTracePersistenceWorker extends PersistenceWorker {
public GlobalTracePersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected List<?> prepareBatch(Map<String, Data> dataMap) {
IGlobalTraceDAO dao = (IGlobalTraceDAO)DAOContainer.INSTANCE.get(IGlobalTraceDAO.class.getName());
return dao.prepareBatch(dataMap);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<GlobalTracePersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public GlobalTracePersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new GlobalTracePersistenceWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return GlobalTracePersistenceWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
@Override public DataDefine dataDefine() {
return new GlobalTraceDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.global;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.GlobalTraceIdsListener;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.UniqueId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class GlobalTraceSpanListener implements FirstSpanListener, GlobalTraceIdsListener {
private final Logger logger = LoggerFactory.getLogger(GlobalTraceSpanListener.class);
private List<String> globalTraceIds = new ArrayList<>();
private String segmentId;
private long timeBucket;
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
this.timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
this.segmentId = segmentId;
}
@Override public void parseGlobalTraceId(UniqueId uniqueId) {
StringBuilder globalTraceIdBuilder = new StringBuilder();
uniqueId.getIdPartsList().forEach(globalTraceIdBuilder::append);
globalTraceIds.add(globalTraceIdBuilder.toString());
}
@Override public void build() {
logger.debug("global trace listener build");
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
for (String globalTraceId : globalTraceIds) {
GlobalTraceDataDefine.GlobalTrace globalTrace = new GlobalTraceDataDefine.GlobalTrace();
globalTrace.setGlobalTraceId(globalTraceId);
globalTrace.setId(segmentId + globalTraceId);
globalTrace.setSegmentId(segmentId);
globalTrace.setTimeBucket(timeBucket);
try {
logger.debug("send to global trace persistence worker, id: {}", globalTrace.getId());
context.getClusterWorkerContext().lookup(GlobalTracePersistenceWorker.WorkerRole.INSTANCE).tell(globalTrace.transform());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
}
}
\ No newline at end of file
package org.skywalking.apm.collector.agentstream.worker.global.dao;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class GlobalTraceEsDAO extends EsDAO implements IGlobalTraceDAO {
private final Logger logger = LoggerFactory.getLogger(GlobalTraceEsDAO.class);
@Override public List<?> prepareBatch(Map<String, Data> dataMap) {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
logger.debug("global trace prepareBatch, id: {}", id);
Map<String, Object> source = new HashMap();
source.put(GlobalTraceTable.COLUMN_SEGMENT_ID, data.getDataString(1));
source.put(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, data.getDataString(2));
source.put(GlobalTraceTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
logger.debug("global trace source: {}", source.toString());
IndexRequestBuilder builder = getClient().prepareIndex(GlobalTraceTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
}
}
package org.skywalking.apm.collector.agentstream.worker.global.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class GlobalTraceH2DAO extends H2DAO implements IGlobalTraceDAO {
@Override public List<?> prepareBatch(Map map) {
return null;
}
}
package org.skywalking.apm.collector.agentstream.worker.global.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public interface IGlobalTraceDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
}
package org.skywalking.apm.collector.agentstream.worker.global.define;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
/**
* @author pengys5
*/
public class GlobalTraceDataDefine extends DataDefine {
@Override public int defineId() {
return 403;
}
@Override protected int initialCapacity() {
return 4;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(GlobalTraceTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(GlobalTraceTable.COLUMN_SEGMENT_ID, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute(GlobalTraceTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
String id = remoteData.getDataStrings(0);
String segmentId = remoteData.getDataStrings(1);
String globalTraceId = remoteData.getDataStrings(2);
Long timeBucket = remoteData.getDataLongs(0);
return new GlobalTrace(id, segmentId, globalTraceId, timeBucket);
}
@Override public RemoteData serialize(Object object) {
GlobalTrace globalTrace = (GlobalTrace)object;
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(globalTrace.getId());
builder.addDataStrings(globalTrace.getSegmentId());
builder.addDataStrings(globalTrace.getGlobalTraceId());
builder.addDataLongs(globalTrace.getTimeBucket());
return builder.build();
}
public static class GlobalTrace implements TransformToData {
private String id;
private String segmentId;
private String globalTraceId;
private long timeBucket;
GlobalTrace(String id, String segmentId, String globalTraceId, long timeBucket) {
this.id = id;
this.segmentId = segmentId;
this.globalTraceId = globalTraceId;
this.timeBucket = timeBucket;
}
public GlobalTrace() {
}
@Override public Data transform() {
GlobalTraceDataDefine define = new GlobalTraceDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataString(1, this.segmentId);
data.setDataString(2, this.globalTraceId);
data.setDataLong(0, this.timeBucket);
return data;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getSegmentId() {
return segmentId;
}
public void setSegmentId(String segmentId) {
this.segmentId = segmentId;
}
public String getGlobalTraceId() {
return globalTraceId;
}
public void setGlobalTraceId(String globalTraceId) {
this.globalTraceId = globalTraceId;
}
public long getTimeBucket() {
return timeBucket;
}
public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.global.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class GlobalTraceEsTableDefine extends ElasticSearchTableDefine {
public GlobalTraceEsTableDefine() {
super(GlobalTraceTable.TABLE);
}
@Override public int refreshInterval() {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(GlobalTraceTable.COLUMN_SEGMENT_ID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(GlobalTraceTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.global.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class GlobalTraceH2TableDefine extends H2TableDefine {
public GlobalTraceH2TableDefine() {
super(GlobalTraceTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(GlobalTraceTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(GlobalTraceTable.COLUMN_SEGMENT_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(GlobalTraceTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.global.define;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
/**
* @author pengys5
*/
public class GlobalTraceTable extends CommonTable {
public static final String TABLE = "global_trace";
public static final String COLUMN_SEGMENT_ID = "segment_id";
public static final String COLUMN_GLOBAL_TRACE_ID = "global_trace_id";
}
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.segment;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.global.GlobalTraceSpanListener;
import org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentSpanListener;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingSpanListener;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefSpanListener;
......@@ -40,6 +41,7 @@ public class SegmentParse {
spanListeners.add(new NodeRefSpanListener());
spanListeners.add(new NodeRefSumSpanListener());
spanListeners.add(new SegmentCostSpanListener());
spanListeners.add(new GlobalTraceSpanListener());
}
public void parse(List<UniqueId> traceIds, TraceSegmentObject segmentObject) {
......
......@@ -5,7 +5,6 @@ import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.GlobalTraceIdsListener;
import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostDataDefine;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
......@@ -15,18 +14,16 @@ import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.UniqueId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListener, LocalSpanListener, FirstSpanListener, GlobalTraceIdsListener {
public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListener, LocalSpanListener, FirstSpanListener {
private final Logger logger = LoggerFactory.getLogger(SegmentCostSpanListener.class);
private List<String> globalTraceIds = new ArrayList<>();
private List<SegmentCostDataDefine.SegmentCost> segmentCosts = new ArrayList<>();
private boolean isError = false;
private long timeBucket;
......@@ -39,11 +36,12 @@ public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListe
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
SegmentCostDataDefine.SegmentCost segmentCost = new SegmentCostDataDefine.SegmentCost();
segmentCost.setSegmentId(segmentId);
segmentCost.setCost(spanObject.getEndTime() - spanObject.getStartTime());
segmentCost.setStartTime(spanObject.getStartTime());
segmentCost.setEndTime(spanObject.getEndTime());
segmentCost.setSegmentId(segmentId);
segmentCost.setOperationName(spanObject.getOperationName());
segmentCost.setId(segmentId);
segmentCosts.add(segmentCost);
isError = isError || spanObject.getIsError();
......@@ -59,28 +57,18 @@ public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListe
isError = isError || spanObject.getIsError();
}
@Override public void parseGlobalTraceId(UniqueId uniqueId) {
StringBuilder globalTraceIdBuilder = new StringBuilder();
uniqueId.getIdPartsList().forEach(globalTraceIdBuilder::append);
globalTraceIds.add(globalTraceIdBuilder.toString());
}
@Override public void build() {
logger.debug("segment cost listener build");
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
for (SegmentCostDataDefine.SegmentCost segmentCost : segmentCosts) {
for (String globalTraceId : globalTraceIds) {
segmentCost.setGlobalTraceId(globalTraceId);
segmentCost.setId(segmentCost.getSegmentId() + globalTraceId);
segmentCost.setError(isError);
segmentCost.setTimeBucket(timeBucket);
try {
logger.debug("send to segment cost persistence worker, id: {}", segmentCost.getId());
context.getClusterWorkerContext().lookup(SegmentCostPersistenceWorker.WorkerRole.INSTANCE).tell(segmentCost.transform());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
segmentCost.setError(isError);
segmentCost.setTimeBucket(timeBucket);
try {
logger.debug("send to segment cost persistence worker, id: {}", segmentCost.getId());
context.getClusterWorkerContext().lookup(SegmentCostPersistenceWorker.WorkerRole.INSTANCE).tell(segmentCost.transform());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
}
......
......@@ -24,8 +24,7 @@ public class SegmentCostEsDAO extends EsDAO implements ISegmentCostDAO {
logger.debug("segment cost prepareBatch, id: {}", id);
Map<String, Object> source = new HashMap();
source.put(SegmentCostTable.COLUMN_SEGMENT_ID, data.getDataString(1));
source.put(SegmentCostTable.COLUMN_GLOBAL_TRACE_ID, data.getDataString(2));
source.put(SegmentCostTable.COLUMN_OPERATION_NAME, data.getDataString(3));
source.put(SegmentCostTable.COLUMN_OPERATION_NAME, data.getDataString(2));
source.put(SegmentCostTable.COLUMN_COST, data.getDataLong(0));
source.put(SegmentCostTable.COLUMN_START_TIME, data.getDataLong(1));
source.put(SegmentCostTable.COLUMN_END_TIME, data.getDataLong(2));
......
......@@ -19,32 +19,30 @@ public class SegmentCostDataDefine extends DataDefine {
}
@Override protected int initialCapacity() {
return 9;
return 8;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(SegmentCostTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(SegmentCostTable.COLUMN_SEGMENT_ID, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(SegmentCostTable.COLUMN_GLOBAL_TRACE_ID, AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute(SegmentCostTable.COLUMN_OPERATION_NAME, AttributeType.STRING, new CoverOperation()));
addAttribute(4, new Attribute(SegmentCostTable.COLUMN_COST, AttributeType.LONG, new CoverOperation()));
addAttribute(5, new Attribute(SegmentCostTable.COLUMN_START_TIME, AttributeType.LONG, new CoverOperation()));
addAttribute(6, new Attribute(SegmentCostTable.COLUMN_END_TIME, AttributeType.LONG, new CoverOperation()));
addAttribute(7, new Attribute(SegmentCostTable.COLUMN_IS_ERROR, AttributeType.BOOLEAN, new CoverOperation()));
addAttribute(8, new Attribute(SegmentCostTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
addAttribute(2, new Attribute(SegmentCostTable.COLUMN_OPERATION_NAME, AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute(SegmentCostTable.COLUMN_COST, AttributeType.LONG, new CoverOperation()));
addAttribute(4, new Attribute(SegmentCostTable.COLUMN_START_TIME, AttributeType.LONG, new CoverOperation()));
addAttribute(5, new Attribute(SegmentCostTable.COLUMN_END_TIME, AttributeType.LONG, new CoverOperation()));
addAttribute(6, new Attribute(SegmentCostTable.COLUMN_IS_ERROR, AttributeType.BOOLEAN, new CoverOperation()));
addAttribute(7, new Attribute(SegmentCostTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new NonOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
String id = remoteData.getDataStrings(0);
String segmentId = remoteData.getDataStrings(1);
String globalTraceId = remoteData.getDataStrings(2);
String operationName = remoteData.getDataStrings(3);
String operationName = remoteData.getDataStrings(2);
Long cost = remoteData.getDataLongs(0);
Long startTime = remoteData.getDataLongs(1);
Long endTime = remoteData.getDataLongs(2);
Boolean isError = remoteData.getDataBooleans(0);
Long timeBucket = remoteData.getDataLongs(2);
return new SegmentCost(id, segmentId, globalTraceId, operationName, cost, startTime, endTime, isError, timeBucket);
return new SegmentCost(id, segmentId, operationName, cost, startTime, endTime, isError, timeBucket);
}
@Override public RemoteData serialize(Object object) {
......@@ -52,19 +50,18 @@ public class SegmentCostDataDefine extends DataDefine {
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(segmentCost.getId());
builder.addDataStrings(segmentCost.getSegmentId());
builder.addDataStrings(segmentCost.getGlobalTraceId());
builder.addDataStrings(segmentCost.getOperationName());
builder.addDataLongs(segmentCost.getCost());
builder.addDataLongs(segmentCost.getStartTime());
builder.addDataLongs(segmentCost.getEndTime());
builder.addDataBooleans(segmentCost.isError());
builder.addDataLongs(segmentCost.getTimeBucket());
return builder.build();
}
public static class SegmentCost implements TransformToData {
private String id;
private String segmentId;
private String globalTraceId;
private String operationName;
private Long cost;
private Long startTime;
......@@ -72,11 +69,10 @@ public class SegmentCostDataDefine extends DataDefine {
private boolean isError;
private long timeBucket;
SegmentCost(String id, String segmentId, String globalTraceId, String operationName, Long cost,
SegmentCost(String id, String segmentId, String operationName, Long cost,
Long startTime, Long endTime, boolean isError, long timeBucket) {
this.id = id;
this.segmentId = segmentId;
this.globalTraceId = globalTraceId;
this.operationName = operationName;
this.cost = cost;
this.startTime = startTime;
......@@ -93,8 +89,7 @@ public class SegmentCostDataDefine extends DataDefine {
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataString(1, this.segmentId);
data.setDataString(2, this.globalTraceId);
data.setDataString(3, this.operationName);
data.setDataString(2, this.operationName);
data.setDataLong(0, this.cost);
data.setDataLong(1, this.startTime);
data.setDataLong(2, this.endTime);
......@@ -119,14 +114,6 @@ public class SegmentCostDataDefine extends DataDefine {
this.segmentId = segmentId;
}
public String getGlobalTraceId() {
return globalTraceId;
}
public void setGlobalTraceId(String globalTraceId) {
this.globalTraceId = globalTraceId;
}
public String getOperationName() {
return operationName;
}
......
......@@ -26,7 +26,6 @@ public class SegmentCostEsTableDefine extends ElasticSearchTableDefine {
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_SEGMENT_ID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_GLOBAL_TRACE_ID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_OPERATION_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_COST, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_START_TIME, ElasticSearchColumnDefine.Type.Long.name()));
......
......@@ -15,7 +15,6 @@ public class SegmentCostH2TableDefine extends H2TableDefine {
@Override public void initialize() {
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_SEGMENT_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_GLOBAL_TRACE_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_OPERATION_NAME, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_COST, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_START_TIME, H2ColumnDefine.Type.Bigint.name()));
......
......@@ -10,7 +10,6 @@ public class SegmentCostTable extends CommonTable {
public static final String COLUMN_SEGMENT_ID = "segment_id";
public static final String COLUMN_START_TIME = "start_time";
public static final String COLUMN_END_TIME = "end_time";
public static final String COLUMN_GLOBAL_TRACE_ID = "global_trace_id";
public static final String COLUMN_OPERATION_NAME = "operation_name";
public static final String COLUMN_COST = "cost";
public static final String COLUMN_IS_ERROR = "is_error";
......
......@@ -6,4 +6,5 @@ org.skywalking.apm.collector.agentstream.worker.node.mapping.dao.NodeMappingEsDA
org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao.NodeReferenceEsDAO
org.skywalking.apm.collector.agentstream.worker.segment.origin.dao.SegmentEsDAO
org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao.NodeRefSumEsDAO
org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.SegmentCostEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.SegmentCostEsDAO
org.skywalking.apm.collector.agentstream.worker.global.dao.GlobalTraceEsDAO
\ No newline at end of file
......@@ -6,4 +6,5 @@ org.skywalking.apm.collector.agentstream.worker.node.mapping.dao.NodeMappingH2DA
org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao.NodeReferenceH2DAO
org.skywalking.apm.collector.agentstream.worker.segment.origin.dao.SegmentH2DAO
org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao.NodeRefSumH2DAO
org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.SegmentCostH2DAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.SegmentCostH2DAO
org.skywalking.apm.collector.agentstream.worker.global.dao.GlobalTraceH2DAO
\ No newline at end of file
......@@ -12,6 +12,7 @@ org.skywalking.apm.collector.agentstream.worker.noderef.summary.NodeRefSumPersis
org.skywalking.apm.collector.agentstream.worker.segment.origin.SegmentPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.segment.cost.SegmentCostPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.global.GlobalTracePersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceRegisterSerialWorker$Factory
......
......@@ -23,4 +23,7 @@ org.skywalking.apm.collector.agentstream.worker.segment.origin.define.SegmentEsT
org.skywalking.apm.collector.agentstream.worker.segment.origin.define.SegmentH2TableDefine
org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostEsTableDefine
org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostH2TableDefine
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostH2TableDefine
org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceEsTableDefine
org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceH2TableDefine
\ No newline at end of file
......@@ -5,6 +5,8 @@ import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import org.junit.Test;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.KeyWithStringValue;
import org.skywalking.apm.network.proto.LogMessage;
import org.skywalking.apm.network.proto.SpanLayer;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.SpanType;
......@@ -86,6 +88,17 @@ public class TraceSegmentServiceHandlerTestCase {
span_0.setSpanType(SpanType.Entry);
span_0.setPeerId(2);
span_0.setPeer("localhost:8082");
LogMessage.Builder log_0 = LogMessage.newBuilder();
log_0.setTime(now);
log_0.addData(KeyWithStringValue.newBuilder().setKey("log1").setValue("value1"));
log_0.addData(KeyWithStringValue.newBuilder().setKey("log2").setValue("value2"));
log_0.addData(KeyWithStringValue.newBuilder().setKey("log3").setValue("value3"));
span_0.addLogs(log_0.build());
span_0.addTags(KeyWithStringValue.newBuilder().setKey("tag1").setValue("value1"));
span_0.addTags(KeyWithStringValue.newBuilder().setKey("tag2").setValue("value2"));
span_0.addTags(KeyWithStringValue.newBuilder().setKey("tag3").setValue("value3"));
segmentBuilder.addSpans(span_0);
TraceSegmentReference.Builder ref_0 = TraceSegmentReference.newBuilder();
......
......@@ -9,6 +9,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.update.UpdateRequest;
......@@ -113,6 +114,10 @@ public class ElasticSearchClient implements Client {
return client.prepareIndex(indexName, "type", id);
}
public GetRequestBuilder prepareGet(String indexName, String id) {
return client.prepareGet(indexName, "type", id);
}
public BulkRequestBuilder prepareBulk() {
return client.prepareBulk();
}
......
package org.skywalking.apm.collector.ui.dao;
import java.util.ArrayList;
import java.util.List;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class GlobalTraceEsDAO extends EsDAO implements IGlobalTraceDAO {
private final Logger logger = LoggerFactory.getLogger(GlobalTraceEsDAO.class);
@Override public List<String> getGlobalTraceId(String segmentId) {
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(GlobalTraceTable.TABLE);
searchRequestBuilder.setTypes(GlobalTraceTable.TABLE_TYPE);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.termQuery(GlobalTraceTable.COLUMN_SEGMENT_ID, segmentId));
searchRequestBuilder.setSize(10);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
List<String> globalTraceIds = new ArrayList<>();
SearchHit[] searchHits = searchResponse.getHits().getHits();
for (SearchHit searchHit : searchHits) {
String globalTraceId = (String)searchHit.getSource().get(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID);
logger.debug("segmentId: {}, global trace id: {}", segmentId, globalTraceId);
globalTraceIds.add(globalTraceId);
}
return globalTraceIds;
}
@Override public List<String> getSegmentIds(String globalTraceId) {
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(GlobalTraceTable.TABLE);
searchRequestBuilder.setTypes(GlobalTraceTable.TABLE_TYPE);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.termQuery(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, globalTraceId));
searchRequestBuilder.setSize(10);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
List<String> segmentIds = new ArrayList<>();
SearchHit[] searchHits = searchResponse.getHits().getHits();
for (SearchHit searchHit : searchHits) {
String segmentId = (String)searchHit.getSource().get(GlobalTraceTable.COLUMN_SEGMENT_ID);
logger.debug("segmentId: {}, global trace id: {}", segmentId, globalTraceId);
segmentIds.add(segmentId);
}
return segmentIds;
}
}
package org.skywalking.apm.collector.ui.dao;
import java.util.List;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class GlobalTraceH2DAO extends H2DAO implements IGlobalTraceDAO {
@Override public List<String> getGlobalTraceId(String segmentId) {
return null;
}
@Override public List<String> getSegmentIds(String globalTraceId) {
return null;
}
}
package org.skywalking.apm.collector.ui.dao;
import java.util.List;
/**
* @author pengys5
*/
public interface IGlobalTraceDAO {
List<String> getGlobalTraceId(String segmentId);
List<String> getSegmentIds(String globalTraceId);
}
package org.skywalking.apm.collector.ui.dao;
import org.skywalking.apm.network.proto.TraceSegmentObject;
/**
* @author pengys5
*/
public interface ISegmentDAO {
TraceSegmentObject load(String segmentId);
}
......@@ -12,8 +12,11 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceTable;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostTable;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
/**
......@@ -44,9 +47,6 @@ public class SegmentCostEsDAO extends EsDAO implements ISegmentCostDAO {
if (!StringUtils.isEmpty(operationName)) {
mustQueryList.add(QueryBuilders.matchQuery(SegmentCostTable.COLUMN_OPERATION_NAME, operationName));
}
if (!StringUtils.isEmpty(globalTraceId)) {
mustQueryList.add(QueryBuilders.matchQuery(SegmentCostTable.COLUMN_GLOBAL_TRACE_ID, globalTraceId));
}
searchRequestBuilder.addSort(SegmentCostTable.COLUMN_COST, SortOrder.DESC);
searchRequestBuilder.setSize(limit);
......@@ -64,15 +64,20 @@ public class SegmentCostEsDAO extends EsDAO implements ISegmentCostDAO {
for (SearchHit searchHit : searchResponse.getHits().getHits()) {
JsonObject topSegmentJson = new JsonObject();
topSegmentJson.addProperty("num", num);
String segId = (String)searchHit.getSource().get(SegmentCostTable.COLUMN_SEGMENT_ID);
topSegmentJson.addProperty(SegmentCostTable.COLUMN_SEGMENT_ID, segId);
String segmentId = (String)searchHit.getSource().get(SegmentCostTable.COLUMN_SEGMENT_ID);
topSegmentJson.addProperty(SegmentCostTable.COLUMN_SEGMENT_ID, segmentId);
topSegmentJson.addProperty(SegmentCostTable.COLUMN_START_TIME, (Number)searchHit.getSource().get(SegmentCostTable.COLUMN_START_TIME));
if (searchHit.getSource().containsKey(SegmentCostTable.COLUMN_END_TIME)) {
topSegmentJson.addProperty(SegmentCostTable.COLUMN_END_TIME, (Number)searchHit.getSource().get(SegmentCostTable.COLUMN_END_TIME));
}
IGlobalTraceDAO globalTraceDAO = (IGlobalTraceDAO)DAOContainer.INSTANCE.get(IGlobalTraceDAO.class.getName());
List<String> globalTraces = globalTraceDAO.getGlobalTraceId(segmentId);
if (CollectionUtils.isNotEmpty(globalTraces)) {
topSegmentJson.addProperty(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, globalTraces.get(0));
}
topSegmentJson.addProperty(SegmentCostTable.COLUMN_OPERATION_NAME, (String)searchHit.getSource().get(SegmentCostTable.COLUMN_OPERATION_NAME));
topSegmentJson.addProperty(SegmentCostTable.COLUMN_GLOBAL_TRACE_ID, (String)searchHit.getSource().get(SegmentCostTable.COLUMN_GLOBAL_TRACE_ID));
topSegmentJson.addProperty(SegmentCostTable.COLUMN_COST, (Number)searchHit.getSource().get(SegmentCostTable.COLUMN_COST));
topSegmentJson.addProperty(SegmentCostTable.COLUMN_IS_ERROR, (Boolean)searchHit.getSource().get(SegmentCostTable.COLUMN_IS_ERROR));
......
package org.skywalking.apm.collector.ui.dao;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Base64;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.skywalking.apm.collector.agentstream.worker.segment.origin.define.SegmentTable;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class SegmentEsDAO extends EsDAO implements ISegmentDAO {
private final Logger logger = LoggerFactory.getLogger(SegmentEsDAO.class);
@Override public TraceSegmentObject load(String segmentId) {
GetResponse response = getClient().prepareGet(SegmentTable.TABLE, segmentId).get();
Map<String, Object> source = response.getSource();
String dataBinaryBase64 = (String)source.get(SegmentTable.COLUMN_DATA_BINARY);
if (StringUtils.isNotEmpty(dataBinaryBase64)) {
byte[] dataBinary = Base64.getDecoder().decode(dataBinaryBase64);
try {
return TraceSegmentObject.parseFrom(dataBinary);
} catch (InvalidProtocolBufferException e) {
logger.error(e.getMessage(), e);
}
}
return null;
}
}
package org.skywalking.apm.collector.ui.dao;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.network.proto.TraceSegmentObject;
/**
* @author pengys5
*/
public class SegmentH2DAO extends H2DAO implements ISegmentDAO {
@Override public TraceSegmentObject load(String segmentId) {
return null;
}
}
......@@ -11,7 +11,9 @@ import org.skywalking.apm.collector.server.jetty.JettyServer;
import org.skywalking.apm.collector.ui.UIModuleDefine;
import org.skywalking.apm.collector.ui.UIModuleGroupDefine;
import org.skywalking.apm.collector.ui.jetty.handler.SegmentTopGetHandler;
import org.skywalking.apm.collector.ui.jetty.handler.SpanGetHandler;
import org.skywalking.apm.collector.ui.jetty.handler.TraceDagGetHandler;
import org.skywalking.apm.collector.ui.jetty.handler.TraceStackGetHandler;
import org.skywalking.apm.collector.ui.jetty.handler.UIJettyServerHandler;
/**
......@@ -50,6 +52,8 @@ public class UIJettyModuleDefine extends UIModuleDefine {
handlers.add(new UIJettyServerHandler());
handlers.add(new TraceDagGetHandler());
handlers.add(new SegmentTopGetHandler());
handlers.add(new TraceStackGetHandler());
handlers.add(new SpanGetHandler());
return handlers;
}
}
package org.skywalking.apm.collector.ui.jetty.handler;
import com.google.gson.JsonElement;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.skywalking.apm.collector.ui.service.SpanService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class SpanGetHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(SpanGetHandler.class);
@Override public String pathSpec() {
return "/span/spanId";
}
private SpanService service = new SpanService();
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
String segmentId = req.getParameter("segmentId");
String spanIdStr = req.getParameter("spanId");
logger.debug("segmentSpanId: {}, spanIdStr: {}", segmentId, spanIdStr);
int spanId;
try {
spanId = Integer.parseInt(spanIdStr);
} catch (NumberFormatException e) {
throw new ArgumentsParseException("span id must be integer");
}
return service.load(segmentId, spanId);
}
}
package org.skywalking.apm.collector.ui.jetty.handler;
import com.google.gson.JsonElement;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.skywalking.apm.collector.ui.service.TraceStackService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class TraceStackGetHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(TraceStackGetHandler.class);
@Override public String pathSpec() {
return "/traceStack/globalTraceId";
}
private TraceStackService service = new TraceStackService();
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
String globalTraceId = req.getParameter("globalTraceId");
logger.debug("globalTraceId: {}", globalTraceId);
return service.load(globalTraceId);
}
}
package org.skywalking.apm.collector.ui.service;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.util.List;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.ui.dao.ISegmentDAO;
import org.skywalking.apm.network.proto.KeyWithStringValue;
import org.skywalking.apm.network.proto.LogMessage;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentObject;
/**
* @author pengys5
*/
public class SpanService {
public JsonObject load(String segmentId, int spanId) {
ISegmentDAO segmentDAO = (ISegmentDAO)DAOContainer.INSTANCE.get(ISegmentDAO.class.getName());
TraceSegmentObject segmentObject = segmentDAO.load(segmentId);
JsonObject spanJson = new JsonObject();
List<SpanObject> spans = segmentObject.getSpansList();
for (SpanObject spanObject : spans) {
if (spanId == spanObject.getSpanId()) {
spanJson.addProperty("operationName", spanObject.getOperationName());
spanJson.addProperty("startTime", spanObject.getStartTime());
spanJson.addProperty("endTime", spanObject.getEndTime());
JsonArray logsArray = new JsonArray();
List<LogMessage> logs = spanObject.getLogsList();
for (LogMessage logMessage : logs) {
JsonObject logJson = new JsonObject();
logJson.addProperty("time", logMessage.getTime());
JsonArray logInfoArray = new JsonArray();
for (KeyWithStringValue value : logMessage.getDataList()) {
JsonObject valueJson = new JsonObject();
valueJson.addProperty("key", value.getKey());
valueJson.addProperty("value", value.getValue());
logInfoArray.add(valueJson);
}
logJson.add("logInfo", logInfoArray);
logsArray.add(logJson);
}
spanJson.add("logMessage", logsArray);
JsonArray tagsArray = new JsonArray();
for (KeyWithStringValue tagValue : spanObject.getTagsList()) {
JsonObject tagJson = new JsonObject();
tagJson.addProperty("key", tagValue.getKey());
tagJson.addProperty("value", tagValue.getValue());
tagsArray.add(tagJson);
}
spanJson.add("tags", tagsArray);
}
}
return spanJson;
}
}
\ No newline at end of file
package org.skywalking.apm.collector.ui.service;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.ui.dao.IGlobalTraceDAO;
import org.skywalking.apm.collector.ui.dao.ISegmentDAO;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.skywalking.apm.network.proto.UniqueId;
/**
* @author pengys5
*/
public class TraceStackService {
public JsonArray load(String globalTraceId) {
IGlobalTraceDAO globalTraceDAO = (IGlobalTraceDAO)DAOContainer.INSTANCE.get(IGlobalTraceDAO.class.getName());
ISegmentDAO segmentDAO = (ISegmentDAO)DAOContainer.INSTANCE.get(ISegmentDAO.class.getName());
List<Span> spans = new ArrayList<>();
List<String> segmentIds = globalTraceDAO.getSegmentIds(globalTraceId);
if (CollectionUtils.isNotEmpty(segmentIds)) {
for (String segmentId : segmentIds) {
TraceSegmentObject segment = segmentDAO.load(segmentId);
if (ObjectUtils.isNotEmpty(segment)) {
spans.addAll(buildSpanList(segmentId, segment));
}
}
}
List<Span> sortedSpans = new ArrayList<>();
if (CollectionUtils.isNotEmpty(spans)) {
List<Span> rootSpans = findRoot(spans);
if (CollectionUtils.isNotEmpty(rootSpans)) {
rootSpans.forEach(span -> {
List<Span> childrenSpan = new ArrayList<>();
childrenSpan.add(span);
findChildren(spans, span, childrenSpan);
sortedSpans.addAll(childrenSpan);
});
}
}
minStartTime(sortedSpans);
return toJsonArray(sortedSpans);
}
private JsonArray toJsonArray(List<Span> sortedSpans) {
JsonArray traceStackArray = new JsonArray();
sortedSpans.forEach(span -> {
JsonObject spanJson = new JsonObject();
spanJson.addProperty("spanId", span.getSpanId());
spanJson.addProperty("parentSpanId", span.getParentSpanId());
spanJson.addProperty("segmentSpanId", span.getSegmentSpanId());
spanJson.addProperty("segmentParentSpanId", span.getSegmentParentSpanId());
spanJson.addProperty("startTime", span.getStartTime());
spanJson.addProperty("operationName", span.getOperationName());
spanJson.addProperty("applicationCode", span.getApplicationCode());
spanJson.addProperty("cost", span.getCost());
spanJson.addProperty("isRoot", span.isRoot());
traceStackArray.add(spanJson);
});
return traceStackArray;
}
private void minStartTime(List<Span> spans) {
long minStartTime = Long.MAX_VALUE;
for (Span span : spans) {
if (span.getStartTime() < minStartTime) {
minStartTime = span.getStartTime();
}
}
for (Span span : spans) {
span.setStartTime(span.getStartTime() - minStartTime);
}
}
private List<Span> buildSpanList(String segmentId, TraceSegmentObject segment) {
List<Span> spans = new ArrayList<>();
if (segment.getSpansCount() > 0) {
for (SpanObject spanObject : segment.getSpansList()) {
int spanId = spanObject.getSpanId();
int parentSpanId = spanObject.getParentSpanId();
String segmentSpanId = segmentId + Const.SEGMENT_SPAN_SPLIT + String.valueOf(spanId);
String segmentParentSpanId = segmentId + Const.SEGMENT_SPAN_SPLIT + String.valueOf(parentSpanId);
long startTime = spanObject.getStartTime();
String operationName = spanObject.getOperationName();
String applicationCode = "Code" + String.valueOf(segment.getApplicationId());
long cost = spanObject.getEndTime() - spanObject.getStartTime();
if (cost == 0) {
cost = 1;
}
if (parentSpanId == -1 && segment.getRefsCount() > 0) {
for (TraceSegmentReference reference : segment.getRefsList()) {
parentSpanId = reference.getParentSpanId();
UniqueId uniqueId = reference.getParentTraceSegmentId();
StringBuilder segmentIdBuilder = new StringBuilder();
uniqueId.getIdPartsList().forEach(part -> segmentIdBuilder.append(String.valueOf(part)));
String parentSegmentId = segmentIdBuilder.toString();
segmentParentSpanId = parentSegmentId + Const.SEGMENT_SPAN_SPLIT + String.valueOf(parentSpanId);
spans.add(new Span(spanId, parentSpanId, segmentSpanId, segmentParentSpanId, startTime, operationName, applicationCode, cost));
}
} else {
spans.add(new Span(spanId, parentSpanId, segmentSpanId, segmentParentSpanId, startTime, operationName, applicationCode, cost));
}
}
}
return spans;
}
private List<Span> findRoot(List<Span> spans) {
List<Span> rootSpans = new ArrayList<>();
spans.forEach(span -> {
String segmentParentSpanId = span.getSegmentParentSpanId();
boolean hasParent = false;
for (Span span1 : spans) {
if (segmentParentSpanId.equals(span1.getSegmentSpanId())) {
hasParent = true;
}
}
if (!hasParent) {
span.setRoot(true);
rootSpans.add(span);
}
});
return rootSpans;
}
private void findChildren(List<Span> spans, Span parentSpan, List<Span> childrenSpan) {
spans.forEach(span -> {
if (span.getSegmentParentSpanId().equals(parentSpan.getSegmentSpanId())) {
childrenSpan.add(span);
findChildren(spans, span, childrenSpan);
}
});
}
class Span {
private int spanId;
private int parentSpanId;
private String segmentSpanId;
private String segmentParentSpanId;
private long startTime;
private String operationName;
private String applicationCode;
private long cost;
private boolean isRoot = false;
Span(int spanId, int parentSpanId, String segmentSpanId, String segmentParentSpanId, long startTime,
String operationName, String applicationCode, long cost) {
this.spanId = spanId;
this.parentSpanId = parentSpanId;
this.segmentSpanId = segmentSpanId;
this.segmentParentSpanId = segmentParentSpanId;
this.startTime = startTime;
this.operationName = operationName;
this.applicationCode = applicationCode;
this.cost = cost;
}
int getSpanId() {
return spanId;
}
int getParentSpanId() {
return parentSpanId;
}
String getSegmentSpanId() {
return segmentSpanId;
}
String getSegmentParentSpanId() {
return segmentParentSpanId;
}
long getStartTime() {
return startTime;
}
String getOperationName() {
return operationName;
}
String getApplicationCode() {
return applicationCode;
}
long getCost() {
return cost;
}
public boolean isRoot() {
return isRoot;
}
public void setRoot(boolean root) {
isRoot = root;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
}
}
......@@ -2,4 +2,6 @@ org.skywalking.apm.collector.ui.dao.NodeComponentEsDAO
org.skywalking.apm.collector.ui.dao.NodeMappingEsDAO
org.skywalking.apm.collector.ui.dao.NodeReferenceEsDAO
org.skywalking.apm.collector.ui.dao.NodeRefSumEsDAO
org.skywalking.apm.collector.ui.dao.SegmentCostEsDAO
\ No newline at end of file
org.skywalking.apm.collector.ui.dao.SegmentCostEsDAO
org.skywalking.apm.collector.ui.dao.GlobalTraceEsDAO
org.skywalking.apm.collector.ui.dao.SegmentEsDAO
\ No newline at end of file
......@@ -2,4 +2,6 @@ org.skywalking.apm.collector.ui.dao.NodeComponentH2DAO
org.skywalking.apm.collector.ui.dao.NodeMappingH2DAO
org.skywalking.apm.collector.ui.dao.NodeReferenceH2DAO
org.skywalking.apm.collector.ui.dao.NodeRefSumH2DAO
org.skywalking.apm.collector.ui.dao.SegmentCostH2DAO
\ No newline at end of file
org.skywalking.apm.collector.ui.dao.SegmentCostH2DAO
org.skywalking.apm.collector.ui.dao.GlobalTraceH2DAO
org.skywalking.apm.collector.ui.dao.SegmentH2DAO
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册