提交 99222785 编写于 作者: P pengys5

Segment cost save to es success

上级 430fe745
......@@ -27,7 +27,8 @@ public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanLis
private List<String> nodeComponents = new ArrayList<>();
private long timeBucket;
@Override public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId) {
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String peers = spanObject.getPeer();
if (spanObject.getPeerId() == 0) {
peers = String.valueOf(spanObject.getPeerId());
......@@ -36,13 +37,15 @@ public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanLis
nodeComponents.add(agg);
}
@Override public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId) {
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String peers = String.valueOf(applicationId);
String agg = spanObject.getComponentId() + Const.ID_SPLIT + peers;
nodeComponents.add(agg);
}
@Override public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId) {
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
}
......
......@@ -27,7 +27,8 @@ public class NodeMappingSpanListener implements RefsListener, FirstSpanListener
private List<String> nodeMappings = new ArrayList<>();
private long timeBucket;
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId) {
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
String segmentId) {
logger.debug("node mapping listener parse reference");
String peers = Const.PEERS_FRONT_SPLIT + reference.getNetworkAddressId() + Const.PEERS_BEHIND_SPLIT;
if (reference.getNetworkAddressId() == 0) {
......@@ -38,7 +39,8 @@ public class NodeMappingSpanListener implements RefsListener, FirstSpanListener
nodeMappings.add(agg);
}
@Override public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId) {
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
}
......
......@@ -31,7 +31,8 @@ public class NodeRefSpanListener implements EntrySpanListener, ExitSpanListener,
private long timeBucket;
private boolean hasReference = false;
@Override public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId) {
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String front = String.valueOf(applicationId);
String behind = String.valueOf(spanObject.getPeerId());
if (spanObject.getPeerId() == 0) {
......@@ -42,7 +43,8 @@ public class NodeRefSpanListener implements EntrySpanListener, ExitSpanListener,
nodeExitReferences.add(agg);
}
@Override public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId) {
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String behind = String.valueOf(applicationId);
String front = Const.USER_CODE;
......@@ -50,11 +52,13 @@ public class NodeRefSpanListener implements EntrySpanListener, ExitSpanListener,
nodeEntryReferences.add(agg);
}
@Override public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId) {
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
}
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId) {
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
String segmentId) {
hasReference = true;
}
......
......@@ -31,7 +31,8 @@ public class NodeRefSumSpanListener implements EntrySpanListener, ExitSpanListen
private long timeBucket;
private boolean hasReference = false;
@Override public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId) {
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String front = String.valueOf(applicationId);
String behind = String.valueOf(spanObject.getPeerId());
if (spanObject.getPeerId() == 0) {
......@@ -42,7 +43,8 @@ public class NodeRefSumSpanListener implements EntrySpanListener, ExitSpanListen
nodeExitReferences.add(buildNodeRefSum(spanObject.getStartTime(), spanObject.getEndTime(), agg, spanObject.getIsError()));
}
@Override public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId) {
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String behind = String.valueOf(applicationId);
String front = Const.USER_CODE;
......@@ -71,11 +73,13 @@ public class NodeRefSumSpanListener implements EntrySpanListener, ExitSpanListen
return referenceSum;
}
@Override public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId) {
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
}
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId) {
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
String segmentId) {
hasReference = true;
}
......
......@@ -6,5 +6,5 @@ import org.skywalking.apm.network.proto.SpanObject;
* @author pengys5
*/
public interface EntrySpanListener extends SpanListener {
void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId);
void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId);
}
......@@ -6,5 +6,5 @@ import org.skywalking.apm.network.proto.SpanObject;
* @author pengys5
*/
public interface ExitSpanListener extends SpanListener {
void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId);
void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId);
}
......@@ -6,5 +6,5 @@ import org.skywalking.apm.network.proto.SpanObject;
* @author pengys5
*/
public interface FirstSpanListener extends SpanListener {
void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId);
void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId);
}
package org.skywalking.apm.collector.agentstream.worker.segment;
import org.skywalking.apm.network.proto.UniqueId;
/**
* @author pengys5
*/
public interface GlobalTraceIdsListener extends SpanListener {
void parseGlobalTraceId(UniqueId uniqueId);
}
......@@ -6,5 +6,5 @@ import org.skywalking.apm.network.proto.SpanObject;
* @author pengys5
*/
public interface LocalSpanListener extends SpanListener {
void parseLocal(SpanObject spanObject, int applicationId, int applicationInstanceId);
void parseLocal(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId);
}
......@@ -6,5 +6,5 @@ import org.skywalking.apm.network.proto.TraceSegmentReference;
* @author pengys5
*/
public interface RefsListener extends SpanListener {
void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId);
void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId, String segmentId);
}
......@@ -6,7 +6,9 @@ import org.skywalking.apm.collector.agentstream.worker.node.component.NodeCompon
import org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingSpanListener;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefSpanListener;
import org.skywalking.apm.collector.agentstream.worker.noderef.summary.NodeRefSumSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.define.SegmentDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.SegmentCostSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.origin.SegmentPersistenceWorker;
import org.skywalking.apm.collector.agentstream.worker.segment.origin.define.SegmentDataDefine;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.stream.StreamModuleContext;
......@@ -29,7 +31,6 @@ public class SegmentParse {
private final Logger logger = LoggerFactory.getLogger(SegmentParse.class);
private List<SpanListener> spanListeners;
private List<SpanListener> refsListeners;
public SegmentParse() {
spanListeners = new ArrayList<>();
......@@ -38,38 +39,40 @@ public class SegmentParse {
spanListeners.add(new NodeMappingSpanListener());
spanListeners.add(new NodeRefSpanListener());
spanListeners.add(new NodeRefSumSpanListener());
refsListeners = new ArrayList<>();
refsListeners.add(new NodeMappingSpanListener());
refsListeners.add(new NodeRefSpanListener());
refsListeners.add(new NodeRefSumSpanListener());
spanListeners.add(new SegmentCostSpanListener());
}
public void parse(List<UniqueId> traceIds, TraceSegmentObject segmentObject) {
StringBuilder segmentIdBuilder = new StringBuilder();
segmentObject.getTraceSegmentId().getIdPartsList().forEach(part -> {
segmentIdBuilder.append(part);
});
String segmentId = segmentIdBuilder.toString();
for (UniqueId uniqueId : traceIds) {
uniqueId.getIdPartsList();
notifyGlobalsListener(uniqueId);
}
int applicationId = segmentObject.getApplicationId();
int applicationInstanceId = segmentObject.getApplicationInstanceId();
for (TraceSegmentReference reference : segmentObject.getRefsList()) {
notifyRefsListener(reference, applicationId, applicationInstanceId);
notifyRefsListener(reference, applicationId, applicationInstanceId, segmentId);
}
List<SpanObject> spans = segmentObject.getSpansList();
if (CollectionUtils.isNotEmpty(spans)) {
for (SpanObject spanObject : spans) {
if (spanObject.getSpanId() == 0) {
notifyFirstListener(spanObject, applicationId, applicationInstanceId);
notifyFirstListener(spanObject, applicationId, applicationInstanceId, segmentId);
}
if (SpanType.Exit.equals(spanObject.getSpanType())) {
notifyExitListener(spanObject, applicationId, applicationInstanceId);
notifyExitListener(spanObject, applicationId, applicationInstanceId, segmentId);
} else if (SpanType.Entry.equals(spanObject.getSpanType())) {
notifyEntryListener(spanObject, applicationId, applicationInstanceId);
notifyEntryListener(spanObject, applicationId, applicationInstanceId, segmentId);
} else if (SpanType.Local.equals(spanObject.getSpanType())) {
notifyLocalListener(spanObject, applicationId, applicationInstanceId);
notifyLocalListener(spanObject, applicationId, applicationInstanceId, segmentId);
} else {
logger.error("span type error, span type: {}", spanObject.getSpanType().name());
}
......@@ -77,12 +80,7 @@ public class SegmentParse {
}
notifyListenerToBuild();
StringBuilder segmentId = new StringBuilder();
segmentObject.getTraceSegmentId().getIdPartsList().forEach(part -> {
segmentId.append(part);
});
buildSegment(segmentId.toString(), segmentObject.toByteArray());
buildSegment(segmentId, segmentObject.toByteArray());
}
public void buildSegment(String id, byte[] dataBinary) {
......@@ -101,45 +99,57 @@ public class SegmentParse {
private void notifyListenerToBuild() {
spanListeners.forEach(listener -> listener.build());
refsListeners.forEach(listener -> listener.build());
}
private void notifyExitListener(SpanObject spanObject, int applicationId, int applicationInstanceId) {
private void notifyExitListener(SpanObject spanObject, int applicationId, int applicationInstanceId,
String segmentId) {
for (SpanListener listener : spanListeners) {
if (listener instanceof ExitSpanListener) {
((ExitSpanListener)listener).parseExit(spanObject, applicationId, applicationInstanceId);
((ExitSpanListener)listener).parseExit(spanObject, applicationId, applicationInstanceId, segmentId);
}
}
}
private void notifyEntryListener(SpanObject spanObject, int applicationId, int applicationInstanceId) {
private void notifyEntryListener(SpanObject spanObject, int applicationId, int applicationInstanceId,
String segmentId) {
for (SpanListener listener : spanListeners) {
if (listener instanceof EntrySpanListener) {
((EntrySpanListener)listener).parseEntry(spanObject, applicationId, applicationInstanceId);
((EntrySpanListener)listener).parseEntry(spanObject, applicationId, applicationInstanceId, segmentId);
}
}
}
private void notifyLocalListener(SpanObject spanObject, int applicationId, int applicationInstanceId) {
private void notifyLocalListener(SpanObject spanObject, int applicationId, int applicationInstanceId,
String segmentId) {
for (SpanListener listener : spanListeners) {
if (listener instanceof LocalSpanListener) {
((LocalSpanListener)listener).parseLocal(spanObject, applicationId, applicationInstanceId);
((LocalSpanListener)listener).parseLocal(spanObject, applicationId, applicationInstanceId, segmentId);
}
}
}
private void notifyFirstListener(SpanObject spanObject, int applicationId, int applicationInstanceId) {
private void notifyFirstListener(SpanObject spanObject, int applicationId, int applicationInstanceId,
String segmentId) {
for (SpanListener listener : spanListeners) {
if (listener instanceof FirstSpanListener) {
((FirstSpanListener)listener).parseFirst(spanObject, applicationId, applicationInstanceId);
((FirstSpanListener)listener).parseFirst(spanObject, applicationId, applicationInstanceId, segmentId);
}
}
}
private void notifyRefsListener(TraceSegmentReference reference, int applicationId, int applicationInstanceId) {
for (SpanListener listener : refsListeners) {
private void notifyRefsListener(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
String segmentId) {
for (SpanListener listener : spanListeners) {
if (listener instanceof RefsListener) {
((RefsListener)listener).parseRef(reference, applicationId, applicationInstanceId);
((RefsListener)listener).parseRef(reference, applicationId, applicationInstanceId, segmentId);
}
}
}
private void notifyGlobalsListener(UniqueId uniqueId) {
for (SpanListener listener : spanListeners) {
if (listener instanceof GlobalTraceIdsListener) {
((GlobalTraceIdsListener)listener).parseGlobalTraceId(uniqueId);
}
}
}
......
package org.skywalking.apm.collector.agentstream.worker.segment.cost;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.ISegmentCostDAO;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class SegmentCostPersistenceWorker extends PersistenceWorker {
public SegmentCostPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected List<?> prepareBatch(Map<String, Data> dataMap) {
ISegmentCostDAO dao = (ISegmentCostDAO)DAOContainer.INSTANCE.get(ISegmentCostDAO.class.getName());
return dao.prepareBatch(dataMap);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<SegmentCostPersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public SegmentCostPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new SegmentCostPersistenceWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return SegmentCostPersistenceWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
@Override public DataDefine dataDefine() {
return new SegmentCostDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.segment.cost;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.GlobalTraceIdsListener;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostDataDefine;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.UniqueId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class SegmentCostSpanListener implements EntrySpanListener, GlobalTraceIdsListener {
private final Logger logger = LoggerFactory.getLogger(SegmentCostSpanListener.class);
private List<String> globalTraceIds = new ArrayList<>();
private List<SegmentCostDataDefine.SegmentCost> segmentCosts = new ArrayList<>();
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
SegmentCostDataDefine.SegmentCost segmentCost = new SegmentCostDataDefine.SegmentCost();
segmentCost.setCost(spanObject.getEndTime() - spanObject.getStartTime());
segmentCost.setStartTime(spanObject.getStartTime());
segmentCost.setEndTime(spanObject.getEndTime());
segmentCost.setSegmentId(segmentId);
segmentCost.setOperationName(spanObject.getOperationName());
segmentCosts.add(segmentCost);
}
@Override public void parseGlobalTraceId(UniqueId uniqueId) {
StringBuilder globalTraceIdBuilder = new StringBuilder();
uniqueId.getIdPartsList().forEach(globalTraceIdBuilder::append);
globalTraceIds.add(globalTraceIdBuilder.toString());
}
@Override public void build() {
logger.debug("segment cost listener build");
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
for (SegmentCostDataDefine.SegmentCost segmentCost : segmentCosts) {
for (String globalTraceId : globalTraceIds) {
segmentCost.setGlobalTraceId(globalTraceId);
segmentCost.setId(segmentCost.getSegmentId() + globalTraceId);
try {
logger.debug("send to segment cost persistence worker, id: {}", segmentCost.getId());
context.getClusterWorkerContext().lookup(SegmentCostPersistenceWorker.WorkerRole.INSTANCE).tell(segmentCost.transform());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
}
}
}
\ No newline at end of file
package org.skywalking.apm.collector.agentstream.worker.segment.cost.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public interface ISegmentCostDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
}
package org.skywalking.apm.collector.agentstream.worker.segment.cost.dao;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class SegmentCostEsDAO extends EsDAO implements ISegmentCostDAO {
private final Logger logger = LoggerFactory.getLogger(SegmentCostEsDAO.class);
@Override public List<?> prepareBatch(Map<String, Data> dataMap) {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
logger.debug("segment cost prepareBatch, id: {}", id);
Map<String, Object> source = new HashMap();
source.put(SegmentCostTable.COLUMN_SEGMENT_ID, data.getDataString(1));
source.put(SegmentCostTable.COLUMN_GLOBAL_TRACE_ID, data.getDataString(2));
source.put(SegmentCostTable.COLUMN_OPERATION_NAME, data.getDataString(3));
source.put(SegmentCostTable.COLUMN_COST, data.getDataLong(0));
source.put(SegmentCostTable.COLUMN_START_TIME, data.getDataLong(1));
source.put(SegmentCostTable.COLUMN_END_TIME, data.getDataLong(2));
logger.debug("segment cost source: {}", source.toString());
IndexRequestBuilder builder = getClient().prepareIndex(SegmentCostTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
}
}
package org.skywalking.apm.collector.agentstream.worker.segment.cost.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class SegmentCostH2DAO extends H2DAO implements ISegmentCostDAO {
@Override public List<?> prepareBatch(Map map) {
return null;
}
}
package org.skywalking.apm.collector.agentstream.worker.segment.cost.define;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
/**
* @author pengys5
*/
public class SegmentCostDataDefine extends DataDefine {
public static final int DEFINE_ID = 402;
@Override public int defineId() {
return DEFINE_ID;
}
@Override protected int initialCapacity() {
return 7;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(SegmentCostTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(SegmentCostTable.COLUMN_SEGMENT_ID, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(SegmentCostTable.COLUMN_GLOBAL_TRACE_ID, AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute(SegmentCostTable.COLUMN_OPERATION_NAME, AttributeType.STRING, new CoverOperation()));
addAttribute(4, new Attribute(SegmentCostTable.COLUMN_COST, AttributeType.LONG, new CoverOperation()));
addAttribute(5, new Attribute(SegmentCostTable.COLUMN_START_TIME, AttributeType.LONG, new CoverOperation()));
addAttribute(6, new Attribute(SegmentCostTable.COLUMN_END_TIME, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
String id = remoteData.getDataStrings(0);
String segmentId = remoteData.getDataStrings(1);
String globalTraceId = remoteData.getDataStrings(2);
String operationName = remoteData.getDataStrings(3);
Long cost = remoteData.getDataLongs(0);
Long startTime = remoteData.getDataLongs(1);
Long endTime = remoteData.getDataLongs(2);
return new SegmentCost(id, segmentId, globalTraceId, operationName, cost, startTime, endTime);
}
@Override public RemoteData serialize(Object object) {
SegmentCost segmentCost = (SegmentCost)object;
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(segmentCost.getId());
builder.addDataStrings(segmentCost.getSegmentId());
builder.addDataStrings(segmentCost.getGlobalTraceId());
builder.addDataStrings(segmentCost.getOperationName());
builder.addDataLongs(segmentCost.getCost());
builder.addDataLongs(segmentCost.getStartTime());
builder.addDataLongs(segmentCost.getEndTime());
return builder.build();
}
public static class SegmentCost implements TransformToData {
private String id;
private String segmentId;
private String globalTraceId;
private String operationName;
private Long cost;
private Long startTime;
private Long endTime;
public SegmentCost(String id, String segmentId, String globalTraceId, String operationName, Long cost,
Long startTime, Long endTime) {
this.id = id;
this.segmentId = segmentId;
this.globalTraceId = globalTraceId;
this.operationName = operationName;
this.cost = cost;
this.startTime = startTime;
this.endTime = endTime;
}
public SegmentCost() {
}
@Override public Data transform() {
SegmentCostDataDefine define = new SegmentCostDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataString(1, this.segmentId);
data.setDataString(2, this.globalTraceId);
data.setDataString(3, this.operationName);
data.setDataLong(0, this.cost);
data.setDataLong(1, this.startTime);
data.setDataLong(2, this.endTime);
return data;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getSegmentId() {
return segmentId;
}
public void setSegmentId(String segmentId) {
this.segmentId = segmentId;
}
public String getGlobalTraceId() {
return globalTraceId;
}
public void setGlobalTraceId(String globalTraceId) {
this.globalTraceId = globalTraceId;
}
public String getOperationName() {
return operationName;
}
public void setOperationName(String operationName) {
this.operationName = operationName;
}
public Long getCost() {
return cost;
}
public void setCost(Long cost) {
this.cost = cost;
}
public Long getStartTime() {
return startTime;
}
public void setStartTime(Long startTime) {
this.startTime = startTime;
}
public Long getEndTime() {
return endTime;
}
public void setEndTime(Long endTime) {
this.endTime = endTime;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.segment.cost.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class SegmentCostEsTableDefine extends ElasticSearchTableDefine {
public SegmentCostEsTableDefine() {
super(SegmentCostTable.TABLE);
}
@Override public int refreshInterval() {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_SEGMENT_ID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_GLOBAL_TRACE_ID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_OPERATION_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_COST, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_START_TIME, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_END_TIME, ElasticSearchColumnDefine.Type.Long.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.segment.cost.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class SegmentCostH2TableDefine extends H2TableDefine {
public SegmentCostH2TableDefine() {
super(SegmentCostTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_SEGMENT_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_GLOBAL_TRACE_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_OPERATION_NAME, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_COST, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_START_TIME, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_END_TIME, H2ColumnDefine.Type.Bigint.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.segment.cost.define;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
/**
* @author pengys5
*/
public class SegmentCostTable extends CommonTable {
public static final String TABLE = "segment_cost";
public static final String COLUMN_SEGMENT_ID = "segment_id";
public static final String COLUMN_START_TIME = "start_time";
public static final String COLUMN_END_TIME = "end_time";
public static final String COLUMN_GLOBAL_TRACE_ID = "global_trace_id";
public static final String COLUMN_OPERATION_NAME = "operation_name";
public static final String COLUMN_COST = "cost";
}
package org.skywalking.apm.collector.agentstream.worker.segment;
package org.skywalking.apm.collector.agentstream.worker.segment.origin;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.segment.dao.ISegmentDAO;
import org.skywalking.apm.collector.agentstream.worker.segment.define.SegmentDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.origin.dao.ISegmentDAO;
import org.skywalking.apm.collector.agentstream.worker.segment.origin.define.SegmentDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
......
package org.skywalking.apm.collector.agentstream.worker.segment.dao;
package org.skywalking.apm.collector.agentstream.worker.segment.origin.dao;
import java.util.ArrayList;
import java.util.Base64;
......@@ -6,7 +6,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.segment.define.SegmentTable;
import org.skywalking.apm.collector.agentstream.worker.segment.origin.define.SegmentTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.slf4j.Logger;
......
package org.skywalking.apm.collector.agentstream.worker.segment.define;
package org.skywalking.apm.collector.agentstream.worker.segment.origin.define;
import com.google.protobuf.ByteString;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
......
package org.skywalking.apm.collector.agentstream.worker.segment.define;
package org.skywalking.apm.collector.agentstream.worker.segment.origin.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
......
package org.skywalking.apm.collector.agentstream.worker.segment.define;
package org.skywalking.apm.collector.agentstream.worker.segment.origin.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
......
package org.skywalking.apm.collector.agentstream.worker.segment.define;
package org.skywalking.apm.collector.agentstream.worker.segment.origin.define;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
......
......@@ -26,11 +26,13 @@ public class ServiceRefSpanListener implements FirstSpanListener, EntrySpanListe
private List<ServiceTemp> fronts = new ArrayList<>();
private long timeBucket;
@Override public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId) {
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
}
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId) {
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
String segmentId) {
String entryService = String.valueOf(reference.getEntryServiceId());
if (reference.getEntryServiceId() == 0) {
entryService = reference.getEntryServiceName();
......@@ -42,14 +44,16 @@ public class ServiceRefSpanListener implements FirstSpanListener, EntrySpanListe
fronts.add(new ServiceTemp(entryService, parentService));
}
@Override public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId) {
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
front = String.valueOf(spanObject.getOperationNameId());
if (spanObject.getOperationNameId() == 0) {
front = spanObject.getOperationName();
}
}
@Override public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId) {
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String behind = String.valueOf(spanObject.getOperationNameId());
if (spanObject.getOperationNameId() == 0) {
behind = spanObject.getOperationName();
......
package org.skywalking.apm.collector.agentstream.worker.storage;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.framework.Starter;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
......@@ -24,7 +25,7 @@ public class PersistenceTimer implements Starter {
// final long timeInterval = EsConfig.Es.Persistence.Timer.VALUE * 1000;
final long timeInterval = 3 * 1000;
Runnable runnable = () -> {
Thread persistenceThread = new Thread(() -> {
while (true) {
try {
extractDataAndSave();
......@@ -33,23 +34,25 @@ public class PersistenceTimer implements Starter {
logger.error(e.getMessage(), e);
}
}
};
Thread persistenceThread = new Thread(runnable);
});
persistenceThread.setName("timerPersistence");
persistenceThread.start();
}
private void extractDataAndSave() {
List<PersistenceWorker> workers = PersistenceWorkerContainer.INSTANCE.getPersistenceWorkers();
workers.forEach(worker -> {
List batchAllCollection = new ArrayList<>();
workers.forEach((PersistenceWorker worker) -> {
try {
worker.allocateJob(new FlushAndSwitch());
List<?> batchCollection = worker.buildBatchCollection();
IBatchDAO dao = (IBatchDAO)DAOContainer.INSTANCE.get(IBatchDAO.class.getName());
dao.batchPersistence(batchCollection);
batchAllCollection.addAll(batchCollection);
} catch (WorkerException e) {
logger.error(e.getMessage(), e);
}
});
IBatchDAO dao = (IBatchDAO)DAOContainer.INSTANCE.get(IBatchDAO.class.getName());
dao.batchPersistence(batchAllCollection);
}
}
......@@ -4,5 +4,6 @@ org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.Service
org.skywalking.apm.collector.agentstream.worker.node.component.dao.NodeComponentEsDAO
org.skywalking.apm.collector.agentstream.worker.node.mapping.dao.NodeMappingEsDAO
org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao.NodeReferenceEsDAO
org.skywalking.apm.collector.agentstream.worker.segment.dao.SegmentEsDAO
org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao.NodeRefSumEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.segment.origin.dao.SegmentEsDAO
org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao.NodeRefSumEsDAO
org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.SegmentCostEsDAO
\ No newline at end of file
......@@ -4,5 +4,6 @@ org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.Service
org.skywalking.apm.collector.agentstream.worker.node.component.dao.NodeComponentH2DAO
org.skywalking.apm.collector.agentstream.worker.node.mapping.dao.NodeMappingH2DAO
org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao.NodeReferenceH2DAO
org.skywalking.apm.collector.agentstream.worker.segment.dao.SegmentH2DAO
org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao.NodeRefSumH2DAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.segment.origin.dao.SegmentH2DAO
org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao.NodeRefSumH2DAO
org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.SegmentCostH2DAO
\ No newline at end of file
......@@ -10,7 +10,8 @@ org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefPersist
org.skywalking.apm.collector.agentstream.worker.noderef.summary.NodeRefSumAggregationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.summary.NodeRefSumPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.segment.SegmentPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.segment.origin.SegmentPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.segment.cost.SegmentCostPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceRegisterSerialWorker$Factory
......
......@@ -19,5 +19,8 @@ org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceH2Tabl
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameEsTableDefine
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameH2TableDefine
org.skywalking.apm.collector.agentstream.worker.segment.define.SegmentEsTableDefine
org.skywalking.apm.collector.agentstream.worker.segment.define.SegmentH2TableDefine
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.segment.origin.define.SegmentEsTableDefine
org.skywalking.apm.collector.agentstream.worker.segment.origin.define.SegmentH2TableDefine
org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostEsTableDefine
org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostH2TableDefine
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册