提交 6c41373b 编写于 作者: P pengys5

fixed #164

1. add global trace id column into segment cost index.
2. just query from segment cost index when query top segment record with condition, e.g. operation name, global trace id, cost
package org.skywalking.apm.collector.worker.segment;
import java.io.IOException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.skywalking.apm.collector.worker.config.EsConfig;
import org.skywalking.apm.collector.worker.storage.AbstractIndex;
import java.io.IOException;
/**
* @author pengys5
*/
......@@ -15,7 +14,8 @@ public class SegmentCostIndex extends AbstractIndex {
public static final String INDEX = "segment_cost_idx";
public static final String SEG_ID = "segId";
public static final String START_TIME = "startTime";
public static final String END_TIME = "END_TIME";
public static final String END_TIME = "endTime";
public static final String GLOBAL_TRACE_ID = "globalTraceId";
public static final String OPERATION_NAME = "operationName";
public static final String COST = "cost";
......@@ -50,9 +50,12 @@ public class SegmentCostIndex extends AbstractIndex {
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject(OPERATION_NAME)
.startObject(GLOBAL_TRACE_ID)
.field("type", "keyword")
.endObject()
.startObject(OPERATION_NAME)
.field("type", "text")
.endObject()
.startObject(COST)
.field("type", "long")
.field("index", "not_analyzed")
......
......@@ -16,23 +16,23 @@ import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.httpserver.AbstractGet;
import org.skywalking.apm.collector.worker.httpserver.AbstractGetProvider;
import org.skywalking.apm.collector.worker.httpserver.ArgumentsParseException;
import org.skywalking.apm.collector.worker.segment.persistence.SegmentTopSearchWithTimeSlice;
import org.skywalking.apm.collector.worker.segment.persistence.SegmentTopSearch;
import org.skywalking.apm.collector.worker.tools.ParameterTools;
/**
* @author pengys5
*/
public class SegmentTopGetWithTimeSlice extends AbstractGet {
public class SegmentTopGet extends AbstractGet {
private Logger logger = LogManager.getFormatterLogger(SegmentTopGetWithTimeSlice.class);
private Logger logger = LogManager.getFormatterLogger(SegmentTopGet.class);
SegmentTopGetWithTimeSlice(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
SegmentTopGet(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(SegmentTopSearchWithTimeSlice.WorkerRole.INSTANCE).create(this);
getClusterContext().findProvider(SegmentTopSearch.WorkerRole.INSTANCE).create(this);
}
@Override protected void onReceive(Map<String, String[]> parameter,
......@@ -83,27 +83,37 @@ public class SegmentTopGetWithTimeSlice extends AbstractGet {
maxCost = Integer.valueOf(ParameterTools.INSTANCE.toString(parameter, "maxCost"));
}
SegmentTopSearchWithTimeSlice.RequestEntity requestEntity;
requestEntity = new SegmentTopSearchWithTimeSlice.RequestEntity(from, limit, startTime, endTime);
String globalTraceId = null;
if (parameter.containsKey("globalTraceId")) {
globalTraceId = ParameterTools.INSTANCE.toString(parameter, "globalTraceId");
}
String operationName = null;
if (parameter.containsKey("operationName")) {
operationName = ParameterTools.INSTANCE.toString(parameter, "operationName");
}
SegmentTopSearch.RequestEntity requestEntity;
requestEntity = new SegmentTopSearch.RequestEntity(from, limit, startTime, endTime, globalTraceId, operationName);
requestEntity.setMinCost(minCost);
requestEntity.setMaxCost(maxCost);
getSelfContext().lookup(SegmentTopSearchWithTimeSlice.WorkerRole.INSTANCE).ask(requestEntity, response);
getSelfContext().lookup(SegmentTopSearch.WorkerRole.INSTANCE).ask(requestEntity, response);
}
public static class Factory extends AbstractGetProvider<SegmentTopGetWithTimeSlice> {
public static class Factory extends AbstractGetProvider<SegmentTopGet> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public SegmentTopGetWithTimeSlice workerInstance(ClusterWorkerContext clusterContext) {
return new SegmentTopGetWithTimeSlice(role(), clusterContext, new LocalWorkerContext());
public SegmentTopGet workerInstance(ClusterWorkerContext clusterContext) {
return new SegmentTopGet(role(), clusterContext, new LocalWorkerContext());
}
@Override
public String servletPath() {
return "/segments/top/timeSlice";
return "/segments/top";
}
}
......@@ -112,7 +122,7 @@ public class SegmentTopGetWithTimeSlice extends AbstractGet {
@Override
public String roleName() {
return SegmentTopGetWithTimeSlice.class.getSimpleName();
return SegmentTopGet.class.getSimpleName();
}
@Override
......
package org.skywalking.apm.collector.worker.segment;
import com.google.gson.JsonObject;
import java.util.Arrays;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.httpserver.AbstractGet;
import org.skywalking.apm.collector.worker.httpserver.AbstractGetProvider;
import org.skywalking.apm.collector.worker.httpserver.ArgumentsParseException;
import org.skywalking.apm.collector.worker.segment.persistence.SegmentTopSearchWithGlobalTraceId;
import org.skywalking.apm.collector.worker.tools.ParameterTools;
/**
* @author pengys5
*/
public class SegmentTopGetWithGlobalTraceId extends AbstractGet {
private Logger logger = LogManager.getFormatterLogger(SegmentTopGetWithGlobalTraceId.class);
SegmentTopGetWithGlobalTraceId(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(SegmentTopSearchWithGlobalTraceId.WorkerRole.INSTANCE).create(this);
}
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
if (!parameter.containsKey("globalTraceId") || !parameter.containsKey("from") || !parameter.containsKey("limit")) {
throw new ArgumentsParseException("the request parameter must contains globalTraceId, from, limit");
}
if (logger.isDebugEnabled()) {
logger.debug("globalTraceId: %s, from: %s, limit: %s", Arrays.toString(parameter.get("globalTraceId")),
Arrays.toString(parameter.get("from")), Arrays.toString(parameter.get("limit")));
}
int from;
try {
from = Integer.valueOf(ParameterTools.INSTANCE.toString(parameter, "from"));
} catch (NumberFormatException e) {
throw new ArgumentsParseException("the request parameter from must be an integer");
}
int limit;
try {
limit = Integer.valueOf(ParameterTools.INSTANCE.toString(parameter, "limit"));
} catch (NumberFormatException e) {
throw new ArgumentsParseException("the request parameter limit must be an integer");
}
String globalTraceId = ParameterTools.INSTANCE.toString(parameter, "globalTraceId");
SegmentTopSearchWithGlobalTraceId.RequestEntity requestEntity = new SegmentTopSearchWithGlobalTraceId.RequestEntity(globalTraceId, from, limit);
getSelfContext().lookup(SegmentTopSearchWithGlobalTraceId.WorkerRole.INSTANCE).ask(requestEntity, response);
}
public static class Factory extends AbstractGetProvider<SegmentTopGetWithGlobalTraceId> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public SegmentTopGetWithGlobalTraceId workerInstance(ClusterWorkerContext clusterContext) {
return new SegmentTopGetWithGlobalTraceId(role(), clusterContext, new LocalWorkerContext());
}
@Override
public String servletPath() {
return "/segments/top/globalTraceId";
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return SegmentTopGetWithGlobalTraceId.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
......@@ -49,6 +49,9 @@ public class SegmentCostAnalysis extends RecordAnalysisMember {
dataJsonObj.addProperty(SegmentCostIndex.SEG_ID, segment.getTraceSegmentId());
dataJsonObj.addProperty(SegmentCostIndex.START_TIME, span.getStartTime());
dataJsonObj.addProperty(SegmentCostIndex.END_TIME, span.getEndTime());
if (segment.getRelatedGlobalTraces().get() != null && segment.getRelatedGlobalTraces().get().size() > 0) {
dataJsonObj.addProperty(SegmentCostIndex.GLOBAL_TRACE_ID, segment.getRelatedGlobalTraces().get().get(0));
}
dataJsonObj.addProperty(SegmentCostIndex.OPERATION_NAME, span.getOperationName());
dataJsonObj.addProperty(SegmentCostIndex.TIME_SLICE, segmentWithTimeSlice.getMinute());
......
......@@ -8,6 +8,7 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit;
......@@ -28,13 +29,14 @@ import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.entity.SegmentDeserialize;
import org.skywalking.apm.collector.worker.storage.EsClient;
import org.skywalking.apm.collector.worker.tools.CollectionTools;
import org.skywalking.apm.util.StringUtil;
/**
* @author pengys5
*/
public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
public class SegmentTopSearch extends AbstractLocalSyncWorker {
private SegmentTopSearchWithTimeSlice(Role role, ClusterWorkerContext clusterContext,
private SegmentTopSearch(Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -54,7 +56,9 @@ public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
searchRequestBuilder.setQuery(boolQueryBuilder);
boolQueryBuilder.must().add(QueryBuilders.rangeQuery(SegmentCostIndex.TIME_SLICE).gte(search.startTime).lte(search.endTime));
List<QueryBuilder> mustQueryList = boolQueryBuilder.must();
mustQueryList.add(QueryBuilders.rangeQuery(SegmentCostIndex.TIME_SLICE).gte(search.startTime).lte(search.endTime));
if (search.minCost != -1 || search.maxCost != -1) {
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(SegmentCostIndex.COST);
if (search.minCost != -1) {
......@@ -65,6 +69,12 @@ public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
}
boolQueryBuilder.must().add(rangeQueryBuilder);
}
if (!StringUtil.isEmpty(search.operationName)) {
mustQueryList.add(QueryBuilders.matchQuery(SegmentCostIndex.OPERATION_NAME, search.operationName));
}
if (!StringUtil.isEmpty(search.globalTraceId)) {
mustQueryList.add(QueryBuilders.matchQuery(SegmentCostIndex.GLOBAL_TRACE_ID, search.globalTraceId));
}
searchRequestBuilder.addSort(SegmentCostIndex.COST, SortOrder.DESC);
searchRequestBuilder.setSize(search.limit);
......@@ -137,12 +147,17 @@ public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
private long endTime;
private int minCost;
private int maxCost;
private String globalTraceId;
private String operationName;
public RequestEntity(int from, int limit, long startTime, long endTime) {
public RequestEntity(int from, int limit, long startTime, long endTime, String globalTraceId,
String operationName) {
this.from = from;
this.limit = limit;
this.startTime = startTime;
this.endTime = endTime;
this.globalTraceId = globalTraceId;
this.operationName = operationName;
}
public void setMinCost(int minCost) {
......@@ -178,15 +193,15 @@ public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
}
}
public static class Factory extends AbstractLocalSyncWorkerProvider<SegmentTopSearchWithTimeSlice> {
public static class Factory extends AbstractLocalSyncWorkerProvider<SegmentTopSearch> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public SegmentTopSearchWithTimeSlice workerInstance(ClusterWorkerContext clusterContext) {
return new SegmentTopSearchWithTimeSlice(role(), clusterContext, new LocalWorkerContext());
public SegmentTopSearch workerInstance(ClusterWorkerContext clusterContext) {
return new SegmentTopSearch(role(), clusterContext, new LocalWorkerContext());
}
}
......@@ -195,7 +210,7 @@ public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
@Override
public String roleName() {
return SegmentTopSearchWithTimeSlice.class.getSimpleName();
return SegmentTopSearch.class.getSimpleName();
}
@Override
......
package org.skywalking.apm.collector.worker.segment.persistence;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.util.List;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.globaltrace.GlobalTraceIndex;
import org.skywalking.apm.collector.worker.segment.SegmentCostIndex;
import org.skywalking.apm.collector.worker.segment.SegmentExceptionIndex;
import org.skywalking.apm.collector.worker.segment.SegmentIndex;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.entity.SegmentDeserialize;
import org.skywalking.apm.collector.worker.storage.EsClient;
import org.skywalking.apm.collector.worker.storage.JoinAndSplitData;
import org.skywalking.apm.collector.worker.tools.CollectionTools;
/**
* @author pengys5
*/
public class SegmentTopSearchWithGlobalTraceId extends AbstractLocalSyncWorker {
private Gson gson = new Gson();
private SegmentTopSearchWithGlobalTraceId(Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(SegmentExceptionWithSegId.WorkerRole.INSTANCE).create(this);
}
@Override
protected void onWork(Object request, Object response) throws WorkerException {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity)request;
Client client = EsClient.INSTANCE.getClient();
String globalTraceData = client.prepareGet(GlobalTraceIndex.INDEX, GlobalTraceIndex.TYPE_RECORD, search.globalTraceId).get().getSourceAsString();
JsonObject globalTraceObj = gson.fromJson(globalTraceData, JsonObject.class);
JsonObject topSegPaging = new JsonObject();
topSegPaging.addProperty("recordsTotal", 0);
JsonArray topSegArray = new JsonArray();
topSegPaging.add("data", topSegArray);
if (globalTraceObj != null && globalTraceObj.has(GlobalTraceIndex.SUB_SEG_IDS)) {
String subSegIdsStr = globalTraceObj.get(GlobalTraceIndex.SUB_SEG_IDS).getAsString();
String[] subSegIds = subSegIdsStr.split(JoinAndSplitData.SPLIT);
topSegPaging.addProperty("recordsTotal", subSegIds.length);
int num = search.from;
int limit = search.limit;
if (search.limit >= subSegIds.length) {
limit = subSegIds.length;
}
for (int i = num; i < limit; i++) {
GetResponse getResponse = client.prepareGet(SegmentCostIndex.INDEX, SegmentCostIndex.TYPE_RECORD, subSegIds[num]).get();
JsonObject topSegmentJson = new JsonObject();
topSegmentJson.addProperty("num", num);
String segId = (String)getResponse.getSource().get(SegmentCostIndex.SEG_ID);
topSegmentJson.addProperty(SegmentCostIndex.SEG_ID, segId);
topSegmentJson.addProperty(SegmentCostIndex.START_TIME, (Number)getResponse.getSource().get(SegmentCostIndex.START_TIME));
if (getResponse.getSource().containsKey(SegmentCostIndex.END_TIME)) {
topSegmentJson.addProperty(SegmentCostIndex.END_TIME, (Number)getResponse.getSource().get(SegmentCostIndex.END_TIME));
}
topSegmentJson.addProperty(SegmentCostIndex.OPERATION_NAME, (String)getResponse.getSource().get(SegmentCostIndex.OPERATION_NAME));
topSegmentJson.addProperty(SegmentCostIndex.COST, (Number)getResponse.getSource().get(SegmentCostIndex.COST));
String segmentSource = client.prepareGet(SegmentIndex.INDEX, SegmentIndex.TYPE_RECORD, segId).get().getSourceAsString();
Segment segment = null;
try {
segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentSource);
} catch (IOException e) {
throw new WorkerException(e.getMessage(), e);
}
List<String> distributedTraceIdList = segment.getRelatedGlobalTraces().get();
JsonArray distributedTraceIdArray = new JsonArray();
if (CollectionTools.isNotEmpty(distributedTraceIdList)) {
for (String distributedTraceId : distributedTraceIdList) {
distributedTraceIdArray.add(distributedTraceId);
}
}
topSegmentJson.add("traceIds", distributedTraceIdArray);
boolean isError = false;
JsonObject resJsonObj = new JsonObject();
getSelfContext().lookup(SegmentExceptionWithSegId.WorkerRole.INSTANCE).ask(new SegmentExceptionWithSegId.RequestEntity(segId), resJsonObj);
if (resJsonObj.has("result")) {
JsonObject segExJson = resJsonObj.get("result").getAsJsonObject();
if (segExJson.has(SegmentExceptionIndex.IS_ERROR)) {
isError = segExJson.get(SegmentExceptionIndex.IS_ERROR).getAsBoolean();
}
}
topSegmentJson.addProperty(SegmentExceptionIndex.IS_ERROR, isError);
num++;
topSegArray.add(topSegmentJson);
}
}
JsonObject resJsonObj = (JsonObject)response;
resJsonObj.add("result", topSegPaging);
}
}
public static class RequestEntity {
private int from;
private int limit;
private String globalTraceId;
public RequestEntity(String globalTraceId, int from, int limit) {
this.from = from;
this.limit = limit;
this.globalTraceId = globalTraceId;
}
public int getFrom() {
return from;
}
public int getLimit() {
return limit;
}
public String getGlobalTraceId() {
return globalTraceId;
}
}
public static class Factory extends AbstractLocalSyncWorkerProvider<SegmentTopSearchWithGlobalTraceId> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public SegmentTopSearchWithGlobalTraceId workerInstance(ClusterWorkerContext clusterContext) {
return new SegmentTopSearchWithGlobalTraceId(role(), clusterContext, new LocalWorkerContext());
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return SegmentTopSearchWithGlobalTraceId.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
......@@ -6,8 +6,7 @@ org.skywalking.apm.collector.worker.segment.persistence.SegmentSave$Factory
org.skywalking.apm.collector.worker.segment.persistence.SegmentCostSave$Factory
org.skywalking.apm.collector.worker.segment.persistence.SegmentExceptionSave$Factory
org.skywalking.apm.collector.worker.segment.persistence.SegmentExceptionWithSegId$Factory
org.skywalking.apm.collector.worker.segment.persistence.SegmentTopSearchWithTimeSlice$Factory
org.skywalking.apm.collector.worker.segment.persistence.SegmentTopSearchWithGlobalTraceId$Factory
org.skywalking.apm.collector.worker.segment.persistence.SegmentTopSearch$Factory
org.skywalking.apm.collector.worker.noderef.analysis.NodeRefDayAnalysis$Factory
org.skywalking.apm.collector.worker.noderef.analysis.NodeRefHourAnalysis$Factory
......
org.skywalking.apm.collector.worker.noderef.NodeRefResSumGetGroupWithTimeSlice$Factory
org.skywalking.apm.collector.worker.segment.SegmentTopGetWithTimeSlice$Factory
org.skywalking.apm.collector.worker.segment.SegmentTopGet$Factory
org.skywalking.apm.collector.worker.globaltrace.GlobalTraceGetWithGlobalId$Factory
org.skywalking.apm.collector.worker.segment.SegmentTopGetWithGlobalTraceId$Factory
org.skywalking.apm.collector.worker.span.SpanGetWithId$Factory
org.skywalking.apm.collector.worker.tracedag.TraceDagGetWithTimeSlice$Factory
\ No newline at end of file
......@@ -20,7 +20,7 @@ import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.worker.httpserver.ArgumentsParseException;
import org.skywalking.apm.collector.worker.segment.persistence.SegmentExceptionWithSegId;
import org.skywalking.apm.collector.worker.segment.persistence.SegmentTopSearchWithTimeSlice;
import org.skywalking.apm.collector.worker.segment.persistence.SegmentTopSearch;
import java.util.HashMap;
import java.util.Map;
......@@ -34,9 +34,9 @@ import static org.mockito.Mockito.*;
@RunWith(PowerMockRunner.class)
@PrepareForTest( {ClusterWorkerContext.class})
@PowerMockIgnore( {"javax.management.*"})
public class SegmentTopGetWithTimeSliceTestCase {
public class SegmentTopGetTestCase {
private SegmentTopGetWithTimeSlice getObj;
private SegmentTopGet getObj;
private SegmentTopGetAnswerGet answer;
private ClusterWorkerContext clusterWorkerContext;
......@@ -51,23 +51,23 @@ public class SegmentTopGetWithTimeSliceTestCase {
WorkerRefs workerRefs = mock(WorkerRefs.class);
answer = new SegmentTopGetAnswerGet();
doAnswer(answer).when(workerRefs).ask(Mockito.any(SegmentTopSearchWithTimeSlice.RequestEntity.class), Mockito.any(JsonObject.class));
doAnswer(answer).when(workerRefs).ask(Mockito.any(SegmentTopSearch.RequestEntity.class), Mockito.any(JsonObject.class));
when(localWorkerContext.lookup(SegmentTopSearchWithTimeSlice.WorkerRole.INSTANCE)).thenReturn(workerRefs);
getObj = new SegmentTopGetWithTimeSlice(SegmentTopGetWithTimeSlice.WorkerRole.INSTANCE, clusterWorkerContext, localWorkerContext);
when(localWorkerContext.lookup(SegmentTopSearch.WorkerRole.INSTANCE)).thenReturn(workerRefs);
getObj = new SegmentTopGet(SegmentTopGet.WorkerRole.INSTANCE, clusterWorkerContext, localWorkerContext);
}
@Test
public void testRole() {
Assert.assertEquals(SegmentTopGetWithTimeSlice.class.getSimpleName(), SegmentTopGetWithTimeSlice.WorkerRole.INSTANCE.roleName());
Assert.assertEquals(RollingSelector.class.getSimpleName(), SegmentTopGetWithTimeSlice.WorkerRole.INSTANCE.workerSelector().getClass().getSimpleName());
Assert.assertEquals(SegmentTopGet.class.getSimpleName(), SegmentTopGet.WorkerRole.INSTANCE.roleName());
Assert.assertEquals(RollingSelector.class.getSimpleName(), SegmentTopGet.WorkerRole.INSTANCE.workerSelector().getClass().getSimpleName());
}
@Test
public void testFactory() {
SegmentTopGetWithTimeSlice.Factory factory = new SegmentTopGetWithTimeSlice.Factory();
Assert.assertEquals(SegmentTopGetWithTimeSlice.class.getSimpleName(), factory.role().roleName());
Assert.assertEquals(SegmentTopGetWithTimeSlice.class.getSimpleName(), factory.workerInstance(null).getClass().getSimpleName());
SegmentTopGet.Factory factory = new SegmentTopGet.Factory();
Assert.assertEquals(SegmentTopGet.class.getSimpleName(), factory.role().roleName());
Assert.assertEquals(SegmentTopGet.class.getSimpleName(), factory.workerInstance(null).getClass().getSimpleName());
Assert.assertEquals("/segments/top", factory.servletPath());
}
......@@ -77,12 +77,12 @@ public class SegmentTopGetWithTimeSliceTestCase {
SegmentExceptionWithSegId.Factory factory = new SegmentExceptionWithSegId.Factory();
when(exceptionContext.findProvider(SegmentExceptionWithSegId.WorkerRole.INSTANCE)).thenReturn(factory);
SegmentTopSearchWithTimeSlice.Factory factory1 = new SegmentTopSearchWithTimeSlice.Factory();
SegmentTopSearch.Factory factory1 = new SegmentTopSearch.Factory();
factory1.setClusterContext(exceptionContext);
when(clusterWorkerContext.findProvider(SegmentTopSearchWithTimeSlice.WorkerRole.INSTANCE)).thenReturn(factory1);
when(clusterWorkerContext.findProvider(SegmentTopSearch.WorkerRole.INSTANCE)).thenReturn(factory1);
ArgumentCaptor<SegmentTopSearchWithTimeSlice.WorkerRole> argumentCaptor = ArgumentCaptor.forClass(SegmentTopSearchWithTimeSlice.WorkerRole.class);
ArgumentCaptor<SegmentTopSearch.WorkerRole> argumentCaptor = ArgumentCaptor.forClass(SegmentTopSearch.WorkerRole.class);
getObj.preStart();
verify(clusterWorkerContext).findProvider(argumentCaptor.capture());
}
......@@ -155,7 +155,7 @@ public class SegmentTopGetWithTimeSliceTestCase {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
SegmentTopSearchWithTimeSlice.RequestEntity requestEntity = (SegmentTopSearchWithTimeSlice.RequestEntity) invocation.getArguments()[0];
SegmentTopSearch.RequestEntity requestEntity = (SegmentTopSearch.RequestEntity) invocation.getArguments()[0];
Assert.assertEquals(10, requestEntity.getStartTime());
Assert.assertEquals(20, requestEntity.getEndTime());
Assert.assertEquals(30, requestEntity.getFrom());
......
package org.skywalking.apm.collector.worker.segment;
import com.google.gson.JsonObject;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.worker.httpserver.ArgumentsParseException;
import org.skywalking.apm.collector.worker.segment.persistence.SegmentExceptionWithSegId;
import org.skywalking.apm.collector.worker.segment.persistence.SegmentTopSearchWithGlobalTraceId;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
import static org.mockito.Mockito.*;
/**
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest( {ClusterWorkerContext.class})
@PowerMockIgnore( {"javax.management.*"})
public class SegmentTopGetWithGlobalTraceIdTestCase {
private SegmentTopGetWithGlobalTraceId getObj;
private SegmentTopGetAnswerGet answer;
private ClusterWorkerContext clusterWorkerContext;
@Before
public void init() throws Exception {
System.setProperty("user.timezone", "UTC");
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
clusterWorkerContext = PowerMockito.mock(ClusterWorkerContext.class);
LocalWorkerContext localWorkerContext = PowerMockito.mock(LocalWorkerContext.class);
WorkerRefs workerRefs = mock(WorkerRefs.class);
answer = new SegmentTopGetAnswerGet();
doAnswer(answer).when(workerRefs).ask(Mockito.any(SegmentTopSearchWithGlobalTraceId.RequestEntity.class), Mockito.any(JsonObject.class));
when(localWorkerContext.lookup(SegmentTopSearchWithGlobalTraceId.WorkerRole.INSTANCE)).thenReturn(workerRefs);
getObj = new SegmentTopGetWithGlobalTraceId(SegmentTopGetWithGlobalTraceId.WorkerRole.INSTANCE, clusterWorkerContext, localWorkerContext);
}
@Test
public void testRole() {
Assert.assertEquals(SegmentTopGetWithGlobalTraceId.class.getSimpleName(), SegmentTopGetWithGlobalTraceId.WorkerRole.INSTANCE.roleName());
Assert.assertEquals(RollingSelector.class.getSimpleName(), SegmentTopGetWithGlobalTraceId.WorkerRole.INSTANCE.workerSelector().getClass().getSimpleName());
}
@Test
public void testFactory() {
SegmentTopGetWithGlobalTraceId.Factory factory = new SegmentTopGetWithGlobalTraceId.Factory();
Assert.assertEquals(SegmentTopGetWithGlobalTraceId.class.getSimpleName(), factory.role().roleName());
Assert.assertEquals(SegmentTopGetWithGlobalTraceId.class.getSimpleName(), factory.workerInstance(null).getClass().getSimpleName());
Assert.assertEquals("/segments/top/globalTraceId", factory.servletPath());
}
@Test
public void testPreStart() throws ProviderNotFoundException {
ClusterWorkerContext exceptionContext = PowerMockito.mock(ClusterWorkerContext.class);
SegmentExceptionWithSegId.Factory factory = new SegmentExceptionWithSegId.Factory();
when(exceptionContext.findProvider(SegmentExceptionWithSegId.WorkerRole.INSTANCE)).thenReturn(factory);
SegmentTopSearchWithGlobalTraceId.Factory factory1 = new SegmentTopSearchWithGlobalTraceId.Factory();
factory1.setClusterContext(exceptionContext);
when(clusterWorkerContext.findProvider(SegmentTopSearchWithGlobalTraceId.WorkerRole.INSTANCE)).thenReturn(factory1);
ArgumentCaptor<SegmentTopSearchWithGlobalTraceId.WorkerRole> argumentCaptor = ArgumentCaptor.forClass(SegmentTopSearchWithGlobalTraceId.WorkerRole.class);
getObj.preStart();
verify(clusterWorkerContext).findProvider(argumentCaptor.capture());
}
@Test
public void testOnSearch() throws Exception {
Map<String, String[]> request = new HashMap<>();
String[] globalTraceId = {"TestId"};
request.put("globalTraceId", globalTraceId);
String[] from = {"20"};
request.put("from", from);
String[] limit = {"50"};
request.put("limit", limit);
JsonObject response = new JsonObject();
getObj.onReceive(request, response);
}
@Test(expected = ArgumentsParseException.class)
public void testOnSearchError() throws Exception {
Map<String, String[]> request = new HashMap<>();
JsonObject response = new JsonObject();
getObj.onReceive(request, response);
}
@Test(expected = ArgumentsParseException.class)
public void testOnSearchErrorFrom() throws Exception {
Map<String, String[]> request = new HashMap<>();
String[] globalTraceId = {"TestId"};
request.put("globalTraceId", globalTraceId);
String[] from = {"x"};
request.put("from", from);
String[] limit = {"50"};
request.put("limit", limit);
JsonObject response = new JsonObject();
getObj.onReceive(request, response);
}
@Test(expected = ArgumentsParseException.class)
public void testOnSearchErrorLimit() throws Exception {
Map<String, String[]> request = new HashMap<>();
String[] globalTraceId = {"TestId"};
request.put("globalTraceId", globalTraceId);
String[] from = {"20"};
request.put("from", from);
String[] limit = {"x"};
request.put("limit", limit);
JsonObject response = new JsonObject();
getObj.onReceive(request, response);
}
class SegmentTopGetAnswerGet implements Answer {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
SegmentTopSearchWithGlobalTraceId.RequestEntity requestEntity = (SegmentTopSearchWithGlobalTraceId.RequestEntity) invocation.getArguments()[0];
Assert.assertEquals("TestId", requestEntity.getGlobalTraceId());
Assert.assertEquals(20, requestEntity.getFrom());
Assert.assertEquals(50, requestEntity.getLimit());
return null;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册