提交 17ba2edb 编写于 作者: P pengys5

dag graph finish

上级 c0650800
...@@ -20,7 +20,7 @@ public abstract class AnalysisMember extends AbstractLocalAsyncWorker { ...@@ -20,7 +20,7 @@ public abstract class AnalysisMember extends AbstractLocalAsyncWorker {
@Override @Override
public void preStart() throws ProviderNotFoundException { public void preStart() throws ProviderNotFoundException {
} }
@Override @Override
...@@ -28,7 +28,11 @@ public abstract class AnalysisMember extends AbstractLocalAsyncWorker { ...@@ -28,7 +28,11 @@ public abstract class AnalysisMember extends AbstractLocalAsyncWorker {
if (message instanceof EndOfBatchCommand) { if (message instanceof EndOfBatchCommand) {
aggregation(); aggregation();
} else { } else {
analyse(message); try {
analyse(message);
} catch (Exception e) {
e.printStackTrace();
}
} }
} }
......
package com.a.eye.skywalking.collector.worker;
/**
* @author pengys5
*/
public class Const {
public static final String ID_SPLIT = "..-..";
public static final String IDS_SPLIT = "\\.\\.-\\.\\.";
public static final String PEERS_FRONT_SPLIT = "[";
public static final String PEERS_BEHIND_SPLIT = "]";
public static final String USER_CODE = "User";
}
...@@ -4,19 +4,25 @@ package com.a.eye.skywalking.collector.worker; ...@@ -4,19 +4,25 @@ package com.a.eye.skywalking.collector.worker;
* @author pengys5 * @author pengys5
*/ */
public abstract class TimeSlice { public abstract class TimeSlice {
private long timeSlice;
private String sliceType; private String sliceType;
private long startTime;
private long endTime;
public TimeSlice(String sliceType, long timeSlice) { public TimeSlice(String sliceType,long startTime, long endTime) {
this.timeSlice = timeSlice; this.startTime = startTime;
this.endTime = endTime;
this.sliceType = sliceType; this.sliceType = sliceType;
} }
public long getTimeSlice() {
return timeSlice;
}
public String getSliceType() { public String getSliceType() {
return sliceType; return sliceType;
} }
public long getStartTime() {
return startTime;
}
public long getEndTime() {
return endTime;
}
} }
...@@ -35,20 +35,28 @@ public class NodeGetWithTimeSlice extends AbstractGet { ...@@ -35,20 +35,28 @@ public class NodeGetWithTimeSlice extends AbstractGet {
@Override @Override
protected void onSearch(Map<String, String[]> request, JsonObject response) throws Exception { protected void onSearch(Map<String, String[]> request, JsonObject response) throws Exception {
if (!request.containsKey("timeSliceValue") || !request.containsKey("timeSliceType")) { if (!request.containsKey("startTime") || !request.containsKey("endTime") || !request.containsKey("timeSliceType")) {
throw new IllegalArgumentException("the request parameter must contains timeSliceValue and timeSliceType"); throw new IllegalArgumentException("the request parameter must contains startTime,endTime,timeSliceType");
} }
logger.debug("timeSliceValue: %s, timeSliceType: %s", Arrays.toString(request.get("timeSliceValue")), Arrays.toString(request.get("timeSliceType"))); logger.debug("startTime: %s, endTime: %s, timeSliceType: %s", Arrays.toString(request.get("startTime")),
Arrays.toString(request.get("endTime")), Arrays.toString(request.get("timeSliceType")));
long timeSlice; long startTime;
try { try {
timeSlice = Long.valueOf(ParameterTools.toString(request, "timeSliceValue")); startTime = Long.valueOf(ParameterTools.toString(request, "startTime"));
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter timeSliceValue must numeric with long type"); throw new IllegalArgumentException("the request parameter startTime must numeric with long type");
}
long endTime;
try {
endTime = Long.valueOf(ParameterTools.toString(request, "endTime"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter endTime must numeric with long type");
} }
NodeSearchWithTimeSlice.RequestEntity requestEntity; NodeSearchWithTimeSlice.RequestEntity requestEntity;
requestEntity = new NodeSearchWithTimeSlice.RequestEntity(ParameterTools.toString(request, "timeSliceType"), timeSlice); requestEntity = new NodeSearchWithTimeSlice.RequestEntity(ParameterTools.toString(request, "timeSliceType"), startTime, endTime);
getSelfContext().lookup(NodeSearchWithTimeSlice.WorkerRole.INSTANCE).ask(requestEntity, response); getSelfContext().lookup(NodeSearchWithTimeSlice.WorkerRole.INSTANCE).ask(requestEntity, response);
} }
......
...@@ -54,6 +54,10 @@ public class NodeIndex extends AbstractIndex { ...@@ -54,6 +54,10 @@ public class NodeIndex extends AbstractIndex {
.field("type", "string") .field("type", "string")
.field("index", "not_analyzed") .field("index", "not_analyzed")
.endObject() .endObject()
.startObject(AGG_COLUMN)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(Time_Slice) .startObject(Time_Slice)
.field("type", "long") .field("type", "long")
.field("index", "not_analyzed") .field("index", "not_analyzed")
......
...@@ -2,6 +2,7 @@ package com.a.eye.skywalking.collector.worker.node.analysis; ...@@ -2,6 +2,7 @@ package com.a.eye.skywalking.collector.worker.node.analysis;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext; import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext; import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.worker.Const;
import com.a.eye.skywalking.collector.worker.RecordAnalysisMember; import com.a.eye.skywalking.collector.worker.RecordAnalysisMember;
import com.a.eye.skywalking.collector.worker.node.NodeIndex; import com.a.eye.skywalking.collector.worker.node.NodeIndex;
import com.a.eye.skywalking.collector.worker.tools.ClientSpanIsLeafTools; import com.a.eye.skywalking.collector.worker.tools.ClientSpanIsLeafTools;
...@@ -30,8 +31,10 @@ abstract class AbstractNodeAnalysis extends RecordAnalysisMember { ...@@ -30,8 +31,10 @@ abstract class AbstractNodeAnalysis extends RecordAnalysisMember {
void analyseSpans(TraceSegment segment, long timeSlice) throws Exception { void analyseSpans(TraceSegment segment, long timeSlice) throws Exception {
List<Span> spanList = segment.getSpans(); List<Span> spanList = segment.getSpans();
logger.debug("node analysis span isNotEmpty %s", CollectionTools.isNotEmpty(spanList));
if (CollectionTools.isNotEmpty(spanList)) { if (CollectionTools.isNotEmpty(spanList)) {
logger.debug("node analysis span list size: %s", spanList.size());
for (Span span : spanList) { for (Span span : spanList) {
JsonObject dataJsonObj = new JsonObject(); JsonObject dataJsonObj = new JsonObject();
String kind = Tags.SPAN_KIND.get(span); String kind = Tags.SPAN_KIND.get(span);
...@@ -47,44 +50,46 @@ abstract class AbstractNodeAnalysis extends RecordAnalysisMember { ...@@ -47,44 +50,46 @@ abstract class AbstractNodeAnalysis extends RecordAnalysisMember {
dataJsonObj.addProperty(NodeIndex.Code, code); dataJsonObj.addProperty(NodeIndex.Code, code);
dataJsonObj.addProperty(NodeIndex.Time_Slice, timeSlice); dataJsonObj.addProperty(NodeIndex.Time_Slice, timeSlice);
logger.debug("span id=%s, kind=%s, layer=%s, component=%s, code=%s", span.getSpanId(), kind, layer, component, code);
if (Tags.SPAN_KIND_CLIENT.equals(kind) && ClientSpanIsLeafTools.isLeaf(span.getSpanId(), spanList)) { if (Tags.SPAN_KIND_CLIENT.equals(kind) && ClientSpanIsLeafTools.isLeaf(span.getSpanId(), spanList)) {
code = component + "[" + SpanPeersTools.getPeers(span) + "]"; logger.debug("The span id %s which kind is client and is a leaf span", span.getSpanId());
code = SpanPeersTools.getPeers(span);
dataJsonObj.addProperty(NodeIndex.Code, code); dataJsonObj.addProperty(NodeIndex.Code, code);
dataJsonObj.addProperty(NodeIndex.NickName, code); dataJsonObj.addProperty(NodeIndex.NickName, code);
String id = timeSlice + "-" + code; String id = timeSlice + Const.ID_SPLIT + code;
logger.debug("leaf client node: %s", dataJsonObj.toString()); logger.debug("leaf client node: %s", dataJsonObj.toString());
setRecord(id, dataJsonObj); setRecord(id, dataJsonObj);
} else if (Tags.SPAN_KIND_SERVER.equals(kind) && span.getParentSpanId() == -1) { } else if (Tags.SPAN_KIND_SERVER.equals(kind) && span.getParentSpanId() == -1) {
logger.debug("The span id %s which kind is server and is top span", span.getSpanId());
if (CollectionTools.isEmpty(segment.getRefs())) { if (CollectionTools.isEmpty(segment.getRefs())) {
JsonObject userDataJsonObj = new JsonObject(); JsonObject userDataJsonObj = new JsonObject();
userDataJsonObj.addProperty(NodeIndex.Code, "User"); userDataJsonObj.addProperty(NodeIndex.Code, Const.USER_CODE);
userDataJsonObj.addProperty(NodeIndex.Layer, "User"); userDataJsonObj.addProperty(NodeIndex.Layer, Const.USER_CODE);
userDataJsonObj.addProperty(NodeIndex.Kind, Tags.SPAN_KIND_CLIENT); userDataJsonObj.addProperty(NodeIndex.Kind, Tags.SPAN_KIND_CLIENT);
userDataJsonObj.addProperty(NodeIndex.Component, "User"); userDataJsonObj.addProperty(NodeIndex.Component, Const.USER_CODE);
userDataJsonObj.addProperty(NodeIndex.NickName, "User"); userDataJsonObj.addProperty(NodeIndex.NickName, Const.USER_CODE);
userDataJsonObj.addProperty(NodeIndex.Time_Slice, timeSlice); userDataJsonObj.addProperty(NodeIndex.Time_Slice, timeSlice);
String userId = timeSlice + "-" + "User"; String userId = timeSlice + Const.ID_SPLIT + Const.USER_CODE;
logger.debug("user node: %s", userDataJsonObj.toString()); logger.debug("user node: %s", userDataJsonObj.toString());
setRecord(userId, userDataJsonObj); setRecord(userId, userDataJsonObj);
String id = timeSlice + "-" + code; String id = timeSlice + Const.ID_SPLIT + code;
dataJsonObj.addProperty(NodeIndex.NickName, code); dataJsonObj.addProperty(NodeIndex.NickName, code);
logger.debug("refs node: %s", dataJsonObj.toString()); logger.debug("refs node: %s", dataJsonObj.toString());
setRecord(id, dataJsonObj); setRecord(id, dataJsonObj);
} else { } else {
for (TraceSegmentRef segmentRef : segment.getRefs()) { for (TraceSegmentRef segmentRef : segment.getRefs()) {
String nickName = component + "[" + segmentRef.getPeerHost() + "]"; String nickName = Const.PEERS_FRONT_SPLIT + segmentRef.getPeerHost() + Const.PEERS_BEHIND_SPLIT;
dataJsonObj.addProperty(NodeIndex.NickName, nickName); dataJsonObj.addProperty(NodeIndex.NickName, nickName);
String id = timeSlice + "-" + code; String id = timeSlice + Const.ID_SPLIT + code;
logger.debug("refs node: %s", dataJsonObj.toString()); logger.debug("refs node: %s", dataJsonObj.toString());
setRecord(id, dataJsonObj); setRecord(id, dataJsonObj);
} }
} }
} else { } else {
logger.error("The span kind value is incorrect which segment record id is %s, the value must client or server", segment.getTraceSegmentId()); logger.error("The span kind value is incorrect which segment record id is %s, the value must client or server", segment.getTraceSegmentId());
return;
} }
} }
} }
......
...@@ -15,6 +15,10 @@ import org.elasticsearch.action.search.SearchResponse; ...@@ -15,6 +15,10 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.tophits.TopHits;
/** /**
* @author pengys5 * @author pengys5
...@@ -35,22 +39,31 @@ public class NodeSearchWithTimeSlice extends AbstractLocalSyncWorker { ...@@ -35,22 +39,31 @@ public class NodeSearchWithTimeSlice extends AbstractLocalSyncWorker {
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeIndex.Index); SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeIndex.Index);
searchRequestBuilder.setTypes(search.getSliceType()); searchRequestBuilder.setTypes(search.getSliceType());
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH); searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.termQuery(NodeIndex.Time_Slice, search.getTimeSlice())); searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(NodeIndex.Time_Slice).gte(search.getStartTime()).lte(search.getEndTime()));
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); searchRequestBuilder.setSize(0);
AggregationBuilder aggregation = AggregationBuilders.terms(NodeIndex.AGG_COLUMN).field(NodeIndex.AGG_COLUMN);
aggregation.subAggregation(AggregationBuilders.topHits(NodeIndex.Top_One).size(1));
searchRequestBuilder.addAggregation(aggregation);
SearchHit[] hits = searchResponse.getHits().getHits(); SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
logger.debug("server node list size: %s", hits.length);
JsonArray nodeArray = new JsonArray(); JsonArray nodeArray = new JsonArray();
for (SearchHit hit : searchResponse.getHits().getHits()) { Terms agg = searchResponse.getAggregations().get(NodeIndex.AGG_COLUMN);
JsonObject nodeObj = new JsonObject(); for (Terms.Bucket entry : agg.getBuckets()) {
nodeObj.addProperty(NodeIndex.Code, (String) hit.getSource().get(NodeIndex.Code)); TopHits topHits = entry.getAggregations().get(NodeIndex.Top_One);
nodeObj.addProperty(NodeIndex.Component, (String) hit.getSource().get(NodeIndex.Component)); for (SearchHit hit : topHits.getHits().getHits()) {
nodeObj.addProperty(NodeIndex.Layer, (String) hit.getSource().get(NodeIndex.Layer)); logger.debug(" -> id [{%s}], _source [{%s}]", hit.getId(), hit.getSourceAsString());
nodeObj.addProperty(NodeIndex.Kind, (String) hit.getSource().get(NodeIndex.Kind));
nodeObj.addProperty(NodeIndex.NickName, (String) hit.getSource().get(NodeIndex.NickName)); JsonObject nodeObj = new JsonObject();
nodeObj.addProperty(NodeIndex.Time_Slice, (Long) hit.getSource().get(NodeIndex.Time_Slice)); nodeObj.addProperty(NodeIndex.Code, (String) hit.getSource().get(NodeIndex.Code));
nodeArray.add(nodeObj); nodeObj.addProperty(NodeIndex.Component, (String) hit.getSource().get(NodeIndex.Component));
nodeObj.addProperty(NodeIndex.Layer, (String) hit.getSource().get(NodeIndex.Layer));
nodeObj.addProperty(NodeIndex.Kind, (String) hit.getSource().get(NodeIndex.Kind));
nodeObj.addProperty(NodeIndex.NickName, (String) hit.getSource().get(NodeIndex.NickName));
nodeObj.addProperty(NodeIndex.Time_Slice, (Long) hit.getSource().get(NodeIndex.Time_Slice));
nodeArray.add(nodeObj);
}
} }
JsonObject resJsonObj = (JsonObject) response; JsonObject resJsonObj = (JsonObject) response;
...@@ -61,8 +74,8 @@ public class NodeSearchWithTimeSlice extends AbstractLocalSyncWorker { ...@@ -61,8 +74,8 @@ public class NodeSearchWithTimeSlice extends AbstractLocalSyncWorker {
} }
public static class RequestEntity extends TimeSlice { public static class RequestEntity extends TimeSlice {
public RequestEntity(String sliceType, long timeSlice) { public RequestEntity(String sliceType, long startTime, long endTime) {
super(sliceType, timeSlice); super(sliceType, startTime, endTime);
} }
} }
......
...@@ -35,20 +35,28 @@ public class NodeInstGetWithTimeSlice extends AbstractGet { ...@@ -35,20 +35,28 @@ public class NodeInstGetWithTimeSlice extends AbstractGet {
@Override @Override
protected void onSearch(Map<String, String[]> request, JsonObject response) throws Exception { protected void onSearch(Map<String, String[]> request, JsonObject response) throws Exception {
if (!request.containsKey("timeSliceValue") || !request.containsKey("timeSliceType")) { if (!request.containsKey("startTime") || !request.containsKey("endTime") || !request.containsKey("timeSliceType")) {
throw new IllegalArgumentException("the request parameter must contains timeSliceValue and timeSliceType"); throw new IllegalArgumentException("the request parameter must contains startTime,endTime,timeSliceType");
} }
logger.debug("timeSliceValue: %s, timeSliceType: %s", Arrays.toString(request.get("timeSliceValue")), Arrays.toString(request.get("timeSliceType"))); logger.debug("startTime: %s, endTime: %s, timeSliceType: %s", Arrays.toString(request.get("startTime")),
Arrays.toString(request.get("endTime")), Arrays.toString(request.get("timeSliceType")));
long timeSlice; long startTime;
try { try {
timeSlice = Long.valueOf(ParameterTools.toString(request, "timeSliceValue")); startTime = Long.valueOf(ParameterTools.toString(request, "startTime"));
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter timeSliceValue must numeric with long type"); throw new IllegalArgumentException("the request parameter startTime must numeric with long type");
}
long endTime;
try {
endTime = Long.valueOf(ParameterTools.toString(request, "endTime"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter endTime must numeric with long type");
} }
NodeInstSearchWithTimeSlice.RequestEntity requestEntity; NodeInstSearchWithTimeSlice.RequestEntity requestEntity;
requestEntity = new NodeInstSearchWithTimeSlice.RequestEntity(ParameterTools.toString(request, "timeSliceType"), timeSlice); requestEntity = new NodeInstSearchWithTimeSlice.RequestEntity(ParameterTools.toString(request, "timeSliceType"), startTime, endTime);
getSelfContext().lookup(NodeInstSearchWithTimeSlice.WorkerRole.INSTANCE).ask(requestEntity, response); getSelfContext().lookup(NodeInstSearchWithTimeSlice.WorkerRole.INSTANCE).ask(requestEntity, response);
} }
......
...@@ -49,6 +49,10 @@ public class NodeInstIndex extends AbstractIndex { ...@@ -49,6 +49,10 @@ public class NodeInstIndex extends AbstractIndex {
.field("type", "string") .field("type", "string")
.field("index", "not_analyzed") .field("index", "not_analyzed")
.endObject() .endObject()
.startObject(AGG_COLUMN)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(Time_Slice) .startObject(Time_Slice)
.field("type", "long") .field("type", "long")
.field("index", "not_analyzed") .field("index", "not_analyzed")
......
...@@ -35,20 +35,28 @@ public class NodeInstSummaryGetWithTimeSlice extends AbstractGet { ...@@ -35,20 +35,28 @@ public class NodeInstSummaryGetWithTimeSlice extends AbstractGet {
@Override @Override
protected void onSearch(Map<String, String[]> request, JsonObject response) throws Exception { protected void onSearch(Map<String, String[]> request, JsonObject response) throws Exception {
if (!request.containsKey("timeSliceValue") || !request.containsKey("timeSliceType")) { if (!request.containsKey("startTime") || !request.containsKey("endTime") || !request.containsKey("timeSliceType")) {
throw new IllegalArgumentException("the request parameter must contains timeSliceValue and timeSliceType"); throw new IllegalArgumentException("the request parameter must contains startTime,endTime,timeSliceType");
} }
logger.debug("timeSliceValue: %s, timeSliceType: %s", Arrays.toString(request.get("timeSliceValue")), Arrays.toString(request.get("timeSliceType"))); logger.debug("startTime: %s, endTime: %s, timeSliceType: %s", Arrays.toString(request.get("startTime")),
Arrays.toString(request.get("endTime")), Arrays.toString(request.get("timeSliceType")));
long timeSlice; long startTime;
try { try {
timeSlice = Long.valueOf(ParameterTools.toString(request, "timeSliceValue")); startTime = Long.valueOf(ParameterTools.toString(request, "startTime"));
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter timeSliceValue must numeric with long type"); throw new IllegalArgumentException("the request parameter startTime must numeric with long type");
}
long endTime;
try {
endTime = Long.valueOf(ParameterTools.toString(request, "endTime"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter endTime must numeric with long type");
} }
NodeInstSummarySearchWithTimeSlice.RequestEntity requestEntity; NodeInstSummarySearchWithTimeSlice.RequestEntity requestEntity;
requestEntity = new NodeInstSummarySearchWithTimeSlice.RequestEntity(ParameterTools.toString(request, "timeSliceType"), timeSlice); requestEntity = new NodeInstSummarySearchWithTimeSlice.RequestEntity(ParameterTools.toString(request, "timeSliceType"), startTime, endTime);
getSelfContext().lookup(NodeInstSummarySearchWithTimeSlice.WorkerRole.INSTANCE).ask(requestEntity, response); getSelfContext().lookup(NodeInstSummarySearchWithTimeSlice.WorkerRole.INSTANCE).ask(requestEntity, response);
} }
......
...@@ -2,9 +2,9 @@ package com.a.eye.skywalking.collector.worker.nodeinst.analysis; ...@@ -2,9 +2,9 @@ package com.a.eye.skywalking.collector.worker.nodeinst.analysis;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext; import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext; import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.worker.Const;
import com.a.eye.skywalking.collector.worker.RecordAnalysisMember; import com.a.eye.skywalking.collector.worker.RecordAnalysisMember;
import com.a.eye.skywalking.collector.worker.nodeinst.NodeInstIndex; import com.a.eye.skywalking.collector.worker.nodeinst.NodeInstIndex;
import com.a.eye.skywalking.collector.worker.tools.UrlTools;
import com.a.eye.skywalking.trace.Span; import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment; import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.tag.Tags; import com.a.eye.skywalking.trace.tag.Tags;
...@@ -43,7 +43,7 @@ abstract class AbstractNodeInstAnalysis extends RecordAnalysisMember { ...@@ -43,7 +43,7 @@ abstract class AbstractNodeInstAnalysis extends RecordAnalysisMember {
dataJsonObj.addProperty(NodeInstIndex.Address, url); dataJsonObj.addProperty(NodeInstIndex.Address, url);
dataJsonObj.addProperty(NodeInstIndex.Time_Slice, timeSlice); dataJsonObj.addProperty(NodeInstIndex.Time_Slice, timeSlice);
String id = timeSlice + "-" + url; String id = timeSlice + Const.ID_SPLIT + url;
setRecord(id, dataJsonObj); setRecord(id, dataJsonObj);
logger.debug("node instance: %s", dataJsonObj.toString()); logger.debug("node instance: %s", dataJsonObj.toString());
......
...@@ -34,7 +34,7 @@ public class NodeInstSearchWithTimeSlice extends AbstractLocalSyncWorker { ...@@ -34,7 +34,7 @@ public class NodeInstSearchWithTimeSlice extends AbstractLocalSyncWorker {
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeInstIndex.Index); SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeInstIndex.Index);
searchRequestBuilder.setTypes(search.getSliceType()); searchRequestBuilder.setTypes(search.getSliceType());
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH); searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.termQuery("timeSlice", search.getTimeSlice())); searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(NodeInstIndex.Time_Slice).gte(search.getStartTime()).lte(search.getEndTime()));
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
SearchHit[] hits = searchResponse.getHits().getHits(); SearchHit[] hits = searchResponse.getHits().getHits();
...@@ -60,8 +60,8 @@ public class NodeInstSearchWithTimeSlice extends AbstractLocalSyncWorker { ...@@ -60,8 +60,8 @@ public class NodeInstSearchWithTimeSlice extends AbstractLocalSyncWorker {
} }
public static class RequestEntity extends TimeSlice { public static class RequestEntity extends TimeSlice {
public RequestEntity(String sliceType, long timeSlice) { public RequestEntity(String sliceType, long startTime, long endTime) {
super(sliceType, timeSlice); super(sliceType, startTime, endTime);
} }
} }
......
...@@ -36,7 +36,7 @@ public class NodeInstSummarySearchWithTimeSlice extends AbstractLocalSyncWorker ...@@ -36,7 +36,7 @@ public class NodeInstSummarySearchWithTimeSlice extends AbstractLocalSyncWorker
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeInstIndex.Index); SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeInstIndex.Index);
searchRequestBuilder.setTypes(search.getSliceType()); searchRequestBuilder.setTypes(search.getSliceType());
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH); searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.termQuery("timeSlice", search.getTimeSlice())); searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(NodeInstIndex.Time_Slice).gte(search.getStartTime()).lte(search.getEndTime()));
searchRequestBuilder.addAggregation(AggregationBuilders.terms("codes").field("code")); searchRequestBuilder.addAggregation(AggregationBuilders.terms("codes").field("code"));
searchRequestBuilder.setSize(0); searchRequestBuilder.setSize(0);
...@@ -63,8 +63,8 @@ public class NodeInstSummarySearchWithTimeSlice extends AbstractLocalSyncWorker ...@@ -63,8 +63,8 @@ public class NodeInstSummarySearchWithTimeSlice extends AbstractLocalSyncWorker
} }
public static class RequestEntity extends TimeSlice { public static class RequestEntity extends TimeSlice {
public RequestEntity(String sliceType, long timeSlice) { public RequestEntity(String sliceType, long startTime, long endTime) {
super(sliceType, timeSlice); super(sliceType, startTime, endTime);
} }
} }
......
...@@ -35,20 +35,28 @@ public class NodeRefGetWithTimeSlice extends AbstractGet { ...@@ -35,20 +35,28 @@ public class NodeRefGetWithTimeSlice extends AbstractGet {
@Override @Override
protected void onSearch(Map<String, String[]> request, JsonObject response) throws Exception { protected void onSearch(Map<String, String[]> request, JsonObject response) throws Exception {
if (!request.containsKey("timeSliceValue") || !request.containsKey("timeSliceType")) { if (!request.containsKey("startTime") || !request.containsKey("endTime") || !request.containsKey("timeSliceType")) {
throw new IllegalArgumentException("the request parameter must contains timeSliceValue and timeSliceType"); throw new IllegalArgumentException("the request parameter must contains startTime,endTime,timeSliceType");
} }
logger.debug("timeSliceValue: %s, timeSliceType: %s", Arrays.toString(request.get("timeSliceValue")), Arrays.toString(request.get("timeSliceType"))); logger.debug("startTime: %s, endTime: %s, timeSliceType: %s", Arrays.toString(request.get("startTime")),
Arrays.toString(request.get("endTime")), Arrays.toString(request.get("timeSliceType")));
long timeSlice; long startTime;
try { try {
timeSlice = Long.valueOf(ParameterTools.toString(request, "timeSliceValue")); startTime = Long.valueOf(ParameterTools.toString(request, "startTime"));
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter timeSliceValue must numeric with long type"); throw new IllegalArgumentException("the request parameter startTime must numeric with long type");
}
long endTime;
try {
endTime = Long.valueOf(ParameterTools.toString(request, "endTime"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter endTime must numeric with long type");
} }
NodeRefSearchWithTimeSlice.RequestEntity requestEntity; NodeRefSearchWithTimeSlice.RequestEntity requestEntity;
requestEntity = new NodeRefSearchWithTimeSlice.RequestEntity(ParameterTools.toString(request, "timeSliceType"), timeSlice); requestEntity = new NodeRefSearchWithTimeSlice.RequestEntity(ParameterTools.toString(request, "timeSliceType"), startTime, endTime);
getSelfContext().lookup(NodeRefSearchWithTimeSlice.WorkerRole.INSTANCE).ask(requestEntity, response); getSelfContext().lookup(NodeRefSearchWithTimeSlice.WorkerRole.INSTANCE).ask(requestEntity, response);
} }
......
...@@ -51,6 +51,10 @@ public class NodeRefIndex extends AbstractIndex { ...@@ -51,6 +51,10 @@ public class NodeRefIndex extends AbstractIndex {
.field("type", "boolean") .field("type", "boolean")
.field("index", "not_analyzed") .field("index", "not_analyzed")
.endObject() .endObject()
.startObject(AGG_COLUMN)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(Time_Slice) .startObject(Time_Slice)
.field("type", "long") .field("type", "long")
.field("index", "not_analyzed") .field("index", "not_analyzed")
......
...@@ -35,20 +35,28 @@ public class NodeRefResSumGetWithTimeSlice extends AbstractGet { ...@@ -35,20 +35,28 @@ public class NodeRefResSumGetWithTimeSlice extends AbstractGet {
@Override @Override
protected void onSearch(Map<String, String[]> request, JsonObject response) throws Exception { protected void onSearch(Map<String, String[]> request, JsonObject response) throws Exception {
if (!request.containsKey("timeSliceValue") || !request.containsKey("timeSliceType")) { if (!request.containsKey("startTime") || !request.containsKey("endTime") || !request.containsKey("timeSliceType")) {
throw new IllegalArgumentException("the request parameter must contains timeSliceValue and timeSliceType"); throw new IllegalArgumentException("the request parameter must contains startTime,endTime,timeSliceType");
} }
logger.debug("timeSliceValue: %s, timeSliceType: %s", Arrays.toString(request.get("timeSliceValue")), Arrays.toString(request.get("timeSliceType"))); logger.debug("startTime: %s, endTime: %s, timeSliceType: %s", Arrays.toString(request.get("startTime")),
Arrays.toString(request.get("endTime")), Arrays.toString(request.get("timeSliceType")));
long timeSlice; long startTime;
try { try {
timeSlice = Long.valueOf(ParameterTools.toString(request, "timeSliceValue")); startTime = Long.valueOf(ParameterTools.toString(request, "startTime"));
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter timeSliceValue must numeric with long type"); throw new IllegalArgumentException("the request parameter startTime must numeric with long type");
}
long endTime;
try {
endTime = Long.valueOf(ParameterTools.toString(request, "endTime"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter endTime must numeric with long type");
} }
NodeRefResSumSearchWithTimeSlice.RequestEntity requestEntity; NodeRefResSumSearchWithTimeSlice.RequestEntity requestEntity;
requestEntity = new NodeRefResSumSearchWithTimeSlice.RequestEntity(ParameterTools.toString(request, "timeSliceType"), timeSlice); requestEntity = new NodeRefResSumSearchWithTimeSlice.RequestEntity(ParameterTools.toString(request, "timeSliceType"), startTime, endTime);
getSelfContext().lookup(NodeRefResSumSearchWithTimeSlice.WorkerRole.INSTANCE).ask(requestEntity, response); getSelfContext().lookup(NodeRefResSumSearchWithTimeSlice.WorkerRole.INSTANCE).ask(requestEntity, response);
} }
......
...@@ -58,6 +58,10 @@ public class NodeRefResSumIndex extends AbstractIndex { ...@@ -58,6 +58,10 @@ public class NodeRefResSumIndex extends AbstractIndex {
.field("type", "long") .field("type", "long")
.field("index", "not_analyzed") .field("index", "not_analyzed")
.endObject() .endObject()
.startObject(AGG_COLUMN)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(Time_Slice) .startObject(Time_Slice)
.field("type", "long") .field("type", "long")
.field("index", "not_analyzed") .field("index", "not_analyzed")
......
...@@ -2,10 +2,12 @@ package com.a.eye.skywalking.collector.worker.noderef.analysis; ...@@ -2,10 +2,12 @@ package com.a.eye.skywalking.collector.worker.noderef.analysis;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext; import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext; import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.worker.Const;
import com.a.eye.skywalking.collector.worker.RecordAnalysisMember; import com.a.eye.skywalking.collector.worker.RecordAnalysisMember;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex; import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
import com.a.eye.skywalking.collector.worker.tools.ClientSpanIsLeafTools; import com.a.eye.skywalking.collector.worker.tools.ClientSpanIsLeafTools;
import com.a.eye.skywalking.collector.worker.tools.CollectionTools; import com.a.eye.skywalking.collector.worker.tools.CollectionTools;
import com.a.eye.skywalking.collector.worker.tools.SpanPeersTools;
import com.a.eye.skywalking.trace.Span; import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment; import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef; import com.a.eye.skywalking.trace.TraceSegmentRef;
...@@ -33,7 +35,6 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember { ...@@ -33,7 +35,6 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
for (Span span : spanList) { for (Span span : spanList) {
JsonObject dataJsonObj = new JsonObject(); JsonObject dataJsonObj = new JsonObject();
String component = Tags.COMPONENT.get(span); String component = Tags.COMPONENT.get(span);
String peers = Tags.PEERS.get(span);
dataJsonObj.addProperty(NodeRefIndex.Time_Slice, timeSlice); dataJsonObj.addProperty(NodeRefIndex.Time_Slice, timeSlice);
dataJsonObj.addProperty(NodeRefIndex.FrontIsRealCode, true); dataJsonObj.addProperty(NodeRefIndex.FrontIsRealCode, true);
dataJsonObj.addProperty(NodeRefIndex.BehindIsRealCode, true); dataJsonObj.addProperty(NodeRefIndex.BehindIsRealCode, true);
...@@ -42,11 +43,11 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember { ...@@ -42,11 +43,11 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
String front = segment.getApplicationCode(); String front = segment.getApplicationCode();
dataJsonObj.addProperty(NodeRefIndex.Front, front); dataJsonObj.addProperty(NodeRefIndex.Front, front);
String behind = component + "[" + peers + "]"; String behind = SpanPeersTools.getPeers(span);
dataJsonObj.addProperty(NodeRefIndex.Behind, behind); dataJsonObj.addProperty(NodeRefIndex.Behind, behind);
dataJsonObj.addProperty(NodeRefIndex.BehindIsRealCode, false); dataJsonObj.addProperty(NodeRefIndex.BehindIsRealCode, false);
String id = timeSlice + "-" + front + "-" + behind; String id = timeSlice + Const.ID_SPLIT + front + Const.ID_SPLIT + behind;
logger.debug("dag node ref: %s", dataJsonObj.toString()); logger.debug("dag node ref: %s", dataJsonObj.toString());
setRecord(id, dataJsonObj); setRecord(id, dataJsonObj);
buildNodeRefResRecordData(id, span, minute, hour, day, second); buildNodeRefResRecordData(id, span, minute, hour, day, second);
...@@ -55,17 +56,17 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember { ...@@ -55,17 +56,17 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
String behind = segment.getApplicationCode(); String behind = segment.getApplicationCode();
dataJsonObj.addProperty(NodeRefIndex.Behind, behind); dataJsonObj.addProperty(NodeRefIndex.Behind, behind);
String front = "User"; String front = Const.USER_CODE;
dataJsonObj.addProperty(NodeRefIndex.Front, front); dataJsonObj.addProperty(NodeRefIndex.Front, front);
String id = timeSlice + "-" + front + "-" + behind; String id = timeSlice + Const.ID_SPLIT + front + Const.ID_SPLIT + behind;
setRecord(id, dataJsonObj); setRecord(id, dataJsonObj);
buildNodeRefResRecordData(id, span, minute, hour, day, second); buildNodeRefResRecordData(id, span, minute, hour, day, second);
} else if (span.getParentSpanId() == -1 && CollectionTools.isNotEmpty(segment.getRefs())) { } else if (span.getParentSpanId() == -1 && CollectionTools.isNotEmpty(segment.getRefs())) {
for (TraceSegmentRef segmentRef : segment.getRefs()) { for (TraceSegmentRef segmentRef : segment.getRefs()) {
String front = segmentRef.getApplicationCode(); String front = segmentRef.getApplicationCode();
String behind = component + "[" + segmentRef.getPeerHost() + "]"; String behind = Const.PEERS_FRONT_SPLIT + segmentRef.getPeerHost() + Const.PEERS_BEHIND_SPLIT;
String id = timeSlice + "-" + front + "-" + behind; String id = timeSlice + Const.ID_SPLIT + front + Const.ID_SPLIT + behind;
JsonObject refDataJsonObj = new JsonObject(); JsonObject refDataJsonObj = new JsonObject();
refDataJsonObj.addProperty(NodeRefIndex.Front, front); refDataJsonObj.addProperty(NodeRefIndex.Front, front);
......
...@@ -3,6 +3,7 @@ package com.a.eye.skywalking.collector.worker.noderef.persistence; ...@@ -3,6 +3,7 @@ package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.*; import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector; import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector; import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.Const;
import com.a.eye.skywalking.collector.worker.TimeSlice; import com.a.eye.skywalking.collector.worker.TimeSlice;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumIndex; import com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumIndex;
import com.a.eye.skywalking.collector.worker.storage.EsClient; import com.a.eye.skywalking.collector.worker.storage.EsClient;
...@@ -14,7 +15,10 @@ import org.elasticsearch.action.search.SearchRequestBuilder; ...@@ -14,7 +15,10 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
/** /**
* @author pengys5 * @author pengys5
...@@ -35,30 +39,48 @@ public class NodeRefResSumSearchWithTimeSlice extends AbstractLocalSyncWorker { ...@@ -35,30 +39,48 @@ public class NodeRefResSumSearchWithTimeSlice extends AbstractLocalSyncWorker {
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeRefResSumIndex.Index); SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeRefResSumIndex.Index);
searchRequestBuilder.setTypes(search.getSliceType()); searchRequestBuilder.setTypes(search.getSliceType());
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH); searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.termQuery(NodeRefResSumIndex.Time_Slice, search.getTimeSlice())); searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(NodeRefResSumIndex.Time_Slice).gte(search.getStartTime()).lte(search.getEndTime()));
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); searchRequestBuilder.setSize(0);
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(NodeRefResSumIndex.AGG_COLUMN).field(NodeRefResSumIndex.AGG_COLUMN);
aggregationBuilder.subAggregation(AggregationBuilders.sum(NodeRefResSumIndex.OneSecondLess).field(NodeRefResSumIndex.OneSecondLess));
aggregationBuilder.subAggregation(AggregationBuilders.sum(NodeRefResSumIndex.ThreeSecondLess).field(NodeRefResSumIndex.ThreeSecondLess));
aggregationBuilder.subAggregation(AggregationBuilders.sum(NodeRefResSumIndex.FiveSecondLess).field(NodeRefResSumIndex.FiveSecondLess));
aggregationBuilder.subAggregation(AggregationBuilders.sum(NodeRefResSumIndex.FiveSecondGreater).field(NodeRefResSumIndex.FiveSecondGreater));
aggregationBuilder.subAggregation(AggregationBuilders.sum(NodeRefResSumIndex.Error).field(NodeRefResSumIndex.Error));
aggregationBuilder.subAggregation(AggregationBuilders.sum(NodeRefResSumIndex.Summary).field(NodeRefResSumIndex.Summary));
SearchHit[] hits = searchResponse.getHits().getHits(); searchRequestBuilder.addAggregation(aggregationBuilder);
logger.debug("node reference list size: %s", hits.length);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
JsonArray nodeRefResSumArray = new JsonArray(); JsonArray nodeRefResSumArray = new JsonArray();
for (SearchHit hit : searchResponse.getHits().getHits()) { Terms aggTerms = searchResponse.getAggregations().get(NodeRefResSumIndex.AGG_COLUMN);
for (Terms.Bucket bucket : aggTerms.getBuckets()) {
String aggId = String.valueOf(bucket.getKey());
Sum oneSecondLess = bucket.getAggregations().get(NodeRefResSumIndex.OneSecondLess);
Sum threeSecondLess = bucket.getAggregations().get(NodeRefResSumIndex.ThreeSecondLess);
Sum fiveSecondLess = bucket.getAggregations().get(NodeRefResSumIndex.FiveSecondLess);
Sum fiveSecondGreater = bucket.getAggregations().get(NodeRefResSumIndex.FiveSecondGreater);
Sum error = bucket.getAggregations().get(NodeRefResSumIndex.Error);
Sum summary = bucket.getAggregations().get(NodeRefResSumIndex.Summary);
logger.debug("aggId: %s, oneSecondLess: %s, threeSecondLess: %s, fiveSecondLess: %s, fiveSecondGreater: %s, error: %s, summary: %s", aggId,
oneSecondLess.getValue(), threeSecondLess.getValue(), fiveSecondLess.getValue(), fiveSecondGreater.getValue(), error.getValue(), summary.getValue());
JsonObject nodeRefResSumObj = new JsonObject(); JsonObject nodeRefResSumObj = new JsonObject();
String id = hit.getId(); String[] ids = aggId.split(Const.IDS_SPLIT);
String[] ids = id.split("-"); String front = ids[0];
String front = ids[1]; String behind = ids[1];
String behind = ids[2];
nodeRefResSumObj.addProperty("front", front); nodeRefResSumObj.addProperty("front", front);
nodeRefResSumObj.addProperty("behind", behind); nodeRefResSumObj.addProperty("behind", behind);
nodeRefResSumObj.addProperty(NodeRefResSumIndex.OneSecondLess, (Number) hit.getSource().get(NodeRefResSumIndex.OneSecondLess)); nodeRefResSumObj.addProperty(NodeRefResSumIndex.OneSecondLess, oneSecondLess.getValue());
nodeRefResSumObj.addProperty(NodeRefResSumIndex.ThreeSecondLess, (Number) hit.getSource().get(NodeRefResSumIndex.ThreeSecondLess)); nodeRefResSumObj.addProperty(NodeRefResSumIndex.ThreeSecondLess, threeSecondLess.getValue());
nodeRefResSumObj.addProperty(NodeRefResSumIndex.FiveSecondLess, (Number) hit.getSource().get(NodeRefResSumIndex.FiveSecondLess)); nodeRefResSumObj.addProperty(NodeRefResSumIndex.FiveSecondLess, fiveSecondLess.getValue());
nodeRefResSumObj.addProperty(NodeRefResSumIndex.FiveSecondGreater, (Number) hit.getSource().get(NodeRefResSumIndex.FiveSecondGreater)); nodeRefResSumObj.addProperty(NodeRefResSumIndex.FiveSecondGreater, fiveSecondGreater.getValue());
nodeRefResSumObj.addProperty(NodeRefResSumIndex.Error, (Number) hit.getSource().get(NodeRefResSumIndex.Error)); nodeRefResSumObj.addProperty(NodeRefResSumIndex.Error, error.getValue());
nodeRefResSumObj.addProperty(NodeRefResSumIndex.Summary, (Number) hit.getSource().get(NodeRefResSumIndex.Summary)); nodeRefResSumObj.addProperty(NodeRefResSumIndex.Summary, summary.getValue());
nodeRefResSumObj.addProperty(NodeRefResSumIndex.Time_Slice, (Long) hit.getSource().get(NodeRefResSumIndex.Time_Slice));
nodeRefResSumArray.add(nodeRefResSumObj); nodeRefResSumArray.add(nodeRefResSumObj);
} }
...@@ -70,8 +92,8 @@ public class NodeRefResSumSearchWithTimeSlice extends AbstractLocalSyncWorker { ...@@ -70,8 +92,8 @@ public class NodeRefResSumSearchWithTimeSlice extends AbstractLocalSyncWorker {
} }
public static class RequestEntity extends TimeSlice { public static class RequestEntity extends TimeSlice {
public RequestEntity(String sliceType, long timeSlice) { public RequestEntity(String sliceType, long startTime, long endTime) {
super(sliceType, timeSlice); super(sliceType, startTime, endTime);
} }
} }
......
...@@ -15,6 +15,10 @@ import org.elasticsearch.action.search.SearchResponse; ...@@ -15,6 +15,10 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.tophits.TopHits;
/** /**
* @author pengys5 * @author pengys5
...@@ -35,21 +39,30 @@ public class NodeRefSearchWithTimeSlice extends AbstractLocalSyncWorker { ...@@ -35,21 +39,30 @@ public class NodeRefSearchWithTimeSlice extends AbstractLocalSyncWorker {
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeRefIndex.Index); SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeRefIndex.Index);
searchRequestBuilder.setTypes(search.getSliceType()); searchRequestBuilder.setTypes(search.getSliceType());
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH); searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.termQuery(NodeRefIndex.Time_Slice, search.getTimeSlice())); searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(NodeRefIndex.Time_Slice).gte(search.getStartTime()).lte(search.getEndTime()));
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); searchRequestBuilder.setSize(0);
AggregationBuilder aggregation = AggregationBuilders.terms(NodeRefIndex.AGG_COLUMN).field(NodeRefIndex.AGG_COLUMN);
aggregation.subAggregation(AggregationBuilders.topHits(NodeRefIndex.Top_One).size(1));
searchRequestBuilder.addAggregation(aggregation);
SearchHit[] hits = searchResponse.getHits().getHits(); SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
logger.debug("node reference list size: %s", hits.length);
JsonArray nodeRefArray = new JsonArray(); JsonArray nodeRefArray = new JsonArray();
for (SearchHit hit : searchResponse.getHits().getHits()) { Terms agg = searchResponse.getAggregations().get(NodeRefIndex.AGG_COLUMN);
JsonObject nodeRefObj = new JsonObject(); for (Terms.Bucket entry : agg.getBuckets()) {
nodeRefObj.addProperty(NodeRefIndex.Front, (String) hit.getSource().get(NodeRefIndex.Front)); TopHits topHits = entry.getAggregations().get(NodeRefIndex.Top_One);
nodeRefObj.addProperty(NodeRefIndex.FrontIsRealCode, (Boolean) hit.getSource().get(NodeRefIndex.FrontIsRealCode)); for (SearchHit hit : topHits.getHits().getHits()) {
nodeRefObj.addProperty(NodeRefIndex.Behind, (String) hit.getSource().get(NodeRefIndex.Behind)); logger.debug(" -> id [{%s}], _source [{%s}]", hit.getId(), hit.getSourceAsString());
nodeRefObj.addProperty(NodeRefIndex.BehindIsRealCode, (Boolean) hit.getSource().get(NodeRefIndex.BehindIsRealCode));
nodeRefObj.addProperty(NodeRefIndex.Time_Slice, (Long) hit.getSource().get(NodeRefIndex.Time_Slice)); JsonObject nodeRefObj = new JsonObject();
nodeRefArray.add(nodeRefObj); nodeRefObj.addProperty(NodeRefIndex.Front, (String) hit.getSource().get(NodeRefIndex.Front));
nodeRefObj.addProperty(NodeRefIndex.FrontIsRealCode, (Boolean) hit.getSource().get(NodeRefIndex.FrontIsRealCode));
nodeRefObj.addProperty(NodeRefIndex.Behind, (String) hit.getSource().get(NodeRefIndex.Behind));
nodeRefObj.addProperty(NodeRefIndex.BehindIsRealCode, (Boolean) hit.getSource().get(NodeRefIndex.BehindIsRealCode));
nodeRefObj.addProperty(NodeRefIndex.Time_Slice, (Long) hit.getSource().get(NodeRefIndex.Time_Slice));
nodeRefArray.add(nodeRefObj);
}
} }
JsonObject resJsonObj = (JsonObject) response; JsonObject resJsonObj = (JsonObject) response;
...@@ -60,8 +73,8 @@ public class NodeRefSearchWithTimeSlice extends AbstractLocalSyncWorker { ...@@ -60,8 +73,8 @@ public class NodeRefSearchWithTimeSlice extends AbstractLocalSyncWorker {
} }
public static class RequestEntity extends TimeSlice { public static class RequestEntity extends TimeSlice {
public RequestEntity(String sliceType, long timeSlice) { public RequestEntity(String sliceType, long startTime, long endTime) {
super(sliceType, timeSlice); super(sliceType, startTime, endTime);
} }
} }
......
...@@ -84,7 +84,7 @@ public class SegmentPost extends AbstractPost { ...@@ -84,7 +84,7 @@ public class SegmentPost extends AbstractPost {
tellNodeRef(segmentWithTimeSlice); tellNodeRef(segmentWithTimeSlice);
tellNode(segmentWithTimeSlice); tellNode(segmentWithTimeSlice);
tellNodeInst(segmentWithTimeSlice); // tellNodeInst(segmentWithTimeSlice);
} }
} }
...@@ -98,20 +98,20 @@ public class SegmentPost extends AbstractPost { ...@@ -98,20 +98,20 @@ public class SegmentPost extends AbstractPost {
private void tellNodeRef(SegmentWithTimeSlice segmentWithTimeSlice) throws Exception { private void tellNodeRef(SegmentWithTimeSlice segmentWithTimeSlice) throws Exception {
getSelfContext().lookup(NodeRefMinuteAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice); getSelfContext().lookup(NodeRefMinuteAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
// getSelfContext().lookup(NodeRefHourAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice); getSelfContext().lookup(NodeRefHourAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
// getSelfContext().lookup(NodeRefDayAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice); getSelfContext().lookup(NodeRefDayAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
} }
private void tellNode(SegmentWithTimeSlice segmentWithTimeSlice) throws Exception { private void tellNode(SegmentWithTimeSlice segmentWithTimeSlice) throws Exception {
getSelfContext().lookup(NodeMinuteAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice); getSelfContext().lookup(NodeMinuteAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
// getSelfContext().lookup(NodeHourAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice); getSelfContext().lookup(NodeHourAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
// getSelfContext().lookup(NodeDayAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice); getSelfContext().lookup(NodeDayAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
} }
private void tellNodeInst(SegmentWithTimeSlice segmentWithTimeSlice) throws Exception { private void tellNodeInst(SegmentWithTimeSlice segmentWithTimeSlice) throws Exception {
getSelfContext().lookup(NodeInstMinuteAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice); getSelfContext().lookup(NodeInstMinuteAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
// getSelfContext().lookup(NodeInstHourAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice); getSelfContext().lookup(NodeInstHourAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
// getSelfContext().lookup(NodeInstDayAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice); getSelfContext().lookup(NodeInstDayAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
} }
private void validateData(TraceSegment newSegment) { private void validateData(TraceSegment newSegment) {
......
package com.a.eye.skywalking.collector.worker.storage; package com.a.eye.skywalking.collector.worker.storage;
import com.a.eye.skywalking.collector.actor.selector.AbstractHashMessage; import com.a.eye.skywalking.collector.actor.selector.AbstractHashMessage;
import com.a.eye.skywalking.collector.worker.Const;
import com.a.eye.skywalking.collector.worker.storage.index.AbstractIndex; import com.a.eye.skywalking.collector.worker.storage.index.AbstractIndex;
import java.util.HashMap; import java.util.HashMap;
...@@ -12,36 +13,57 @@ import java.util.Map; ...@@ -12,36 +13,57 @@ import java.util.Map;
public class MetricData extends AbstractHashMessage { public class MetricData extends AbstractHashMessage {
private String id; private String id;
private Map<String, Long> value; private Map<String, Object> value;
public MetricData(String id) { public MetricData(String id) {
super(id); super(id);
this.id = id; this.id = id;
value = new HashMap<>(); value = new HashMap<>();
value.put(AbstractIndex.Time_Slice, Long.valueOf(id.split("-")[0]));
String[] ids = id.split(Const.ID_SPLIT);
String slice = ids[0];
String aggId = "";
for (int i = 1; i < ids.length; i++) {
if (i == 1) {
aggId = ids[i];
} else {
aggId = aggId + Const.ID_SPLIT + ids[i];
}
}
value.put(AbstractIndex.Time_Slice, Long.valueOf(slice));
value.put(AbstractIndex.AGG_COLUMN, aggId);
} }
public void setMetric(String column, Long value) { public void setMetric(String column, Long value) {
long valueAdd = value; long valueAdd = value;
if (this.value.containsKey(column) && !AbstractIndex.Time_Slice.equals(column)) { if (this.value.containsKey(column) && !AbstractIndex.Time_Slice.equals(column)
valueAdd += this.value.get(column); && !AbstractIndex.AGG_COLUMN.equals(column)) {
valueAdd += (Long) this.value.get(column);
} }
this.value.put(column, valueAdd); this.value.put(column, valueAdd);
} }
public void merge(MetricData metricData) { public void merge(MetricData metricData) {
for (Map.Entry<String, Long> entry : metricData.value.entrySet()) { for (Map.Entry<String, Object> entry : metricData.value.entrySet()) {
setMetric(entry.getKey(), entry.getValue()); if (!AbstractIndex.Time_Slice.equals(entry.getKey())
&& !AbstractIndex.AGG_COLUMN.equals(entry.getKey())) {
setMetric(entry.getKey(), (Long) entry.getValue());
}
} }
} }
public void merge(Map<String, Object> dbData) { public void merge(Map<String, Object> dbData) {
for (Map.Entry<String, Object> entry : dbData.entrySet()) { for (Map.Entry<String, Object> entry : dbData.entrySet()) {
setMetric(entry.getKey(), (Long) entry.getValue()); if (!AbstractIndex.Time_Slice.equals(entry.getKey())
&& !AbstractIndex.AGG_COLUMN.equals(entry.getKey())) {
long dbValue = ((Number) entry.getValue()).longValue();
setMetric(entry.getKey(), dbValue);
}
} }
} }
public Map<String, Long> toMap() { public Map<String, Object> toMap() {
return value; return value;
} }
......
package com.a.eye.skywalking.collector.worker.storage; package com.a.eye.skywalking.collector.worker.storage;
import com.a.eye.skywalking.collector.actor.selector.AbstractHashMessage; import com.a.eye.skywalking.collector.actor.selector.AbstractHashMessage;
import com.a.eye.skywalking.collector.worker.Const;
import com.a.eye.skywalking.collector.worker.storage.index.AbstractIndex;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
/** /**
...@@ -9,11 +11,20 @@ import com.google.gson.JsonObject; ...@@ -9,11 +11,20 @@ import com.google.gson.JsonObject;
public class RecordData extends AbstractHashMessage { public class RecordData extends AbstractHashMessage {
private String id; private String id;
private String aggId;
private JsonObject record; private JsonObject record;
public RecordData(String key) { public RecordData(String key) {
super(key); super(key);
this.id = key; this.id = key;
String[] ids = id.split(Const.ID_SPLIT);
for (int i = 1; i < ids.length; i++) {
if (i == 1) {
this.aggId = ids[i];
} else {
this.aggId = this.aggId + Const.ID_SPLIT + ids[i];
}
}
} }
public String getId() { public String getId() {
...@@ -21,6 +32,7 @@ public class RecordData extends AbstractHashMessage { ...@@ -21,6 +32,7 @@ public class RecordData extends AbstractHashMessage {
} }
public JsonObject getRecord() { public JsonObject getRecord() {
record.addProperty(AbstractIndex.AGG_COLUMN, this.aggId);
return record; return record;
} }
......
...@@ -27,7 +27,9 @@ public abstract class AbstractIndex { ...@@ -27,7 +27,9 @@ public abstract class AbstractIndex {
public static final String Type_Record = "record"; public static final String Type_Record = "record";
public static final String AGG_COLUMN = "aggId";
public static final String Time_Slice = "timeSlice"; public static final String Time_Slice = "timeSlice";
public static final String Top_One = "topOne";
final public XContentBuilder createSettingBuilder() throws IOException { final public XContentBuilder createSettingBuilder() throws IOException {
XContentBuilder settingsBuilder = XContentFactory.jsonBuilder() XContentBuilder settingsBuilder = XContentFactory.jsonBuilder()
......
package com.a.eye.skywalking.collector.worker.tools; package com.a.eye.skywalking.collector.worker.tools;
import com.a.eye.skywalking.api.util.StringUtil; import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.collector.worker.Const;
import com.a.eye.skywalking.trace.Span; import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.tag.Tags; import com.a.eye.skywalking.trace.tag.Tags;
...@@ -12,9 +13,9 @@ public class SpanPeersTools { ...@@ -12,9 +13,9 @@ public class SpanPeersTools {
if (StringUtil.isEmpty(Tags.PEERS.get(span))) { if (StringUtil.isEmpty(Tags.PEERS.get(span))) {
String host = Tags.PEER_HOST.get(span); String host = Tags.PEER_HOST.get(span);
int port = Tags.PEER_PORT.get(span); int port = Tags.PEER_PORT.get(span);
return host + ":" + port; return Const.PEERS_FRONT_SPLIT + host + ":" + port + Const.PEERS_BEHIND_SPLIT;
} else { } else {
return Tags.PEERS.get(span); return Const.PEERS_FRONT_SPLIT + Tags.PEERS.get(span) + Const.PEERS_BEHIND_SPLIT;
} }
} }
} }
...@@ -20,7 +20,7 @@ import java.util.Map; ...@@ -20,7 +20,7 @@ import java.util.Map;
*/ */
public class ClientNodeTestCase { public class ClientNodeTestCase {
@Before // @Before
public void initIndex() throws UnknownHostException { public void initIndex() throws UnknownHostException {
EsClient.boot(); EsClient.boot();
ClientNodeIndex index = new ClientNodeIndex(); ClientNodeIndex index = new ClientNodeIndex();
...@@ -28,7 +28,7 @@ public class ClientNodeTestCase { ...@@ -28,7 +28,7 @@ public class ClientNodeTestCase {
index.createIndex(); index.createIndex();
} }
@Test // @Test
public void testLoadClientNode() throws Exception { public void testLoadClientNode() throws Exception {
loadNode(201703101201l, ClientNodeIndex.Type_Minute); loadNode(201703101201l, ClientNodeIndex.Type_Minute);
loadNode(201703101200l, ClientNodeIndex.Type_Hour); loadNode(201703101200l, ClientNodeIndex.Type_Hour);
......
...@@ -20,7 +20,7 @@ import java.util.Map; ...@@ -20,7 +20,7 @@ import java.util.Map;
*/ */
public class NodeRefTestCase { public class NodeRefTestCase {
@Before // @Before
public void initIndex() throws UnknownHostException { public void initIndex() throws UnknownHostException {
EsClient.boot(); EsClient.boot();
NodeRefIndex index = new NodeRefIndex(); NodeRefIndex index = new NodeRefIndex();
...@@ -28,7 +28,7 @@ public class NodeRefTestCase { ...@@ -28,7 +28,7 @@ public class NodeRefTestCase {
index.createIndex(); index.createIndex();
} }
@Test // @Test
public void testLoadNodeRef() throws Exception { public void testLoadNodeRef() throws Exception {
loadNodeRef(201703101201l, NodeRefIndex.Type_Minute); loadNodeRef(201703101201l, NodeRefIndex.Type_Minute);
loadNodeRef(201703101200l, NodeRefIndex.Type_Hour); loadNodeRef(201703101200l, NodeRefIndex.Type_Hour);
......
...@@ -20,7 +20,7 @@ import java.util.Map; ...@@ -20,7 +20,7 @@ import java.util.Map;
*/ */
public class ServerNodeTestCase { public class ServerNodeTestCase {
@Before // @Before
public void initIndex() throws UnknownHostException { public void initIndex() throws UnknownHostException {
EsClient.boot(); EsClient.boot();
NodeIndex index = new NodeIndex(); NodeIndex index = new NodeIndex();
...@@ -28,7 +28,7 @@ public class ServerNodeTestCase { ...@@ -28,7 +28,7 @@ public class ServerNodeTestCase {
index.createIndex(); index.createIndex();
} }
@Test // @Test
public void testLoadServerNode() throws Exception { public void testLoadServerNode() throws Exception {
loadNode(201703101201l, NodeIndex.Type_Minute); loadNode(201703101201l, NodeIndex.Type_Minute);
loadNode(201703101200l, NodeIndex.Type_Hour); loadNode(201703101200l, NodeIndex.Type_Hour);
......
...@@ -20,7 +20,7 @@ import java.util.Map; ...@@ -20,7 +20,7 @@ import java.util.Map;
*/ */
public class NodeInstSummaryTestCase { public class NodeInstSummaryTestCase {
@Before // @Before
public void initIndex() throws UnknownHostException { public void initIndex() throws UnknownHostException {
EsClient.boot(); EsClient.boot();
// NodeInstIndex index = new NodeInstIndex(); // NodeInstIndex index = new NodeInstIndex();
...@@ -28,20 +28,20 @@ public class NodeInstSummaryTestCase { ...@@ -28,20 +28,20 @@ public class NodeInstSummaryTestCase {
// index.createIndex(); // index.createIndex();
} }
@Test // @Test
public void testLoadNodeInstSummary() throws Exception { public void testLoadNodeInstSummary() throws Exception {
loadNodeInstSummary(201703202208l, NodeInstIndex.Type_Minute); loadNodeInstSummary(201703202208l, 201703202209l, NodeInstIndex.Type_Minute);
// loadNodeInstance(201703202200l, NodeInstIndex.Type_Hour); // loadNodeInstance(201703202200l, NodeInstIndex.Type_Hour);
// loadNodeInstance(201703200000l, NodeInstIndex.Type_Day); // loadNodeInstance(201703200000l, NodeInstIndex.Type_Day);
} }
public void loadNodeInstSummary(long timeSlice, String type) throws Exception { public void loadNodeInstSummary(long startTime, long endTime, String type) throws Exception {
LocalSyncWorkerRef workerRef = (LocalSyncWorkerRef) NodeInstSummarySearchWithTimeSlice.Factory.INSTANCE.create(AbstractWorker.noOwner()); LocalSyncWorkerRef workerRef = (LocalSyncWorkerRef) NodeInstSummarySearchWithTimeSlice.Factory.INSTANCE.create(AbstractWorker.noOwner());
// insertData(timeSlice, type); // insertData(timeSlice, type);
// EsClient.indexRefresh(NodeInstIndex.Index); // EsClient.indexRefresh(NodeInstIndex.Index);
NodeInstSummarySearchWithTimeSlice.RequestEntity requestEntity = new NodeInstSummarySearchWithTimeSlice.RequestEntity(type, timeSlice); NodeInstSummarySearchWithTimeSlice.RequestEntity requestEntity = new NodeInstSummarySearchWithTimeSlice.RequestEntity(type, startTime, endTime);
JsonObject resJsonObj = new JsonObject(); JsonObject resJsonObj = new JsonObject();
workerRef.ask(requestEntity, resJsonObj); workerRef.ask(requestEntity, resJsonObj);
JsonArray nodeArray = resJsonObj.get("result").getAsJsonArray(); JsonArray nodeArray = resJsonObj.get("result").getAsJsonArray();
......
package com.a.eye.skywalking.collector.worker.tools;
import com.a.eye.skywalking.collector.worker.Const;
import org.junit.Test;
/**
* @author pengys5
*/
public class IdSplitTestCase {
@Test
public void testIdSplit() {
String id = "201703221502..-..portal-service..-..[127.0.0.1:8002]";
String[] ids = id.split(Const.IDS_SPLIT);
for (String splitId : ids) {
System.out.println(splitId);
}
}
}
...@@ -8,7 +8,7 @@ import org.junit.Test; ...@@ -8,7 +8,7 @@ import org.junit.Test;
*/ */
public class UrlToolsTestCase { public class UrlToolsTestCase {
@Test // @Test
public void testParseTomcat() { public void testParseTomcat() {
String peers = String peers =
UrlTools.parse("http://172.0.0.1:8080/Web/GetUser", "Tomcat"); UrlTools.parse("http://172.0.0.1:8080/Web/GetUser", "Tomcat");
...@@ -24,7 +24,7 @@ public class UrlToolsTestCase { ...@@ -24,7 +24,7 @@ public class UrlToolsTestCase {
Assert.assertEquals(peers, "http172.0.0.18080/Web/GetUser"); Assert.assertEquals(peers, "http172.0.0.18080/Web/GetUser");
} }
@Test // @Test
public void testParseMotan() { public void testParseMotan() {
String peers = String peers =
UrlTools.parse("motan://10.20.3.15:3000/com.a.eye.skywalking.demo.services.GetUserService.findUser(String, String)", "Motan"); UrlTools.parse("motan://10.20.3.15:3000/com.a.eye.skywalking.demo.services.GetUserService.findUser(String, String)", "Motan");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册