提交 58b80da7 编写于 作者: wu-sheng's avatar wu-sheng

Merge branch 'feature/collector'

* feature/collector:
  agg  save mock finish
  mock persistence
  worker storage mock finish
......@@ -44,7 +44,7 @@ public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {
asyncWorker.allocateJob(new EndOfBatchCommand());
}
} catch (Exception e) {
e.printStackTrace();
asyncWorker.saveException(e);
}
}
......
......@@ -33,4 +33,8 @@ public abstract class AbstractWorker {
final public static AbstractWorker noOwner() {
return null;
}
final protected void saveException(Exception e) {
// e.printStackTrace();
}
}
......@@ -48,5 +48,10 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>
</project>
......@@ -2,17 +2,13 @@ package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public abstract class AnalysisMember extends AbstractLocalAsyncWorker {
private Logger logger = LogManager.getFormatterLogger(AnalysisMember.class);
public AnalysisMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
AnalysisMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -20,7 +16,7 @@ public abstract class AnalysisMember extends AbstractLocalAsyncWorker {
@Override
public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override
......@@ -31,7 +27,7 @@ public abstract class AnalysisMember extends AbstractLocalAsyncWorker {
try {
analyse(message);
} catch (Exception e) {
e.printStackTrace();
saveException(e);
}
}
}
......
......@@ -22,8 +22,8 @@ public class CollectorBootStartUp {
CollectorSystem collectorSystem = new CollectorSystem();
collectorSystem.boot();
EsClient.boot();
// IndexCreator.INSTANCE.create();
EsClient.INSTANCE.boot();
IndexCreator.INSTANCE.create();
HttpServer.INSTANCE.boot((ClusterWorkerContext) collectorSystem.getClusterContext());
}
}
......@@ -26,18 +26,23 @@ public abstract class MergePersistenceMember extends PersistenceMember {
private Logger logger = LogManager.getFormatterLogger(MergePersistenceMember.class);
private MergePersistenceData persistenceData = new MergePersistenceData();
private MergePersistenceData persistenceData;
protected MergePersistenceMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
persistenceData = new MergePersistenceData();
}
private MergePersistenceData getPersistenceData() {
return persistenceData;
}
@Override
final public void analyse(Object message) throws Exception {
if (message instanceof MergeData) {
MergeData mergeData = (MergeData) message;
persistenceData.getElseCreate(mergeData.getId()).merge(mergeData);
if (persistenceData.size() >= WorkerConfig.Persistence.Data.size) {
getPersistenceData().getElseCreate(mergeData.getId()).merge(mergeData);
if (getPersistenceData().size() >= WorkerConfig.Persistence.Data.size) {
persistence();
}
} else {
......@@ -50,21 +55,21 @@ public abstract class MergePersistenceMember extends PersistenceMember {
for (MultiGetItemResponse itemResponse : multiGetResponse) {
GetResponse response = itemResponse.getResponse();
if (response != null && response.isExists()) {
persistenceData.getElseCreate(response.getId()).merge(response.getSource());
getPersistenceData().getElseCreate(response.getId()).merge(response.getSource());
}
}
boolean success = saveToEs();
if (success) {
persistenceData.clear();
getPersistenceData().clear();
}
}
private MultiGetResponse searchFromEs() {
Client client = EsClient.getClient();
Client client = EsClient.INSTANCE.getClient();
MultiGetRequestBuilder multiGetRequestBuilder = client.prepareMultiGet();
Iterator<Map.Entry<String, MergeData>> iterator = persistenceData.iterator();
Iterator<Map.Entry<String, MergeData>> iterator = getPersistenceData().iterator();
while (iterator.hasNext()) {
multiGetRequestBuilder.add(esIndex(), esType(), iterator.next().getKey());
......@@ -74,11 +79,11 @@ public abstract class MergePersistenceMember extends PersistenceMember {
}
private boolean saveToEs() {
Client client = EsClient.getClient();
Client client = EsClient.INSTANCE.getClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
logger.debug("persistenceData size: %s", persistenceData.size());
logger.debug("persistenceData size: %s", getPersistenceData().size());
Iterator<Map.Entry<String, MergeData>> iterator = persistenceData.iterator();
Iterator<Map.Entry<String, MergeData>> iterator = getPersistenceData().iterator();
while (iterator.hasNext()) {
MergeData mergeData = iterator.next().getValue();
bulkRequest.add(client.prepareIndex(esIndex(), esType(), mergeData.getId()).setSource(mergeData.toMap()));
......
......@@ -61,7 +61,7 @@ public abstract class MetricPersistenceMember extends PersistenceMember {
}
private MultiGetResponse searchFromEs() {
Client client = EsClient.getClient();
Client client = EsClient.INSTANCE.getClient();
MultiGetRequestBuilder multiGetRequestBuilder = client.prepareMultiGet();
Iterator<Map.Entry<String, MetricData>> iterator = persistenceData.iterator();
......@@ -75,7 +75,7 @@ public abstract class MetricPersistenceMember extends PersistenceMember {
}
private boolean saveToEs() {
Client client = EsClient.getClient();
Client client = EsClient.INSTANCE.getClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
logger.debug("persistenceData size: %s", persistenceData.size());
......
......@@ -55,7 +55,7 @@ public abstract class RecordPersistenceMember extends PersistenceMember {
}
private boolean saveToEs() {
Client client = EsClient.getClient();
Client client = EsClient.INSTANCE.getClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
logger.debug("persistenceData size: %s", getPersistenceData().size());
......
......@@ -19,85 +19,85 @@ public class WorkerConfig extends ClusterConfig {
}
}
public static class Worker {
public static class TraceSegmentReceiver {
public static int Num = 10;
}
public static class DAGNodeReceiver {
public static int Num = 10;
}
public static class WorkerNum {
public static class Node {
public static class NodeCompAgg {
public static int Value = 10;
}
public static class NodeInstanceReceiver {
public static int Num = 10;
}
public static class NodeMappingDayAgg {
public static int Value = 10;
}
public static class ResponseCostReceiver {
public static int Num = 10;
}
public static class NodeMappingHourAgg {
public static int Value = 10;
}
public static class ResponseSummaryReceiver {
public static int Num = 10;
public static class NodeMappingMinuteAgg {
public static int Value = 10;
}
}
public static class DAGNodeRefReceiver {
public static int Num = 10;
}
}
public static class NodeRef {
public static class NodeRefDayAgg {
public static int Value = 10;
}
public static class WorkerNum {
public static class Node {
public static class NodeDayAgg {
public static class NodeRefHourAgg {
public static int Value = 10;
}
public static class NodeHourAgg {
public static class NodeRefMinuteAgg {
public static int Value = 10;
}
public static class NodeMinuteAgg {
public static class NodeRefResSumDayAgg {
public static int Value = 10;
}
public static class NodeMappingDayAgg {
public static class NodeRefResSumHourAgg {
public static int Value = 10;
}
public static class NodeMappingHourAgg {
public static class NodeRefResSumMinuteAgg {
public static int Value = 10;
}
}
public static class NodeMappingMinuteAgg {
public static class GlobalTrace {
public static class GlobalTraceAgg {
public static int Value = 10;
}
}
}
public static class Queue {
public static class Segment {
public static class SegmentCostSave {
public static class GlobalTrace {
public static class GlobalTraceSave {
public static int Size = 1024;
}
}
public static class SegmentSave {
public static class Segment {
public static class SegmentPost {
public static int Size = 1024;
}
public static class SegmentExceptionSave {
public static class SegmentCostSave {
public static int Size = 1024;
}
}
public static class Node {
public static class NodeDayAnalysis {
public static class SegmentSave {
public static int Size = 1024;
}
public static class NodeHourAnalysis {
public static class SegmentExceptionSave {
public static int Size = 1024;
}
}
public static class NodeMinuteAnalysis {
public static class Node {
public static class NodeCompAnalysis {
public static int Size = 1024;
}
......@@ -124,54 +124,52 @@ public class WorkerConfig extends ClusterConfig {
public static class NodeMappingMinuteAnalysis {
public static int Size = 1024;
}
}
public static class Persistence {
public static class DAGNodePersistence {
public static class NodeCompSave {
public static int Size = 1024;
}
public static class NodeInstancePersistence {
public static class NodeMappingDaySave {
public static int Size = 1024;
}
public static class ResponseCostPersistence {
public static class NodeMappingHourSave {
public static int Size = 1024;
}
public static class ResponseSummaryPersistence {
public static class NodeMappingMinuteSave {
public static int Size = 1024;
}
}
public static class DAGNodeRefPersistence {
public static class NodeRef {
public static class NodeRefDaySave {
public static int Size = 1024;
}
}
public static class NodeRefHourSave {
public static int Size = 1024;
}
public static class TraceSegmentRecordAnalysis {
public static int Size = 1024;
}
public static class NodeRefMinuteSave {
public static int Size = 1024;
}
public static class NodeInstanceAnalysis {
public static int Size = 1024;
}
public static class NodeRefResSumDaySave {
public static int Size = 1024;
}
public static class DAGNodeAnalysis {
public static int Size = 1024;
}
public static class NodeRefResSumHourSave {
public static int Size = 1024;
}
public static class ResponseCostAnalysis {
public static int Size = 1024;
public static class NodeRefResSumMinuteSave {
public static int Size = 1024;
}
}
public static class ResponseSummaryAnalysis {
public static int Size = 1024;
}
public static class DAGNodeRefAnalysis {
public static int Size = 1024;
}
}
}
......@@ -21,7 +21,7 @@ import java.util.List;
*/
public class GlobalTraceAnalysis extends MergeAnalysisMember {
private GlobalTraceAnalysis(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
GlobalTraceAnalysis(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......
package com.a.eye.skywalking.collector.worker.globaltrace.entity;
import java.util.ArrayList;
import java.util.List;
/**
* @author pengys5
*/
public class TreeNode {
private String spanId;
private List<TreeNode> childNodes;
public TreeNode(String spanId) {
this.spanId = spanId;
childNodes = new ArrayList<>();
}
public void addChild(TreeNode childNode) {
childNodes.add(childNode);
}
}
......@@ -15,7 +15,7 @@ public class GlobalTraceAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(GlobalTraceAgg.class);
private GlobalTraceAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
GlobalTraceAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -48,7 +48,7 @@ public class GlobalTraceAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
return WorkerConfig.Worker.DAGNodeReceiver.Num;
return WorkerConfig.WorkerNum.GlobalTrace.GlobalTraceAgg.Value;
}
}
......
......@@ -4,7 +4,7 @@ package com.a.eye.skywalking.collector.worker.globaltrace.persistence;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.MergePersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
......@@ -15,7 +15,7 @@ import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
*/
public class GlobalTraceSave extends MergePersistenceMember {
private GlobalTraceSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
GlobalTraceSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -39,7 +39,7 @@ public class GlobalTraceSave extends MergePersistenceMember {
@Override
public int queueSize() {
return WorkerConfig.Queue.TraceSegmentRecordAnalysis.Size;
return WorkerConfig.Queue.GlobalTrace.GlobalTraceSave.Size;
}
@Override
......@@ -58,7 +58,7 @@ public class GlobalTraceSave extends MergePersistenceMember {
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
return new HashCodeSelector();
}
}
}
......@@ -9,7 +9,7 @@ import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
import com.a.eye.skywalking.collector.worker.segment.logic.Segment;
import com.a.eye.skywalking.collector.worker.segment.logic.SegmentDeserialize;
import com.a.eye.skywalking.collector.worker.segment.logic.SpanView;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.GetResponseFromEs;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.tools.CollectionTools;
import com.a.eye.skywalking.trace.Span;
......@@ -18,7 +18,6 @@ import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import java.util.ArrayList;
import java.util.Collections;
......@@ -33,17 +32,17 @@ public class GlobalTraceSearchWithGlobalId extends AbstractLocalSyncWorker {
private Gson gson = new Gson();
public GlobalTraceSearchWithGlobalId(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
GlobalTraceSearchWithGlobalId(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
protected void onWork(Object request, Object response) throws Exception {
if (request instanceof String) {
Client client = EsClient.getClient();
String globalId = (String) request;
String globalTraceData = client.prepareGet(GlobalTraceIndex.Index, GlobalTraceIndex.Type_Record, globalId).get().getSourceAsString();
String globalTraceData = GetResponseFromEs.INSTANCE.get(GlobalTraceIndex.Index, GlobalTraceIndex.Type_Record, globalId).getSourceAsString();
JsonObject globalTraceObj = gson.fromJson(globalTraceData, JsonObject.class);
logger.debug("globalTraceObj: %s", globalTraceObj);
String subSegIdsStr = globalTraceObj.get(GlobalTraceIndex.SubSegIds).getAsString();
String[] subSegIds = subSegIdsStr.split(MergeData.Split);
......@@ -51,7 +50,8 @@ public class GlobalTraceSearchWithGlobalId extends AbstractLocalSyncWorker {
List<SpanView> spanViewList = new ArrayList<>();
for (String subSegId : subSegIds) {
logger.debug("subSegId: %s", subSegId);
String segmentSource = client.prepareGet(SegmentIndex.Index, SegmentIndex.Type_Record, subSegId).get().getSourceAsString();
String segmentSource = GetResponseFromEs.INSTANCE.get(SegmentIndex.Index, SegmentIndex.Type_Record, subSegId).getSourceAsString();
logger.debug("segmentSource: %s", segmentSource);
Segment segment = SegmentDeserialize.INSTANCE.deserializeFromES(segmentSource);
String segmentId = segment.getTraceSegmentId();
List<TraceSegmentRef> refsList = segment.getRefs();
......@@ -62,19 +62,23 @@ public class GlobalTraceSearchWithGlobalId extends AbstractLocalSyncWorker {
}
}
SpanView rootSpan = findRoot(spanViewList);
findChild(rootSpan, spanViewList, rootSpan.getStartTime());
List<SpanView> viewList = new ArrayList<>();
viewList.add(rootSpan);
Gson gson = new Gson();
String globalTraceStr = gson.toJson(viewList);
JsonObject responseObj = (JsonObject) response;
responseObj.addProperty("result", globalTraceStr);
responseObj.addProperty("result", buildTree(spanViewList));
}
}
private String buildTree(List<SpanView> spanViewList) {
SpanView rootSpan = findRoot(spanViewList);
assert rootSpan != null;
findChild(rootSpan, spanViewList, rootSpan.getStartTime());
List<SpanView> viewList = new ArrayList<>();
viewList.add(rootSpan);
Gson gson = new Gson();
return gson.toJson(viewList);
}
private SpanView findRoot(List<SpanView> spanViewList) {
for (SpanView spanView : spanViewList) {
if (StringUtil.isEmpty(spanView.getParentSpanSegId())) {
......
......@@ -25,11 +25,6 @@ public enum HttpServer {
ServletsCreator.INSTANCE.boot(servletContextHandler, clusterContext);
// ServerConnector serverConnector = new ServerConnector(server);
// serverConnector.setHost("127.0.0.1");
// serverConnector.setPort(7001);
// serverConnector.setIdleTimeout(5000);
server.setHandler(servletContextHandler);
server.start();
server.join();
......
......@@ -36,7 +36,7 @@ abstract class AbstractNodeCompAnalysis extends RecordAnalysisMember {
for (Span span : spanList) {
String kind = Tags.SPAN_KIND.get(span);
if (Tags.SPAN_KIND_CLIENT.equals(kind) && ClientSpanIsLeafTools.isLeaf(span.getSpanId(), spanList)) {
String peers = SpanPeersTools.getPeers(span);
String peers = SpanPeersTools.INSTANCE.getPeers(span);
JsonObject compJsonObj = new JsonObject();
compJsonObj.addProperty(NodeCompIndex.Peers, peers);
......
......@@ -50,7 +50,7 @@ public class NodeCompAnalysis extends AbstractNodeCompAnalysis {
@Override
public int queueSize() {
return WorkerConfig.Queue.Node.NodeDayAnalysis.Size;
return WorkerConfig.Queue.Node.NodeCompAnalysis.Size;
}
}
......
......@@ -11,7 +11,7 @@ import com.a.eye.skywalking.collector.worker.storage.RecordData;
*/
public class NodeCompAgg extends AbstractClusterWorker {
public NodeCompAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeCompAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -24,9 +24,7 @@ public class NodeCompAgg extends AbstractClusterWorker {
protected void onWork(Object message) throws Exception {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeCompSave.Role.INSTANCE).tell(message);
} else {
throw new IllegalArgumentException("message instance must RecordData");
}
} else throw new IllegalArgumentException("message instance must RecordData");
}
public static class Factory extends AbstractClusterWorkerProvider<NodeCompAgg> {
......@@ -44,7 +42,7 @@ public class NodeCompAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
return WorkerConfig.WorkerNum.Node.NodeDayAgg.Value;
return WorkerConfig.WorkerNum.Node.NodeCompAgg.Value;
}
}
......
......@@ -27,7 +27,7 @@ public class NodeCompLoad extends AbstractLocalSyncWorker {
@Override
public void onWork(Object request, Object response) throws Exception {
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeCompIndex.Index);
SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(NodeCompIndex.Index);
searchRequestBuilder.setTypes(NodeCompIndex.Type_Record);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setSize(100);
......
......@@ -43,7 +43,7 @@ public class NodeCompSave extends RecordPersistenceMember {
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.DAGNodePersistence.Size;
return WorkerConfig.Queue.Node.NodeCompSave.Size;
}
}
......
......@@ -11,7 +11,7 @@ import com.a.eye.skywalking.collector.worker.storage.RecordData;
*/
public class NodeMappingDayAgg extends AbstractClusterWorker {
public NodeMappingDayAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeMappingDayAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......
......@@ -14,7 +14,7 @@ import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
*/
public class NodeMappingDaySave extends RecordPersistenceMember {
public NodeMappingDaySave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeMappingDaySave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -43,7 +43,7 @@ public class NodeMappingDaySave extends RecordPersistenceMember {
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.DAGNodePersistence.Size;
return WorkerConfig.Queue.Node.NodeMappingDaySave.Size;
}
}
......
......@@ -14,7 +14,7 @@ import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
*/
public class NodeMappingHourSave extends RecordPersistenceMember {
public NodeMappingHourSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeMappingHourSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -43,7 +43,7 @@ public class NodeMappingHourSave extends RecordPersistenceMember {
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.DAGNodePersistence.Size;
return WorkerConfig.Queue.Node.NodeMappingHourSave.Size;
}
}
......
......@@ -14,7 +14,7 @@ import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
*/
public class NodeMappingMinuteSave extends RecordPersistenceMember {
public NodeMappingMinuteSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeMappingMinuteSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -43,7 +43,7 @@ public class NodeMappingMinuteSave extends RecordPersistenceMember {
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.DAGNodePersistence.Size;
return WorkerConfig.Queue.Node.NodeMappingMinuteSave.Size;
}
}
......
......@@ -34,7 +34,7 @@ public class NodeMappingSearchWithTimeSlice extends AbstractLocalSyncWorker {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeMappingIndex.Index);
SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(NodeMappingIndex.Index);
searchRequestBuilder.setTypes(search.getSliceType());
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(NodeMappingIndex.Time_Slice).gte(search.getStartTime()).lte(search.getEndTime()));
......
package com.a.eye.skywalking.collector.worker.noderef;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractGet;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractGetProvider;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefSearchWithTimeSlice;
import com.a.eye.skywalking.collector.worker.tools.ParameterTools;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Arrays;
import java.util.Map;
/**
* @author pengys5
*/
public class NodeRefGetWithTimeSlice extends AbstractGet {
private Logger logger = LogManager.getFormatterLogger(NodeRefGetWithTimeSlice.class);
private NodeRefGetWithTimeSlice(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(NodeRefSearchWithTimeSlice.WorkerRole.INSTANCE).create(this);
}
@Override
protected void onSearch(Map<String, String[]> request, JsonObject response) throws Exception {
if (!request.containsKey("startTime") || !request.containsKey("endTime") || !request.containsKey("timeSliceType")) {
throw new IllegalArgumentException("the request parameter must contains startTime,endTime,timeSliceType");
}
logger.debug("startTime: %s, endTime: %s, timeSliceType: %s", Arrays.toString(request.get("startTime")),
Arrays.toString(request.get("endTime")), Arrays.toString(request.get("timeSliceType")));
long startTime;
try {
startTime = Long.valueOf(ParameterTools.INSTANCE.toString(request, "startTime"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter startTime must numeric with long type");
}
long endTime;
try {
endTime = Long.valueOf(ParameterTools.INSTANCE.toString(request, "endTime"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter endTime must numeric with long type");
}
NodeRefSearchWithTimeSlice.RequestEntity requestEntity;
requestEntity = new NodeRefSearchWithTimeSlice.RequestEntity(ParameterTools.INSTANCE.toString(request, "timeSliceType"), startTime, endTime);
getSelfContext().lookup(NodeRefSearchWithTimeSlice.WorkerRole.INSTANCE).ask(requestEntity, response);
}
public static class Factory extends AbstractGetProvider<NodeRefGetWithTimeSlice> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeRefGetWithTimeSlice workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefGetWithTimeSlice(role(), clusterContext, new LocalWorkerContext());
}
@Override
public String servletPath() {
return "/nodeRef/timeSlice";
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeRefGetWithTimeSlice.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.noderef;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractGet;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractGetProvider;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefResSumSearchWithTimeSlice;
import com.a.eye.skywalking.collector.worker.tools.ParameterTools;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Arrays;
import java.util.Map;
/**
* @author pengys5
*/
public class NodeRefResSumGetWithTimeSlice extends AbstractGet {
private Logger logger = LogManager.getFormatterLogger(NodeRefResSumGetWithTimeSlice.class);
private NodeRefResSumGetWithTimeSlice(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(NodeRefResSumSearchWithTimeSlice.WorkerRole.INSTANCE).create(this);
}
@Override
protected void onSearch(Map<String, String[]> request, JsonObject response) throws Exception {
if (!request.containsKey("startTime") || !request.containsKey("endTime") || !request.containsKey("timeSliceType")) {
throw new IllegalArgumentException("the request parameter must contains startTime,endTime,timeSliceType");
}
logger.debug("startTime: %s, endTime: %s, timeSliceType: %s", Arrays.toString(request.get("startTime")),
Arrays.toString(request.get("endTime")), Arrays.toString(request.get("timeSliceType")));
long startTime;
try {
startTime = Long.valueOf(ParameterTools.INSTANCE.toString(request, "startTime"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter startTime must numeric with long type");
}
long endTime;
try {
endTime = Long.valueOf(ParameterTools.INSTANCE.toString(request, "endTime"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter endTime must numeric with long type");
}
NodeRefResSumSearchWithTimeSlice.RequestEntity requestEntity;
requestEntity = new NodeRefResSumSearchWithTimeSlice.RequestEntity(ParameterTools.INSTANCE.toString(request, "timeSliceType"), startTime, endTime);
getSelfContext().lookup(NodeRefResSumSearchWithTimeSlice.WorkerRole.INSTANCE).ask(requestEntity, response);
}
public static class Factory extends AbstractGetProvider<NodeRefResSumGetWithTimeSlice> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeRefResSumGetWithTimeSlice workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefResSumGetWithTimeSlice(role(), clusterContext, new LocalWorkerContext());
}
@Override
public String servletPath() {
return "/nodeRef/resSum/timeSlice";
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeRefResSumGetWithTimeSlice.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
......@@ -41,7 +41,7 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
String front = segment.getApplicationCode();
dataJsonObj.addProperty(NodeRefIndex.Front, front);
String behind = SpanPeersTools.getPeers(span);
String behind = SpanPeersTools.INSTANCE.getPeers(span);
dataJsonObj.addProperty(NodeRefIndex.Behind, behind);
dataJsonObj.addProperty(NodeRefIndex.BehindIsRealCode, false);
......
......@@ -15,7 +15,7 @@ public class NodeRefDayAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefDayAgg.class);
public NodeRefDayAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeRefDayAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -48,7 +48,7 @@ public class NodeRefDayAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
return WorkerConfig.Worker.DAGNodeRefReceiver.Num;
return WorkerConfig.WorkerNum.NodeRef.NodeRefDayAgg.Value;
}
}
......
......@@ -14,7 +14,7 @@ import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
*/
public class NodeRefDaySave extends RecordPersistenceMember {
public NodeRefDaySave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeRefDaySave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -44,7 +44,7 @@ public class NodeRefDaySave extends RecordPersistenceMember {
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.DAGNodeRefPersistence.Size;
return WorkerConfig.Queue.NodeRef.NodeRefDaySave.Size;
}
}
......
......@@ -15,7 +15,7 @@ public class NodeRefHourAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefHourAgg.class);
public NodeRefHourAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeRefHourAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -48,7 +48,7 @@ public class NodeRefHourAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
return WorkerConfig.Worker.DAGNodeRefReceiver.Num;
return WorkerConfig.WorkerNum.NodeRef.NodeRefHourAgg.Value;
}
}
......
......@@ -14,7 +14,7 @@ import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
*/
public class NodeRefHourSave extends RecordPersistenceMember {
public NodeRefHourSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeRefHourSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -44,7 +44,7 @@ public class NodeRefHourSave extends RecordPersistenceMember {
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.DAGNodeRefPersistence.Size;
return WorkerConfig.Queue.NodeRef.NodeRefHourSave.Size;
}
}
......
......@@ -15,7 +15,7 @@ public class NodeRefMinuteAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefMinuteAgg.class);
public NodeRefMinuteAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeRefMinuteAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -48,7 +48,7 @@ public class NodeRefMinuteAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
return WorkerConfig.Worker.DAGNodeRefReceiver.Num;
return WorkerConfig.WorkerNum.NodeRef.NodeRefMinuteAgg.Value;
}
}
......
......@@ -14,7 +14,7 @@ import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
*/
public class NodeRefMinuteSave extends RecordPersistenceMember {
public NodeRefMinuteSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeRefMinuteSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -44,7 +44,7 @@ public class NodeRefMinuteSave extends RecordPersistenceMember {
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.DAGNodeRefPersistence.Size;
return WorkerConfig.Queue.NodeRef.NodeRefMinuteSave.Size;
}
}
......
......@@ -15,7 +15,7 @@ public class NodeRefResSumDayAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefResSumDayAgg.class);
private NodeRefResSumDayAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeRefResSumDayAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -48,7 +48,7 @@ public class NodeRefResSumDayAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
return WorkerConfig.Worker.ResponseSummaryReceiver.Num;
return WorkerConfig.WorkerNum.NodeRef.NodeRefResSumDayAgg.Value;
}
}
......
......@@ -14,7 +14,7 @@ import com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumIndex;
*/
public class NodeRefResSumDaySave extends MetricPersistenceMember {
private NodeRefResSumDaySave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeRefResSumDaySave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -43,7 +43,7 @@ public class NodeRefResSumDaySave extends MetricPersistenceMember {
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.ResponseSummaryPersistence.Size;
return WorkerConfig.Queue.NodeRef.NodeRefResSumDaySave.Size;
}
}
......
......@@ -35,7 +35,7 @@ public class NodeRefResSumGroupWithTimeSlice extends AbstractLocalSyncWorker {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeRefResSumIndex.Index);
SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(NodeRefResSumIndex.Index);
searchRequestBuilder.setTypes(search.getSliceType());
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(NodeRefResSumIndex.Time_Slice).gte(search.getStartTime()).lte(search.getEndTime()));
......
......@@ -15,7 +15,7 @@ public class NodeRefResSumHourAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefResSumHourAgg.class);
private NodeRefResSumHourAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeRefResSumHourAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -48,7 +48,7 @@ public class NodeRefResSumHourAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
return WorkerConfig.Worker.ResponseSummaryReceiver.Num;
return WorkerConfig.WorkerNum.NodeRef.NodeRefResSumHourAgg.Value;
}
}
......
......@@ -14,7 +14,7 @@ import com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumIndex;
*/
public class NodeRefResSumHourSave extends MetricPersistenceMember {
private NodeRefResSumHourSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeRefResSumHourSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -43,7 +43,7 @@ public class NodeRefResSumHourSave extends MetricPersistenceMember {
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.ResponseSummaryPersistence.Size;
return WorkerConfig.Queue.NodeRef.NodeRefResSumHourSave.Size;
}
}
......
......@@ -15,7 +15,7 @@ public class NodeRefResSumMinuteAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefResSumMinuteAgg.class);
private NodeRefResSumMinuteAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeRefResSumMinuteAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -48,7 +48,7 @@ public class NodeRefResSumMinuteAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
return WorkerConfig.Worker.ResponseSummaryReceiver.Num;
return WorkerConfig.WorkerNum.NodeRef.NodeRefResSumMinuteAgg.Value;
}
}
......
......@@ -14,7 +14,7 @@ import com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumIndex;
*/
public class NodeRefResSumMinuteSave extends MetricPersistenceMember {
private NodeRefResSumMinuteSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeRefResSumMinuteSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -43,7 +43,7 @@ public class NodeRefResSumMinuteSave extends MetricPersistenceMember {
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.ResponseSummaryPersistence.Size;
return WorkerConfig.Queue.NodeRef.NodeRefResSumMinuteSave.Size;
}
}
......
......@@ -36,7 +36,7 @@ public class NodeRefResSumSearchWithTimeSlice extends AbstractLocalSyncWorker {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeRefResSumIndex.Index);
SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(NodeRefResSumIndex.Index);
searchRequestBuilder.setTypes(search.getSliceType());
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(NodeRefResSumIndex.Time_Slice).gte(search.getStartTime()).lte(search.getEndTime()));
......
......@@ -34,7 +34,7 @@ public class NodeRefSearchWithTimeSlice extends AbstractLocalSyncWorker {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeRefIndex.Index);
SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(NodeRefIndex.Index);
searchRequestBuilder.setTypes(search.getSliceType());
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(NodeRefIndex.Time_Slice).gte(search.getStartTime()).lte(search.getEndTime()));
......
......@@ -31,7 +31,7 @@ public class SegmentCostIndex extends AbstractIndex {
@Override
public XContentBuilder createMappingBuilder() throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
return XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject(SegId)
......@@ -50,12 +50,11 @@ public class SegmentCostIndex extends AbstractIndex {
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(Cost)
.startObject(Cost)
.field("type", "long")
.field("index", "not_analyzed")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject();
return mappingBuilder;
}
}
package com.a.eye.skywalking.collector.worker.segment;
import com.a.eye.skywalking.collector.worker.storage.AbstractIndex;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
......@@ -13,8 +11,6 @@ import java.io.IOException;
*/
public class SegmentIndex extends AbstractIndex {
private Logger logger = LogManager.getFormatterLogger(SegmentIndex.class);
public static final String Index = "segment_idx";
@Override
......@@ -29,7 +25,7 @@ public class SegmentIndex extends AbstractIndex {
@Override
public XContentBuilder createMappingBuilder() throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
return XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject("traceSegmentId")
......@@ -60,54 +56,7 @@ public class SegmentIndex extends AbstractIndex {
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startArray("refs")
.startObject("traceSegmentId")
.field("type", "String")
.field("index", "not_analyzed")
.endObject()
.startObject("spanId")
.field("type", "integer")
.field("index", "not_analyzed")
.endObject()
.startObject("applicationCode")
.field("type", "String")
.field("index", "not_analyzed")
.endObject()
.startObject("peerHost")
.field("type", "String")
.field("index", "not_analyzed")
.endObject()
.endArray()
.startArray("refs")
.startObject("spanId")
.field("type", "integer")
.field("index", "not_analyzed")
.endObject()
.startObject("parentSpanId")
.field("type", "integer")
.field("index", "not_analyzed")
.endObject()
.startObject("startTime")
.field("type", "date")
.field("index", "not_analyzed")
.endObject()
.startObject("endTime")
.field("type", "date")
.field("index", "not_analyzed")
.endObject()
.startObject("operationName")
.field("type", "String")
.field("index", "not_analyzed")
.endObject()
.endArray()
.startArray("relatedGlobalTraces")
.startObject("id")
.field("type", "String")
.field("index", "not_analyzed")
.endObject()
.endArray()
.endObject()
.endObject();
return mappingBuilder;
}
}
......@@ -7,6 +7,7 @@ import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.globaltrace.analysis.GlobalTraceAnalysis;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractPost;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractPostProvider;
......@@ -134,7 +135,7 @@ public class SegmentPost extends AbstractPost {
@Override
public int queueSize() {
return 128;
return WorkerConfig.Queue.Segment.SegmentPost.Size;
}
@Override
......
......@@ -22,7 +22,7 @@ public class SegmentExceptionWithSegId extends AbstractLocalSyncWorker {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
GetResponse getResponse = EsClient.getClient().prepareGet(SegmentExceptionIndex.Index, SegmentExceptionIndex.Type_Record, search.segId).get();
GetResponse getResponse = EsClient.INSTANCE.getClient().prepareGet(SegmentExceptionIndex.Index, SegmentExceptionIndex.Type_Record, search.segId).get();
JsonObject dataJson = new JsonObject();
dataJson.addProperty(SegmentExceptionIndex.SegId, (String) getResponse.getSource().get(SegmentExceptionIndex.SegId));
......
......@@ -41,7 +41,7 @@ public class SegmentTopSearchWithGlobalTraceId extends AbstractLocalSyncWorker {
protected void onWork(Object request, Object response) throws Exception {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
Client client = EsClient.getClient();
Client client = EsClient.INSTANCE.getClient();
String globalTraceData = client.prepareGet(GlobalTraceIndex.Index, GlobalTraceIndex.Type_Record, search.globalTraceId).get().getSourceAsString();
JsonObject globalTraceObj = gson.fromJson(globalTraceData, JsonObject.class);
......
......@@ -43,7 +43,7 @@ public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(SegmentCostIndex.Index);
SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(SegmentCostIndex.Index);
searchRequestBuilder.setTypes(SegmentCostIndex.Type_Record);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
......@@ -86,7 +86,7 @@ public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
topSegmentJson.addProperty(SegmentCostIndex.OperationName, (String) searchHit.getSource().get(SegmentCostIndex.OperationName));
topSegmentJson.addProperty(SegmentCostIndex.Cost, (Number) searchHit.getSource().get(SegmentCostIndex.Cost));
String segmentSource = EsClient.getClient().prepareGet(SegmentIndex.Index, SegmentIndex.Type_Record, segId).get().getSourceAsString();
String segmentSource = EsClient.INSTANCE.getClient().prepareGet(SegmentIndex.Index, SegmentIndex.Type_Record, segId).get().getSourceAsString();
Segment segment = SegmentDeserialize.INSTANCE.deserializeFromES(segmentSource);
List<DistributedTraceId> distributedTraceIdList = segment.getRelatedGlobalTraces();
......
......@@ -40,11 +40,6 @@ public class SpanGetWithId extends AbstractGet {
}
logger.debug("segId: %s, spanId: %s", Arrays.toString(request.get("segId")), Arrays.toString(request.get("spanId")));
int maxCost = -1;
if (request.containsKey("maxCost")) {
maxCost = Integer.valueOf(ParameterTools.INSTANCE.toString(request, "maxCost"));
}
String segId = ParameterTools.INSTANCE.toString(request, "segId");
String spanId = ParameterTools.INSTANCE.toString(request, "spanId");
......
......@@ -3,10 +3,11 @@ package com.a.eye.skywalking.collector.worker.span.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.Const;
import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
import com.a.eye.skywalking.collector.worker.segment.logic.Segment;
import com.a.eye.skywalking.collector.worker.segment.logic.SegmentDeserialize;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.GetResponseFromEs;
import com.a.eye.skywalking.trace.Span;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
......@@ -21,7 +22,7 @@ public class SpanSearchWithId extends AbstractLocalSyncWorker {
private Gson gson = new Gson();
private SpanSearchWithId(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
SpanSearchWithId(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -29,7 +30,7 @@ public class SpanSearchWithId extends AbstractLocalSyncWorker {
protected void onWork(Object request, Object response) throws Exception {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
GetResponse getResponse = EsClient.getClient().prepareGet(SegmentIndex.Index, SegmentIndex.Type_Record, search.segId).get();
GetResponse getResponse = GetResponseFromEs.INSTANCE.get(SegmentIndex.Index, SegmentIndex.Type_Record, search.segId);
Segment segment = SegmentDeserialize.INSTANCE.deserializeFromES(getResponse.getSourceAsString());
List<Span> spanList = segment.getSpans();
......@@ -44,7 +45,7 @@ public class SpanSearchWithId extends AbstractLocalSyncWorker {
}
JsonObject resJsonObj = (JsonObject) response;
resJsonObj.add("result", dataJson);
resJsonObj.add(Const.RESULT, dataJson);
}
}
......
......@@ -29,20 +29,19 @@ public abstract class AbstractIndex {
public static final String AGG_COLUMN = "aggId";
public static final String Time_Slice = "timeSlice";
final public XContentBuilder createSettingBuilder() throws IOException {
XContentBuilder settingsBuilder = XContentFactory.jsonBuilder()
final XContentBuilder createSettingBuilder() throws IOException {
return XContentFactory.jsonBuilder()
.startObject()
.field("index.number_of_shards", 2)
.field("index.number_of_replicas", 0)
.endObject();
return settingsBuilder;
}
public abstract boolean isRecord();
public abstract XContentBuilder createMappingBuilder() throws IOException;
final public void createIndex() {
final void createIndex() {
// settings
String settingSource = "";
......@@ -59,7 +58,7 @@ public abstract class AbstractIndex {
logger.error("create %s index mapping builder error", index());
}
Settings settings = Settings.builder().loadFromSource(settingSource).build();
IndicesAdminClient client = EsClient.getClient().admin().indices();
IndicesAdminClient client = EsClient.INSTANCE.getClient().admin().indices();
if (isRecord()) {
CreateIndexResponse response = client.prepareCreate(index()).setSettings(settings).addMapping(Type_Record, mappingBuilder).get();
......@@ -74,8 +73,8 @@ public abstract class AbstractIndex {
}
}
final public boolean deleteIndex() {
IndicesAdminClient client = EsClient.getClient().admin().indices();
final boolean deleteIndex() {
IndicesAdminClient client = EsClient.INSTANCE.getClient().admin().indices();
try {
DeleteIndexResponse response = client.prepareDelete(index()).get();
logger.info("delete %s index finished, isAcknowledged: %s", index(), response.isAcknowledged());
......
......@@ -15,11 +15,12 @@ import java.net.UnknownHostException;
/**
* @author pengys5
*/
public class EsClient {
public enum EsClient {
INSTANCE;
private static Client client;
private Client client;
public static void boot() throws UnknownHostException {
public void boot() throws UnknownHostException {
Settings settings = Settings.builder()
.put("cluster.name", "CollectorCluster")
.put("client.transport.sniff", true)
......@@ -29,11 +30,11 @@ public class EsClient {
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
}
public static Client getClient() {
public Client getClient() {
return client;
}
public static void indexRefresh(String... indexName) {
public void indexRefresh(String... indexName) {
Logger logger = LogManager.getFormatterLogger(EsClient.class);
RefreshResponse response = client.admin().indices().refresh(new RefreshRequest(indexName)).actionGet();
if (response.getShardFailures().length == response.getTotalShards()) {
......
package com.a.eye.skywalking.collector.worker.storage;
import org.elasticsearch.action.get.GetResponse;
/**
* @author pengys5
*/
public enum GetResponseFromEs {
INSTANCE;
public GetResponse get(String index, String type, String id) {
return EsClient.INSTANCE.getClient().prepareGet(index, type, id).get();
}
}
package com.a.eye.skywalking.collector.worker.storage;
import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
import com.a.eye.skywalking.collector.worker.node.NodeCompIndex;
import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumIndex;
import com.a.eye.skywalking.collector.worker.segment.SegmentCostIndex;
import com.a.eye.skywalking.collector.worker.segment.SegmentExceptionIndex;
import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.HashSet;
import java.util.ServiceLoader;
import java.util.Set;
/**
* @author pengys5
......@@ -15,36 +13,23 @@ import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
public enum IndexCreator {
INSTANCE;
public void create() {
SegmentIndex segmentIndex = new SegmentIndex();
segmentIndex.deleteIndex();
GlobalTraceIndex globalTraceIndex = new GlobalTraceIndex();
globalTraceIndex.deleteIndex();
globalTraceIndex.createIndex();
SegmentCostIndex segmentCostIndex = new SegmentCostIndex();
segmentCostIndex.deleteIndex();
segmentCostIndex.createIndex();
SegmentExceptionIndex segmentExceptionIndex = new SegmentExceptionIndex();
segmentExceptionIndex.deleteIndex();
segmentExceptionIndex.createIndex();
private Logger logger = LogManager.getFormatterLogger(IndexCreator.class);
NodeCompIndex nodeCompIndex = new NodeCompIndex();
nodeCompIndex.deleteIndex();
nodeCompIndex.createIndex();
NodeMappingIndex nodeMappingIndex = new NodeMappingIndex();
nodeMappingIndex.deleteIndex();
nodeMappingIndex.createIndex();
NodeRefIndex nodeRefIndex = new NodeRefIndex();
nodeRefIndex.deleteIndex();
nodeRefIndex.createIndex();
public void create() {
Set<AbstractIndex> indexSet = loadIndex();
for (AbstractIndex index : indexSet) {
index.deleteIndex();
index.createIndex();
}
}
NodeRefResSumIndex nodeRefResSumIndex = new NodeRefResSumIndex();
nodeRefResSumIndex.deleteIndex();
nodeRefResSumIndex.createIndex();
private Set<AbstractIndex> loadIndex() {
Set<AbstractIndex> indexSet = new HashSet<>();
ServiceLoader<AbstractIndex> indexServiceLoader = ServiceLoader.load(AbstractIndex.class);
for (AbstractIndex index : indexServiceLoader) {
logger.info("index name: %s", index.index());
indexSet.add(index);
}
return indexSet;
}
}
......@@ -11,7 +11,7 @@ import java.util.function.Consumer;
*/
public class MergePersistenceData implements Iterable {
private Map<String, MergeData> persistenceData = new HashMap();
private Map<String, MergeData> persistenceData = new HashMap<>();
public MergeData getElseCreate(String id) {
if (!persistenceData.containsKey(id)) {
......@@ -28,10 +28,6 @@ public class MergePersistenceData implements Iterable {
persistenceData.clear();
}
public boolean hasNext() {
return persistenceData.entrySet().iterator().hasNext();
}
public MergeData pushOne() {
MergeData one = persistenceData.entrySet().iterator().next().getValue();
persistenceData.remove(one.getId());
......
......@@ -10,7 +10,7 @@ import java.util.Spliterator;
*/
public class MetricPersistenceData implements Iterable {
private Map<String, MetricData> persistenceData = new HashMap();
private Map<String, MetricData> persistenceData = new HashMap<>();
public MetricData getElseCreate(String id) {
if (!persistenceData.containsKey(id)) {
......
......@@ -24,6 +24,7 @@ public class RecordData extends AbstractHashMessage {
this.aggId = this.aggId + Const.ID_SPLIT + ids[i];
}
}
record = new JsonObject();
}
public String getId() {
......
......@@ -8,8 +8,10 @@ import com.a.eye.skywalking.trace.tag.Tags;
/**
* @author pengys5
*/
public class SpanPeersTools {
public static String getPeers(Span span) {
public enum SpanPeersTools {
INSTANCE;
public String getPeers(Span span) {
if (StringUtil.isEmpty(Tags.PEERS.get(span))) {
String host = Tags.PEER_HOST.get(span);
int port = Tags.PEER_PORT.get(span);
......
package com.a.eye.skywalking.collector.worker.tools;
/**
* @author pengys5
*/
public class UrlTools {
private static final String HttpUrlHead = "http://";
private static final String HttpsUrlHead = "https://";
private static final String MotanUrlHead = "motan://";
public static String parse(String url, String component) {
if ("Tomcat".equals(component)) {
return parseTomcat(url);
} else if ("Motan".equals(component)) {
return parseMotan(url);
}
return null;
}
private static String parseTomcat(String url) {
if (url.startsWith(HttpUrlHead)) {
String suffix = url.substring(7, url.length());
String[] urlSplit = suffix.split("/");
return HttpUrlHead + urlSplit[0];
} else if (url.startsWith(HttpsUrlHead)) {
String suffix = url.substring(8, url.length());
String[] urlSplit = suffix.split("/");
return HttpsUrlHead + urlSplit[0];
} else if (url.contains(":")) {
return url.split("/")[0];
} else {
return url;
}
}
private static String parseMotan(String url) {
if (url.startsWith(MotanUrlHead)) {
String suffix = url.substring(8, url.length());
String[] urlSplit = suffix.split("/");
return MotanUrlHead + urlSplit[0];
} else {
return url;
}
}
}
com.a.eye.skywalking.collector.worker.segment.SegmentIndex
com.a.eye.skywalking.collector.worker.segment.SegmentCostIndex
com.a.eye.skywalking.collector.worker.segment.SegmentExceptionIndex
com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex
com.a.eye.skywalking.collector.worker.node.NodeCompIndex
com.a.eye.skywalking.collector.worker.node.NodeMappingIndex
com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex
com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumIndex
\ No newline at end of file
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.mockito.Mockito.*;
/**
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(TestAnalysisMember.class)
@PowerMockIgnore({"javax.management.*"})
public class AnalysisMemberTestCase {
@Test
public void testCommandOnWork() throws Exception {
AnalysisMember member = mock(AnalysisMember.class);
ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(null);
LocalWorkerContext localWorkerContext = new LocalWorkerContext();
TestAnalysisMember member = PowerMockito.spy(new TestAnalysisMember(TestAnalysisMember.Role.INSTANCE, clusterWorkerContext, localWorkerContext));
EndOfBatchCommand command = new EndOfBatchCommand();
member.onWork(command);
......@@ -22,11 +39,58 @@ public class AnalysisMemberTestCase {
@Test
public void testAnalyse() throws Exception {
AnalysisMember member = mock(AnalysisMember.class);
ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(null);
LocalWorkerContext localWorkerContext = new LocalWorkerContext();
TestAnalysisMember member = PowerMockito.spy(new TestAnalysisMember(TestAnalysisMember.Role.INSTANCE, clusterWorkerContext, localWorkerContext));
Object message = new Object();
member.onWork(message);
verify(member, never()).aggregation();
verify(member, times(1)).analyse(anyObject());
}
@Test
public void testPreStart() throws Exception {
ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(null);
LocalWorkerContext localWorkerContext = new LocalWorkerContext();
TestAnalysisMember member = PowerMockito.spy(new TestAnalysisMember(TestAnalysisMember.Role.INSTANCE, clusterWorkerContext, localWorkerContext));
member.preStart();
}
@Test
public void testOnWorkException() throws Exception {
ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(null);
LocalWorkerContext localWorkerContext = new LocalWorkerContext();
TestAnalysisMember member = PowerMockito.spy(new TestAnalysisMember(TestAnalysisMember.Role.INSTANCE, clusterWorkerContext, localWorkerContext));
doThrow(new TestException()).when(member).analyse(anyObject());
ExceptionAnswer answer = new ExceptionAnswer();
PowerMockito.when(member, "saveException", any(TestException.class)).thenAnswer(answer);
member.onWork(new Object());
Assert.assertEquals(true, answer.isTestException);
}
class TestException extends Exception {
}
class ExceptionAnswer implements Answer {
boolean isTestException = false;
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Object obj = invocation.getArguments()[0];
if (obj instanceof TestException) {
isTestException = true;
} else {
isTestException = false;
}
return null;
}
}
}
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.storage.MergePersistenceData;
import org.junit.Assert;
......@@ -7,6 +9,8 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
......@@ -16,38 +20,42 @@ import static org.powermock.api.mockito.PowerMockito.*;
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(MergeAnalysisMember.class)
@PrepareForTest(TestMergeAnalysisMember.class)
@PowerMockIgnore({"javax.management.*"})
public class MergeAnalysisMemberTestCase {
private MergeAnalysisMember member;
private TestMergeAnalysisMember mergeAnalysisMember;
private MergePersistenceData persistenceData;
@Before
public void init() throws Exception {
member = mock(MergeAnalysisMember.class);
ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(null);
LocalWorkerContext localWorkerContext = new LocalWorkerContext();
mergeAnalysisMember = PowerMockito.spy(new TestMergeAnalysisMember(TestMergeAnalysisMember.Role.INSTANCE, clusterWorkerContext, localWorkerContext));
persistenceData = mock(MergePersistenceData.class);
MergeData mergeData = mock(MergeData.class);
when(member, "getPersistenceData").thenReturn(persistenceData);
when(mergeAnalysisMember, "getPersistenceData").thenReturn(persistenceData);
when(persistenceData.getElseCreate(Mockito.anyString())).thenReturn(mergeData);
doCallRealMethod().when(member).setMergeData(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
doCallRealMethod().when(mergeAnalysisMember).setMergeData(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
}
@Test
public void testSetMergeDataNotFull() throws Exception {
when(persistenceData.size()).thenReturn(WorkerConfig.Persistence.Data.size - 1);
member.setMergeData("segment_1", "column", "value");
Mockito.verify(member, Mockito.never()).aggregation();
mergeAnalysisMember.setMergeData("segment_1", "column", "value");
Mockito.verify(mergeAnalysisMember, Mockito.never()).aggregation();
}
@Test
public void testSetMergeDataFull() throws Exception {
when(persistenceData.size()).thenReturn(WorkerConfig.Persistence.Data.size);
member.setMergeData("segment_1", "column", "value");
Mockito.verify(member, Mockito.times(1)).aggregation();
mergeAnalysisMember.setMergeData("segment_1", "column", "value");
Mockito.verify(mergeAnalysisMember, Mockito.times(1)).aggregation();
}
@Test
......@@ -55,9 +63,10 @@ public class MergeAnalysisMemberTestCase {
MergePersistenceData persistenceData = new MergePersistenceData();
persistenceData.getElseCreate("segment_1").setMergeData("column", "value");
when(member, "getPersistenceData").thenReturn(persistenceData);
doCallRealMethod().when(member).pushOne();
when(mergeAnalysisMember, "getPersistenceData").thenReturn(persistenceData);
doCallRealMethod().when(mergeAnalysisMember).pushOne();
Assert.assertEquals("segment_1", member.pushOne().getId());
Assert.assertEquals("segment_1", mergeAnalysisMember.pushOne().getId());
Assert.assertEquals(null, mergeAnalysisMember.pushOne());
}
}
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
import com.a.eye.skywalking.collector.worker.mock.MockEsBulkClient;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.storage.MergePersistenceData;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.powermock.api.mockito.PowerMockito.*;
/**
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({TestMergePersistenceMember.class, EsClient.class})
@PowerMockIgnore({"javax.management.*"})
public class MergePersistenceMemberTestCase {
private TestMergePersistenceMember mergePersistenceMember;
private MergePersistenceData persistenceData;
@Before
public void init() throws Exception {
MockEsBulkClient mockEsBulkClient = new MockEsBulkClient();
mockEsBulkClient.createMock();
ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(null);
LocalWorkerContext localWorkerContext = new LocalWorkerContext();
mergePersistenceMember = PowerMockito.spy(new TestMergePersistenceMember(TestMergePersistenceMember.Role.INSTANCE, clusterWorkerContext, localWorkerContext));
persistenceData = mock(MergePersistenceData.class);
MergeData mergeData = mock(MergeData.class);
when(mergePersistenceMember, "getPersistenceData").thenReturn(persistenceData);
when(persistenceData.getElseCreate(Mockito.anyString())).thenReturn(mergeData);
doCallRealMethod().when(mergePersistenceMember).analyse(Mockito.any(MergeData.class));
}
@Test
public void testAnalyse() throws Exception {
String id = "2016" + Const.ID_SPLIT + "A" + Const.ID_SPLIT + "B";
MergeData mergeData = new MergeData(id);
mergeData.setMergeData("Column", "Value");
// mergePersistenceMember.analyse(mergeData);
// mergePersistenceMember.onWork(new EndOfBatchCommand());
}
}
......@@ -29,7 +29,7 @@ public class StartUpTestCase {
withFallback(ConfigFactory.load("application.conf"));
ActorSystem system = ActorSystem.create("CollectorSystem", config);
EsClient.boot();
EsClient.INSTANCE.boot();
TraceSegment dubboClientData = TraceSegmentBuilderFactory.INSTANCE.traceOf_Tomcat_DubboClient();
......
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
/**
* @author pengys5
*/
public class TestAnalysisMember extends AnalysisMember {
TestAnalysisMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
}
@Override
public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override
protected void aggregation() throws Exception {
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return null;
}
@Override
public WorkerSelector workerSelector() {
return null;
}
}
}
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
/**
* @author pengys5
*/
public class TestMergeAnalysisMember extends MergeAnalysisMember {
TestMergeAnalysisMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
}
@Override
public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override
protected void aggregation() throws Exception {
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return null;
}
@Override
public WorkerSelector workerSelector() {
return null;
}
}
}
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
/**
* @author pengys5
*/
public class TestMergePersistenceMember extends MergePersistenceMember {
TestMergePersistenceMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return null;
}
@Override
public String esType() {
return null;
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return null;
}
@Override
public WorkerSelector workerSelector() {
return null;
}
}
}
package com.a.eye.skywalking.collector.worker;
import org.junit.Assert;
import org.junit.Test;
/**
* @author pengys5
*/
public class TimeSliceTestCase {
@Test
public void test() {
TestTimeSlice timeSlice = new TestTimeSlice("A", 10L, 20L);
Assert.assertEquals("A", timeSlice.getSliceType());
Assert.assertEquals(10L, timeSlice.getStartTime());
Assert.assertEquals(20L, timeSlice.getEndTime());
}
class TestTimeSlice extends TimeSlice {
public TestTimeSlice(String sliceType, long startTime, long endTime) {
super(sliceType, startTime, endTime);
}
}
}
package com.a.eye.skywalking.collector.worker.globaltrace;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
/**
* @author pengys5
*/
public class GlobalTraceIndexTestCase {
@Test
public void test() {
GlobalTraceIndex index = new GlobalTraceIndex();
Assert.assertEquals("global_trace_idx", index.index());
Assert.assertEquals(true, index.isRecord());
}
@Test
public void testBuilder() throws IOException {
GlobalTraceIndex index = new GlobalTraceIndex();
Assert.assertEquals("{\"properties\":{\"subSegIds\":{\"type\":\"text\",\"index\":\"not_analyzed\"}}}", index.createMappingBuilder().string());
}
}
......@@ -28,7 +28,7 @@ public class GlobalTraceSearchTestCase {
public void testSearchWithGlobalId() throws Exception {
Client client = mock(Client.class);
mockStatic(EsClient.class);
when(EsClient.getClient()).thenReturn(client);
// when(EsClient.INSTANCE.getClient()).thenReturn(client);
String globalTraceId = "Global.1";
String segment_1 = "SEGMENT.1";
......
package com.a.eye.skywalking.collector.worker.globaltrace.persistence;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.mock.MergeDataAnswer;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.MergeDataAggTools;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.mockito.Mockito.*;
/**
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({LocalWorkerContext.class})
@PowerMockIgnore({"javax.management.*"})
public class GlobalTraceAggTestCase {
private GlobalTraceAgg agg;
private MergeDataAnswer mergeDataAnswer;
private ClusterWorkerContext clusterWorkerContext;
@Before
public void init() throws Exception {
clusterWorkerContext = PowerMockito.mock(ClusterWorkerContext.class);
LocalWorkerContext localWorkerContext = PowerMockito.mock(LocalWorkerContext.class);
WorkerRefs workerRefs = mock(WorkerRefs.class);
mergeDataAnswer = new MergeDataAnswer();
doAnswer(mergeDataAnswer).when(workerRefs).tell(Mockito.any(RecordData.class));
when(localWorkerContext.lookup(GlobalTraceSave.Role.INSTANCE)).thenReturn(workerRefs);
agg = new GlobalTraceAgg(GlobalTraceAgg.Role.INSTANCE, clusterWorkerContext, localWorkerContext);
}
@Test
public void testRole() {
Assert.assertEquals(GlobalTraceAgg.class.getSimpleName(), GlobalTraceAgg.Role.INSTANCE.roleName());
Assert.assertEquals(HashCodeSelector.class.getSimpleName(), GlobalTraceAgg.Role.INSTANCE.workerSelector().getClass().getSimpleName());
}
@Test
public void testFactory() {
Assert.assertEquals(GlobalTraceAgg.class.getSimpleName(), GlobalTraceAgg.Factory.INSTANCE.role().roleName());
Assert.assertEquals(GlobalTraceAgg.class.getSimpleName(), GlobalTraceAgg.Factory.INSTANCE.workerInstance(null).getClass().getSimpleName());
int testSize = 10;
WorkerConfig.WorkerNum.GlobalTrace.GlobalTraceAgg.Value = testSize;
Assert.assertEquals(testSize, GlobalTraceAgg.Factory.INSTANCE.workerNum());
}
@Test
public void testPreStart() throws ProviderNotFoundException {
when(clusterWorkerContext.findProvider(GlobalTraceSave.Role.INSTANCE)).thenReturn(GlobalTraceSave.Factory.INSTANCE);
ArgumentCaptor<GlobalTraceSave.Role> argumentCaptor = ArgumentCaptor.forClass(GlobalTraceSave.Role.class);
agg.preStart();
verify(clusterWorkerContext).findProvider(argumentCaptor.capture());
}
@Test
public void testOnWork() throws Exception {
MergeDataAggTools.INSTANCE.testOnWork(agg, mergeDataAnswer);
}
@Test
public void testOnWorkError() throws Exception {
agg.onWork(new Object());
}
}
package com.a.eye.skywalking.collector.worker.globaltrace.persistence;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
*/
public class GlobalTraceSaveTestCase {
private GlobalTraceSave save;
@Before
public void init() {
ClusterWorkerContext cluster = new ClusterWorkerContext(null);
LocalWorkerContext local = new LocalWorkerContext();
save = new GlobalTraceSave(GlobalTraceSave.Role.INSTANCE, cluster, local);
}
@Test
public void testEsIndex() {
Assert.assertEquals(GlobalTraceIndex.Index, save.esIndex());
}
@Test
public void testEsType() {
Assert.assertEquals(GlobalTraceIndex.Type_Record, save.esType());
}
@Test
public void testRole() {
Assert.assertEquals(GlobalTraceSave.class.getSimpleName(), GlobalTraceSave.Role.INSTANCE.roleName());
Assert.assertEquals(HashCodeSelector.class.getSimpleName(), GlobalTraceSave.Role.INSTANCE.workerSelector().getClass().getSimpleName());
}
@Test
public void testFactory() {
Assert.assertEquals(GlobalTraceSave.class.getSimpleName(), GlobalTraceSave.Factory.INSTANCE.role().roleName());
Assert.assertEquals(GlobalTraceSave.class.getSimpleName(), GlobalTraceSave.Factory.INSTANCE.workerInstance(null).getClass().getSimpleName());
int testSize = 10;
WorkerConfig.Queue.GlobalTrace.GlobalTraceSave.Size = testSize;
Assert.assertEquals(testSize, GlobalTraceSave.Factory.INSTANCE.queueSize());
}
}
package com.a.eye.skywalking.collector.worker.globaltrace.persistence;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
import com.a.eye.skywalking.collector.worker.storage.GetResponseFromEs;
import com.google.gson.JsonObject;
import org.elasticsearch.action.get.GetResponse;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({GetResponseFromEs.class})
@PowerMockIgnore({"javax.management.*"})
public class GlobalTraceSearchWithGlobalIdTestCase {
private GetResponseFromEs getResponseFromEs;
private String global_Str = "{\"subSegIds\":\"Segment.1491277162066.18986177.70531.27.1\"}";
private String seg_str = "{\"ts\":\"Segment.1491277162066.18986177.70531.27.1\",\"st\":1491277162066,\"et\":1491277165743,\"ss\":[{\"si\":0,\"ps\":-1,\"st\":1491277162141,\"et\":1491277162144,\"on\":\"Jedis/getClient\",\"ts\":{\"span.layer\":\"db\",\"component\":\"Redis\",\"db.type\":\"Redis\",\"peer.host\":\"127.0.0.1\",\"span.kind\":\"client\"},\"tb\":{},\"ti\":{\"peer.port\":6379},\"lo\":[]}],\"ac\":\"cache-service\",\"gt\":[\"Trace.1491277147443.-1562443425.70539.65.2\"],\"sampled\":true,\"minute\":201704041139,\"hour\":201704041100,\"day\":201704040000,\"aggId\":null}";
@Before
public void init() {
getResponseFromEs = PowerMockito.mock(GetResponseFromEs.class);
Whitebox.setInternalState(GetResponseFromEs.class, "INSTANCE", getResponseFromEs);
}
@Test
public void testRole() {
Assert.assertEquals(GlobalTraceSearchWithGlobalId.class.getSimpleName(), GlobalTraceSearchWithGlobalId.WorkerRole.INSTANCE.roleName());
Assert.assertEquals(RollingSelector.class.getSimpleName(), GlobalTraceSearchWithGlobalId.WorkerRole.INSTANCE.workerSelector().getClass().getSimpleName());
}
@Test
public void testFactory() {
Assert.assertEquals(GlobalTraceSearchWithGlobalId.class.getSimpleName(), GlobalTraceSearchWithGlobalId.Factory.INSTANCE.role().roleName());
Assert.assertEquals(GlobalTraceSearchWithGlobalId.class.getSimpleName(), GlobalTraceSearchWithGlobalId.Factory.INSTANCE.workerInstance(null).getClass().getSimpleName());
}
@Test
public void testOnWork() throws Exception {
ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(null);
LocalWorkerContext localWorkerContext = new LocalWorkerContext();
GlobalTraceSearchWithGlobalId globalTraceSearchWithGlobalId = new GlobalTraceSearchWithGlobalId(GlobalTraceSearchWithGlobalId.WorkerRole.INSTANCE, clusterWorkerContext, localWorkerContext);
GetResponse getResponse = mock(GetResponse.class);
when(getResponseFromEs.get(GlobalTraceIndex.Index, GlobalTraceIndex.Type_Record, "Trace.1491277147443.-1562443425.70539.65.2")).thenReturn(getResponse);
when(getResponse.getSourceAsString()).thenReturn(global_Str);
GetResponse segResponse = mock(GetResponse.class);
when(getResponseFromEs.get(SegmentIndex.Index, SegmentIndex.Type_Record, "Segment.1491277162066.18986177.70531.27.1")).thenReturn(segResponse);
when(segResponse.getSourceAsString()).thenReturn(seg_str);
JsonObject response = new JsonObject();
globalTraceSearchWithGlobalId.onWork("Trace.1491277147443.-1562443425.70539.65.2", response);
}
}
package com.a.eye.skywalking.collector.worker.globaltrace.persistence;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
import com.a.eye.skywalking.collector.worker.node.persistence.NodeMappingSearchWithTimeSlice;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
/**
* @author pengys5
*/
public class GlobalTraceSearchWithGlobalIdUseDB {
public static void main(String[] args) throws Exception {
EsClient.INSTANCE.boot();
ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(null);
LocalWorkerContext localWorkerContext = new LocalWorkerContext();
GlobalTraceSearchWithGlobalId globalTraceSearchWithGlobalId =
new GlobalTraceSearchWithGlobalId(GlobalTraceSearchWithGlobalId.WorkerRole.INSTANCE, clusterWorkerContext, localWorkerContext);
JsonObject response = new JsonObject();
globalTraceSearchWithGlobalId.onWork("Trace.1491277147443.-1562443425.70539.65.2", response);
JsonArray nodeArray = response.get("result").getAsJsonArray();
System.out.println(nodeArray.size());
System.out.println(nodeArray.toString());
for (int i = 0; i < nodeArray.size(); i++) {
JsonObject nodeJsonObj = nodeArray.get(i).getAsJsonObject();
System.out.println(nodeJsonObj);
}
}
}
package com.a.eye.skywalking.collector.worker.mock;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class MergeDataAnswer implements Answer<Object> {
public Map<String, String> mergeObj = new HashMap<>();
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
MergeData mergeData = (MergeData) invocation.getArguments()[0];
for (Map.Entry<String, String> entry : mergeData.toMap().entrySet()) {
System.out.printf("key: %s, value: %s \n", entry.getKey(), entry.getValue());
mergeObj.put(entry.getKey(), entry.getValue());
}
return null;
}
}
package com.a.eye.skywalking.collector.worker.mock;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class MetricDataAnswer implements Answer<Object> {
public Map<String, Object> metricObj = new HashMap<>();
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
MetricData metricData = (MetricData) invocation.getArguments()[0];
for (Map.Entry<String, Object> entry : metricData.toMap().entrySet()) {
metricObj.put(entry.getKey(), entry.getValue());
}
return null;
}
}
......@@ -7,6 +7,8 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.reflect.Whitebox;
import static org.powermock.api.mockito.PowerMockito.*;
......@@ -18,9 +20,10 @@ public class MockEsBulkClient {
public IndexRequestBuilder indexRequestBuilder;
public void createMock() {
Client client = mock(Client.class);
mockStatic(EsClient.class);
when(EsClient.getClient()).thenReturn(client);
Client client = PowerMockito.mock(Client.class);
EsClient esClient = PowerMockito.mock(EsClient.class);
Whitebox.setInternalState(EsClient.class, "INSTANCE", esClient);
when(esClient.getClient()).thenReturn(client);
BulkRequestBuilder bulkRequestBuilder = mock(BulkRequestBuilder.class);
when(client.prepareBulk()).thenReturn(bulkRequestBuilder);
......
package com.a.eye.skywalking.collector.worker.mock;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import org.elasticsearch.client.Client;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.reflect.Whitebox;
import static org.powermock.api.mockito.PowerMockito.when;
/**
* @author pengys5
*/
public class MockEsClient {
public Client mock() {
Client client = PowerMockito.mock(Client.class);
EsClient esClient = PowerMockito.mock(EsClient.class);
Whitebox.setInternalState(EsClient.class, "INSTANCE", esClient);
when(esClient.getClient()).thenReturn(client);
return client;
}
}
package com.a.eye.skywalking.collector.worker.mock;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import static org.mockito.Mockito.*;
/**
* @author pengys5
*/
public class MockGetResponse {
public GetResponse mockito() {
MockEsClient mockEsClient = new MockEsClient();
Client client = mockEsClient.mock();
GetRequestBuilder builder = mock(GetRequestBuilder.class);
GetResponse getResponse = mock(GetResponse.class);
when(builder.get()).thenReturn(getResponse);
when(client.prepareGet(anyString(), anyString(), anyString())).thenReturn(builder);
return getResponse;
}
}
package com.a.eye.skywalking.collector.worker.node;
import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
/**
* @author pengys5
*/
public class NodeCompIndexTestCase {
@Test
public void test() {
NodeCompIndex index = new NodeCompIndex();
Assert.assertEquals("node_comp_idx", index.index());
Assert.assertEquals(false, index.isRecord());
}
@Test
public void testBuilder() throws IOException {
NodeCompIndex index = new NodeCompIndex();
Assert.assertEquals("{\"properties\":{\"name\":{\"type\":\"string\",\"index\":\"not_analyzed\"},\"peers\":{\"type\":\"string\",\"index\":\"not_analyzed\"},\"aggId\":{\"type\":\"string\",\"index\":\"not_analyzed\"}}}", index.createMappingBuilder().string());
}
}
package com.a.eye.skywalking.collector.worker.node;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
/**
* @author pengys5
*/
public class NodeMappingIndexTestCase {
@Test
public void test() {
NodeMappingIndex index = new NodeMappingIndex();
Assert.assertEquals("node_mapping_idx", index.index());
Assert.assertEquals(false, index.isRecord());
}
@Test
public void testBuilder() throws IOException {
NodeMappingIndex index = new NodeMappingIndex();
Assert.assertEquals("{\"properties\":{\"code\":{\"type\":\"string\",\"index\":\"not_analyzed\"},\"peers\":{\"type\":\"string\",\"index\":\"not_analyzed\"},\"aggId\":{\"type\":\"string\",\"index\":\"not_analyzed\"},\"timeSlice\":{\"type\":\"long\",\"index\":\"not_analyzed\"}}}", index.createMappingBuilder().string());
}
}
......@@ -2,12 +2,14 @@ package com.a.eye.skywalking.collector.worker.node.analysis;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.mock.RecordDataAnswer;
import com.a.eye.skywalking.collector.worker.node.persistence.NodeMappingDayAgg;
import com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefResSumDayAnalysis;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.segment.mock.SegmentMock;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
......@@ -25,6 +27,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import java.util.List;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
......@@ -39,10 +42,11 @@ public class NodeMappingDayAnalysisTestCase {
private NodeMappingDayAnalysis nodeMappingDayAnalysis;
private SegmentMock segmentMock = new SegmentMock();
private RecordDataAnswer recordDataAnswer;
private ClusterWorkerContext clusterWorkerContext;
@Before
public void init() throws Exception {
ClusterWorkerContext clusterWorkerContext = PowerMockito.mock(ClusterWorkerContext.class);
clusterWorkerContext = PowerMockito.mock(ClusterWorkerContext.class);
WorkerRefs workerRefs = mock(WorkerRefs.class);
recordDataAnswer = new RecordDataAnswer();
doAnswer(recordDataAnswer).when(workerRefs).tell(Mockito.any(RecordData.class));
......@@ -69,6 +73,12 @@ public class NodeMappingDayAnalysisTestCase {
Assert.assertEquals(testSize, NodeMappingDayAnalysis.Factory.INSTANCE.queueSize());
}
@Test(expected = Exception.class)
public void testPreStart() throws ProviderNotFoundException {
when(clusterWorkerContext.findProvider(NodeRefResSumDayAnalysis.Role.INSTANCE)).thenThrow(new Exception());
nodeMappingDayAnalysis.preStart();
}
@Test
public void testAnalyse() throws Exception {
List<SegmentPost.SegmentWithTimeSlice> cacheServiceSegment = segmentMock.mockCacheServiceSegmentSegmentTimeSlice();
......
package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.Const;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.mock.RecordDataAnswer;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.RecordDataAggTools;
import com.a.eye.skywalking.collector.worker.tools.RecordDataTool;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.mockito.Mockito.*;
/**
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({LocalWorkerContext.class})
@PowerMockIgnore({"javax.management.*"})
public class NodeCompAggTestCase {
private NodeCompAgg agg;
private RecordDataAnswer recordDataAnswer;
private ClusterWorkerContext clusterWorkerContext;
private LocalWorkerContext localWorkerContext;
@Before
public void init() throws Exception {
clusterWorkerContext = PowerMockito.mock(ClusterWorkerContext.class);
localWorkerContext = PowerMockito.mock(LocalWorkerContext.class);
WorkerRefs workerRefs = mock(WorkerRefs.class);
recordDataAnswer = new RecordDataAnswer();
doAnswer(recordDataAnswer).when(workerRefs).tell(Mockito.any(RecordData.class));
when(localWorkerContext.lookup(NodeCompSave.Role.INSTANCE)).thenReturn(workerRefs);
agg = new NodeCompAgg(NodeCompAgg.Role.INSTANCE, clusterWorkerContext, localWorkerContext);
}
@Test
public void testRole() {
Assert.assertEquals(NodeCompAgg.class.getSimpleName(), NodeCompAgg.Role.INSTANCE.roleName());
Assert.assertEquals(HashCodeSelector.class.getSimpleName(), NodeCompAgg.Role.INSTANCE.workerSelector().getClass().getSimpleName());
}
@Test
public void testFactory() {
Assert.assertEquals(NodeCompAgg.class.getSimpleName(), NodeCompAgg.Factory.INSTANCE.role().roleName());
Assert.assertEquals(NodeCompAgg.class.getSimpleName(), NodeCompAgg.Factory.INSTANCE.workerInstance(null).getClass().getSimpleName());
int testSize = 10;
WorkerConfig.WorkerNum.Node.NodeCompAgg.Value = testSize;
Assert.assertEquals(testSize, NodeCompAgg.Factory.INSTANCE.workerNum());
}
@Test
public void testPreStart() throws ProviderNotFoundException {
when(clusterWorkerContext.findProvider(NodeCompSave.Role.INSTANCE)).thenReturn(NodeCompSave.Factory.INSTANCE);
ArgumentCaptor<NodeCompSave.Role> argumentCaptor = ArgumentCaptor.forClass(NodeCompSave.Role.class);
agg.preStart();
verify(clusterWorkerContext).findProvider(argumentCaptor.capture());
}
@Test(expected = IllegalArgumentException.class)
public void testOnWorkError() throws Exception {
agg.onWork(new Object());
}
@Test
public void testOnWork() throws Exception {
RecordDataAggTools.INSTANCE.testOnWork(agg, recordDataAnswer);
}
}
package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.NodeCompIndex;
import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
*/
public class NodeCompSaveTestCase {
private NodeCompSave save;
@Before
public void init() {
ClusterWorkerContext cluster = new ClusterWorkerContext(null);
LocalWorkerContext local = new LocalWorkerContext();
save = new NodeCompSave(NodeCompSave.Role.INSTANCE, cluster, local);
}
@Test
public void testEsIndex() {
Assert.assertEquals(NodeCompIndex.Index, save.esIndex());
}
@Test
public void testEsType() {
Assert.assertEquals(NodeCompIndex.Type_Record, save.esType());
}
@Test
public void testRole() {
Assert.assertEquals(NodeCompSave.class.getSimpleName(), NodeCompSave.Role.INSTANCE.roleName());
Assert.assertEquals(HashCodeSelector.class.getSimpleName(), NodeCompSave.Role.INSTANCE.workerSelector().getClass().getSimpleName());
}
@Test
public void testFactory() {
Assert.assertEquals(NodeCompSave.class.getSimpleName(), NodeCompSave.Factory.INSTANCE.role().roleName());
Assert.assertEquals(NodeCompSave.class.getSimpleName(), NodeCompSave.Factory.INSTANCE.workerInstance(null).getClass().getSimpleName());
int testSize = 10;
WorkerConfig.Queue.Node.NodeCompSave.Size = testSize;
Assert.assertEquals(testSize, NodeCompSave.Factory.INSTANCE.queueSize());
}
}
......@@ -12,7 +12,7 @@ import com.google.gson.JsonObject;
public class NodeCompWithTimeSliceUseDB {
public static void main(String[] args) throws Exception {
EsClient.boot();
EsClient.INSTANCE.boot();
ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(null);
LocalWorkerContext localWorkerContext = new LocalWorkerContext();
......
......@@ -2,26 +2,24 @@ package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.Const;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.mock.RecordDataAnswer;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.RecordDataTool;
import com.google.gson.JsonObject;
import com.a.eye.skywalking.collector.worker.tools.RecordDataAggTools;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.List;
import static org.mockito.Mockito.*;
/**
......@@ -32,12 +30,13 @@ import static org.mockito.Mockito.*;
@PowerMockIgnore({"javax.management.*"})
public class NodeMappingDayAggTestCase {
private NodeMappingDayAgg nodeMappingDayAgg;
private NodeMappingDayAgg agg;
private RecordDataAnswer recordDataAnswer;
private ClusterWorkerContext clusterWorkerContext;
@Before
public void init() throws Exception {
ClusterWorkerContext clusterWorkerContext = PowerMockito.mock(ClusterWorkerContext.class);
clusterWorkerContext = PowerMockito.mock(ClusterWorkerContext.class);
LocalWorkerContext localWorkerContext = PowerMockito.mock(LocalWorkerContext.class);
WorkerRefs workerRefs = mock(WorkerRefs.class);
......@@ -46,7 +45,7 @@ public class NodeMappingDayAggTestCase {
doAnswer(recordDataAnswer).when(workerRefs).tell(Mockito.any(RecordData.class));
when(localWorkerContext.lookup(NodeMappingDaySave.Role.INSTANCE)).thenReturn(workerRefs);
nodeMappingDayAgg = new NodeMappingDayAgg(NodeMappingDayAgg.Role.INSTANCE, clusterWorkerContext, localWorkerContext);
agg = new NodeMappingDayAgg(NodeMappingDayAgg.Role.INSTANCE, clusterWorkerContext, localWorkerContext);
}
@Test
......@@ -66,23 +65,21 @@ public class NodeMappingDayAggTestCase {
}
@Test
public void testOnWork() throws Exception {
String id = "2017" + Const.ID_SPLIT + "TestNodeMappingDayAgg";
JsonObject record = new JsonObject();
record.addProperty("Column", "TestData");
public void testPreStart() throws ProviderNotFoundException {
when(clusterWorkerContext.findProvider(NodeMappingDaySave.Role.INSTANCE)).thenReturn(NodeMappingDaySave.Factory.INSTANCE);
RecordData recordData = new RecordData(id);
recordData.setRecord(record);
nodeMappingDayAgg.onWork(recordData);
ArgumentCaptor<NodeMappingDaySave.Role> argumentCaptor = ArgumentCaptor.forClass(NodeMappingDaySave.Role.class);
agg.preStart();
verify(clusterWorkerContext).findProvider(argumentCaptor.capture());
}
List<RecordData> recordDataList = recordDataAnswer.recordObj.getRecordData();
RecordData data = RecordDataTool.INSTANCE.getRecord(recordDataList, id);
Assert.assertEquals("TestNodeMappingDayAgg", data.getRecord().get("aggId").getAsString());
Assert.assertEquals("TestData", data.getRecord().get("Column").getAsString());
@Test
public void testOnWork() throws Exception {
RecordDataAggTools.INSTANCE.testOnWork(agg, recordDataAnswer);
}
@Test(expected = IllegalArgumentException.class)
public void testOnWorkError() throws Exception {
nodeMappingDayAgg.onWork(new Object());
agg.onWork(new Object());
}
}
package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
*/
public class NodeMappingDaySaveTestCase {
private NodeMappingDaySave save;
@Before
public void init() {
ClusterWorkerContext cluster = new ClusterWorkerContext(null);
LocalWorkerContext local = new LocalWorkerContext();
save = new NodeMappingDaySave(NodeMappingDaySave.Role.INSTANCE, cluster, local);
}
@Test
public void testEsIndex() {
Assert.assertEquals(NodeMappingIndex.Index, save.esIndex());
}
@Test
public void testEsType() {
Assert.assertEquals(NodeMappingIndex.Type_Day, save.esType());
}
@Test
public void testRole() {
Assert.assertEquals(NodeMappingDaySave.class.getSimpleName(), NodeMappingDaySave.Role.INSTANCE.roleName());
Assert.assertEquals(HashCodeSelector.class.getSimpleName(), NodeMappingDaySave.Role.INSTANCE.workerSelector().getClass().getSimpleName());
}
@Test
public void testFactory() {
Assert.assertEquals(NodeMappingDaySave.class.getSimpleName(), NodeMappingDaySave.Factory.INSTANCE.role().roleName());
Assert.assertEquals(NodeMappingDaySave.class.getSimpleName(), NodeMappingDaySave.Factory.INSTANCE.workerInstance(null).getClass().getSimpleName());
int testSize = 10;
WorkerConfig.Queue.Node.NodeMappingDaySave.Size = testSize;
Assert.assertEquals(testSize, NodeMappingDaySave.Factory.INSTANCE.queueSize());
}
}
......@@ -2,26 +2,24 @@ package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.Const;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.mock.RecordDataAnswer;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.RecordDataTool;
import com.google.gson.JsonObject;
import com.a.eye.skywalking.collector.worker.tools.RecordDataAggTools;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.List;
import static org.mockito.Mockito.*;
/**
......@@ -32,12 +30,13 @@ import static org.mockito.Mockito.*;
@PowerMockIgnore({"javax.management.*"})
public class NodeMappingHourAggTestCase {
private NodeMappingHourAgg nodeMappingHourAgg;
private NodeMappingHourAgg agg;
private RecordDataAnswer recordDataAnswer;
private ClusterWorkerContext clusterWorkerContext;
@Before
public void init() throws Exception {
ClusterWorkerContext clusterWorkerContext = PowerMockito.mock(ClusterWorkerContext.class);
clusterWorkerContext = PowerMockito.mock(ClusterWorkerContext.class);
LocalWorkerContext localWorkerContext = PowerMockito.mock(LocalWorkerContext.class);
WorkerRefs workerRefs = mock(WorkerRefs.class);
......@@ -46,7 +45,7 @@ public class NodeMappingHourAggTestCase {
doAnswer(recordDataAnswer).when(workerRefs).tell(Mockito.any(RecordData.class));
when(localWorkerContext.lookup(NodeMappingHourSave.Role.INSTANCE)).thenReturn(workerRefs);
nodeMappingHourAgg = new NodeMappingHourAgg(NodeMappingHourAgg.Role.INSTANCE, clusterWorkerContext, localWorkerContext);
agg = new NodeMappingHourAgg(NodeMappingHourAgg.Role.INSTANCE, clusterWorkerContext, localWorkerContext);
}
@Test
......@@ -66,23 +65,21 @@ public class NodeMappingHourAggTestCase {
}
@Test
public void testOnWork() throws Exception {
String id = "2017" + Const.ID_SPLIT + "TestNodeMappingHourAgg";
JsonObject record = new JsonObject();
record.addProperty("Column", "TestData");
public void testPreStart() throws ProviderNotFoundException {
when(clusterWorkerContext.findProvider(NodeMappingHourSave.Role.INSTANCE)).thenReturn(NodeMappingHourSave.Factory.INSTANCE);
RecordData recordData = new RecordData(id);
recordData.setRecord(record);
nodeMappingHourAgg.onWork(recordData);
ArgumentCaptor<NodeMappingHourSave.Role> argumentCaptor = ArgumentCaptor.forClass(NodeMappingHourSave.Role.class);
agg.preStart();
verify(clusterWorkerContext).findProvider(argumentCaptor.capture());
}
List<RecordData> recordDataList = recordDataAnswer.recordObj.getRecordData();
RecordData data = RecordDataTool.INSTANCE.getRecord(recordDataList, id);
Assert.assertEquals("TestNodeMappingHourAgg", data.getRecord().get("aggId").getAsString());
Assert.assertEquals("TestData", data.getRecord().get("Column").getAsString());
@Test
public void testOnWork() throws Exception {
RecordDataAggTools.INSTANCE.testOnWork(agg, recordDataAnswer);
}
@Test(expected = IllegalArgumentException.class)
public void testOnWorkError() throws Exception {
nodeMappingHourAgg.onWork(new Object());
agg.onWork(new Object());
}
}
package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
*/
public class NodeMappingHourSaveTestCase {
private NodeMappingHourSave save;
@Before
public void init() {
ClusterWorkerContext cluster = new ClusterWorkerContext(null);
LocalWorkerContext local = new LocalWorkerContext();
save = new NodeMappingHourSave(NodeMappingHourSave.Role.INSTANCE, cluster, local);
}
@Test
public void testEsIndex() {
Assert.assertEquals(NodeMappingIndex.Index, save.esIndex());
}
@Test
public void testEsType() {
Assert.assertEquals(NodeMappingIndex.Type_Hour, save.esType());
}
@Test
public void testRole() {
Assert.assertEquals(NodeMappingHourSave.class.getSimpleName(), NodeMappingHourSave.Role.INSTANCE.roleName());
Assert.assertEquals(HashCodeSelector.class.getSimpleName(), NodeMappingHourSave.Role.INSTANCE.workerSelector().getClass().getSimpleName());
}
@Test
public void testFactory() {
Assert.assertEquals(NodeMappingHourSave.class.getSimpleName(), NodeMappingHourSave.Factory.INSTANCE.role().roleName());
Assert.assertEquals(NodeMappingHourSave.class.getSimpleName(), NodeMappingHourSave.Factory.INSTANCE.workerInstance(null).getClass().getSimpleName());
int testSize = 10;
WorkerConfig.Queue.Node.NodeMappingHourSave.Size = testSize;
Assert.assertEquals(testSize, NodeMappingHourSave.Factory.INSTANCE.queueSize());
}
}
......@@ -2,26 +2,24 @@ package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.Const;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.mock.RecordDataAnswer;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.RecordDataTool;
import com.google.gson.JsonObject;
import com.a.eye.skywalking.collector.worker.tools.RecordDataAggTools;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.List;
import static org.mockito.Mockito.*;
/**
......@@ -32,12 +30,13 @@ import static org.mockito.Mockito.*;
@PowerMockIgnore({"javax.management.*"})
public class NodeMappingMinuteAggTestCase {
private NodeMappingMinuteAgg nodeMappingMinuteAgg;
private NodeMappingMinuteAgg agg;
private RecordDataAnswer recordDataAnswer;
private ClusterWorkerContext clusterWorkerContext;
@Before
public void init() throws Exception {
ClusterWorkerContext clusterWorkerContext = PowerMockito.mock(ClusterWorkerContext.class);
clusterWorkerContext = PowerMockito.mock(ClusterWorkerContext.class);
LocalWorkerContext localWorkerContext = PowerMockito.mock(LocalWorkerContext.class);
WorkerRefs workerRefs = mock(WorkerRefs.class);
......@@ -46,7 +45,7 @@ public class NodeMappingMinuteAggTestCase {
doAnswer(recordDataAnswer).when(workerRefs).tell(Mockito.any(RecordData.class));
when(localWorkerContext.lookup(NodeMappingMinuteSave.Role.INSTANCE)).thenReturn(workerRefs);
nodeMappingMinuteAgg = new NodeMappingMinuteAgg(NodeMappingMinuteAgg.Role.INSTANCE, clusterWorkerContext, localWorkerContext);
agg = new NodeMappingMinuteAgg(NodeMappingMinuteAgg.Role.INSTANCE, clusterWorkerContext, localWorkerContext);
}
@Test
......@@ -66,23 +65,21 @@ public class NodeMappingMinuteAggTestCase {
}
@Test
public void testOnWork() throws Exception {
String id = "2017" + Const.ID_SPLIT + "TestNodeMappingMinuteAgg";
JsonObject record = new JsonObject();
record.addProperty("Column", "TestData");
public void testPreStart() throws ProviderNotFoundException {
when(clusterWorkerContext.findProvider(NodeMappingMinuteSave.Role.INSTANCE)).thenReturn(NodeMappingMinuteSave.Factory.INSTANCE);
RecordData recordData = new RecordData(id);
recordData.setRecord(record);
nodeMappingMinuteAgg.onWork(recordData);
ArgumentCaptor<NodeMappingMinuteSave.Role> argumentCaptor = ArgumentCaptor.forClass(NodeMappingMinuteSave.Role.class);
agg.preStart();
verify(clusterWorkerContext).findProvider(argumentCaptor.capture());
}
List<RecordData> recordDataList = recordDataAnswer.recordObj.getRecordData();
RecordData data = RecordDataTool.INSTANCE.getRecord(recordDataList, id);
Assert.assertEquals("TestNodeMappingMinuteAgg", data.getRecord().get("aggId").getAsString());
Assert.assertEquals("TestData", data.getRecord().get("Column").getAsString());
@Test
public void testOnWork() throws Exception {
RecordDataAggTools.INSTANCE.testOnWork(agg, recordDataAnswer);
}
@Test(expected = IllegalArgumentException.class)
public void testOnWorkError() throws Exception {
nodeMappingMinuteAgg.onWork(new Object());
agg.onWork(new Object());
}
}
package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
*/
public class NodeMappingMinuteSaveTestCase {
private NodeMappingMinuteSave save;
@Before
public void init() {
ClusterWorkerContext cluster = new ClusterWorkerContext(null);
LocalWorkerContext local = new LocalWorkerContext();
save = new NodeMappingMinuteSave(NodeMappingMinuteSave.Role.INSTANCE, cluster, local);
}
@Test
public void testEsIndex() {
Assert.assertEquals(NodeMappingIndex.Index, save.esIndex());
}
@Test
public void testEsType() {
Assert.assertEquals(NodeMappingIndex.Type_Minute, save.esType());
}
@Test
public void testRole() {
Assert.assertEquals(NodeMappingMinuteSave.class.getSimpleName(), NodeMappingMinuteSave.Role.INSTANCE.roleName());
Assert.assertEquals(HashCodeSelector.class.getSimpleName(), NodeMappingMinuteSave.Role.INSTANCE.workerSelector().getClass().getSimpleName());
}
@Test
public void testFactory() {
Assert.assertEquals(NodeMappingMinuteSave.class.getSimpleName(), NodeMappingMinuteSave.Factory.INSTANCE.role().roleName());
Assert.assertEquals(NodeMappingMinuteSave.class.getSimpleName(), NodeMappingMinuteSave.Factory.INSTANCE.workerInstance(null).getClass().getSimpleName());
int testSize = 10;
WorkerConfig.Queue.Node.NodeMappingMinuteSave.Size = testSize;
Assert.assertEquals(testSize, NodeMappingMinuteSave.Factory.INSTANCE.queueSize());
}
}
......@@ -13,7 +13,7 @@ import com.google.gson.JsonObject;
public class NodeMappingSearchWithTimeSliceUseDB {
public static void main(String[] args) throws Exception {
EsClient.boot();
EsClient.INSTANCE.boot();
ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(null);
LocalWorkerContext localWorkerContext = new LocalWorkerContext();
......
package com.a.eye.skywalking.collector.worker.noderef;
import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
/**
* @author pengys5
*/
public class NodeRefIndexTestCase {
@Test
public void test() {
NodeRefIndex index = new NodeRefIndex();
Assert.assertEquals("node_ref_idx", index.index());
Assert.assertEquals(false, index.isRecord());
}
@Test
public void testBuilder() throws IOException {
NodeRefIndex index = new NodeRefIndex();
Assert.assertEquals("{\"properties\":{\"front\":{\"type\":\"string\",\"index\":\"not_analyzed\"},\"frontIsRealCode\":{\"type\":\"boolean\",\"index\":\"not_analyzed\"},\"behind\":{\"type\":\"string\",\"index\":\"not_analyzed\"},\"behindIsRealCode\":{\"type\":\"boolean\",\"index\":\"not_analyzed\"},\"aggId\":{\"type\":\"string\",\"index\":\"not_analyzed\"},\"timeSlice\":{\"type\":\"long\",\"index\":\"not_analyzed\"}}}", index.createMappingBuilder().string());
}
}
package com.a.eye.skywalking.collector.worker.noderef;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
/**
* @author pengys5
*/
public class NodeRefResSumIndexTestCase {
@Test
public void test() {
NodeRefResSumIndex index = new NodeRefResSumIndex();
Assert.assertEquals("node_ref_res_sum_idx", index.index());
Assert.assertEquals(false, index.isRecord());
}
@Test
public void testBuilder() throws IOException {
NodeRefResSumIndex index = new NodeRefResSumIndex();
Assert.assertEquals("{\"properties\":{\"oneSecondLess\":{\"type\":\"long\",\"index\":\"not_analyzed\"},\"threeSecondLess\":{\"type\":\"long\",\"index\":\"not_analyzed\"},\"fiveSecondLess\":{\"type\":\"long\",\"index\":\"not_analyzed\"},\"fiveSecondGreater\":{\"type\":\"long\",\"index\":\"not_analyzed\"},\"error\":{\"type\":\"long\",\"index\":\"not_analyzed\"},\"summary\":{\"type\":\"long\",\"index\":\"not_analyzed\"},\"aggId\":{\"type\":\"string\",\"index\":\"not_analyzed\"},\"timeSlice\":{\"type\":\"long\",\"index\":\"not_analyzed\"}}}", index.createMappingBuilder().string());
}
}
package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.mock.RecordDataAnswer;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.RecordDataAggTools;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.mockito.Mockito.*;
/**
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({LocalWorkerContext.class})
@PowerMockIgnore({"javax.management.*"})
public class NodeRefDayAggTestCase {
private NodeRefDayAgg agg;
private RecordDataAnswer recordDataAnswer;
private ClusterWorkerContext clusterWorkerContext;
private LocalWorkerContext localWorkerContext;
@Before
public void init() throws Exception {
clusterWorkerContext = PowerMockito.mock(ClusterWorkerContext.class);
localWorkerContext = PowerMockito.mock(LocalWorkerContext.class);
WorkerRefs workerRefs = mock(WorkerRefs.class);
recordDataAnswer = new RecordDataAnswer();
doAnswer(recordDataAnswer).when(workerRefs).tell(Mockito.any(RecordData.class));
when(localWorkerContext.lookup(NodeRefDaySave.Role.INSTANCE)).thenReturn(workerRefs);
agg = new NodeRefDayAgg(NodeRefDayAgg.Role.INSTANCE, clusterWorkerContext, localWorkerContext);
}
@Test
public void testRole() {
Assert.assertEquals(NodeRefDayAgg.class.getSimpleName(), NodeRefDayAgg.Role.INSTANCE.roleName());
Assert.assertEquals(HashCodeSelector.class.getSimpleName(), NodeRefDayAgg.Role.INSTANCE.workerSelector().getClass().getSimpleName());
}
@Test
public void testFactory() {
Assert.assertEquals(NodeRefDayAgg.class.getSimpleName(), NodeRefDayAgg.Factory.INSTANCE.role().roleName());
Assert.assertEquals(NodeRefDayAgg.class.getSimpleName(), NodeRefDayAgg.Factory.INSTANCE.workerInstance(null).getClass().getSimpleName());
int testSize = 10;
WorkerConfig.WorkerNum.NodeRef.NodeRefDayAgg.Value = testSize;
Assert.assertEquals(testSize, NodeRefDayAgg.Factory.INSTANCE.workerNum());
}
@Test
public void testPreStart() throws ProviderNotFoundException {
when(clusterWorkerContext.findProvider(NodeRefDaySave.Role.INSTANCE)).thenReturn(NodeRefDaySave.Factory.INSTANCE);
ArgumentCaptor<NodeRefDaySave.Role> argumentCaptor = ArgumentCaptor.forClass(NodeRefDaySave.Role.class);
agg.preStart();
verify(clusterWorkerContext).findProvider(argumentCaptor.capture());
}
@Test
public void testOnWorkError() throws Exception {
agg.onWork(new Object());
}
@Test
public void testOnWork() throws Exception {
RecordDataAggTools.INSTANCE.testOnWork(agg, recordDataAnswer);
}
}
package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
*/
public class NodeRefDaySaveTestCase {
private NodeRefDaySave save;
@Before
public void init() {
ClusterWorkerContext cluster = new ClusterWorkerContext(null);
LocalWorkerContext local = new LocalWorkerContext();
save = new NodeRefDaySave(NodeRefDaySave.Role.INSTANCE, cluster, local);
}
@Test
public void testEsIndex() {
Assert.assertEquals(NodeRefIndex.Index, save.esIndex());
}
@Test
public void testEsType() {
Assert.assertEquals(NodeRefIndex.Type_Day, save.esType());
}
@Test
public void testRole() {
Assert.assertEquals(NodeRefDaySave.class.getSimpleName(), NodeRefDaySave.Role.INSTANCE.roleName());
Assert.assertEquals(HashCodeSelector.class.getSimpleName(), NodeRefDaySave.Role.INSTANCE.workerSelector().getClass().getSimpleName());
}
@Test
public void testFactory() {
Assert.assertEquals(NodeRefDaySave.class.getSimpleName(), NodeRefDaySave.Factory.INSTANCE.role().roleName());
Assert.assertEquals(NodeRefDaySave.class.getSimpleName(), NodeRefDaySave.Factory.INSTANCE.workerInstance(null).getClass().getSimpleName());
int testSize = 10;
WorkerConfig.Queue.NodeRef.NodeRefDaySave.Size = testSize;
Assert.assertEquals(testSize, NodeRefDaySave.Factory.INSTANCE.queueSize());
}
}
package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.mock.RecordDataAnswer;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.RecordDataAggTools;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.mockito.Mockito.*;
/**
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({LocalWorkerContext.class})
@PowerMockIgnore({"javax.management.*"})
public class NodeRefHourAggTestCase {
private NodeRefHourAgg agg;
private RecordDataAnswer recordDataAnswer;
private ClusterWorkerContext clusterWorkerContext;
private LocalWorkerContext localWorkerContext;
@Before
public void init() throws Exception {
clusterWorkerContext = PowerMockito.mock(ClusterWorkerContext.class);
localWorkerContext = PowerMockito.mock(LocalWorkerContext.class);
WorkerRefs workerRefs = mock(WorkerRefs.class);
recordDataAnswer = new RecordDataAnswer();
doAnswer(recordDataAnswer).when(workerRefs).tell(Mockito.any(RecordData.class));
when(localWorkerContext.lookup(NodeRefHourSave.Role.INSTANCE)).thenReturn(workerRefs);
agg = new NodeRefHourAgg(NodeRefHourAgg.Role.INSTANCE, clusterWorkerContext, localWorkerContext);
}
@Test
public void testRole() {
Assert.assertEquals(NodeRefHourAgg.class.getSimpleName(), NodeRefHourAgg.Role.INSTANCE.roleName());
Assert.assertEquals(HashCodeSelector.class.getSimpleName(), NodeRefHourAgg.Role.INSTANCE.workerSelector().getClass().getSimpleName());
}
@Test
public void testFactory() {
Assert.assertEquals(NodeRefHourAgg.class.getSimpleName(), NodeRefHourAgg.Factory.INSTANCE.role().roleName());
Assert.assertEquals(NodeRefHourAgg.class.getSimpleName(), NodeRefHourAgg.Factory.INSTANCE.workerInstance(null).getClass().getSimpleName());
int testSize = 10;
WorkerConfig.WorkerNum.NodeRef.NodeRefHourAgg.Value = testSize;
Assert.assertEquals(testSize, NodeRefHourAgg.Factory.INSTANCE.workerNum());
}
@Test
public void testPreStart() throws ProviderNotFoundException {
when(clusterWorkerContext.findProvider(NodeRefHourSave.Role.INSTANCE)).thenReturn(NodeRefHourSave.Factory.INSTANCE);
ArgumentCaptor<NodeRefHourSave.Role> argumentCaptor = ArgumentCaptor.forClass(NodeRefHourSave.Role.class);
agg.preStart();
verify(clusterWorkerContext).findProvider(argumentCaptor.capture());
}
@Test
public void testOnWorkError() throws Exception {
agg.onWork(new Object());
}
@Test
public void testOnWork() throws Exception {
RecordDataAggTools.INSTANCE.testOnWork(agg, recordDataAnswer);
}
}
package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
*/
public class NodeRefHourSaveTestCase {
private NodeRefHourSave save;
@Before
public void init() {
ClusterWorkerContext cluster = new ClusterWorkerContext(null);
LocalWorkerContext local = new LocalWorkerContext();
save = new NodeRefHourSave(NodeRefHourSave.Role.INSTANCE, cluster, local);
}
@Test
public void testEsIndex() {
Assert.assertEquals(NodeRefIndex.Index, save.esIndex());
}
@Test
public void testEsType() {
Assert.assertEquals(NodeRefIndex.Type_Hour, save.esType());
}
@Test
public void testRole() {
Assert.assertEquals(NodeRefHourSave.class.getSimpleName(), NodeRefHourSave.Role.INSTANCE.roleName());
Assert.assertEquals(HashCodeSelector.class.getSimpleName(), NodeRefHourSave.Role.INSTANCE.workerSelector().getClass().getSimpleName());
}
@Test
public void testFactory() {
Assert.assertEquals(NodeRefHourSave.class.getSimpleName(), NodeRefHourSave.Factory.INSTANCE.role().roleName());
Assert.assertEquals(NodeRefHourSave.class.getSimpleName(), NodeRefHourSave.Factory.INSTANCE.workerInstance(null).getClass().getSimpleName());
int testSize = 10;
WorkerConfig.Queue.NodeRef.NodeRefHourSave.Size = testSize;
Assert.assertEquals(testSize, NodeRefHourSave.Factory.INSTANCE.queueSize());
}
}
package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.mock.RecordDataAnswer;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.RecordDataAggTools;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.mockito.Mockito.*;
/**
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({LocalWorkerContext.class})
@PowerMockIgnore({"javax.management.*"})
public class NodeRefMinuteAggTestCase {
private NodeRefMinuteAgg agg;
private RecordDataAnswer recordDataAnswer;
private ClusterWorkerContext clusterWorkerContext;
private LocalWorkerContext localWorkerContext;
@Before
public void init() throws Exception {
clusterWorkerContext = PowerMockito.mock(ClusterWorkerContext.class);
localWorkerContext = PowerMockito.mock(LocalWorkerContext.class);
WorkerRefs workerRefs = mock(WorkerRefs.class);
recordDataAnswer = new RecordDataAnswer();
doAnswer(recordDataAnswer).when(workerRefs).tell(Mockito.any(RecordData.class));
when(localWorkerContext.lookup(NodeRefMinuteSave.Role.INSTANCE)).thenReturn(workerRefs);
agg = new NodeRefMinuteAgg(NodeRefMinuteAgg.Role.INSTANCE, clusterWorkerContext, localWorkerContext);
}
@Test
public void testRole() {
Assert.assertEquals(NodeRefMinuteAgg.class.getSimpleName(), NodeRefMinuteAgg.Role.INSTANCE.roleName());
Assert.assertEquals(HashCodeSelector.class.getSimpleName(), NodeRefMinuteAgg.Role.INSTANCE.workerSelector().getClass().getSimpleName());
}
@Test
public void testFactory() {
Assert.assertEquals(NodeRefMinuteAgg.class.getSimpleName(), NodeRefMinuteAgg.Factory.INSTANCE.role().roleName());
Assert.assertEquals(NodeRefMinuteAgg.class.getSimpleName(), NodeRefMinuteAgg.Factory.INSTANCE.workerInstance(null).getClass().getSimpleName());
int testSize = 10;
WorkerConfig.WorkerNum.NodeRef.NodeRefMinuteAgg.Value = testSize;
Assert.assertEquals(testSize, NodeRefMinuteAgg.Factory.INSTANCE.workerNum());
}
@Test
public void testPreStart() throws ProviderNotFoundException {
when(clusterWorkerContext.findProvider(NodeRefMinuteSave.Role.INSTANCE)).thenReturn(NodeRefMinuteSave.Factory.INSTANCE);
ArgumentCaptor<NodeRefMinuteSave.Role> argumentCaptor = ArgumentCaptor.forClass(NodeRefMinuteSave.Role.class);
agg.preStart();
verify(clusterWorkerContext).findProvider(argumentCaptor.capture());
}
@Test
public void testOnWorkError() throws Exception {
agg.onWork(new Object());
}
@Test
public void testOnWork() throws Exception {
RecordDataAggTools.INSTANCE.testOnWork(agg, recordDataAnswer);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册