diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractLocalAsyncWorker.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractLocalAsyncWorker.java
index 3136fd6c96be781c320857e8120e627fd987e44e..eb919e57e8703f0a966276d0d7dc8480dc19dd9c 100644
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractLocalAsyncWorker.java
+++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractLocalAsyncWorker.java
@@ -44,7 +44,7 @@ public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {
asyncWorker.allocateJob(new EndOfBatchCommand());
}
} catch (Exception e) {
- e.printStackTrace();
+ asyncWorker.saveException(e);
}
}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java
index 9b180c093fd4b56dc5064a2d1a2184ebe608ea0a..e5a5f8a03e759d9d9ff18bf70586313bc7754da7 100644
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java
+++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java
@@ -33,4 +33,8 @@ public abstract class AbstractWorker {
final public static AbstractWorker noOwner() {
return null;
}
+
+ final protected void saveException(Exception e) {
+// e.printStackTrace();
+ }
}
diff --git a/skywalking-collector/skywalking-collector-worker/pom.xml b/skywalking-collector/skywalking-collector-worker/pom.xml
index 65c0e4d5b72d49554504339578a0251d5a8af33f..b3968863471e23acd2ebe2975cd67a0bc7938d1c 100644
--- a/skywalking-collector/skywalking-collector-worker/pom.xml
+++ b/skywalking-collector/skywalking-collector-worker/pom.xml
@@ -48,5 +48,10 @@
${project.version}
test
+
+ org.jetbrains
+ annotations
+ RELEASE
+
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/AnalysisMember.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/AnalysisMember.java
index ec3155b18cb967e055e77c61bc4e79f5fcd4ff34..ddddbc25bdac178ea8d88731ebfe1a30250f2505 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/AnalysisMember.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/AnalysisMember.java
@@ -2,17 +2,13 @@ package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public abstract class AnalysisMember extends AbstractLocalAsyncWorker {
- private Logger logger = LogManager.getFormatterLogger(AnalysisMember.class);
-
- public AnalysisMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ AnalysisMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@@ -20,7 +16,7 @@ public abstract class AnalysisMember extends AbstractLocalAsyncWorker {
@Override
public void preStart() throws ProviderNotFoundException {
-
+ super.preStart();
}
@Override
@@ -31,7 +27,7 @@ public abstract class AnalysisMember extends AbstractLocalAsyncWorker {
try {
analyse(message);
} catch (Exception e) {
- e.printStackTrace();
+ saveException(e);
}
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/CollectorBootStartUp.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/CollectorBootStartUp.java
index c7958e256c6c9300abdb15254b5e7b7a2f70a266..373603a3386455d9609c719c1e7b992d541a8c46 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/CollectorBootStartUp.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/CollectorBootStartUp.java
@@ -22,8 +22,8 @@ public class CollectorBootStartUp {
CollectorSystem collectorSystem = new CollectorSystem();
collectorSystem.boot();
- EsClient.boot();
-// IndexCreator.INSTANCE.create();
+ EsClient.INSTANCE.boot();
+ IndexCreator.INSTANCE.create();
HttpServer.INSTANCE.boot((ClusterWorkerContext) collectorSystem.getClusterContext());
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/MergePersistenceMember.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/MergePersistenceMember.java
index 2eacb7e5e6aad6508e2a52d4b804a76b9a8ec9ec..c77178470415718fca483bbb7f328b35f3fabb34 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/MergePersistenceMember.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/MergePersistenceMember.java
@@ -26,18 +26,23 @@ public abstract class MergePersistenceMember extends PersistenceMember {
private Logger logger = LogManager.getFormatterLogger(MergePersistenceMember.class);
- private MergePersistenceData persistenceData = new MergePersistenceData();
+ private MergePersistenceData persistenceData;
protected MergePersistenceMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
+ persistenceData = new MergePersistenceData();
+ }
+
+ private MergePersistenceData getPersistenceData() {
+ return persistenceData;
}
@Override
final public void analyse(Object message) throws Exception {
if (message instanceof MergeData) {
MergeData mergeData = (MergeData) message;
- persistenceData.getElseCreate(mergeData.getId()).merge(mergeData);
- if (persistenceData.size() >= WorkerConfig.Persistence.Data.size) {
+ getPersistenceData().getElseCreate(mergeData.getId()).merge(mergeData);
+ if (getPersistenceData().size() >= WorkerConfig.Persistence.Data.size) {
persistence();
}
} else {
@@ -50,21 +55,21 @@ public abstract class MergePersistenceMember extends PersistenceMember {
for (MultiGetItemResponse itemResponse : multiGetResponse) {
GetResponse response = itemResponse.getResponse();
if (response != null && response.isExists()) {
- persistenceData.getElseCreate(response.getId()).merge(response.getSource());
+ getPersistenceData().getElseCreate(response.getId()).merge(response.getSource());
}
}
boolean success = saveToEs();
if (success) {
- persistenceData.clear();
+ getPersistenceData().clear();
}
}
private MultiGetResponse searchFromEs() {
- Client client = EsClient.getClient();
+ Client client = EsClient.INSTANCE.getClient();
MultiGetRequestBuilder multiGetRequestBuilder = client.prepareMultiGet();
- Iterator> iterator = persistenceData.iterator();
+ Iterator> iterator = getPersistenceData().iterator();
while (iterator.hasNext()) {
multiGetRequestBuilder.add(esIndex(), esType(), iterator.next().getKey());
@@ -74,11 +79,11 @@ public abstract class MergePersistenceMember extends PersistenceMember {
}
private boolean saveToEs() {
- Client client = EsClient.getClient();
+ Client client = EsClient.INSTANCE.getClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
- logger.debug("persistenceData size: %s", persistenceData.size());
+ logger.debug("persistenceData size: %s", getPersistenceData().size());
- Iterator> iterator = persistenceData.iterator();
+ Iterator> iterator = getPersistenceData().iterator();
while (iterator.hasNext()) {
MergeData mergeData = iterator.next().getValue();
bulkRequest.add(client.prepareIndex(esIndex(), esType(), mergeData.getId()).setSource(mergeData.toMap()));
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/MetricPersistenceMember.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/MetricPersistenceMember.java
index 33a1d0d5f7ef34e651792c13f0caf7dea3926d68..08e4b1130be91eef96a206dd4610ca9ab8400cf8 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/MetricPersistenceMember.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/MetricPersistenceMember.java
@@ -61,7 +61,7 @@ public abstract class MetricPersistenceMember extends PersistenceMember {
}
private MultiGetResponse searchFromEs() {
- Client client = EsClient.getClient();
+ Client client = EsClient.INSTANCE.getClient();
MultiGetRequestBuilder multiGetRequestBuilder = client.prepareMultiGet();
Iterator> iterator = persistenceData.iterator();
@@ -75,7 +75,7 @@ public abstract class MetricPersistenceMember extends PersistenceMember {
}
private boolean saveToEs() {
- Client client = EsClient.getClient();
+ Client client = EsClient.INSTANCE.getClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
logger.debug("persistenceData size: %s", persistenceData.size());
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/RecordPersistenceMember.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/RecordPersistenceMember.java
index 1f9828c8b98738f5762ad512d0ce1c152f55450a..fdd75b6b3e6a80c984206788c84a8a9708ad8615 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/RecordPersistenceMember.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/RecordPersistenceMember.java
@@ -55,7 +55,7 @@ public abstract class RecordPersistenceMember extends PersistenceMember {
}
private boolean saveToEs() {
- Client client = EsClient.getClient();
+ Client client = EsClient.INSTANCE.getClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
logger.debug("persistenceData size: %s", getPersistenceData().size());
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/WorkerConfig.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/WorkerConfig.java
index fcb5e902b3c57cbb507bb1b6aadb894c43eb5575..1e9e69e6e3cc573b775eb59c7f5ceb4fb9524579 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/WorkerConfig.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/WorkerConfig.java
@@ -19,85 +19,85 @@ public class WorkerConfig extends ClusterConfig {
}
}
- public static class Worker {
- public static class TraceSegmentReceiver {
- public static int Num = 10;
- }
-
- public static class DAGNodeReceiver {
- public static int Num = 10;
- }
+ public static class WorkerNum {
+ public static class Node {
+ public static class NodeCompAgg {
+ public static int Value = 10;
+ }
- public static class NodeInstanceReceiver {
- public static int Num = 10;
- }
+ public static class NodeMappingDayAgg {
+ public static int Value = 10;
+ }
- public static class ResponseCostReceiver {
- public static int Num = 10;
- }
+ public static class NodeMappingHourAgg {
+ public static int Value = 10;
+ }
- public static class ResponseSummaryReceiver {
- public static int Num = 10;
+ public static class NodeMappingMinuteAgg {
+ public static int Value = 10;
+ }
}
- public static class DAGNodeRefReceiver {
- public static int Num = 10;
- }
- }
+ public static class NodeRef {
+ public static class NodeRefDayAgg {
+ public static int Value = 10;
+ }
- public static class WorkerNum {
- public static class Node {
- public static class NodeDayAgg {
+ public static class NodeRefHourAgg {
public static int Value = 10;
}
- public static class NodeHourAgg {
+ public static class NodeRefMinuteAgg {
public static int Value = 10;
}
- public static class NodeMinuteAgg {
+ public static class NodeRefResSumDayAgg {
public static int Value = 10;
}
- public static class NodeMappingDayAgg {
+ public static class NodeRefResSumHourAgg {
public static int Value = 10;
}
- public static class NodeMappingHourAgg {
+ public static class NodeRefResSumMinuteAgg {
public static int Value = 10;
}
+ }
- public static class NodeMappingMinuteAgg {
+ public static class GlobalTrace {
+ public static class GlobalTraceAgg {
public static int Value = 10;
}
}
}
public static class Queue {
- public static class Segment {
- public static class SegmentCostSave {
+ public static class GlobalTrace {
+ public static class GlobalTraceSave {
public static int Size = 1024;
}
+ }
- public static class SegmentSave {
+ public static class Segment {
+ public static class SegmentPost {
public static int Size = 1024;
}
- public static class SegmentExceptionSave {
+ public static class SegmentCostSave {
public static int Size = 1024;
}
- }
- public static class Node {
- public static class NodeDayAnalysis {
+ public static class SegmentSave {
public static int Size = 1024;
}
- public static class NodeHourAnalysis {
+ public static class SegmentExceptionSave {
public static int Size = 1024;
}
+ }
- public static class NodeMinuteAnalysis {
+ public static class Node {
+ public static class NodeCompAnalysis {
public static int Size = 1024;
}
@@ -124,54 +124,52 @@ public class WorkerConfig extends ClusterConfig {
public static class NodeMappingMinuteAnalysis {
public static int Size = 1024;
}
- }
-
- public static class Persistence {
- public static class DAGNodePersistence {
+ public static class NodeCompSave {
public static int Size = 1024;
}
- public static class NodeInstancePersistence {
+ public static class NodeMappingDaySave {
public static int Size = 1024;
}
- public static class ResponseCostPersistence {
+ public static class NodeMappingHourSave {
public static int Size = 1024;
}
- public static class ResponseSummaryPersistence {
+ public static class NodeMappingMinuteSave {
public static int Size = 1024;
}
+ }
- public static class DAGNodeRefPersistence {
+ public static class NodeRef {
+ public static class NodeRefDaySave {
public static int Size = 1024;
}
- }
+ public static class NodeRefHourSave {
+ public static int Size = 1024;
+ }
- public static class TraceSegmentRecordAnalysis {
- public static int Size = 1024;
- }
+ public static class NodeRefMinuteSave {
+ public static int Size = 1024;
+ }
- public static class NodeInstanceAnalysis {
- public static int Size = 1024;
- }
+ public static class NodeRefResSumDaySave {
+ public static int Size = 1024;
+ }
- public static class DAGNodeAnalysis {
- public static int Size = 1024;
- }
+ public static class NodeRefResSumHourSave {
+ public static int Size = 1024;
+ }
- public static class ResponseCostAnalysis {
- public static int Size = 1024;
+ public static class NodeRefResSumMinuteSave {
+ public static int Size = 1024;
+ }
}
public static class ResponseSummaryAnalysis {
public static int Size = 1024;
}
-
- public static class DAGNodeRefAnalysis {
- public static int Size = 1024;
- }
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/analysis/GlobalTraceAnalysis.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/analysis/GlobalTraceAnalysis.java
index 89bb239dc70362e0c95fcbe632e036d1fe8d1101..0c54c4b08da440c86bcc76fcc0831e5b55c0d747 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/analysis/GlobalTraceAnalysis.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/analysis/GlobalTraceAnalysis.java
@@ -21,7 +21,7 @@ import java.util.List;
*/
public class GlobalTraceAnalysis extends MergeAnalysisMember {
- private GlobalTraceAnalysis(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ GlobalTraceAnalysis(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/entity/TreeNode.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/entity/TreeNode.java
deleted file mode 100644
index 7f7f22cbc2035fb0633215547bfade4375338df2..0000000000000000000000000000000000000000
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/entity/TreeNode.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package com.a.eye.skywalking.collector.worker.globaltrace.entity;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @author pengys5
- */
-public class TreeNode {
-
- private String spanId;
-
- private List childNodes;
-
- public TreeNode(String spanId) {
- this.spanId = spanId;
- childNodes = new ArrayList<>();
- }
-
- public void addChild(TreeNode childNode) {
- childNodes.add(childNode);
- }
-}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceAgg.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceAgg.java
index 0a6cf9079fbf0619d64de4291ca06e3e85e4f56d..5e246aa7ad507f85a4aa89afe560e9edd6518f6d 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceAgg.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceAgg.java
@@ -15,7 +15,7 @@ public class GlobalTraceAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(GlobalTraceAgg.class);
- private GlobalTraceAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ GlobalTraceAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@@ -48,7 +48,7 @@ public class GlobalTraceAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
- return WorkerConfig.Worker.DAGNodeReceiver.Num;
+ return WorkerConfig.WorkerNum.GlobalTrace.GlobalTraceAgg.Value;
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceSave.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceSave.java
index 6954e9b4dc7ba7d7713589446e331bcf4713d5c0..3c1f121993f70918b0f5dd2b7281830f59a2faec 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceSave.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceSave.java
@@ -4,7 +4,7 @@ package com.a.eye.skywalking.collector.worker.globaltrace.persistence;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
-import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
+import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.MergePersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
@@ -15,7 +15,7 @@ import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
*/
public class GlobalTraceSave extends MergePersistenceMember {
- private GlobalTraceSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ GlobalTraceSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@@ -39,7 +39,7 @@ public class GlobalTraceSave extends MergePersistenceMember {
@Override
public int queueSize() {
- return WorkerConfig.Queue.TraceSegmentRecordAnalysis.Size;
+ return WorkerConfig.Queue.GlobalTrace.GlobalTraceSave.Size;
}
@Override
@@ -58,7 +58,7 @@ public class GlobalTraceSave extends MergePersistenceMember {
@Override
public WorkerSelector workerSelector() {
- return new RollingSelector();
+ return new HashCodeSelector();
}
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceSearchWithGlobalId.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceSearchWithGlobalId.java
index 525482393a50e71596b4b21879678c3029db6dc8..4e5915162432baab974c6a33f52a0815f6cfc79c 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceSearchWithGlobalId.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceSearchWithGlobalId.java
@@ -9,7 +9,7 @@ import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
import com.a.eye.skywalking.collector.worker.segment.logic.Segment;
import com.a.eye.skywalking.collector.worker.segment.logic.SegmentDeserialize;
import com.a.eye.skywalking.collector.worker.segment.logic.SpanView;
-import com.a.eye.skywalking.collector.worker.storage.EsClient;
+import com.a.eye.skywalking.collector.worker.storage.GetResponseFromEs;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.tools.CollectionTools;
import com.a.eye.skywalking.trace.Span;
@@ -18,7 +18,6 @@ import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.elasticsearch.client.Client;
import java.util.ArrayList;
import java.util.Collections;
@@ -33,17 +32,17 @@ public class GlobalTraceSearchWithGlobalId extends AbstractLocalSyncWorker {
private Gson gson = new Gson();
- public GlobalTraceSearchWithGlobalId(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ GlobalTraceSearchWithGlobalId(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
protected void onWork(Object request, Object response) throws Exception {
if (request instanceof String) {
- Client client = EsClient.getClient();
String globalId = (String) request;
- String globalTraceData = client.prepareGet(GlobalTraceIndex.Index, GlobalTraceIndex.Type_Record, globalId).get().getSourceAsString();
+ String globalTraceData = GetResponseFromEs.INSTANCE.get(GlobalTraceIndex.Index, GlobalTraceIndex.Type_Record, globalId).getSourceAsString();
JsonObject globalTraceObj = gson.fromJson(globalTraceData, JsonObject.class);
+ logger.debug("globalTraceObj: %s", globalTraceObj);
String subSegIdsStr = globalTraceObj.get(GlobalTraceIndex.SubSegIds).getAsString();
String[] subSegIds = subSegIdsStr.split(MergeData.Split);
@@ -51,7 +50,8 @@ public class GlobalTraceSearchWithGlobalId extends AbstractLocalSyncWorker {
List spanViewList = new ArrayList<>();
for (String subSegId : subSegIds) {
logger.debug("subSegId: %s", subSegId);
- String segmentSource = client.prepareGet(SegmentIndex.Index, SegmentIndex.Type_Record, subSegId).get().getSourceAsString();
+ String segmentSource = GetResponseFromEs.INSTANCE.get(SegmentIndex.Index, SegmentIndex.Type_Record, subSegId).getSourceAsString();
+ logger.debug("segmentSource: %s", segmentSource);
Segment segment = SegmentDeserialize.INSTANCE.deserializeFromES(segmentSource);
String segmentId = segment.getTraceSegmentId();
List refsList = segment.getRefs();
@@ -62,19 +62,23 @@ public class GlobalTraceSearchWithGlobalId extends AbstractLocalSyncWorker {
}
}
- SpanView rootSpan = findRoot(spanViewList);
- findChild(rootSpan, spanViewList, rootSpan.getStartTime());
-
- List viewList = new ArrayList<>();
- viewList.add(rootSpan);
-
- Gson gson = new Gson();
- String globalTraceStr = gson.toJson(viewList);
JsonObject responseObj = (JsonObject) response;
- responseObj.addProperty("result", globalTraceStr);
+ responseObj.addProperty("result", buildTree(spanViewList));
}
}
+ private String buildTree(List spanViewList) {
+ SpanView rootSpan = findRoot(spanViewList);
+ assert rootSpan != null;
+ findChild(rootSpan, spanViewList, rootSpan.getStartTime());
+
+ List viewList = new ArrayList<>();
+ viewList.add(rootSpan);
+
+ Gson gson = new Gson();
+ return gson.toJson(viewList);
+ }
+
private SpanView findRoot(List spanViewList) {
for (SpanView spanView : spanViewList) {
if (StringUtil.isEmpty(spanView.getParentSpanSegId())) {
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/httpserver/HttpServer.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/httpserver/HttpServer.java
index 481ac94ee1f69a9291eac3ed99654cd43e9e3d06..144a9ab69767ff67e8871d8b7e3ffb1f192abb4c 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/httpserver/HttpServer.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/httpserver/HttpServer.java
@@ -25,11 +25,6 @@ public enum HttpServer {
ServletsCreator.INSTANCE.boot(servletContextHandler, clusterContext);
-// ServerConnector serverConnector = new ServerConnector(server);
-// serverConnector.setHost("127.0.0.1");
-// serverConnector.setPort(7001);
-// serverConnector.setIdleTimeout(5000);
-
server.setHandler(servletContextHandler);
server.start();
server.join();
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/AbstractNodeCompAnalysis.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/AbstractNodeCompAnalysis.java
index ea783c1ebe6a83a88cb7c22f9ccd021e2b9f1079..c85d2245f36a88e916c5f0e6d3849fc353ac3791 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/AbstractNodeCompAnalysis.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/AbstractNodeCompAnalysis.java
@@ -36,7 +36,7 @@ abstract class AbstractNodeCompAnalysis extends RecordAnalysisMember {
for (Span span : spanList) {
String kind = Tags.SPAN_KIND.get(span);
if (Tags.SPAN_KIND_CLIENT.equals(kind) && ClientSpanIsLeafTools.isLeaf(span.getSpanId(), spanList)) {
- String peers = SpanPeersTools.getPeers(span);
+ String peers = SpanPeersTools.INSTANCE.getPeers(span);
JsonObject compJsonObj = new JsonObject();
compJsonObj.addProperty(NodeCompIndex.Peers, peers);
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/NodeCompAnalysis.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/NodeCompAnalysis.java
index e7827f69d92239e243b8c43fbc9a3ad010764f25..2cc87013857bced83fb7ccb53fa436be26687285 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/NodeCompAnalysis.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/NodeCompAnalysis.java
@@ -50,7 +50,7 @@ public class NodeCompAnalysis extends AbstractNodeCompAnalysis {
@Override
public int queueSize() {
- return WorkerConfig.Queue.Node.NodeDayAnalysis.Size;
+ return WorkerConfig.Queue.Node.NodeCompAnalysis.Size;
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeCompAgg.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeCompAgg.java
index fd2fd46a051e3ac41d32355ed86da03e5984fcd5..2149a293121dd8c40e6e8746bfafa4242d644a90 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeCompAgg.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeCompAgg.java
@@ -11,7 +11,7 @@ import com.a.eye.skywalking.collector.worker.storage.RecordData;
*/
public class NodeCompAgg extends AbstractClusterWorker {
- public NodeCompAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ NodeCompAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@@ -24,9 +24,7 @@ public class NodeCompAgg extends AbstractClusterWorker {
protected void onWork(Object message) throws Exception {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeCompSave.Role.INSTANCE).tell(message);
- } else {
- throw new IllegalArgumentException("message instance must RecordData");
- }
+ } else throw new IllegalArgumentException("message instance must RecordData");
}
public static class Factory extends AbstractClusterWorkerProvider {
@@ -44,7 +42,7 @@ public class NodeCompAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
- return WorkerConfig.WorkerNum.Node.NodeDayAgg.Value;
+ return WorkerConfig.WorkerNum.Node.NodeCompAgg.Value;
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeCompLoad.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeCompLoad.java
index 098cfe6b50bde4e6e2ec8d492d7f4070068cfd81..d4e9c3d29d10c809ada883fdb876180e2c3067df 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeCompLoad.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeCompLoad.java
@@ -27,7 +27,7 @@ public class NodeCompLoad extends AbstractLocalSyncWorker {
@Override
public void onWork(Object request, Object response) throws Exception {
- SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeCompIndex.Index);
+ SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(NodeCompIndex.Index);
searchRequestBuilder.setTypes(NodeCompIndex.Type_Record);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setSize(100);
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeCompSave.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeCompSave.java
index c89f6eb3b12aa7824b05cf8f5427aad4e6d59905..bcae0e72925f2607e3451b20324ad0daa6f98d83 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeCompSave.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeCompSave.java
@@ -43,7 +43,7 @@ public class NodeCompSave extends RecordPersistenceMember {
@Override
public int queueSize() {
- return WorkerConfig.Queue.Persistence.DAGNodePersistence.Size;
+ return WorkerConfig.Queue.Node.NodeCompSave.Size;
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeMappingDayAgg.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeMappingDayAgg.java
index 12e36e053c32cc8b9da52ba42b95c401a9d0dc1b..25d6869b3906ccbbed93ea439bd8d8e56b692a38 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeMappingDayAgg.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeMappingDayAgg.java
@@ -11,7 +11,7 @@ import com.a.eye.skywalking.collector.worker.storage.RecordData;
*/
public class NodeMappingDayAgg extends AbstractClusterWorker {
- public NodeMappingDayAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ NodeMappingDayAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeMappingDaySave.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeMappingDaySave.java
index c54393183d93f1e1c54029157fee2cb64409cc84..b883a7f06039c4ae6b500e2a196adbe875e7e019 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeMappingDaySave.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeMappingDaySave.java
@@ -14,7 +14,7 @@ import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
*/
public class NodeMappingDaySave extends RecordPersistenceMember {
- public NodeMappingDaySave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ NodeMappingDaySave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@@ -43,7 +43,7 @@ public class NodeMappingDaySave extends RecordPersistenceMember {
@Override
public int queueSize() {
- return WorkerConfig.Queue.Persistence.DAGNodePersistence.Size;
+ return WorkerConfig.Queue.Node.NodeMappingDaySave.Size;
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeMappingHourSave.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeMappingHourSave.java
index 2ca6a656381c3d53cddf0d96a5274e1aedce2c5c..58d776ffbfd0963d0981fa36524a3a47c21e0315 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeMappingHourSave.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeMappingHourSave.java
@@ -14,7 +14,7 @@ import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
*/
public class NodeMappingHourSave extends RecordPersistenceMember {
- public NodeMappingHourSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ NodeMappingHourSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@@ -43,7 +43,7 @@ public class NodeMappingHourSave extends RecordPersistenceMember {
@Override
public int queueSize() {
- return WorkerConfig.Queue.Persistence.DAGNodePersistence.Size;
+ return WorkerConfig.Queue.Node.NodeMappingHourSave.Size;
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeMappingMinuteSave.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeMappingMinuteSave.java
index e8bde58246d1cd302af4d00403baf96394c6efe6..29b398b80575326f7d5524f610732613eb1d665e 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeMappingMinuteSave.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeMappingMinuteSave.java
@@ -14,7 +14,7 @@ import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
*/
public class NodeMappingMinuteSave extends RecordPersistenceMember {
- public NodeMappingMinuteSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ NodeMappingMinuteSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@@ -43,7 +43,7 @@ public class NodeMappingMinuteSave extends RecordPersistenceMember {
@Override
public int queueSize() {
- return WorkerConfig.Queue.Persistence.DAGNodePersistence.Size;
+ return WorkerConfig.Queue.Node.NodeMappingMinuteSave.Size;
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeMappingSearchWithTimeSlice.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeMappingSearchWithTimeSlice.java
index 292254153683d761ce6cc8453a44a374f371d79b..6779924b5600cec90250169e3e70a05986cc95c2 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeMappingSearchWithTimeSlice.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/persistence/NodeMappingSearchWithTimeSlice.java
@@ -34,7 +34,7 @@ public class NodeMappingSearchWithTimeSlice extends AbstractLocalSyncWorker {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
- SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeMappingIndex.Index);
+ SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(NodeMappingIndex.Index);
searchRequestBuilder.setTypes(search.getSliceType());
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(NodeMappingIndex.Time_Slice).gte(search.getStartTime()).lte(search.getEndTime()));
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/NodeRefGetWithTimeSlice.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/NodeRefGetWithTimeSlice.java
deleted file mode 100644
index d02326b7a453b4d206939441df99bc94947d5da7..0000000000000000000000000000000000000000
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/NodeRefGetWithTimeSlice.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package com.a.eye.skywalking.collector.worker.noderef;
-
-import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
-import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
-import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
-import com.a.eye.skywalking.collector.actor.Role;
-import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
-import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
-import com.a.eye.skywalking.collector.worker.httpserver.AbstractGet;
-import com.a.eye.skywalking.collector.worker.httpserver.AbstractGetProvider;
-import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefSearchWithTimeSlice;
-import com.a.eye.skywalking.collector.worker.tools.ParameterTools;
-import com.google.gson.JsonObject;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.Arrays;
-import java.util.Map;
-
-/**
- * @author pengys5
- */
-public class NodeRefGetWithTimeSlice extends AbstractGet {
-
- private Logger logger = LogManager.getFormatterLogger(NodeRefGetWithTimeSlice.class);
-
- private NodeRefGetWithTimeSlice(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
- super(role, clusterContext, selfContext);
- }
-
- @Override
- public void preStart() throws ProviderNotFoundException {
- getClusterContext().findProvider(NodeRefSearchWithTimeSlice.WorkerRole.INSTANCE).create(this);
- }
-
- @Override
- protected void onSearch(Map request, JsonObject response) throws Exception {
- if (!request.containsKey("startTime") || !request.containsKey("endTime") || !request.containsKey("timeSliceType")) {
- throw new IllegalArgumentException("the request parameter must contains startTime,endTime,timeSliceType");
- }
- logger.debug("startTime: %s, endTime: %s, timeSliceType: %s", Arrays.toString(request.get("startTime")),
- Arrays.toString(request.get("endTime")), Arrays.toString(request.get("timeSliceType")));
-
- long startTime;
- try {
- startTime = Long.valueOf(ParameterTools.INSTANCE.toString(request, "startTime"));
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException("the request parameter startTime must numeric with long type");
- }
-
- long endTime;
- try {
- endTime = Long.valueOf(ParameterTools.INSTANCE.toString(request, "endTime"));
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException("the request parameter endTime must numeric with long type");
- }
-
- NodeRefSearchWithTimeSlice.RequestEntity requestEntity;
- requestEntity = new NodeRefSearchWithTimeSlice.RequestEntity(ParameterTools.INSTANCE.toString(request, "timeSliceType"), startTime, endTime);
- getSelfContext().lookup(NodeRefSearchWithTimeSlice.WorkerRole.INSTANCE).ask(requestEntity, response);
- }
-
- public static class Factory extends AbstractGetProvider {
-
- @Override
- public Role role() {
- return WorkerRole.INSTANCE;
- }
-
- @Override
- public NodeRefGetWithTimeSlice workerInstance(ClusterWorkerContext clusterContext) {
- return new NodeRefGetWithTimeSlice(role(), clusterContext, new LocalWorkerContext());
- }
-
- @Override
- public String servletPath() {
- return "/nodeRef/timeSlice";
- }
- }
-
- public enum WorkerRole implements Role {
- INSTANCE;
-
- @Override
- public String roleName() {
- return NodeRefGetWithTimeSlice.class.getSimpleName();
- }
-
- @Override
- public WorkerSelector workerSelector() {
- return new RollingSelector();
- }
- }
-}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/NodeRefResSumGetWithTimeSlice.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/NodeRefResSumGetWithTimeSlice.java
deleted file mode 100644
index 758ddda5191767d371d0d0b4a625d695a50ce1d7..0000000000000000000000000000000000000000
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/NodeRefResSumGetWithTimeSlice.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package com.a.eye.skywalking.collector.worker.noderef;
-
-import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
-import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
-import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
-import com.a.eye.skywalking.collector.actor.Role;
-import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
-import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
-import com.a.eye.skywalking.collector.worker.httpserver.AbstractGet;
-import com.a.eye.skywalking.collector.worker.httpserver.AbstractGetProvider;
-import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefResSumSearchWithTimeSlice;
-import com.a.eye.skywalking.collector.worker.tools.ParameterTools;
-import com.google.gson.JsonObject;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.Arrays;
-import java.util.Map;
-
-/**
- * @author pengys5
- */
-public class NodeRefResSumGetWithTimeSlice extends AbstractGet {
-
- private Logger logger = LogManager.getFormatterLogger(NodeRefResSumGetWithTimeSlice.class);
-
- private NodeRefResSumGetWithTimeSlice(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
- super(role, clusterContext, selfContext);
- }
-
- @Override
- public void preStart() throws ProviderNotFoundException {
- getClusterContext().findProvider(NodeRefResSumSearchWithTimeSlice.WorkerRole.INSTANCE).create(this);
- }
-
- @Override
- protected void onSearch(Map request, JsonObject response) throws Exception {
- if (!request.containsKey("startTime") || !request.containsKey("endTime") || !request.containsKey("timeSliceType")) {
- throw new IllegalArgumentException("the request parameter must contains startTime,endTime,timeSliceType");
- }
- logger.debug("startTime: %s, endTime: %s, timeSliceType: %s", Arrays.toString(request.get("startTime")),
- Arrays.toString(request.get("endTime")), Arrays.toString(request.get("timeSliceType")));
-
- long startTime;
- try {
- startTime = Long.valueOf(ParameterTools.INSTANCE.toString(request, "startTime"));
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException("the request parameter startTime must numeric with long type");
- }
-
- long endTime;
- try {
- endTime = Long.valueOf(ParameterTools.INSTANCE.toString(request, "endTime"));
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException("the request parameter endTime must numeric with long type");
- }
-
- NodeRefResSumSearchWithTimeSlice.RequestEntity requestEntity;
- requestEntity = new NodeRefResSumSearchWithTimeSlice.RequestEntity(ParameterTools.INSTANCE.toString(request, "timeSliceType"), startTime, endTime);
- getSelfContext().lookup(NodeRefResSumSearchWithTimeSlice.WorkerRole.INSTANCE).ask(requestEntity, response);
- }
-
- public static class Factory extends AbstractGetProvider {
-
- @Override
- public Role role() {
- return WorkerRole.INSTANCE;
- }
-
- @Override
- public NodeRefResSumGetWithTimeSlice workerInstance(ClusterWorkerContext clusterContext) {
- return new NodeRefResSumGetWithTimeSlice(role(), clusterContext, new LocalWorkerContext());
- }
-
- @Override
- public String servletPath() {
- return "/nodeRef/resSum/timeSlice";
- }
- }
-
- public enum WorkerRole implements Role {
- INSTANCE;
-
- @Override
- public String roleName() {
- return NodeRefResSumGetWithTimeSlice.class.getSimpleName();
- }
-
- @Override
- public WorkerSelector workerSelector() {
- return new RollingSelector();
- }
- }
-}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/analysis/AbstractNodeRefAnalysis.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/analysis/AbstractNodeRefAnalysis.java
index 3046c500633a42b514e869b25f3e02d74e745ff6..717070bf19f77eba8b1db07d541fefadcfd6366c 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/analysis/AbstractNodeRefAnalysis.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/analysis/AbstractNodeRefAnalysis.java
@@ -41,7 +41,7 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
String front = segment.getApplicationCode();
dataJsonObj.addProperty(NodeRefIndex.Front, front);
- String behind = SpanPeersTools.getPeers(span);
+ String behind = SpanPeersTools.INSTANCE.getPeers(span);
dataJsonObj.addProperty(NodeRefIndex.Behind, behind);
dataJsonObj.addProperty(NodeRefIndex.BehindIsRealCode, false);
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefDayAgg.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefDayAgg.java
index ce44bfcec2cf2bb03146340b6f0c8278d6b95416..e045565ace522a0a312c533a913d8ce0fd89a941 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefDayAgg.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefDayAgg.java
@@ -15,7 +15,7 @@ public class NodeRefDayAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefDayAgg.class);
- public NodeRefDayAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ NodeRefDayAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@@ -48,7 +48,7 @@ public class NodeRefDayAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
- return WorkerConfig.Worker.DAGNodeRefReceiver.Num;
+ return WorkerConfig.WorkerNum.NodeRef.NodeRefDayAgg.Value;
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefDaySave.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefDaySave.java
index 16a595d9df29f8827df63af367bcaf5b99b6f788..946eda3a1e284a4f000e17b19f866332c86bf071 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefDaySave.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefDaySave.java
@@ -14,7 +14,7 @@ import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
*/
public class NodeRefDaySave extends RecordPersistenceMember {
- public NodeRefDaySave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ NodeRefDaySave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@@ -44,7 +44,7 @@ public class NodeRefDaySave extends RecordPersistenceMember {
@Override
public int queueSize() {
- return WorkerConfig.Queue.Persistence.DAGNodeRefPersistence.Size;
+ return WorkerConfig.Queue.NodeRef.NodeRefDaySave.Size;
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefHourAgg.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefHourAgg.java
index 14c6890f7f1270a43b8b6845030de69dcc6e8593..78974e82b98b12401d1e2aa2cb211ed9ef789419 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefHourAgg.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefHourAgg.java
@@ -15,7 +15,7 @@ public class NodeRefHourAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefHourAgg.class);
- public NodeRefHourAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ NodeRefHourAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@@ -48,7 +48,7 @@ public class NodeRefHourAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
- return WorkerConfig.Worker.DAGNodeRefReceiver.Num;
+ return WorkerConfig.WorkerNum.NodeRef.NodeRefHourAgg.Value;
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefHourSave.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefHourSave.java
index 814ef87f42cb438d5eedca99f9ca8de260d79442..8ec5b14760d5c3c7fc481c95ffddb8ddf3ca4c09 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefHourSave.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefHourSave.java
@@ -14,7 +14,7 @@ import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
*/
public class NodeRefHourSave extends RecordPersistenceMember {
- public NodeRefHourSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ NodeRefHourSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@@ -44,7 +44,7 @@ public class NodeRefHourSave extends RecordPersistenceMember {
@Override
public int queueSize() {
- return WorkerConfig.Queue.Persistence.DAGNodeRefPersistence.Size;
+ return WorkerConfig.Queue.NodeRef.NodeRefHourSave.Size;
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefMinuteAgg.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefMinuteAgg.java
index 5d1b4f124b08d9d4cb2e43f90c45c5771ff98782..b961e2ce7e85c7acb3a18a50f0998053b148419b 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefMinuteAgg.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefMinuteAgg.java
@@ -15,7 +15,7 @@ public class NodeRefMinuteAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefMinuteAgg.class);
- public NodeRefMinuteAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ NodeRefMinuteAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@@ -48,7 +48,7 @@ public class NodeRefMinuteAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
- return WorkerConfig.Worker.DAGNodeRefReceiver.Num;
+ return WorkerConfig.WorkerNum.NodeRef.NodeRefMinuteAgg.Value;
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefMinuteSave.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefMinuteSave.java
index b872da5ea17f37bf2c0c32f23a0eddac32d68690..d77cd3c37e441c781c11994d5d28df6c294b04cd 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefMinuteSave.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefMinuteSave.java
@@ -14,7 +14,7 @@ import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
*/
public class NodeRefMinuteSave extends RecordPersistenceMember {
- public NodeRefMinuteSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ NodeRefMinuteSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@@ -44,7 +44,7 @@ public class NodeRefMinuteSave extends RecordPersistenceMember {
@Override
public int queueSize() {
- return WorkerConfig.Queue.Persistence.DAGNodeRefPersistence.Size;
+ return WorkerConfig.Queue.NodeRef.NodeRefMinuteSave.Size;
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumDayAgg.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumDayAgg.java
index a03ea1caf4ae25f0f627cab89a351acf80aa53b0..61fa1ad5c66a8f6e1b75bc9b12bb76b7f58a5169 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumDayAgg.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumDayAgg.java
@@ -15,7 +15,7 @@ public class NodeRefResSumDayAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefResSumDayAgg.class);
- private NodeRefResSumDayAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ NodeRefResSumDayAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@@ -48,7 +48,7 @@ public class NodeRefResSumDayAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
- return WorkerConfig.Worker.ResponseSummaryReceiver.Num;
+ return WorkerConfig.WorkerNum.NodeRef.NodeRefResSumDayAgg.Value;
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumDaySave.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumDaySave.java
index 566c51e336a554e63c6ee80138945080cf3f7931..b2afec7e7fc6e418426fc326853d8dc6ea05d713 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumDaySave.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumDaySave.java
@@ -14,7 +14,7 @@ import com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumIndex;
*/
public class NodeRefResSumDaySave extends MetricPersistenceMember {
- private NodeRefResSumDaySave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ NodeRefResSumDaySave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@@ -43,7 +43,7 @@ public class NodeRefResSumDaySave extends MetricPersistenceMember {
@Override
public int queueSize() {
- return WorkerConfig.Queue.Persistence.ResponseSummaryPersistence.Size;
+ return WorkerConfig.Queue.NodeRef.NodeRefResSumDaySave.Size;
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumGroupWithTimeSlice.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumGroupWithTimeSlice.java
index 0a14de90afa6fa515d8ec08ddb12439c90844f92..2e3a61ee9572cf812141e7d63f08c09a7fcab97e 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumGroupWithTimeSlice.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumGroupWithTimeSlice.java
@@ -35,7 +35,7 @@ public class NodeRefResSumGroupWithTimeSlice extends AbstractLocalSyncWorker {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
- SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeRefResSumIndex.Index);
+ SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(NodeRefResSumIndex.Index);
searchRequestBuilder.setTypes(search.getSliceType());
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(NodeRefResSumIndex.Time_Slice).gte(search.getStartTime()).lte(search.getEndTime()));
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumHourAgg.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumHourAgg.java
index 5e4ef0c3e9c01fdc2775011bd6b1563f75cbd8d6..39435bbd5ecf4f032b5eec062d52e452ecc10e12 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumHourAgg.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumHourAgg.java
@@ -15,7 +15,7 @@ public class NodeRefResSumHourAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefResSumHourAgg.class);
- private NodeRefResSumHourAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ NodeRefResSumHourAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@@ -48,7 +48,7 @@ public class NodeRefResSumHourAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
- return WorkerConfig.Worker.ResponseSummaryReceiver.Num;
+ return WorkerConfig.WorkerNum.NodeRef.NodeRefResSumHourAgg.Value;
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumHourSave.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumHourSave.java
index 0009ce518dcf127ca29c8dec2f9e6f99d2f80fe2..f4da5b5076ab3ca89243787eb0e4898d799de5c4 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumHourSave.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumHourSave.java
@@ -14,7 +14,7 @@ import com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumIndex;
*/
public class NodeRefResSumHourSave extends MetricPersistenceMember {
- private NodeRefResSumHourSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ NodeRefResSumHourSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@@ -43,7 +43,7 @@ public class NodeRefResSumHourSave extends MetricPersistenceMember {
@Override
public int queueSize() {
- return WorkerConfig.Queue.Persistence.ResponseSummaryPersistence.Size;
+ return WorkerConfig.Queue.NodeRef.NodeRefResSumHourSave.Size;
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumMinuteAgg.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumMinuteAgg.java
index 1981f643ae65c3ebd33d059d79e87c1557230873..7afb59d8b6595e9a074c7da46f12c40cc621fdaf 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumMinuteAgg.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumMinuteAgg.java
@@ -15,7 +15,7 @@ public class NodeRefResSumMinuteAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefResSumMinuteAgg.class);
- private NodeRefResSumMinuteAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ NodeRefResSumMinuteAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@@ -48,7 +48,7 @@ public class NodeRefResSumMinuteAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
- return WorkerConfig.Worker.ResponseSummaryReceiver.Num;
+ return WorkerConfig.WorkerNum.NodeRef.NodeRefResSumMinuteAgg.Value;
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumMinuteSave.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumMinuteSave.java
index 3ffb4a0ffc133f05d90ca6c82fa22a3d1be689b6..d2ae9fae79a0d302bdf453b2e814bb335504a8cb 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumMinuteSave.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumMinuteSave.java
@@ -14,7 +14,7 @@ import com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumIndex;
*/
public class NodeRefResSumMinuteSave extends MetricPersistenceMember {
- private NodeRefResSumMinuteSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ NodeRefResSumMinuteSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@@ -43,7 +43,7 @@ public class NodeRefResSumMinuteSave extends MetricPersistenceMember {
@Override
public int queueSize() {
- return WorkerConfig.Queue.Persistence.ResponseSummaryPersistence.Size;
+ return WorkerConfig.Queue.NodeRef.NodeRefResSumMinuteSave.Size;
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumSearchWithTimeSlice.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumSearchWithTimeSlice.java
index a3a41f234fb20cf1640f4c5ba95174af00327dc9..8580abb441b18f8f7cc75ed913b3d6b699a88580 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumSearchWithTimeSlice.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefResSumSearchWithTimeSlice.java
@@ -36,7 +36,7 @@ public class NodeRefResSumSearchWithTimeSlice extends AbstractLocalSyncWorker {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
- SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeRefResSumIndex.Index);
+ SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(NodeRefResSumIndex.Index);
searchRequestBuilder.setTypes(search.getSliceType());
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(NodeRefResSumIndex.Time_Slice).gte(search.getStartTime()).lte(search.getEndTime()));
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefSearchWithTimeSlice.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefSearchWithTimeSlice.java
index eeebdd49b41a74c47092d29fac5bdf5790c88014..5f24dd0fe99eb6630dff1f6956c5ca4ba808eb47 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefSearchWithTimeSlice.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/persistence/NodeRefSearchWithTimeSlice.java
@@ -34,7 +34,7 @@ public class NodeRefSearchWithTimeSlice extends AbstractLocalSyncWorker {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
- SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeRefIndex.Index);
+ SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(NodeRefIndex.Index);
searchRequestBuilder.setTypes(search.getSliceType());
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(NodeRefIndex.Time_Slice).gte(search.getStartTime()).lte(search.getEndTime()));
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/SegmentCostIndex.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/SegmentCostIndex.java
index dd2a994d8838456b9830c03318382645b0acb3f5..8da59104f001027d6815d8f1f5560adeb2aac5fc 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/SegmentCostIndex.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/SegmentCostIndex.java
@@ -31,7 +31,7 @@ public class SegmentCostIndex extends AbstractIndex {
@Override
public XContentBuilder createMappingBuilder() throws IOException {
- XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
+ return XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject(SegId)
@@ -50,12 +50,11 @@ public class SegmentCostIndex extends AbstractIndex {
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
- .startObject(Cost)
+ .startObject(Cost)
.field("type", "long")
- .field("index", "not_analyzed")
+ .field("index", "not_analyzed")
.endObject()
.endObject()
.endObject();
- return mappingBuilder;
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/SegmentIndex.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/SegmentIndex.java
index 741ac3e0d03054edbd04e2a6e6225c2ee58be63b..32def9cd84329f4bcac69f89f8ded8ab22f81472 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/SegmentIndex.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/SegmentIndex.java
@@ -1,8 +1,6 @@
package com.a.eye.skywalking.collector.worker.segment;
import com.a.eye.skywalking.collector.worker.storage.AbstractIndex;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
@@ -13,8 +11,6 @@ import java.io.IOException;
*/
public class SegmentIndex extends AbstractIndex {
- private Logger logger = LogManager.getFormatterLogger(SegmentIndex.class);
-
public static final String Index = "segment_idx";
@Override
@@ -29,7 +25,7 @@ public class SegmentIndex extends AbstractIndex {
@Override
public XContentBuilder createMappingBuilder() throws IOException {
- XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
+ return XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject("traceSegmentId")
@@ -60,54 +56,7 @@ public class SegmentIndex extends AbstractIndex {
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
- .startArray("refs")
- .startObject("traceSegmentId")
- .field("type", "String")
- .field("index", "not_analyzed")
- .endObject()
- .startObject("spanId")
- .field("type", "integer")
- .field("index", "not_analyzed")
- .endObject()
- .startObject("applicationCode")
- .field("type", "String")
- .field("index", "not_analyzed")
- .endObject()
- .startObject("peerHost")
- .field("type", "String")
- .field("index", "not_analyzed")
- .endObject()
- .endArray()
- .startArray("refs")
- .startObject("spanId")
- .field("type", "integer")
- .field("index", "not_analyzed")
- .endObject()
- .startObject("parentSpanId")
- .field("type", "integer")
- .field("index", "not_analyzed")
- .endObject()
- .startObject("startTime")
- .field("type", "date")
- .field("index", "not_analyzed")
- .endObject()
- .startObject("endTime")
- .field("type", "date")
- .field("index", "not_analyzed")
- .endObject()
- .startObject("operationName")
- .field("type", "String")
- .field("index", "not_analyzed")
- .endObject()
- .endArray()
- .startArray("relatedGlobalTraces")
- .startObject("id")
- .field("type", "String")
- .field("index", "not_analyzed")
- .endObject()
- .endArray()
.endObject()
.endObject();
- return mappingBuilder;
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/SegmentPost.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/SegmentPost.java
index da1317929e86a29bbff1df24192da88cea93d293..af96de4b2f4c3bae97c4c37c43a9eb0048141c35 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/SegmentPost.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/SegmentPost.java
@@ -7,6 +7,7 @@ import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
+import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.globaltrace.analysis.GlobalTraceAnalysis;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractPost;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractPostProvider;
@@ -134,7 +135,7 @@ public class SegmentPost extends AbstractPost {
@Override
public int queueSize() {
- return 128;
+ return WorkerConfig.Queue.Segment.SegmentPost.Size;
}
@Override
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentExceptionWithSegId.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentExceptionWithSegId.java
index 4102a6698f653f61d9fc38c4b0945a438e036259..71443e4d919004e9cb227eeeca08ddc6da5f1a2f 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentExceptionWithSegId.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentExceptionWithSegId.java
@@ -22,7 +22,7 @@ public class SegmentExceptionWithSegId extends AbstractLocalSyncWorker {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
- GetResponse getResponse = EsClient.getClient().prepareGet(SegmentExceptionIndex.Index, SegmentExceptionIndex.Type_Record, search.segId).get();
+ GetResponse getResponse = EsClient.INSTANCE.getClient().prepareGet(SegmentExceptionIndex.Index, SegmentExceptionIndex.Type_Record, search.segId).get();
JsonObject dataJson = new JsonObject();
dataJson.addProperty(SegmentExceptionIndex.SegId, (String) getResponse.getSource().get(SegmentExceptionIndex.SegId));
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentTopSearchWithGlobalTraceId.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentTopSearchWithGlobalTraceId.java
index 9ecf8c6bdda7d7e000573fe003795f7bc764ae0a..52e15d215ccb759c4016e2d6511f49bd89bfdf17 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentTopSearchWithGlobalTraceId.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentTopSearchWithGlobalTraceId.java
@@ -41,7 +41,7 @@ public class SegmentTopSearchWithGlobalTraceId extends AbstractLocalSyncWorker {
protected void onWork(Object request, Object response) throws Exception {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
- Client client = EsClient.getClient();
+ Client client = EsClient.INSTANCE.getClient();
String globalTraceData = client.prepareGet(GlobalTraceIndex.Index, GlobalTraceIndex.Type_Record, search.globalTraceId).get().getSourceAsString();
JsonObject globalTraceObj = gson.fromJson(globalTraceData, JsonObject.class);
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentTopSearchWithTimeSlice.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentTopSearchWithTimeSlice.java
index 0b3095af83db657a4ab86c0a72c63f9a1d125fdc..fbedfd46b32198a214e8d1db214365f0154e1392 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentTopSearchWithTimeSlice.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentTopSearchWithTimeSlice.java
@@ -43,7 +43,7 @@ public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
- SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(SegmentCostIndex.Index);
+ SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(SegmentCostIndex.Index);
searchRequestBuilder.setTypes(SegmentCostIndex.Type_Record);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
@@ -86,7 +86,7 @@ public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
topSegmentJson.addProperty(SegmentCostIndex.OperationName, (String) searchHit.getSource().get(SegmentCostIndex.OperationName));
topSegmentJson.addProperty(SegmentCostIndex.Cost, (Number) searchHit.getSource().get(SegmentCostIndex.Cost));
- String segmentSource = EsClient.getClient().prepareGet(SegmentIndex.Index, SegmentIndex.Type_Record, segId).get().getSourceAsString();
+ String segmentSource = EsClient.INSTANCE.getClient().prepareGet(SegmentIndex.Index, SegmentIndex.Type_Record, segId).get().getSourceAsString();
Segment segment = SegmentDeserialize.INSTANCE.deserializeFromES(segmentSource);
List distributedTraceIdList = segment.getRelatedGlobalTraces();
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/span/SpanGetWithId.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/span/SpanGetWithId.java
index e5ff6cf992e13125a6ce4e3c15fbb25034945568..944ccdb563bc248427c45961a91a891238612058 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/span/SpanGetWithId.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/span/SpanGetWithId.java
@@ -40,11 +40,6 @@ public class SpanGetWithId extends AbstractGet {
}
logger.debug("segId: %s, spanId: %s", Arrays.toString(request.get("segId")), Arrays.toString(request.get("spanId")));
- int maxCost = -1;
- if (request.containsKey("maxCost")) {
- maxCost = Integer.valueOf(ParameterTools.INSTANCE.toString(request, "maxCost"));
- }
-
String segId = ParameterTools.INSTANCE.toString(request, "segId");
String spanId = ParameterTools.INSTANCE.toString(request, "spanId");
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/span/persistence/SpanSearchWithId.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/span/persistence/SpanSearchWithId.java
index 5129b4f2c87f273a87c2b14fbad16b50ce8299b5..d3b013f5c142602bfd5be91669de035eeb4bfcdd 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/span/persistence/SpanSearchWithId.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/span/persistence/SpanSearchWithId.java
@@ -3,10 +3,11 @@ package com.a.eye.skywalking.collector.worker.span.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
+import com.a.eye.skywalking.collector.worker.Const;
import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
import com.a.eye.skywalking.collector.worker.segment.logic.Segment;
import com.a.eye.skywalking.collector.worker.segment.logic.SegmentDeserialize;
-import com.a.eye.skywalking.collector.worker.storage.EsClient;
+import com.a.eye.skywalking.collector.worker.storage.GetResponseFromEs;
import com.a.eye.skywalking.trace.Span;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
@@ -21,7 +22,7 @@ public class SpanSearchWithId extends AbstractLocalSyncWorker {
private Gson gson = new Gson();
- private SpanSearchWithId(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ SpanSearchWithId(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@@ -29,7 +30,7 @@ public class SpanSearchWithId extends AbstractLocalSyncWorker {
protected void onWork(Object request, Object response) throws Exception {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
- GetResponse getResponse = EsClient.getClient().prepareGet(SegmentIndex.Index, SegmentIndex.Type_Record, search.segId).get();
+ GetResponse getResponse = GetResponseFromEs.INSTANCE.get(SegmentIndex.Index, SegmentIndex.Type_Record, search.segId);
Segment segment = SegmentDeserialize.INSTANCE.deserializeFromES(getResponse.getSourceAsString());
List spanList = segment.getSpans();
@@ -44,7 +45,7 @@ public class SpanSearchWithId extends AbstractLocalSyncWorker {
}
JsonObject resJsonObj = (JsonObject) response;
- resJsonObj.add("result", dataJson);
+ resJsonObj.add(Const.RESULT, dataJson);
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/AbstractIndex.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/AbstractIndex.java
index 8f4ade7daad9fb7bccca6fedae93b98c3c4866a7..bb27a34e9350d46858d0fe40503207db238c5775 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/AbstractIndex.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/AbstractIndex.java
@@ -29,20 +29,19 @@ public abstract class AbstractIndex {
public static final String AGG_COLUMN = "aggId";
public static final String Time_Slice = "timeSlice";
- final public XContentBuilder createSettingBuilder() throws IOException {
- XContentBuilder settingsBuilder = XContentFactory.jsonBuilder()
+ final XContentBuilder createSettingBuilder() throws IOException {
+ return XContentFactory.jsonBuilder()
.startObject()
.field("index.number_of_shards", 2)
.field("index.number_of_replicas", 0)
.endObject();
- return settingsBuilder;
}
public abstract boolean isRecord();
public abstract XContentBuilder createMappingBuilder() throws IOException;
- final public void createIndex() {
+ final void createIndex() {
// settings
String settingSource = "";
@@ -59,7 +58,7 @@ public abstract class AbstractIndex {
logger.error("create %s index mapping builder error", index());
}
Settings settings = Settings.builder().loadFromSource(settingSource).build();
- IndicesAdminClient client = EsClient.getClient().admin().indices();
+ IndicesAdminClient client = EsClient.INSTANCE.getClient().admin().indices();
if (isRecord()) {
CreateIndexResponse response = client.prepareCreate(index()).setSettings(settings).addMapping(Type_Record, mappingBuilder).get();
@@ -74,8 +73,8 @@ public abstract class AbstractIndex {
}
}
- final public boolean deleteIndex() {
- IndicesAdminClient client = EsClient.getClient().admin().indices();
+ final boolean deleteIndex() {
+ IndicesAdminClient client = EsClient.INSTANCE.getClient().admin().indices();
try {
DeleteIndexResponse response = client.prepareDelete(index()).get();
logger.info("delete %s index finished, isAcknowledged: %s", index(), response.isAcknowledged());
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/EsClient.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/EsClient.java
index 27a0a8e8000f198da0e1e0c15fd98d51d8f08c87..01355af82233600d77198ea59c73cc552cb3e9dd 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/EsClient.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/EsClient.java
@@ -15,11 +15,12 @@ import java.net.UnknownHostException;
/**
* @author pengys5
*/
-public class EsClient {
+public enum EsClient {
+ INSTANCE;
- private static Client client;
+ private Client client;
- public static void boot() throws UnknownHostException {
+ public void boot() throws UnknownHostException {
Settings settings = Settings.builder()
.put("cluster.name", "CollectorCluster")
.put("client.transport.sniff", true)
@@ -29,11 +30,11 @@ public class EsClient {
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
}
- public static Client getClient() {
+ public Client getClient() {
return client;
}
- public static void indexRefresh(String... indexName) {
+ public void indexRefresh(String... indexName) {
Logger logger = LogManager.getFormatterLogger(EsClient.class);
RefreshResponse response = client.admin().indices().refresh(new RefreshRequest(indexName)).actionGet();
if (response.getShardFailures().length == response.getTotalShards()) {
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/GetResponseFromEs.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/GetResponseFromEs.java
new file mode 100644
index 0000000000000000000000000000000000000000..dae1d509460ddf67c0d71b097d5a7ff628c9e756
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/GetResponseFromEs.java
@@ -0,0 +1,14 @@
+package com.a.eye.skywalking.collector.worker.storage;
+
+import org.elasticsearch.action.get.GetResponse;
+
+/**
+ * @author pengys5
+ */
+public enum GetResponseFromEs {
+ INSTANCE;
+
+ public GetResponse get(String index, String type, String id) {
+ return EsClient.INSTANCE.getClient().prepareGet(index, type, id).get();
+ }
+}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/IndexCreator.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/IndexCreator.java
index b8312881fa41b5d6a79a0a09dad622d79acf8768..1c2073cdd3505d6d7c7d8ac807ac2c135422224b 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/IndexCreator.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/IndexCreator.java
@@ -1,13 +1,11 @@
package com.a.eye.skywalking.collector.worker.storage;
-import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
-import com.a.eye.skywalking.collector.worker.node.NodeCompIndex;
-import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
-import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
-import com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumIndex;
-import com.a.eye.skywalking.collector.worker.segment.SegmentCostIndex;
-import com.a.eye.skywalking.collector.worker.segment.SegmentExceptionIndex;
-import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.HashSet;
+import java.util.ServiceLoader;
+import java.util.Set;
/**
* @author pengys5
@@ -15,36 +13,23 @@ import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
public enum IndexCreator {
INSTANCE;
- public void create() {
- SegmentIndex segmentIndex = new SegmentIndex();
- segmentIndex.deleteIndex();
-
- GlobalTraceIndex globalTraceIndex = new GlobalTraceIndex();
- globalTraceIndex.deleteIndex();
- globalTraceIndex.createIndex();
-
- SegmentCostIndex segmentCostIndex = new SegmentCostIndex();
- segmentCostIndex.deleteIndex();
- segmentCostIndex.createIndex();
-
- SegmentExceptionIndex segmentExceptionIndex = new SegmentExceptionIndex();
- segmentExceptionIndex.deleteIndex();
- segmentExceptionIndex.createIndex();
+ private Logger logger = LogManager.getFormatterLogger(IndexCreator.class);
- NodeCompIndex nodeCompIndex = new NodeCompIndex();
- nodeCompIndex.deleteIndex();
- nodeCompIndex.createIndex();
-
- NodeMappingIndex nodeMappingIndex = new NodeMappingIndex();
- nodeMappingIndex.deleteIndex();
- nodeMappingIndex.createIndex();
-
- NodeRefIndex nodeRefIndex = new NodeRefIndex();
- nodeRefIndex.deleteIndex();
- nodeRefIndex.createIndex();
+ public void create() {
+ Set indexSet = loadIndex();
+ for (AbstractIndex index : indexSet) {
+ index.deleteIndex();
+ index.createIndex();
+ }
+ }
- NodeRefResSumIndex nodeRefResSumIndex = new NodeRefResSumIndex();
- nodeRefResSumIndex.deleteIndex();
- nodeRefResSumIndex.createIndex();
+ private Set loadIndex() {
+ Set indexSet = new HashSet<>();
+ ServiceLoader indexServiceLoader = ServiceLoader.load(AbstractIndex.class);
+ for (AbstractIndex index : indexServiceLoader) {
+ logger.info("index name: %s", index.index());
+ indexSet.add(index);
+ }
+ return indexSet;
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/MergePersistenceData.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/MergePersistenceData.java
index 9d9aa9ec7704e2f12fedd3d728bd4cf6f146e292..4a5ccec673bbb8c1bd5d66a689574383a9fb7832 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/MergePersistenceData.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/MergePersistenceData.java
@@ -11,7 +11,7 @@ import java.util.function.Consumer;
*/
public class MergePersistenceData implements Iterable {
- private Map persistenceData = new HashMap();
+ private Map persistenceData = new HashMap<>();
public MergeData getElseCreate(String id) {
if (!persistenceData.containsKey(id)) {
@@ -28,10 +28,6 @@ public class MergePersistenceData implements Iterable {
persistenceData.clear();
}
- public boolean hasNext() {
- return persistenceData.entrySet().iterator().hasNext();
- }
-
public MergeData pushOne() {
MergeData one = persistenceData.entrySet().iterator().next().getValue();
persistenceData.remove(one.getId());
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/MetricPersistenceData.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/MetricPersistenceData.java
index 3f9cdedd529444ef8eb367f3386424fbc668f031..23aa8304421990e4aac93acc7b271d718491520f 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/MetricPersistenceData.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/MetricPersistenceData.java
@@ -10,7 +10,7 @@ import java.util.Spliterator;
*/
public class MetricPersistenceData implements Iterable {
- private Map persistenceData = new HashMap();
+ private Map persistenceData = new HashMap<>();
public MetricData getElseCreate(String id) {
if (!persistenceData.containsKey(id)) {
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/RecordData.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/RecordData.java
index 0135977b5ef04aa87ad1d807e2e1653231a26da7..f4b8f22614c7b18593bf71280f20b71db222f83d 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/RecordData.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/RecordData.java
@@ -24,6 +24,7 @@ public class RecordData extends AbstractHashMessage {
this.aggId = this.aggId + Const.ID_SPLIT + ids[i];
}
}
+ record = new JsonObject();
}
public String getId() {
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/tools/SpanPeersTools.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/tools/SpanPeersTools.java
index 55afd7219e4dcd92f8b28358f550397d87a43ac2..439684ad30500f3a4d099469f97ae1ccfbdc1ee6 100644
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/tools/SpanPeersTools.java
+++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/tools/SpanPeersTools.java
@@ -8,8 +8,10 @@ import com.a.eye.skywalking.trace.tag.Tags;
/**
* @author pengys5
*/
-public class SpanPeersTools {
- public static String getPeers(Span span) {
+public enum SpanPeersTools {
+ INSTANCE;
+
+ public String getPeers(Span span) {
if (StringUtil.isEmpty(Tags.PEERS.get(span))) {
String host = Tags.PEER_HOST.get(span);
int port = Tags.PEER_PORT.get(span);
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/tools/UrlTools.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/tools/UrlTools.java
deleted file mode 100644
index 42d6592690ba4918c6b17d09d8e16d77264507ab..0000000000000000000000000000000000000000
--- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/tools/UrlTools.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package com.a.eye.skywalking.collector.worker.tools;
-
-/**
- * @author pengys5
- */
-public class UrlTools {
-
- private static final String HttpUrlHead = "http://";
- private static final String HttpsUrlHead = "https://";
- private static final String MotanUrlHead = "motan://";
-
- public static String parse(String url, String component) {
- if ("Tomcat".equals(component)) {
- return parseTomcat(url);
- } else if ("Motan".equals(component)) {
- return parseMotan(url);
- }
- return null;
- }
-
- private static String parseTomcat(String url) {
- if (url.startsWith(HttpUrlHead)) {
- String suffix = url.substring(7, url.length());
- String[] urlSplit = suffix.split("/");
- return HttpUrlHead + urlSplit[0];
- } else if (url.startsWith(HttpsUrlHead)) {
- String suffix = url.substring(8, url.length());
- String[] urlSplit = suffix.split("/");
- return HttpsUrlHead + urlSplit[0];
- } else if (url.contains(":")) {
- return url.split("/")[0];
- } else {
- return url;
- }
- }
-
- private static String parseMotan(String url) {
- if (url.startsWith(MotanUrlHead)) {
- String suffix = url.substring(8, url.length());
- String[] urlSplit = suffix.split("/");
- return MotanUrlHead + urlSplit[0];
- } else {
- return url;
- }
- }
-}
diff --git a/skywalking-collector/skywalking-collector-worker/src/main/resources/META-INF/services/com.a.eye.skywalking.collector.worker.storage.AbstractIndex b/skywalking-collector/skywalking-collector-worker/src/main/resources/META-INF/services/com.a.eye.skywalking.collector.worker.storage.AbstractIndex
new file mode 100644
index 0000000000000000000000000000000000000000..9ceb1d2f123cb604a8715b4431e7c83321de974f
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-worker/src/main/resources/META-INF/services/com.a.eye.skywalking.collector.worker.storage.AbstractIndex
@@ -0,0 +1,11 @@
+com.a.eye.skywalking.collector.worker.segment.SegmentIndex
+com.a.eye.skywalking.collector.worker.segment.SegmentCostIndex
+com.a.eye.skywalking.collector.worker.segment.SegmentExceptionIndex
+
+com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex
+
+com.a.eye.skywalking.collector.worker.node.NodeCompIndex
+com.a.eye.skywalking.collector.worker.node.NodeMappingIndex
+
+com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex
+com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumIndex
\ No newline at end of file
diff --git a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/AnalysisMemberTestCase.java b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/AnalysisMemberTestCase.java
index 3e3583accad8ab5b1bea5eea69cc396b0a0d4180..3d13a4e104c951840e88cdb14197e27e37ae3c3e 100644
--- a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/AnalysisMemberTestCase.java
+++ b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/AnalysisMemberTestCase.java
@@ -1,18 +1,35 @@
package com.a.eye.skywalking.collector.worker;
+import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
+import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
+import org.junit.After;
+import org.junit.Assert;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
import static org.mockito.Mockito.*;
/**
* @author pengys5
*/
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(TestAnalysisMember.class)
+@PowerMockIgnore({"javax.management.*"})
public class AnalysisMemberTestCase {
@Test
public void testCommandOnWork() throws Exception {
- AnalysisMember member = mock(AnalysisMember.class);
+ ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(null);
+ LocalWorkerContext localWorkerContext = new LocalWorkerContext();
+ TestAnalysisMember member = PowerMockito.spy(new TestAnalysisMember(TestAnalysisMember.Role.INSTANCE, clusterWorkerContext, localWorkerContext));
EndOfBatchCommand command = new EndOfBatchCommand();
member.onWork(command);
@@ -22,11 +39,58 @@ public class AnalysisMemberTestCase {
@Test
public void testAnalyse() throws Exception {
- AnalysisMember member = mock(AnalysisMember.class);
+ ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(null);
+ LocalWorkerContext localWorkerContext = new LocalWorkerContext();
+ TestAnalysisMember member = PowerMockito.spy(new TestAnalysisMember(TestAnalysisMember.Role.INSTANCE, clusterWorkerContext, localWorkerContext));
Object message = new Object();
member.onWork(message);
verify(member, never()).aggregation();
verify(member, times(1)).analyse(anyObject());
}
+
+ @Test
+ public void testPreStart() throws Exception {
+ ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(null);
+ LocalWorkerContext localWorkerContext = new LocalWorkerContext();
+ TestAnalysisMember member = PowerMockito.spy(new TestAnalysisMember(TestAnalysisMember.Role.INSTANCE, clusterWorkerContext, localWorkerContext));
+ member.preStart();
+ }
+
+ @Test
+ public void testOnWorkException() throws Exception {
+ ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(null);
+ LocalWorkerContext localWorkerContext = new LocalWorkerContext();
+ TestAnalysisMember member = PowerMockito.spy(new TestAnalysisMember(TestAnalysisMember.Role.INSTANCE, clusterWorkerContext, localWorkerContext));
+
+ doThrow(new TestException()).when(member).analyse(anyObject());
+
+ ExceptionAnswer answer = new ExceptionAnswer();
+ PowerMockito.when(member, "saveException", any(TestException.class)).thenAnswer(answer);
+
+ member.onWork(new Object());
+
+ Assert.assertEquals(true, answer.isTestException);
+ }
+
+ class TestException extends Exception {
+
+ }
+
+
+ class ExceptionAnswer implements Answer {
+
+ boolean isTestException = false;
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ Object obj = invocation.getArguments()[0];
+ if (obj instanceof TestException) {
+ isTestException = true;
+ } else {
+ isTestException = false;
+ }
+ return null;
+ }
+ }
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/MergeAnalysisMemberTestCase.java b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/MergeAnalysisMemberTestCase.java
index bf667754fdf592097ff4ecec4b276008a2193b15..29a89eb2a3125027376ddf426be47396c1784bef 100644
--- a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/MergeAnalysisMemberTestCase.java
+++ b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/MergeAnalysisMemberTestCase.java
@@ -1,5 +1,7 @@
package com.a.eye.skywalking.collector.worker;
+import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
+import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.storage.MergePersistenceData;
import org.junit.Assert;
@@ -7,6 +9,8 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -16,38 +20,42 @@ import static org.powermock.api.mockito.PowerMockito.*;
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest(MergeAnalysisMember.class)
+@PrepareForTest(TestMergeAnalysisMember.class)
+@PowerMockIgnore({"javax.management.*"})
public class MergeAnalysisMemberTestCase {
- private MergeAnalysisMember member;
+ private TestMergeAnalysisMember mergeAnalysisMember;
private MergePersistenceData persistenceData;
@Before
public void init() throws Exception {
- member = mock(MergeAnalysisMember.class);
+ ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(null);
+ LocalWorkerContext localWorkerContext = new LocalWorkerContext();
+ mergeAnalysisMember = PowerMockito.spy(new TestMergeAnalysisMember(TestMergeAnalysisMember.Role.INSTANCE, clusterWorkerContext, localWorkerContext));
+
persistenceData = mock(MergePersistenceData.class);
MergeData mergeData = mock(MergeData.class);
- when(member, "getPersistenceData").thenReturn(persistenceData);
+ when(mergeAnalysisMember, "getPersistenceData").thenReturn(persistenceData);
when(persistenceData.getElseCreate(Mockito.anyString())).thenReturn(mergeData);
- doCallRealMethod().when(member).setMergeData(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
+ doCallRealMethod().when(mergeAnalysisMember).setMergeData(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
}
@Test
public void testSetMergeDataNotFull() throws Exception {
when(persistenceData.size()).thenReturn(WorkerConfig.Persistence.Data.size - 1);
- member.setMergeData("segment_1", "column", "value");
- Mockito.verify(member, Mockito.never()).aggregation();
+ mergeAnalysisMember.setMergeData("segment_1", "column", "value");
+ Mockito.verify(mergeAnalysisMember, Mockito.never()).aggregation();
}
@Test
public void testSetMergeDataFull() throws Exception {
when(persistenceData.size()).thenReturn(WorkerConfig.Persistence.Data.size);
- member.setMergeData("segment_1", "column", "value");
- Mockito.verify(member, Mockito.times(1)).aggregation();
+ mergeAnalysisMember.setMergeData("segment_1", "column", "value");
+ Mockito.verify(mergeAnalysisMember, Mockito.times(1)).aggregation();
}
@Test
@@ -55,9 +63,10 @@ public class MergeAnalysisMemberTestCase {
MergePersistenceData persistenceData = new MergePersistenceData();
persistenceData.getElseCreate("segment_1").setMergeData("column", "value");
- when(member, "getPersistenceData").thenReturn(persistenceData);
- doCallRealMethod().when(member).pushOne();
+ when(mergeAnalysisMember, "getPersistenceData").thenReturn(persistenceData);
+ doCallRealMethod().when(mergeAnalysisMember).pushOne();
- Assert.assertEquals("segment_1", member.pushOne().getId());
+ Assert.assertEquals("segment_1", mergeAnalysisMember.pushOne().getId());
+ Assert.assertEquals(null, mergeAnalysisMember.pushOne());
}
}
diff --git a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/MergePersistenceMemberTestCase.java b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/MergePersistenceMemberTestCase.java
new file mode 100644
index 0000000000000000000000000000000000000000..1795100e5357ddbbdf97e0eb308f4158c3c24dc2
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/MergePersistenceMemberTestCase.java
@@ -0,0 +1,59 @@
+package com.a.eye.skywalking.collector.worker;
+
+import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
+import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
+import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
+import com.a.eye.skywalking.collector.worker.mock.MockEsBulkClient;
+import com.a.eye.skywalking.collector.worker.storage.EsClient;
+import com.a.eye.skywalking.collector.worker.storage.MergeData;
+import com.a.eye.skywalking.collector.worker.storage.MergePersistenceData;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.powermock.api.mockito.PowerMockito.*;
+
+/**
+ * @author pengys5
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({TestMergePersistenceMember.class, EsClient.class})
+@PowerMockIgnore({"javax.management.*"})
+public class MergePersistenceMemberTestCase {
+
+ private TestMergePersistenceMember mergePersistenceMember;
+ private MergePersistenceData persistenceData;
+
+ @Before
+ public void init() throws Exception {
+ MockEsBulkClient mockEsBulkClient = new MockEsBulkClient();
+ mockEsBulkClient.createMock();
+
+ ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(null);
+ LocalWorkerContext localWorkerContext = new LocalWorkerContext();
+ mergePersistenceMember = PowerMockito.spy(new TestMergePersistenceMember(TestMergePersistenceMember.Role.INSTANCE, clusterWorkerContext, localWorkerContext));
+
+ persistenceData = mock(MergePersistenceData.class);
+ MergeData mergeData = mock(MergeData.class);
+
+ when(mergePersistenceMember, "getPersistenceData").thenReturn(persistenceData);
+ when(persistenceData.getElseCreate(Mockito.anyString())).thenReturn(mergeData);
+
+ doCallRealMethod().when(mergePersistenceMember).analyse(Mockito.any(MergeData.class));
+ }
+
+ @Test
+ public void testAnalyse() throws Exception {
+ String id = "2016" + Const.ID_SPLIT + "A" + Const.ID_SPLIT + "B";
+ MergeData mergeData = new MergeData(id);
+ mergeData.setMergeData("Column", "Value");
+
+// mergePersistenceMember.analyse(mergeData);
+// mergePersistenceMember.onWork(new EndOfBatchCommand());
+ }
+}
diff --git a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/StartUpTestCase.java b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/StartUpTestCase.java
index fabd68d2c8b1a5444158aacfd0986affccb4531b..0d66c9cd9944d3f2f4b1382c172087b972061cf8 100644
--- a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/StartUpTestCase.java
+++ b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/StartUpTestCase.java
@@ -29,7 +29,7 @@ public class StartUpTestCase {
withFallback(ConfigFactory.load("application.conf"));
ActorSystem system = ActorSystem.create("CollectorSystem", config);
- EsClient.boot();
+ EsClient.INSTANCE.boot();
TraceSegment dubboClientData = TraceSegmentBuilderFactory.INSTANCE.traceOf_Tomcat_DubboClient();
diff --git a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/TestAnalysisMember.java b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/TestAnalysisMember.java
new file mode 100644
index 0000000000000000000000000000000000000000..55ac94a6865662980d093625091739b0ec59bb37
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/TestAnalysisMember.java
@@ -0,0 +1,44 @@
+package com.a.eye.skywalking.collector.worker;
+
+import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
+import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
+import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
+import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
+
+/**
+ * @author pengys5
+ */
+public class TestAnalysisMember extends AnalysisMember {
+ TestAnalysisMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ super(role, clusterContext, selfContext);
+ }
+
+ @Override
+ public void analyse(Object message) throws Exception {
+
+ }
+
+ @Override
+ public void preStart() throws ProviderNotFoundException {
+ super.preStart();
+ }
+
+ @Override
+ protected void aggregation() throws Exception {
+
+ }
+
+ public enum Role implements com.a.eye.skywalking.collector.actor.Role {
+ INSTANCE;
+
+ @Override
+ public String roleName() {
+ return null;
+ }
+
+ @Override
+ public WorkerSelector workerSelector() {
+ return null;
+ }
+ }
+}
diff --git a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/TestMergeAnalysisMember.java b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/TestMergeAnalysisMember.java
new file mode 100644
index 0000000000000000000000000000000000000000..9f9bdf94dc27cd79f1b6ee0a57e0df42c6b90022
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/TestMergeAnalysisMember.java
@@ -0,0 +1,45 @@
+package com.a.eye.skywalking.collector.worker;
+
+import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
+import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
+import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
+import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
+
+/**
+ * @author pengys5
+ */
+public class TestMergeAnalysisMember extends MergeAnalysisMember {
+
+ TestMergeAnalysisMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ super(role, clusterContext, selfContext);
+ }
+
+ @Override
+ public void analyse(Object message) throws Exception {
+
+ }
+
+ @Override
+ public void preStart() throws ProviderNotFoundException {
+ super.preStart();
+ }
+
+ @Override
+ protected void aggregation() throws Exception {
+
+ }
+
+ public enum Role implements com.a.eye.skywalking.collector.actor.Role {
+ INSTANCE;
+
+ @Override
+ public String roleName() {
+ return null;
+ }
+
+ @Override
+ public WorkerSelector workerSelector() {
+ return null;
+ }
+ }
+}
diff --git a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/TestMergePersistenceMember.java b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/TestMergePersistenceMember.java
new file mode 100644
index 0000000000000000000000000000000000000000..0f81a9043e1973d74d169abb8fbb5937cd0178cd
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/TestMergePersistenceMember.java
@@ -0,0 +1,38 @@
+package com.a.eye.skywalking.collector.worker;
+
+import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
+import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
+import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
+
+/**
+ * @author pengys5
+ */
+public class TestMergePersistenceMember extends MergePersistenceMember {
+ TestMergePersistenceMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ super(role, clusterContext, selfContext);
+ }
+
+ @Override
+ public String esIndex() {
+ return null;
+ }
+
+ @Override
+ public String esType() {
+ return null;
+ }
+
+ public enum Role implements com.a.eye.skywalking.collector.actor.Role {
+ INSTANCE;
+
+ @Override
+ public String roleName() {
+ return null;
+ }
+
+ @Override
+ public WorkerSelector workerSelector() {
+ return null;
+ }
+ }
+}
diff --git a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/TimeSliceTestCase.java b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/TimeSliceTestCase.java
new file mode 100644
index 0000000000000000000000000000000000000000..289040b9249f6adcfd92bcdf5d0d4b070a1c7969
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/TimeSliceTestCase.java
@@ -0,0 +1,24 @@
+package com.a.eye.skywalking.collector.worker;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * @author pengys5
+ */
+public class TimeSliceTestCase {
+
+ @Test
+ public void test() {
+ TestTimeSlice timeSlice = new TestTimeSlice("A", 10L, 20L);
+ Assert.assertEquals("A", timeSlice.getSliceType());
+ Assert.assertEquals(10L, timeSlice.getStartTime());
+ Assert.assertEquals(20L, timeSlice.getEndTime());
+ }
+
+ class TestTimeSlice extends TimeSlice {
+ public TestTimeSlice(String sliceType, long startTime, long endTime) {
+ super(sliceType, startTime, endTime);
+ }
+ }
+}
diff --git a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/globaltrace/GlobalTraceIndexTestCase.java b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/globaltrace/GlobalTraceIndexTestCase.java
new file mode 100644
index 0000000000000000000000000000000000000000..ebd960295b2dbbacebfe4fa1bb5e7f8dede11018
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/globaltrace/GlobalTraceIndexTestCase.java
@@ -0,0 +1,25 @@
+package com.a.eye.skywalking.collector.worker.globaltrace;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * @author pengys5
+ */
+public class GlobalTraceIndexTestCase {
+
+ @Test
+ public void test() {
+ GlobalTraceIndex index = new GlobalTraceIndex();
+ Assert.assertEquals("global_trace_idx", index.index());
+ Assert.assertEquals(true, index.isRecord());
+ }
+
+ @Test
+ public void testBuilder() throws IOException {
+ GlobalTraceIndex index = new GlobalTraceIndex();
+ Assert.assertEquals("{\"properties\":{\"subSegIds\":{\"type\":\"text\",\"index\":\"not_analyzed\"}}}", index.createMappingBuilder().string());
+ }
+}
diff --git a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/globaltrace/GlobalTraceSearchTestCase.java b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/globaltrace/GlobalTraceSearchTestCase.java
index 57ead136feb7a1fb6a14e7faa0133b382f5732d7..84a25f8d822dbd521e88947b7fb5cc51daffb923 100644
--- a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/globaltrace/GlobalTraceSearchTestCase.java
+++ b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/globaltrace/GlobalTraceSearchTestCase.java
@@ -28,7 +28,7 @@ public class GlobalTraceSearchTestCase {
public void testSearchWithGlobalId() throws Exception {
Client client = mock(Client.class);
mockStatic(EsClient.class);
- when(EsClient.getClient()).thenReturn(client);
+// when(EsClient.INSTANCE.getClient()).thenReturn(client);
String globalTraceId = "Global.1";
String segment_1 = "SEGMENT.1";
diff --git a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceAggTestCase.java b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceAggTestCase.java
new file mode 100644
index 0000000000000000000000000000000000000000..b8f87262f9341963b665d69aa3cfa8138b3a35ec
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceAggTestCase.java
@@ -0,0 +1,85 @@
+package com.a.eye.skywalking.collector.worker.globaltrace.persistence;
+
+import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
+import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
+import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
+import com.a.eye.skywalking.collector.actor.WorkerRefs;
+import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
+import com.a.eye.skywalking.collector.worker.WorkerConfig;
+import com.a.eye.skywalking.collector.worker.mock.MergeDataAnswer;
+import com.a.eye.skywalking.collector.worker.storage.RecordData;
+import com.a.eye.skywalking.collector.worker.tools.MergeDataAggTools;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * @author pengys5
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({LocalWorkerContext.class})
+@PowerMockIgnore({"javax.management.*"})
+public class GlobalTraceAggTestCase {
+
+ private GlobalTraceAgg agg;
+ private MergeDataAnswer mergeDataAnswer;
+ private ClusterWorkerContext clusterWorkerContext;
+
+ @Before
+ public void init() throws Exception {
+ clusterWorkerContext = PowerMockito.mock(ClusterWorkerContext.class);
+
+ LocalWorkerContext localWorkerContext = PowerMockito.mock(LocalWorkerContext.class);
+ WorkerRefs workerRefs = mock(WorkerRefs.class);
+
+ mergeDataAnswer = new MergeDataAnswer();
+ doAnswer(mergeDataAnswer).when(workerRefs).tell(Mockito.any(RecordData.class));
+
+ when(localWorkerContext.lookup(GlobalTraceSave.Role.INSTANCE)).thenReturn(workerRefs);
+ agg = new GlobalTraceAgg(GlobalTraceAgg.Role.INSTANCE, clusterWorkerContext, localWorkerContext);
+ }
+
+ @Test
+ public void testRole() {
+ Assert.assertEquals(GlobalTraceAgg.class.getSimpleName(), GlobalTraceAgg.Role.INSTANCE.roleName());
+ Assert.assertEquals(HashCodeSelector.class.getSimpleName(), GlobalTraceAgg.Role.INSTANCE.workerSelector().getClass().getSimpleName());
+ }
+
+ @Test
+ public void testFactory() {
+ Assert.assertEquals(GlobalTraceAgg.class.getSimpleName(), GlobalTraceAgg.Factory.INSTANCE.role().roleName());
+ Assert.assertEquals(GlobalTraceAgg.class.getSimpleName(), GlobalTraceAgg.Factory.INSTANCE.workerInstance(null).getClass().getSimpleName());
+
+ int testSize = 10;
+ WorkerConfig.WorkerNum.GlobalTrace.GlobalTraceAgg.Value = testSize;
+ Assert.assertEquals(testSize, GlobalTraceAgg.Factory.INSTANCE.workerNum());
+ }
+
+ @Test
+ public void testPreStart() throws ProviderNotFoundException {
+ when(clusterWorkerContext.findProvider(GlobalTraceSave.Role.INSTANCE)).thenReturn(GlobalTraceSave.Factory.INSTANCE);
+
+ ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(GlobalTraceSave.Role.class);
+ agg.preStart();
+ verify(clusterWorkerContext).findProvider(argumentCaptor.capture());
+ }
+
+ @Test
+ public void testOnWork() throws Exception {
+ MergeDataAggTools.INSTANCE.testOnWork(agg, mergeDataAnswer);
+ }
+
+ @Test
+ public void testOnWorkError() throws Exception {
+ agg.onWork(new Object());
+ }
+}
diff --git a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceSaveTestCase.java b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceSaveTestCase.java
new file mode 100644
index 0000000000000000000000000000000000000000..2a56ae1fae939f2c048f658009183b57733d037d
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceSaveTestCase.java
@@ -0,0 +1,51 @@
+package com.a.eye.skywalking.collector.worker.globaltrace.persistence;
+
+import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
+import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
+import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
+import com.a.eye.skywalking.collector.worker.WorkerConfig;
+import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author pengys5
+ */
+public class GlobalTraceSaveTestCase {
+
+ private GlobalTraceSave save;
+
+ @Before
+ public void init() {
+ ClusterWorkerContext cluster = new ClusterWorkerContext(null);
+ LocalWorkerContext local = new LocalWorkerContext();
+ save = new GlobalTraceSave(GlobalTraceSave.Role.INSTANCE, cluster, local);
+ }
+
+ @Test
+ public void testEsIndex() {
+ Assert.assertEquals(GlobalTraceIndex.Index, save.esIndex());
+ }
+
+ @Test
+ public void testEsType() {
+ Assert.assertEquals(GlobalTraceIndex.Type_Record, save.esType());
+ }
+
+ @Test
+ public void testRole() {
+ Assert.assertEquals(GlobalTraceSave.class.getSimpleName(), GlobalTraceSave.Role.INSTANCE.roleName());
+ Assert.assertEquals(HashCodeSelector.class.getSimpleName(), GlobalTraceSave.Role.INSTANCE.workerSelector().getClass().getSimpleName());
+ }
+
+ @Test
+ public void testFactory() {
+ Assert.assertEquals(GlobalTraceSave.class.getSimpleName(), GlobalTraceSave.Factory.INSTANCE.role().roleName());
+ Assert.assertEquals(GlobalTraceSave.class.getSimpleName(), GlobalTraceSave.Factory.INSTANCE.workerInstance(null).getClass().getSimpleName());
+
+ int testSize = 10;
+ WorkerConfig.Queue.GlobalTrace.GlobalTraceSave.Size = testSize;
+ Assert.assertEquals(testSize, GlobalTraceSave.Factory.INSTANCE.queueSize());
+ }
+}
diff --git a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceSearchWithGlobalIdTestCase.java b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceSearchWithGlobalIdTestCase.java
new file mode 100644
index 0000000000000000000000000000000000000000..c597e3122cb052d2b4b966388ae204443bc60f4b
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceSearchWithGlobalIdTestCase.java
@@ -0,0 +1,72 @@
+package com.a.eye.skywalking.collector.worker.globaltrace.persistence;
+
+import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
+import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
+import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
+import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
+import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
+import com.a.eye.skywalking.collector.worker.storage.GetResponseFromEs;
+import com.google.gson.JsonObject;
+import org.elasticsearch.action.get.GetResponse;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * @author pengys5
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({GetResponseFromEs.class})
+@PowerMockIgnore({"javax.management.*"})
+public class GlobalTraceSearchWithGlobalIdTestCase {
+
+ private GetResponseFromEs getResponseFromEs;
+
+ private String global_Str = "{\"subSegIds\":\"Segment.1491277162066.18986177.70531.27.1\"}";
+ private String seg_str = "{\"ts\":\"Segment.1491277162066.18986177.70531.27.1\",\"st\":1491277162066,\"et\":1491277165743,\"ss\":[{\"si\":0,\"ps\":-1,\"st\":1491277162141,\"et\":1491277162144,\"on\":\"Jedis/getClient\",\"ts\":{\"span.layer\":\"db\",\"component\":\"Redis\",\"db.type\":\"Redis\",\"peer.host\":\"127.0.0.1\",\"span.kind\":\"client\"},\"tb\":{},\"ti\":{\"peer.port\":6379},\"lo\":[]}],\"ac\":\"cache-service\",\"gt\":[\"Trace.1491277147443.-1562443425.70539.65.2\"],\"sampled\":true,\"minute\":201704041139,\"hour\":201704041100,\"day\":201704040000,\"aggId\":null}";
+
+ @Before
+ public void init() {
+ getResponseFromEs = PowerMockito.mock(GetResponseFromEs.class);
+ Whitebox.setInternalState(GetResponseFromEs.class, "INSTANCE", getResponseFromEs);
+ }
+
+ @Test
+ public void testRole() {
+ Assert.assertEquals(GlobalTraceSearchWithGlobalId.class.getSimpleName(), GlobalTraceSearchWithGlobalId.WorkerRole.INSTANCE.roleName());
+ Assert.assertEquals(RollingSelector.class.getSimpleName(), GlobalTraceSearchWithGlobalId.WorkerRole.INSTANCE.workerSelector().getClass().getSimpleName());
+ }
+
+ @Test
+ public void testFactory() {
+ Assert.assertEquals(GlobalTraceSearchWithGlobalId.class.getSimpleName(), GlobalTraceSearchWithGlobalId.Factory.INSTANCE.role().roleName());
+ Assert.assertEquals(GlobalTraceSearchWithGlobalId.class.getSimpleName(), GlobalTraceSearchWithGlobalId.Factory.INSTANCE.workerInstance(null).getClass().getSimpleName());
+ }
+
+ @Test
+ public void testOnWork() throws Exception {
+ ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(null);
+ LocalWorkerContext localWorkerContext = new LocalWorkerContext();
+ GlobalTraceSearchWithGlobalId globalTraceSearchWithGlobalId = new GlobalTraceSearchWithGlobalId(GlobalTraceSearchWithGlobalId.WorkerRole.INSTANCE, clusterWorkerContext, localWorkerContext);
+
+ GetResponse getResponse = mock(GetResponse.class);
+ when(getResponseFromEs.get(GlobalTraceIndex.Index, GlobalTraceIndex.Type_Record, "Trace.1491277147443.-1562443425.70539.65.2")).thenReturn(getResponse);
+ when(getResponse.getSourceAsString()).thenReturn(global_Str);
+
+ GetResponse segResponse = mock(GetResponse.class);
+ when(getResponseFromEs.get(SegmentIndex.Index, SegmentIndex.Type_Record, "Segment.1491277162066.18986177.70531.27.1")).thenReturn(segResponse);
+ when(segResponse.getSourceAsString()).thenReturn(seg_str);
+
+ JsonObject response = new JsonObject();
+ globalTraceSearchWithGlobalId.onWork("Trace.1491277147443.-1562443425.70539.65.2", response);
+ }
+}
diff --git a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceSearchWithGlobalIdUseDB.java b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceSearchWithGlobalIdUseDB.java
new file mode 100644
index 0000000000000000000000000000000000000000..8db811aceaeeb64b552078064f6e34445346244d
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceSearchWithGlobalIdUseDB.java
@@ -0,0 +1,35 @@
+package com.a.eye.skywalking.collector.worker.globaltrace.persistence;
+
+import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
+import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
+import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
+import com.a.eye.skywalking.collector.worker.node.persistence.NodeMappingSearchWithTimeSlice;
+import com.a.eye.skywalking.collector.worker.storage.EsClient;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+
+/**
+ * @author pengys5
+ */
+public class GlobalTraceSearchWithGlobalIdUseDB {
+
+ public static void main(String[] args) throws Exception {
+ EsClient.INSTANCE.boot();
+
+ ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(null);
+ LocalWorkerContext localWorkerContext = new LocalWorkerContext();
+ GlobalTraceSearchWithGlobalId globalTraceSearchWithGlobalId =
+ new GlobalTraceSearchWithGlobalId(GlobalTraceSearchWithGlobalId.WorkerRole.INSTANCE, clusterWorkerContext, localWorkerContext);
+
+ JsonObject response = new JsonObject();
+ globalTraceSearchWithGlobalId.onWork("Trace.1491277147443.-1562443425.70539.65.2", response);
+
+ JsonArray nodeArray = response.get("result").getAsJsonArray();
+ System.out.println(nodeArray.size());
+ System.out.println(nodeArray.toString());
+ for (int i = 0; i < nodeArray.size(); i++) {
+ JsonObject nodeJsonObj = nodeArray.get(i).getAsJsonObject();
+ System.out.println(nodeJsonObj);
+ }
+ }
+}
diff --git a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/mock/MergeDataAnswer.java b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/mock/MergeDataAnswer.java
new file mode 100644
index 0000000000000000000000000000000000000000..20f72e6430483a4d971ba14463b04bab15884c08
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/mock/MergeDataAnswer.java
@@ -0,0 +1,27 @@
+package com.a.eye.skywalking.collector.worker.mock;
+
+import com.a.eye.skywalking.collector.worker.storage.MergeData;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author pengys5
+ */
+public class MergeDataAnswer implements Answer