提交 419b40a3 编写于 作者: P pengys5

dag graph reached a crossroads

上级 1fd5873a
......@@ -24,7 +24,7 @@ public abstract class AnalysisMember extends AbstractLocalAsyncWorker {
}
@Override
public void onWork(Object message) throws Exception {
final public void onWork(Object message) throws Exception {
if (message instanceof EndOfBatchCommand) {
aggregation();
} else {
......
......@@ -23,7 +23,7 @@ public class CollectorBootStartUp {
CollectorSystem collectorSystem = new CollectorSystem();
collectorSystem.boot();
EsClient.boot();
// IndexCreator.INSTANCE.create();
IndexCreator.INSTANCE.create();
HttpServer.INSTANCE.boot((ClusterWorkerContext) collectorSystem.getClusterContext());
}
}
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import com.a.eye.skywalking.collector.worker.storage.MetricPersistenceData;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public abstract class MetricAnalysisMember extends AnalysisMember {
private Logger logger = LogManager.getFormatterLogger(MetricAnalysisMember.class);
protected MetricPersistenceData persistenceData = new MetricPersistenceData();
private MetricPersistenceData persistenceData = new MetricPersistenceData();
public MetricAnalysisMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
public void setMetric(String id, int second, Long value) throws Exception {
persistenceData.getElseCreate(id).setMetric(second, value);
final protected void setMetric(String id, String column, Long value) throws Exception {
persistenceData.getElseCreate(id).setMetric(column, value);
if (persistenceData.size() >= WorkerConfig.Persistence.Data.size) {
aggregation();
}
}
public MetricData pushOne() {
final public MetricData pushOne() {
if (persistenceData.iterator().hasNext()) {
return persistenceData.pushOne();
}
......
......@@ -26,7 +26,7 @@ public abstract class MetricPersistenceMember extends PersistenceMember {
private Logger logger = LogManager.getFormatterLogger(MetricPersistenceMember.class);
protected MetricPersistenceData persistenceData = new MetricPersistenceData();
private MetricPersistenceData persistenceData = new MetricPersistenceData();
public MetricPersistenceMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
......@@ -60,11 +60,12 @@ public abstract class MetricPersistenceMember extends PersistenceMember {
}
}
public MultiGetResponse searchFromEs() {
private MultiGetResponse searchFromEs() {
Client client = EsClient.getClient();
MultiGetRequestBuilder multiGetRequestBuilder = client.prepareMultiGet();
Iterator<Map.Entry<String, MetricData>> iterator = persistenceData.iterator();
while (iterator.hasNext()) {
multiGetRequestBuilder.add(esIndex(), esType(), iterator.next().getKey());
}
......@@ -73,7 +74,7 @@ public abstract class MetricPersistenceMember extends PersistenceMember {
return multiGetResponse;
}
public boolean saveToEs() {
private boolean saveToEs() {
Client client = EsClient.getClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
logger.debug("persistenceData size: %s", persistenceData.size());
......
......@@ -25,14 +25,14 @@ public abstract class RecordAnalysisMember extends AnalysisMember {
super(role, clusterContext, selfContext);
}
public void setRecord(String id, JsonObject record) throws Exception {
final public void setRecord(String id, JsonObject record) throws Exception {
persistenceData.getElseCreate(id).setRecord(record);
if (persistenceData.size() >= WorkerConfig.Analysis.Data.size) {
aggregation();
}
}
public RecordData pushOne() {
final public RecordData pushOne() {
if (persistenceData.hasNext()) {
return persistenceData.pushOne();
}
......
package com.a.eye.skywalking.collector.worker.application.analysis;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.MetricAnalysisMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.receiver.ResponseCostReceiver;
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class ResponseCostAnalysis extends MetricAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(ResponseCostAnalysis.class);
public ResponseCostAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof Metric) {
Metric metric = (Metric) message;
long cost = metric.endTime - metric.startTime;
if (cost <= 1000 && !metric.isError) {
String id = metric.getMinute() + "-" + metric.code;
setMetric(id, metric.getSecond(), cost);
}
// logger.debug("response cost metric: %s", data.toString());
}
}
@Override
protected void aggregation() throws Exception {
MetricData oneMetric;
while ((oneMetric = pushOne()) != null) {
getClusterContext().lookup(ResponseCostReceiver.Role.INSTANCE).tell(oneMetric);
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ResponseCostAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public ResponseCostAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new ResponseCostAnalysis(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.ResponseCostAnalysis.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return ResponseCostAnalysis.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
public static class Metric extends AbstractTimeSlice {
private final String code;
private final Boolean isError;
private final Long startTime;
private final Long endTime;
public Metric(long minute, long hour, long day, int second, String code, Boolean isError, Long startTime, Long endTime) {
super(minute, hour, day, second);
this.code = code;
this.isError = isError;
this.startTime = startTime;
this.endTime = endTime;
}
}
}
package com.a.eye.skywalking.collector.worker.application.analysis;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.MetricAnalysisMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.receiver.ResponseSummaryReceiver;
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class ResponseSummaryAnalysis extends MetricAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(ResponseSummaryAnalysis.class);
public ResponseSummaryAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof Metric) {
Metric metric = (Metric) message;
String id = metric.getMinute() + "-" + metric.code;
setMetric(id, metric.getSecond(), 1L);
// logger.debug("response summary metric: %s", data.toString());
}
}
@Override
protected void aggregation() throws Exception {
MetricData oneMetric;
while ((oneMetric = pushOne()) != null) {
getClusterContext().lookup(ResponseSummaryReceiver.Role.INSTANCE).tell(oneMetric);
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ResponseSummaryAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public ResponseSummaryAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new ResponseSummaryAnalysis(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.ResponseSummaryAnalysis.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return ResponseSummaryAnalysis.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
public static class Metric extends AbstractTimeSlice {
private final String code;
private final Boolean isError;
public Metric(long minute, long hour, long day, int second, String code, Boolean isError) {
super(minute, hour, day, second);
this.code = code;
this.isError = isError;
}
}
}
package com.a.eye.skywalking.collector.worker.application.receiver;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.persistence.DAGNodePersistence;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class DAGNodeReceiver extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(DAGNodeReceiver.class);
public DAGNodeReceiver(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(DAGNodePersistence.Role.INSTANCE).create(this);
}
@Override
protected void onWork(Object message) throws Exception {
if (message instanceof RecordData) {
getSelfContext().lookup(DAGNodePersistence.Role.INSTANCE).tell(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractClusterWorkerProvider<DAGNodeReceiver> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public DAGNodeReceiver workerInstance(ClusterWorkerContext clusterContext) {
return new DAGNodeReceiver(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int workerNum() {
return WorkerConfig.Worker.DAGNodeReceiver.Num;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return DAGNodeReceiver.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.application.receiver;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.persistence.NodeInstancePersistence;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class NodeInstanceReceiver extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeInstanceReceiver.class);
public NodeInstanceReceiver(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(NodeInstancePersistence.Role.INSTANCE).create(this);
}
@Override
protected void onWork(Object message) throws Exception {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeInstancePersistence.Role.INSTANCE).tell(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractClusterWorkerProvider<NodeInstanceReceiver> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeInstanceReceiver workerInstance(ClusterWorkerContext clusterContext) {
return new NodeInstanceReceiver(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int workerNum() {
return WorkerConfig.Worker.NodeInstanceReceiver.Num;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeInstanceReceiver.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
......@@ -6,6 +6,7 @@ import com.a.eye.skywalking.collector.worker.RecordAnalysisMember;
import com.a.eye.skywalking.collector.worker.node.NodeIndex;
import com.a.eye.skywalking.collector.worker.tools.ClientSpanIsLeafTools;
import com.a.eye.skywalking.collector.worker.tools.CollectionTools;
import com.a.eye.skywalking.collector.worker.tools.SpanPeersTools;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
......@@ -48,7 +49,7 @@ abstract class AbstractNodeAnalysis extends RecordAnalysisMember {
dataJsonObj.addProperty(NodeIndex.Time_Slice, timeSlice);
if (Tags.SPAN_KIND_CLIENT.equals(kind) && ClientSpanIsLeafTools.isLeaf(span.getSpanId(), spanList)) {
code = component + "-" + Tags.PEERS.get(span);
code = component + "[" + SpanPeersTools.getPeers(span) + "]";
dataJsonObj.addProperty(NodeIndex.Code, code);
dataJsonObj.addProperty(NodeIndex.NickName, code);
......@@ -74,7 +75,7 @@ abstract class AbstractNodeAnalysis extends RecordAnalysisMember {
setRecord(id, dataJsonObj);
} else {
for (TraceSegmentRef segmentRef : segment.getRefs()) {
String nickName = component + "-" + segmentRef.getPeerHost();
String nickName = component + "[" + segmentRef.getPeerHost() + "]";
dataJsonObj.addProperty(NodeIndex.NickName, nickName);
String id = timeSlice + "-" + code;
logger.debug("refs node: %s", dataJsonObj.toString());
......
......@@ -34,7 +34,7 @@ abstract class AbstractNodeInstAnalysis extends RecordAnalysisMember {
String kind = Tags.SPAN_KIND.get(span);
String component = Tags.COMPONENT.get(span);
String url = Tags.URL.get(span);
url = UrlTools.parse(url, component);
// url = UrlTools.parse(url, component);
JsonObject dataJsonObj = new JsonObject();
dataJsonObj.addProperty(NodeInstIndex.Code, code);
......
package com.a.eye.skywalking.collector.worker.noderef;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractGet;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractGetProvider;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefResSumSearchWithTimeSlice;
import com.a.eye.skywalking.collector.worker.tools.ParameterTools;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Arrays;
import java.util.Map;
/**
* @author pengys5
*/
public class NodeRefResSumGetWithTimeSlice extends AbstractGet {
private Logger logger = LogManager.getFormatterLogger(NodeRefResSumGetWithTimeSlice.class);
private NodeRefResSumGetWithTimeSlice(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(NodeRefResSumSearchWithTimeSlice.WorkerRole.INSTANCE).create(this);
}
@Override
protected void onSearch(Map<String, String[]> request, JsonObject response) throws Exception {
if (!request.containsKey("timeSliceValue") || !request.containsKey("timeSliceType")) {
throw new IllegalArgumentException("the request parameter must contains timeSliceValue and timeSliceType");
}
logger.debug("timeSliceValue: %s, timeSliceType: %s", Arrays.toString(request.get("timeSliceValue")), Arrays.toString(request.get("timeSliceType")));
long timeSlice;
try {
timeSlice = Long.valueOf(ParameterTools.toString(request, "timeSliceValue"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter timeSliceValue must numeric with long type");
}
NodeRefResSumSearchWithTimeSlice.RequestEntity requestEntity;
requestEntity = new NodeRefResSumSearchWithTimeSlice.RequestEntity(ParameterTools.toString(request, "timeSliceType"), timeSlice);
getSelfContext().lookup(NodeRefResSumSearchWithTimeSlice.WorkerRole.INSTANCE).ask(requestEntity, response);
}
public static class Factory extends AbstractGetProvider<NodeRefResSumGetWithTimeSlice> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeRefResSumGetWithTimeSlice workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefResSumGetWithTimeSlice(role(), clusterContext, new LocalWorkerContext());
}
@Override
public String servletPath() {
return "/nodeRef/resSum/timeSlice";
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeRefResSumGetWithTimeSlice.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.noderef;
import com.a.eye.skywalking.collector.worker.storage.index.AbstractIndex;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
......@@ -13,9 +11,13 @@ import java.io.IOException;
*/
public class NodeRefResSumIndex extends AbstractIndex {
private Logger logger = LogManager.getFormatterLogger(NodeRefResSumIndex.class);
public static final String Index = "node_ref_res_sum_idx";
public static final String OneSecondLess = "oneSecondLess";
public static final String ThreeSecondLess = "threeSecondLess";
public static final String FiveSecondLess = "fiveSecondLess";
public static final String FiveSecondGreater = "fiveSecondGreater";
public static final String Error = "error";
public static final String Summary = "summary";
@Override
public String index() {
......@@ -32,15 +34,31 @@ public class NodeRefResSumIndex extends AbstractIndex {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject("front")
.field("type", "string")
.startObject(OneSecondLess)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject(ThreeSecondLess)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject(FiveSecondLess)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject("behind")
.field("type", "string")
.startObject(FiveSecondGreater)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject(Error)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject(Summary)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject("timeSlice")
.startObject(Time_Slice)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
......
......@@ -27,7 +27,7 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
super(role, clusterContext, selfContext);
}
void analyseNodeRef(TraceSegment segment, long timeSlice) throws Exception {
final void analyseNodeRef(TraceSegment segment, long timeSlice, long minute, long hour, long day, int second) throws Exception {
List<Span> spanList = segment.getSpans();
if (CollectionTools.isNotEmpty(spanList)) {
for (Span span : spanList) {
......@@ -42,13 +42,14 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
String front = segment.getApplicationCode();
dataJsonObj.addProperty(NodeRefIndex.Front, front);
String behind = component + "-" + peers;
String behind = component + "[" + peers + "]";
dataJsonObj.addProperty(NodeRefIndex.Behind, behind);
dataJsonObj.addProperty(NodeRefIndex.BehindIsRealCode, false);
String id = timeSlice + "-" + front + "-" + behind;
logger.debug("dag node ref: %s", dataJsonObj.toString());
setRecord(id, dataJsonObj);
buildNodeRefResRecordData(id, span, minute, hour, day, second);
} else if (Tags.SPAN_KIND_SERVER.equals(Tags.SPAN_KIND.get(span))) {
if (span.getParentSpanId() == -1 && CollectionTools.isEmpty(segment.getRefs())) {
String behind = segment.getApplicationCode();
......@@ -59,10 +60,11 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
String id = timeSlice + "-" + front + "-" + behind;
setRecord(id, dataJsonObj);
buildNodeRefResRecordData(id, span, minute, hour, day, second);
} else if (span.getParentSpanId() == -1 && CollectionTools.isNotEmpty(segment.getRefs())) {
for (TraceSegmentRef segmentRef : segment.getRefs()) {
String front = segmentRef.getApplicationCode();
String behind = component + "-" + segmentRef.getPeerHost();
String behind = component + "[" + segmentRef.getPeerHost() + "]";
String id = timeSlice + "-" + front + "-" + behind;
JsonObject refDataJsonObj = new JsonObject();
......@@ -73,10 +75,22 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
refDataJsonObj.addProperty(NodeRefIndex.Time_Slice, timeSlice);
logger.debug("dag node ref: %s", refDataJsonObj.toString());
setRecord(id, refDataJsonObj);
// buildNodeRefResRecordData(id, span, minute, hour, day, second);
}
}
}
}
}
}
private void buildNodeRefResRecordData(String nodeRefId, Span span, long minute, long hour, long day, int second) throws Exception {
AbstractNodeRefResSumAnalysis.NodeRefResRecord refResRecord = new AbstractNodeRefResSumAnalysis.NodeRefResRecord(minute, hour, day, second);
refResRecord.setStartTime(span.getStartTime());
refResRecord.setEndTime(span.getEndTime());
refResRecord.setNodeRefId(nodeRefId);
refResRecord.setError(Tags.ERROR.get(span));
sendToResSumAnalysis(refResRecord);
}
protected abstract void sendToResSumAnalysis(AbstractNodeRefResSumAnalysis.NodeRefResRecord refResRecord) throws Exception;
}
......@@ -3,39 +3,67 @@ package com.a.eye.skywalking.collector.worker.noderef.analysis;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.worker.MetricAnalysisMember;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumIndex;
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.trace.TraceSegment;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public abstract class AbstractNodeRefResSumAnalysis extends MetricAnalysisMember {
abstract class AbstractNodeRefResSumAnalysis extends MetricAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(AbstractNodeRefResSumAnalysis.class);
public AbstractNodeRefResSumAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
AbstractNodeRefResSumAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
public void analyseResSum(TraceSegment segment, long timeSlice) throws Exception {
// if (message instanceof NodeRefResRecord) {
// Metric metric = (Metric) message;
// String id = metric.getMinute() + "-" + metric.code;
// setMetric(id, metric.getSecond(), 1L);
// logger.debug("response summary metric: %s", data.toString());
// }
final void analyseResSum(NodeRefResRecord nodeRefRes) throws Exception {
long startTime = nodeRefRes.startTime;
long endTime = nodeRefRes.endTime;
boolean isError = nodeRefRes.isError;
long cost = endTime - startTime;
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.OneSecondLess, 0L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.ThreeSecondLess, 0L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.FiveSecondLess, 0L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.FiveSecondGreater, 0L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.Error, 0L);
if (cost <= 1000 && !isError) {
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.OneSecondLess, 1L);
} else if (1000 < cost && cost <= 3000 && !isError) {
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.ThreeSecondLess, 1L);
} else if (3000 < cost && cost <= 5000 && !isError) {
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.FiveSecondLess, 1L);
} else if (5000 < cost && !isError) {
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.FiveSecondGreater, 1L);
} else {
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.Error, 1L);
}
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.Summary, 1L);
}
public static class NodeRefResRecord extends AbstractTimeSlice {
static class NodeRefResRecord extends AbstractTimeSlice {
private String nodeRefId;
private long startTime;
private long endTime;
private Boolean isError;
public NodeRefResRecord(long minute, long hour, long day, int second) {
NodeRefResRecord(long minute, long hour, long day, int second) {
super(minute, hour, day, second);
}
void setNodeRefId(String nodeRefId) {
this.nodeRefId = nodeRefId;
}
void setStartTime(long startTime) {
this.startTime = startTime;
}
void setEndTime(long endTime) {
this.endTime = endTime;
}
void setError(Boolean error) {
isError = error;
}
}
}
......@@ -3,36 +3,49 @@ package com.a.eye.skywalking.collector.worker.noderef.analysis;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefDayAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.trace.TraceSegment;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class NodeRefDayAnalysis extends AbstractNodeRefAnalysis {
private Logger logger = LogManager.getFormatterLogger(NodeRefDayAnalysis.class);
public NodeRefDayAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
private NodeRefDayAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
super.preStart();
getClusterContext().findProvider(NodeRefResSumDayAnalysis.Role.INSTANCE).create(this);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
analyseNodeRef(segment, segmentWithTimeSlice.getDay());
long minute = segmentWithTimeSlice.getMinute();
long hour = segmentWithTimeSlice.getHour();
long day = segmentWithTimeSlice.getDay();
int second = segmentWithTimeSlice.getSecond();
analyseNodeRef(segment, segmentWithTimeSlice.getDay(), minute, hour, day, second);
}
}
@Override
protected void sendToResSumAnalysis(AbstractNodeRefResSumAnalysis.NodeRefResRecord refResRecord) throws Exception {
getSelfContext().lookup(NodeRefResSumDayAnalysis.Role.INSTANCE).tell(refResRecord);
}
@Override
protected void aggregation() throws Exception {
RecordData oneRecord;
......
......@@ -3,36 +3,49 @@ package com.a.eye.skywalking.collector.worker.noderef.analysis;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefHourAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.trace.TraceSegment;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class NodeRefHourAnalysis extends AbstractNodeRefAnalysis {
private Logger logger = LogManager.getFormatterLogger(NodeRefHourAnalysis.class);
public NodeRefHourAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
private NodeRefHourAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
super.preStart();
getClusterContext().findProvider(NodeRefResSumHourAnalysis.Role.INSTANCE).create(this);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
analyseNodeRef(segment, segmentWithTimeSlice.getHour());
long minute = segmentWithTimeSlice.getMinute();
long hour = segmentWithTimeSlice.getHour();
long day = segmentWithTimeSlice.getDay();
int second = segmentWithTimeSlice.getSecond();
analyseNodeRef(segment, segmentWithTimeSlice.getHour(), minute, hour, day, second);
}
}
@Override
protected void sendToResSumAnalysis(AbstractNodeRefResSumAnalysis.NodeRefResRecord refResRecord) throws Exception {
getSelfContext().lookup(NodeRefResSumHourAnalysis.Role.INSTANCE).tell(refResRecord);
}
@Override
protected void aggregation() throws Exception {
RecordData oneRecord;
......
......@@ -3,36 +3,48 @@ package com.a.eye.skywalking.collector.worker.noderef.analysis;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefMinuteAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.trace.TraceSegment;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class NodeRefMinuteAnalysis extends AbstractNodeRefAnalysis {
private Logger logger = LogManager.getFormatterLogger(NodeRefMinuteAnalysis.class);
public NodeRefMinuteAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
private NodeRefMinuteAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
super.preStart();
getClusterContext().findProvider(NodeRefResSumMinuteAnalysis.Role.INSTANCE).create(this);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
analyseNodeRef(segment, segmentWithTimeSlice.getMinute());
long minute = segmentWithTimeSlice.getMinute();
long hour = segmentWithTimeSlice.getHour();
long day = segmentWithTimeSlice.getDay();
int second = segmentWithTimeSlice.getSecond();
analyseNodeRef(segment, segmentWithTimeSlice.getMinute(), minute, hour, day, second);
}
}
@Override
protected void sendToResSumAnalysis(AbstractNodeRefResSumAnalysis.NodeRefResRecord refResRecord) throws Exception {
getSelfContext().lookup(NodeRefResSumMinuteAnalysis.Role.INSTANCE).tell(refResRecord);
}
@Override
protected void aggregation() throws Exception {
RecordData oneRecord;
......@@ -41,6 +53,7 @@ public class NodeRefMinuteAnalysis extends AbstractNodeRefAnalysis {
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefMinuteAnalysis> {
public static Factory INSTANCE = new Factory();
......
package com.a.eye.skywalking.collector.worker.application.persistence;
package com.a.eye.skywalking.collector.worker.noderef.analysis;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefResSumDayAgg;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
/**
* @author pengys5
*/
public class DAGNodePersistence extends RecordPersistenceMember {
public class NodeRefResSumDayAnalysis extends AbstractNodeRefResSumAnalysis {
private Logger logger = LogManager.getFormatterLogger(DAGNodePersistence.class);
public DAGNodePersistence(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
private NodeRefResSumDayAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return "application";
public void analyse(Object message) throws Exception {
if (message instanceof NodeRefResRecord) {
NodeRefResRecord refResRecord = (NodeRefResRecord) message;
analyseResSum(refResRecord);
}
}
@Override
public String esType() {
return "dag_node";
protected void aggregation() throws Exception {
MetricData oneMetric;
while ((oneMetric = pushOne()) != null) {
getClusterContext().lookup(NodeRefResSumDayAgg.Role.INSTANCE).tell(oneMetric);
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<DAGNodePersistence> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefResSumDayAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
......@@ -40,13 +43,13 @@ public class DAGNodePersistence extends RecordPersistenceMember {
}
@Override
public DAGNodePersistence workerInstance(ClusterWorkerContext clusterContext) {
return new DAGNodePersistence(role(), clusterContext, new LocalWorkerContext());
public NodeRefResSumDayAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefResSumDayAnalysis(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.DAGNodePersistence.Size;
return WorkerConfig.Queue.ResponseSummaryAnalysis.Size;
}
}
......@@ -55,7 +58,7 @@ public class DAGNodePersistence extends RecordPersistenceMember {
@Override
public String roleName() {
return DAGNodePersistence.class.getSimpleName();
return NodeRefResSumDayAnalysis.class.getSimpleName();
}
@Override
......
package com.a.eye.skywalking.collector.worker.application.persistence;
package com.a.eye.skywalking.collector.worker.noderef.analysis;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefResSumHourAgg;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
/**
* @author pengys5
*/
public class NodeInstancePersistence extends RecordPersistenceMember {
public class NodeRefResSumHourAnalysis extends AbstractNodeRefResSumAnalysis {
private Logger logger = LogManager.getFormatterLogger(NodeInstancePersistence.class);
public NodeInstancePersistence(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
private NodeRefResSumHourAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return "application";
public void analyse(Object message) throws Exception {
if (message instanceof NodeRefResRecord) {
NodeRefResRecord refResRecord = (NodeRefResRecord) message;
analyseResSum(refResRecord);
}
}
@Override
public String esType() {
return "node_instance";
protected void aggregation() throws Exception {
MetricData oneMetric;
while ((oneMetric = pushOne()) != null) {
getClusterContext().lookup(NodeRefResSumHourAgg.Role.INSTANCE).tell(oneMetric);
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeInstancePersistence> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefResSumHourAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
......@@ -40,13 +43,13 @@ public class NodeInstancePersistence extends RecordPersistenceMember {
}
@Override
public NodeInstancePersistence workerInstance(ClusterWorkerContext clusterContext) {
return new NodeInstancePersistence(role(), clusterContext, new LocalWorkerContext());
public NodeRefResSumHourAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefResSumHourAnalysis(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.NodeInstancePersistence.Size;
return WorkerConfig.Queue.ResponseSummaryAnalysis.Size;
}
}
......@@ -55,7 +58,7 @@ public class NodeInstancePersistence extends RecordPersistenceMember {
@Override
public String roleName() {
return NodeInstancePersistence.class.getSimpleName();
return NodeRefResSumHourAnalysis.class.getSimpleName();
}
@Override
......
......@@ -3,34 +3,27 @@ package com.a.eye.skywalking.collector.worker.noderef.analysis;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.MetricAnalysisMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefResSumMinuteAgg;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class NodeRefResSumMinuteAnalysis extends AbstractNodeRefResSumAnalysis {
private Logger logger = LogManager.getFormatterLogger(NodeRefResSumMinuteAnalysis.class);
public NodeRefResSumMinuteAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
private NodeRefResSumMinuteAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
// if (message instanceof Metric) {
// Metric metric = (Metric) message;
// String id = metric.getMinute() + "-" + metric.code;
// setMetric(id, metric.getSecond(), 1L);
// logger.debug("response summary metric: %s", data.toString());
// }
if (message instanceof NodeRefResRecord) {
NodeRefResRecord refResRecord = (NodeRefResRecord) message;
analyseResSum(refResRecord);
}
}
@Override
......@@ -70,7 +63,7 @@ public class NodeRefResSumMinuteAnalysis extends AbstractNodeRefResSumAnalysis {
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.application.receiver;
package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.persistence.ResponseCostPersistence;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -12,29 +11,29 @@ import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class ResponseCostReceiver extends AbstractClusterWorker {
public class NodeRefResSumDayAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(ResponseCostReceiver.class);
private Logger logger = LogManager.getFormatterLogger(NodeRefResSumDayAgg.class);
public ResponseCostReceiver(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
private NodeRefResSumDayAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(ResponseCostPersistence.Role.INSTANCE).create(this);
getClusterContext().findProvider(NodeRefResSumDaySave.Role.INSTANCE).create(this);
}
@Override
protected void onWork(Object message) throws Exception {
if (message instanceof MetricData) {
getSelfContext().lookup(ResponseCostPersistence.Role.INSTANCE).tell(message);
getSelfContext().lookup(NodeRefResSumDaySave.Role.INSTANCE).tell(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractClusterWorkerProvider<ResponseCostReceiver> {
public static class Factory extends AbstractClusterWorkerProvider<NodeRefResSumDayAgg> {
public static Factory INSTANCE = new Factory();
@Override
......@@ -43,13 +42,13 @@ public class ResponseCostReceiver extends AbstractClusterWorker {
}
@Override
public ResponseCostReceiver workerInstance(ClusterWorkerContext clusterContext) {
return new ResponseCostReceiver(role(), clusterContext, new LocalWorkerContext());
public NodeRefResSumDayAgg workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefResSumDayAgg(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int workerNum() {
return WorkerConfig.Worker.ResponseCostReceiver.Num;
return WorkerConfig.Worker.ResponseSummaryReceiver.Num;
}
}
......@@ -58,12 +57,12 @@ public class ResponseCostReceiver extends AbstractClusterWorker {
@Override
public String roleName() {
return ResponseCostReceiver.class.getSimpleName();
return NodeRefResSumDayAgg.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.MetricPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumIndex;
/**
* @author pengys5
*/
public class ResponseSummaryPersistence extends MetricPersistenceMember {
public class NodeRefResSumDaySave extends MetricPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(ResponseSummaryPersistence.class);
public ResponseSummaryPersistence(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
private NodeRefResSumDaySave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return "application_metric";
return NodeRefResSumIndex.Index;
}
@Override
public String esType() {
return "response_summary";
return NodeRefResSumIndex.Type_Day;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ResponseSummaryPersistence> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefResSumDaySave> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return null;
return Role.INSTANCE;
}
@Override
public ResponseSummaryPersistence workerInstance(ClusterWorkerContext clusterContext) {
return new ResponseSummaryPersistence(role(), clusterContext, new LocalWorkerContext());
public NodeRefResSumDaySave workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefResSumDaySave(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -55,12 +52,12 @@ public class ResponseSummaryPersistence extends MetricPersistenceMember {
@Override
public String roleName() {
return ResponseSummaryPersistence.class.getSimpleName();
return NodeRefResSumDaySave.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.application.receiver;
package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.persistence.ResponseSummaryPersistence;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -12,39 +11,39 @@ import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class ResponseSummaryReceiver extends AbstractClusterWorker {
public class NodeRefResSumHourAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(ResponseSummaryReceiver.class);
private Logger logger = LogManager.getFormatterLogger(NodeRefResSumHourAgg.class);
public ResponseSummaryReceiver(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
private NodeRefResSumHourAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(ResponseSummaryPersistence.Role.INSTANCE).create(this);
getClusterContext().findProvider(NodeRefResSumHourSave.Role.INSTANCE).create(this);
}
@Override
protected void onWork(Object message) throws Exception {
if (message instanceof MetricData) {
getSelfContext().lookup(ResponseSummaryPersistence.Role.INSTANCE).tell(message);
getSelfContext().lookup(NodeRefResSumHourSave.Role.INSTANCE).tell(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractClusterWorkerProvider<ResponseSummaryReceiver> {
public static class Factory extends AbstractClusterWorkerProvider<NodeRefResSumHourAgg> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return null;
return Role.INSTANCE;
}
@Override
public ResponseSummaryReceiver workerInstance(ClusterWorkerContext clusterContext) {
return new ResponseSummaryReceiver(role(), clusterContext, new LocalWorkerContext());
public NodeRefResSumHourAgg workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefResSumHourAgg(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -58,12 +57,12 @@ public class ResponseSummaryReceiver extends AbstractClusterWorker {
@Override
public String roleName() {
return ResponseSummaryReceiver.class.getSimpleName();
return NodeRefResSumHourAgg.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.MetricPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumIndex;
/**
* @author pengys5
*/
public class ResponseCostPersistence extends MetricPersistenceMember {
public class NodeRefResSumHourSave extends MetricPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(ResponseCostPersistence.class);
public ResponseCostPersistence(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
private NodeRefResSumHourSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return "application_metric";
return NodeRefResSumIndex.Index;
}
@Override
public String esType() {
return "response_cost";
return NodeRefResSumIndex.Type_Hour;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ResponseCostPersistence> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefResSumHourSave> {
public static Factory INSTANCE = new Factory();
@Override
......@@ -40,13 +37,13 @@ public class ResponseCostPersistence extends MetricPersistenceMember {
}
@Override
public ResponseCostPersistence workerInstance(ClusterWorkerContext clusterContext) {
return new ResponseCostPersistence(role(), clusterContext, new LocalWorkerContext());
public NodeRefResSumHourSave workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefResSumHourSave(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.ResponseCostPersistence.Size;
return WorkerConfig.Queue.Persistence.ResponseSummaryPersistence.Size;
}
}
......@@ -55,12 +52,12 @@ public class ResponseCostPersistence extends MetricPersistenceMember {
@Override
public String roleName() {
return ResponseCostPersistence.class.getSimpleName();
return NodeRefResSumHourSave.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
return new HashCodeSelector();
}
}
}
......@@ -15,7 +15,7 @@ public class NodeRefResSumMinuteAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefResSumMinuteAgg.class);
public NodeRefResSumMinuteAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
private NodeRefResSumMinuteAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -38,7 +38,7 @@ public class NodeRefResSumMinuteAgg extends AbstractClusterWorker {
@Override
public Role role() {
return null;
return Role.INSTANCE;
}
@Override
......
......@@ -14,7 +14,7 @@ import com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumIndex;
*/
public class NodeRefResSumMinuteSave extends MetricPersistenceMember {
public NodeRefResSumMinuteSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
private NodeRefResSumMinuteSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -33,7 +33,7 @@ public class NodeRefResSumMinuteSave extends MetricPersistenceMember {
@Override
public Role role() {
return null;
return Role.INSTANCE;
}
@Override
......
package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.TimeSlice;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumIndex;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
/**
* @author pengys5
*/
public class NodeRefResSumSearchWithTimeSlice extends AbstractLocalSyncWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefResSumSearchWithTimeSlice.class);
private NodeRefResSumSearchWithTimeSlice(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void onWork(Object request, Object response) throws Exception {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeRefResSumIndex.Index);
searchRequestBuilder.setTypes(search.getSliceType());
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.termQuery(NodeRefResSumIndex.Time_Slice, search.getTimeSlice()));
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
SearchHit[] hits = searchResponse.getHits().getHits();
logger.debug("node reference list size: %s", hits.length);
JsonArray nodeRefResSumArray = new JsonArray();
for (SearchHit hit : searchResponse.getHits().getHits()) {
JsonObject nodeRefResSumObj = new JsonObject();
String id = hit.getId();
String[] ids = id.split("-");
String front = ids[1];
String behind = ids[2];
nodeRefResSumObj.addProperty("front", front);
nodeRefResSumObj.addProperty("behind", behind);
nodeRefResSumObj.addProperty(NodeRefResSumIndex.OneSecondLess, (Number) hit.getSource().get(NodeRefResSumIndex.OneSecondLess));
nodeRefResSumObj.addProperty(NodeRefResSumIndex.ThreeSecondLess, (Number) hit.getSource().get(NodeRefResSumIndex.ThreeSecondLess));
nodeRefResSumObj.addProperty(NodeRefResSumIndex.FiveSecondLess, (Number) hit.getSource().get(NodeRefResSumIndex.FiveSecondLess));
nodeRefResSumObj.addProperty(NodeRefResSumIndex.FiveSecondGreater, (Number) hit.getSource().get(NodeRefResSumIndex.FiveSecondGreater));
nodeRefResSumObj.addProperty(NodeRefResSumIndex.Error, (Number) hit.getSource().get(NodeRefResSumIndex.Error));
nodeRefResSumObj.addProperty(NodeRefResSumIndex.Summary, (Number) hit.getSource().get(NodeRefResSumIndex.Summary));
nodeRefResSumObj.addProperty(NodeRefResSumIndex.Time_Slice, (Long) hit.getSource().get(NodeRefResSumIndex.Time_Slice));
nodeRefResSumArray.add(nodeRefResSumObj);
}
JsonObject resJsonObj = (JsonObject) response;
resJsonObj.add("result", nodeRefResSumArray);
} else {
throw new IllegalArgumentException("message instance must be RequestEntity");
}
}
public static class RequestEntity extends TimeSlice {
public RequestEntity(String sliceType, long timeSlice) {
super(sliceType, timeSlice);
}
}
public static class Factory extends AbstractLocalSyncWorkerProvider<NodeRefResSumSearchWithTimeSlice> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeRefResSumSearchWithTimeSlice workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefResSumSearchWithTimeSlice(role(), clusterContext, new LocalWorkerContext());
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeRefResSumSearchWithTimeSlice.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
......@@ -21,12 +21,15 @@ import com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefMinuteAnaly
import com.a.eye.skywalking.collector.worker.segment.persistence.SegmentSave;
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.tools.DateTools;
import com.a.eye.skywalking.trace.SegmentsMessage;
import com.a.eye.skywalking.trace.TraceSegment;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
/**
* @author pengys5
*/
......@@ -59,28 +62,37 @@ public class SegmentPost extends AbstractPost {
@Override
protected void onReceive(String reqJsonStr) throws Exception {
TraceSegment newSegment = gson.fromJson(reqJsonStr, TraceSegment.class);
validateData(newSegment);
logger.debug("receive message instanceof TraceSegment, traceSegmentId is %s", newSegment.getTraceSegmentId());
long minuteSlice = DateTools.getMinuteSlice(newSegment.getStartTime());
long hourSlice = DateTools.getHourSlice(newSegment.getStartTime());
long daySlice = DateTools.getDaySlice(newSegment.getStartTime());
int second = DateTools.getSecond(newSegment.getStartTime());
SegmentWithTimeSlice segmentWithTimeSlice = new SegmentWithTimeSlice(newSegment, minuteSlice, hourSlice, daySlice, second);
tellSegmentSave(reqJsonStr, daySlice, hourSlice, minuteSlice);
tellNodeRef(segmentWithTimeSlice);
tellNode(segmentWithTimeSlice);
tellNodeInst(segmentWithTimeSlice);
SegmentsMessage segmentsMessage = gson.fromJson(reqJsonStr, SegmentsMessage.class);
List<TraceSegment> segmentList = segmentsMessage.getSegments();
for (TraceSegment newSegment : segmentList) {
try {
validateData(newSegment);
} catch (IllegalArgumentException e) {
continue;
}
logger.debug("receive message instanceof TraceSegment, traceSegmentId is %s", newSegment.getTraceSegmentId());
long minuteSlice = DateTools.getMinuteSlice(newSegment.getStartTime());
long hourSlice = DateTools.getHourSlice(newSegment.getStartTime());
long daySlice = DateTools.getDaySlice(newSegment.getStartTime());
int second = DateTools.getSecond(newSegment.getStartTime());
SegmentWithTimeSlice segmentWithTimeSlice = new SegmentWithTimeSlice(newSegment, minuteSlice, hourSlice, daySlice, second);
String newSegmentJsonStr = gson.toJson(newSegment);
tellSegmentSave(newSegmentJsonStr, daySlice, hourSlice, minuteSlice);
tellNodeRef(segmentWithTimeSlice);
tellNode(segmentWithTimeSlice);
tellNodeInst(segmentWithTimeSlice);
}
}
private void tellSegmentSave(String reqJsonStr, long day, long hour, long minute) throws Exception {
JsonObject newSegmentJson = gson.fromJson(reqJsonStr, JsonObject.class);
private void tellSegmentSave(String newSegmentJsonStr, long day, long hour, long minute) throws Exception {
JsonObject newSegmentJson = gson.fromJson(newSegmentJsonStr, JsonObject.class);
newSegmentJson.addProperty("minute", minute);
// newSegmentJson.addProperty("hour", hour);
// newSegmentJson.addProperty("day", day);
newSegmentJson.addProperty("hour", hour);
newSegmentJson.addProperty("day", day);
getSelfContext().lookup(SegmentSave.Role.INSTANCE).tell(newSegmentJson);
}
......
......@@ -3,6 +3,8 @@ package com.a.eye.skywalking.collector.worker.storage;
import com.a.eye.skywalking.collector.worker.node.NodeIndex;
import com.a.eye.skywalking.collector.worker.nodeinst.NodeInstIndex;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumIndex;
import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
/**
* @author pengys5
......@@ -11,6 +13,9 @@ public enum IndexCreator {
INSTANCE;
public void create() {
SegmentIndex segmentIndex = new SegmentIndex();
segmentIndex.deleteIndex();
NodeIndex nodeIndex = new NodeIndex();
nodeIndex.deleteIndex();
nodeIndex.createIndex();
......@@ -22,5 +27,9 @@ public enum IndexCreator {
NodeRefIndex nodeRefIndex = new NodeRefIndex();
nodeRefIndex.deleteIndex();
nodeRefIndex.createIndex();
NodeRefResSumIndex nodeRefResSumIndex = new NodeRefResSumIndex();
nodeRefResSumIndex.deleteIndex();
nodeRefResSumIndex.createIndex();
}
}
package com.a.eye.skywalking.collector.worker.storage;
import com.a.eye.skywalking.collector.actor.selector.AbstractHashMessage;
import com.a.eye.skywalking.collector.worker.storage.index.AbstractIndex;
import java.util.HashMap;
import java.util.Map;
......@@ -10,97 +11,41 @@ import java.util.Map;
*/
public class MetricData extends AbstractHashMessage {
public MetricData(String key) {
super(key);
this.id = key;
}
private String id;
private Map<String, Long> value;
private static final String s10 = "s10";
private static final String s20 = "s20";
private static final String s30 = "s30";
private static final String s40 = "s40";
private static final String s50 = "s50";
private static final String s60 = "s60";
private Long s10Value = 0L;
private Long s20Value = 0L;
private Long s30Value = 0L;
private Long s40Value = 0L;
private Long s50Value = 0L;
private Long s60Value = 0L;
public MetricData(String id) {
super(id);
this.id = id;
value = new HashMap<>();
value.put(AbstractIndex.Time_Slice, Long.valueOf(id.split("-")[0]));
}
public void setMetric(int second, Long value) {
if (second <= 10) {
s10Value += value;
} else if (second > 10 && second <= 20) {
s20Value += value;
} else if (second > 20 && second <= 30) {
s30Value += value;
} else if (second > 30 && second <= 40) {
s40Value += value;
} else if (second > 40 && second <= 50) {
s50Value += value;
} else {
s60Value += value;
public void setMetric(String column, Long value) {
long valueAdd = value;
if (this.value.containsKey(column) && !AbstractIndex.Time_Slice.equals(column)) {
valueAdd += this.value.get(column);
}
this.value.put(column, valueAdd);
}
public void merge(MetricData metricData) {
s10Value += metricData.s10Value;
s20Value += metricData.s20Value;
s30Value += metricData.s30Value;
s40Value += metricData.s40Value;
s50Value += metricData.s50Value;
s60Value += metricData.s60Value;
for (Map.Entry<String, Long> entry : metricData.value.entrySet()) {
setMetric(entry.getKey(), entry.getValue());
}
}
public void merge(Map<String, Object> dbData) {
s10Value += Long.valueOf(dbData.get(s10).toString());
s20Value += Long.valueOf(dbData.get(s20).toString());
s30Value += Long.valueOf(dbData.get(s30).toString());
s40Value += Long.valueOf(dbData.get(s40).toString());
s50Value += Long.valueOf(dbData.get(s50).toString());
s60Value += Long.valueOf(dbData.get(s60).toString());
for (Map.Entry<String, Object> entry : dbData.entrySet()) {
setMetric(entry.getKey(), (Long) entry.getValue());
}
}
public Map<String, Long> toMap() {
Map<String, Long> map = new HashMap<>();
map.put(s10, s10Value);
map.put(s20, s20Value);
map.put(s30, s30Value);
map.put(s40, s40Value);
map.put(s50, s50Value);
map.put(s60, s60Value);
return map;
return value;
}
public String getId() {
return id;
}
protected Long getS10Value() {
return s10Value;
}
protected Long getS20Value() {
return s20Value;
}
protected Long getS30Value() {
return s30Value;
}
protected Long getS40Value() {
return s40Value;
}
protected Long getS50Value() {
return s50Value;
}
protected Long getS60Value() {
return s60Value;
}
}
......@@ -4,7 +4,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Spliterator;
import java.util.function.Consumer;
/**
* @author pengys5
......@@ -34,11 +33,6 @@ public class MetricPersistenceData implements Iterable {
return one;
}
@Override
public void forEach(Consumer action) {
throw new UnsupportedOperationException("forEach");
}
@Override
public Spliterator spliterator() {
throw new UnsupportedOperationException("spliterator");
......
package com.a.eye.skywalking.collector.worker.tools;
import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.tag.Tags;
/**
* @author pengys5
*/
public class SpanPeersTools {
public static String getPeers(Span span) {
if (StringUtil.isEmpty(Tags.PEERS.get(span))) {
String host = Tags.PEER_HOST.get(span);
int port = Tags.PEER_PORT.get(span);
return host + ":" + port;
} else {
return Tags.PEERS.get(span);
}
}
}
......@@ -2,6 +2,10 @@ com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefDayAgg$Factory
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefHourAgg$Factory
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefMinuteAgg$Factory
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefResSumDayAgg$Factory
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefResSumHourAgg$Factory
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefResSumMinuteAgg$Factory
com.a.eye.skywalking.collector.worker.node.persistence.NodeDayAgg$Factory
com.a.eye.skywalking.collector.worker.node.persistence.NodeHourAgg$Factory
com.a.eye.skywalking.collector.worker.node.persistence.NodeMinuteAgg$Factory
......
......@@ -4,10 +4,19 @@ com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefDayAnalysis$Factor
com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefHourAnalysis$Factory
com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefMinuteAnalysis$Factory
com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefResSumDayAnalysis$Factory
com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefResSumHourAnalysis$Factory
com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefResSumMinuteAnalysis$Factory
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefDaySave$Factory
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefHourSave$Factory
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefMinuteSave$Factory
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefSearchWithTimeSlice$Factory
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefResSumSearchWithTimeSlice$Factory
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefResSumDaySave$Factory
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefResSumHourSave$Factory
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefResSumMinuteSave$Factory
com.a.eye.skywalking.collector.worker.node.analysis.NodeDayAnalysis$Factory
com.a.eye.skywalking.collector.worker.node.analysis.NodeHourAnalysis$Factory
......
com.a.eye.skywalking.collector.worker.node.NodeGetWithTimeSlice$Factory
com.a.eye.skywalking.collector.worker.nodeinst.NodeInstGetWithTimeSlice$Factory
com.a.eye.skywalking.collector.worker.nodeinst.NodeInstSummaryGetWithTimeSlice$Factory
com.a.eye.skywalking.collector.worker.noderef.NodeRefGetWithTimeSlice$Factory
\ No newline at end of file
com.a.eye.skywalking.collector.worker.noderef.NodeRefGetWithTimeSlice$Factory
com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumGetWithTimeSlice$Factory
\ No newline at end of file
package com.a.eye.skywalking.collector.worker.segment;
import com.a.eye.skywalking.collector.worker.tools.HttpClientTools;
import com.a.eye.skywalking.trace.SegmentsMessage;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
......@@ -21,6 +22,8 @@ public class SegmentPostTestCase {
public void testPostSegment() throws Exception {
Gson gson = new GsonBuilder().excludeFieldsWithoutExposeAnnotation().create();
SegmentsMessage segmentsMessage = new SegmentsMessage();
TraceSegment webSegment = new TraceSegment("WebApplication");
Span span = new Span(0, "/Web/GetUser", new Date().getTime());
......@@ -48,8 +51,9 @@ public class SegmentPostTestCase {
webSegment.finish();
Thread.sleep(300);
String webJsonStr = gson.toJson(webSegment);
HttpClientTools.INSTANCE.post("http://localhost:7001/segment", webJsonStr);
segmentsMessage.append(webSegment);
// String webJsonStr = gson.toJson(webSegment);
// HttpClientTools.INSTANCE.post("http://localhost:7001/segment", webJsonStr);
TraceSegment motanSegment = new TraceSegment("MotanServiceApplication");
......@@ -82,15 +86,16 @@ public class SegmentPostTestCase {
motanSegment.finish();
Thread.sleep(300);
String motanJsonStr = gson.toJson(motanSegment);
HttpClientTools.INSTANCE.post("http://localhost:7001/segment", motanJsonStr);
segmentsMessage.append(motanSegment);
String segmentJsonStr = gson.toJson(segmentsMessage);
HttpClientTools.INSTANCE.post("http://localhost:7001/segment", segmentJsonStr);
}
@Test
public void testPostSample1Segment() throws Exception {
HttpClientTools.INSTANCE.post("http://localhost:7001/segment", sample1);
// HttpClientTools.INSTANCE.post("http://localhost:7001/segment", sample2);
// HttpClientTools.INSTANCE.post("http://localhost:7001/segment", sample3);
HttpClientTools.INSTANCE.post("http://localhost:7001/segment", sample2);
HttpClientTools.INSTANCE.post("http://localhost:7001/segment", sample3);
}
private String sample1 = "[{\"ts\":\"Segment.1490064072962.-2099929254.16777.68.1\",\"st\":1490064072962,\"et\":1490064073000,\"ss\":[{\"si\":2,\"ps\":1,\"st\":1490064072965,\"et\":1490064072970,\"on\":\"com.a.eye.skywalking.test.cache.CacheService.findCache(java.lang.String)\",\"ta\":{\"span.layer\":\"rpc\",\"component\":\"Motan\",\"span.kind\":\"client\",\"peer.host\":\"127.0.0.1\",\"peer.port\":8002,\"url\":\"motan://127.0.0.1:8002/default_rpc/com.a.eye.skywalking.test.cache.CacheService/1.0/referer\"},\"lo\":[]},{\"si\":1,\"ps\":0,\"st\":0,\"et\":1490064072970,\"on\":\"Motan_default_rpc_com.a.eye.skywalking.test.cache.CacheService.findCache(java.lang.String)\",\"ta\":{\"requestId\":1562445425373347845},\"lo\":[]},{\"si\":4,\"ps\":3,\"st\":1490064072970,\"et\":1490064072991,\"on\":\"/persistence/query\",\"ta\":{\"span.layer\":\"http\",\"component\":\"HttpClient\",\"status_code\":200,\"span.kind\":\"client\",\"peer.host\":\"10.128.35.80\",\"peer.port\":20880,\"url\":\"http://10.128.35.80:20880/persistence/query\"},\"lo\":[]},{\"si\":3,\"ps\":0,\"st\":1490064072970,\"et\":1490064072993,\"on\":\"com.a.eye.skywalking.test.persistence.PersistenceService.query(String)\",\"ta\":{\"span.layer\":\"rpc\",\"component\":\"Dubbo\",\"span.kind\":\"client\",\"peer.host\":\"10.128.35.80\",\"peer.port\":20880,\"url\":\"rest://10.128.35.80:20880/com.a.eye.skywalking.test.persistence.PersistenceService.query(String)\"},\"lo\":[]},{\"si\":6,\"ps\":5,\"st\":1490064072994,\"et\":1490064072997,\"on\":\"com.a.eye.skywalking.test.cache.CacheService.updateCache(java.lang.String,java.lang.String)\",\"ta\":{\"span.layer\":\"rpc\",\"component\":\"Motan\",\"span.kind\":\"client\",\"peer.host\":\"127.0.0.1\",\"peer.port\":8002,\"url\":\"motan://127.0.0.1:8002/default_rpc/com.a.eye.skywalking.test.cache.CacheService/1.0/referer\"},\"lo\":[]},{\"si\":5,\"ps\":0,\"st\":0,\"et\":1490064072997,\"on\":\"Motan_default_rpc_com.a.eye.skywalking.test.cache.CacheService.updateCache(java.lang.String,java.lang.String)\",\"ta\":{\"requestId\":1562445425402707974},\"lo\":[]},{\"si\":0,\"ps\":-1,\"st\":1490064072963,\"et\":1490064073000,\"on\":\"/portal/\",\"ta\":{\"span.layer\":\"http\",\"component\":\"Tomcat\",\"status_code\":200,\"span.kind\":\"server\",\"peer.host\":\"0:0:0:0:0:0:0:1\",\"peer.port\":51735,\"url\":\"http://localhost:8080/portal/\"},\"lo\":[]}],\"ac\":\"portal-service\",\"gt\":[\"Trace.1490064072962.-2099929254.16777.68.2\"]}]";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册