提交 a062fdae 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Fixed the bug from topology. (#1736)

* Fixed the bug from topology.

* no message
上级 3f975d2b
......@@ -37,15 +37,21 @@ public class EndpointInventoryCache implements Service {
private static final Logger logger = LoggerFactory.getLogger(EndpointInventoryCache.class);
private final ModuleManager moduleManager;
private final EndpointInventory userEndpoint;
private final Cache<String, Integer> endpointNameCache = CacheBuilder.newBuilder().initialCapacity(5000).maximumSize(100000).build();
private final Cache<Integer, EndpointInventory> endpointIdCache = CacheBuilder.newBuilder().initialCapacity(5000).maximumSize(100000).build();
private IEndpointInventoryCacheDAO cacheDAO;
public EndpointInventoryCache(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private final Cache<String, Integer> endpointNameCache = CacheBuilder.newBuilder().initialCapacity(5000).maximumSize(100000).build();
private final Cache<Integer, EndpointInventory> endpointIdCache = CacheBuilder.newBuilder().initialCapacity(5000).maximumSize(100000).build();
this.userEndpoint = new EndpointInventory();
this.userEndpoint.setSequence(Const.USER_ENDPOINT_ID);
this.userEndpoint.setName(Const.USER_CODE);
this.userEndpoint.setServiceId(Const.USER_SERVICE_ID);
}
private IEndpointInventoryCacheDAO getCacheDAO() {
if (isNull(cacheDAO)) {
......@@ -69,6 +75,10 @@ public class EndpointInventoryCache implements Service {
}
public EndpointInventory get(int endpointId) {
if (Const.USER_ENDPOINT_ID == endpointId) {
return userEndpoint;
}
EndpointInventory endpointInventory = endpointIdCache.getIfPresent(endpointId);
if (isNull(endpointInventory)) {
......
......@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.slf4j.*;
import static java.util.Objects.isNull;
......@@ -36,6 +37,7 @@ public class ServiceInstanceInventoryCache implements Service {
private static final Logger logger = LoggerFactory.getLogger(ServiceInstanceInventoryCache.class);
private final ServiceInstanceInventory userServiceInstance;
private final Cache<Integer, ServiceInstanceInventory> serviceInstanceIdCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(5000).build();
private final Cache<String, Integer> serviceInstanceNameCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(5000).build();
......@@ -47,6 +49,12 @@ public class ServiceInstanceInventoryCache implements Service {
public ServiceInstanceInventoryCache(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.userServiceInstance = new ServiceInstanceInventory();
this.userServiceInstance.setSequence(Const.USER_INSTANCE_ID);
this.userServiceInstance.setName(Const.USER_CODE);
this.userServiceInstance.setServiceId(Const.USER_SERVICE_ID);
this.userServiceInstance.setIsAddress(BooleanUtils.FALSE);
}
private IServiceInstanceInventoryCacheDAO getCacheDAO() {
......@@ -57,6 +65,10 @@ public class ServiceInstanceInventoryCache implements Service {
}
public ServiceInstanceInventory get(int serviceInstanceId) {
if (Const.USER_INSTANCE_ID == serviceInstanceId) {
return userServiceInstance;
}
ServiceInstanceInventory serviceInstanceInventory = serviceInstanceIdCache.getIfPresent(serviceInstanceId);
if (Objects.isNull(serviceInstanceInventory)) {
......
......@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.register.ServiceInventory;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.slf4j.*;
import static java.util.Objects.*;
......@@ -36,6 +37,7 @@ public class ServiceInventoryCache implements Service {
private static final Logger logger = LoggerFactory.getLogger(ServiceInventoryCache.class);
private final ServiceInventory userService;
private final Cache<String, Integer> serviceNameCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(1000).build();
private final Cache<String, Integer> addressIdCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(1000).build();
private final Cache<Integer, ServiceInventory> serviceIdCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(1000).build();
......@@ -45,6 +47,11 @@ public class ServiceInventoryCache implements Service {
public ServiceInventoryCache(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.userService = new ServiceInventory();
this.userService.setSequence(Const.USER_SERVICE_ID);
this.userService.setName(Const.USER_CODE);
this.userService.setIsAddress(BooleanUtils.FALSE);
}
private IServiceInventoryCacheDAO getCacheDAO() {
......@@ -79,6 +86,14 @@ public class ServiceInventoryCache implements Service {
}
public ServiceInventory get(int serviceId) {
if (logger.isDebugEnabled()) {
logger.debug("Get service by id {} from cache", serviceId);
}
if (Const.USER_SERVICE_ID == serviceId) {
return userService;
}
ServiceInventory serviceInventory = serviceIdCache.getIfPresent(serviceId);
if (isNull(serviceInventory)) {
......
......@@ -78,6 +78,11 @@ class TopologyBuilder {
serviceNode.setId(source.getSequence());
serviceNode.setName(source.getName());
serviceNode.setType(nodeCompMap.getOrDefault(source.getSequence(), Const.UNKNOWN));
if (BooleanUtils.valueToBoolean(source.getIsAddress())) {
serviceNode.setReal(false);
} else {
serviceNode.setReal(true);
}
nodes.add(serviceNode);
}
......@@ -88,6 +93,7 @@ class TopologyBuilder {
call.setTarget(actualTargetId);
call.setCallType(nodeCompMap.get(clientCall.getTarget()));
call.setId(clientCall.getId());
call.setDetectPoint(DetectPoint.CLIENT);
calls.add(call);
});
......@@ -101,6 +107,7 @@ class TopologyBuilder {
visualUserNode.setId(source.getSequence());
visualUserNode.setName(Const.USER_CODE);
visualUserNode.setType(Const.USER_CODE.toUpperCase());
visualUserNode.setReal(false);
nodes.add(visualUserNode);
nodeIds.add(source.getSequence());
}
......@@ -112,6 +119,7 @@ class TopologyBuilder {
conjecturalNode.setId(source.getSequence());
conjecturalNode.setName(source.getName());
conjecturalNode.setType(conjecturalNodeCompMap.getOrDefault(target.getSequence(), Const.UNKNOWN));
conjecturalNode.setReal(true);
nodeIds.add(source.getSequence());
nodes.add(conjecturalNode);
}
......@@ -121,6 +129,7 @@ class TopologyBuilder {
call.setSource(source.getSequence());
call.setTarget(target.getSequence());
call.setId(serverCall.getId());
call.setDetectPoint(DetectPoint.SERVER);
if (source.getSequence() == Const.USER_SERVICE_ID) {
call.setCallType(Const.EMPTY_STRING);
......
......@@ -98,7 +98,7 @@ public class TopologyQueryService implements Service {
List<Call> serviceRelationServerCalls = getTopologyQueryDAO().loadSpecifiedServerSideServiceRelations(step, startTB, endTB, serviceIdList);
TopologyBuilder builder = new TopologyBuilder(moduleManager);
Topology topology = builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls);
Topology topology = builder.build(serviceComponents, new ArrayList<>(), serviceRelationClientCalls, serviceRelationServerCalls);
Set<Integer> nodeIds = new HashSet<>();
topology.getCalls().forEach(call -> {
......
......@@ -38,7 +38,7 @@ public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> {
private final Map<RegisterSource, RegisterSource> sources;
private int messageNum;
public RegisterDistinctWorker(int workerId, AbstractWorker<RegisterSource> nextWorker) {
RegisterDistinctWorker(int workerId, AbstractWorker<RegisterSource> nextWorker) {
super(workerId);
this.nextWorker = nextWorker;
this.sources = new HashMap<>();
......
......@@ -39,7 +39,7 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
private final IRegisterLockDAO registerLockDAO;
private final IRegisterDAO registerDAO;
public RegisterPersistentWorker(int workerId, String modelName, ModuleManager moduleManager,
RegisterPersistentWorker(int workerId, String modelName, ModuleManager moduleManager,
IRegisterDAO registerDAO, Scope scope) {
super(workerId);
this.modelName = modelName;
......
......@@ -40,8 +40,8 @@ class ConsumerMock {
private ByteString createSegment(long startTimestamp, UniqueId.Builder segmentId, boolean isPrepare) {
TraceSegmentObject.Builder segment = TraceSegmentObject.newBuilder();
segment.setTraceSegmentId(segmentId);
segment.setApplicationId(1);
segment.setApplicationInstanceId(1);
segment.setApplicationId(2);
segment.setApplicationInstanceId(2);
segment.addSpans(createEntrySpan(startTimestamp, isPrepare));
segment.addSpans(createLocalSpan(startTimestamp, isPrepare));
segment.addSpans(createMqEntrySpan(startTimestamp, isPrepare));
......@@ -64,7 +64,7 @@ class ConsumerMock {
if (isPrepare) {
span.setOperationName("/dubbox-case/case/dubbox-rest");
} else {
span.setOperationNameId(1);
span.setOperationNameId(2);
}
span.setIsError(false);
return span;
......@@ -80,7 +80,7 @@ class ConsumerMock {
if (isPrepare) {
span.setOperationName("org.apache.skywalking.Local.do");
} else {
span.setOperationNameId(2);
span.setOperationNameId(3);
}
span.setIsError(false);
return span;
......@@ -98,7 +98,7 @@ class ConsumerMock {
if (isPrepare) {
span.setOperationName("org.apache.skywalking.RocketMQ");
} else {
span.setOperationNameId(3);
span.setOperationNameId(4);
}
span.setIsError(false);
return span;
......@@ -117,8 +117,8 @@ class ConsumerMock {
span.setPeer("172.25.0.4:20880");
span.setOperationName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
} else {
span.setOperationNameId(4);
span.setPeerId(2);
span.setOperationNameId(5);
span.setPeerId(3);
}
span.setIsError(false);
return span;
......@@ -136,7 +136,7 @@ class ConsumerMock {
if (isPrepare) {
span.setOperationName("org.apache.skywalking.RocketMQ");
} else {
span.setOperationNameId(3);
span.setOperationNameId(4);
}
span.setIsError(false);
return span;
......@@ -155,8 +155,8 @@ class ConsumerMock {
span.setPeer("172.25.0.4:20880");
span.setOperationName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
} else {
span.setOperationNameId(4);
span.setPeerId(2);
span.setOperationNameId(5);
span.setPeerId(3);
}
span.setIsError(false);
return span;
......
......@@ -41,8 +41,8 @@ class ProviderMock {
UniqueId.Builder parentTraceSegmentId, boolean isPrepare) {
TraceSegmentObject.Builder segment = TraceSegmentObject.newBuilder();
segment.setTraceSegmentId(segmentId);
segment.setApplicationId(2);
segment.setApplicationInstanceId(2);
segment.setApplicationId(3);
segment.setApplicationInstanceId(3);
segment.addSpans(createExitSpan(startTimestamp, isPrepare));
segment.addSpans(createEntrySpan(startTimestamp, parentTraceSegmentId, isPrepare));
......@@ -52,9 +52,9 @@ class ProviderMock {
private TraceSegmentReference.Builder createReference(UniqueId.Builder parentTraceSegmentId, boolean isPrepare) {
TraceSegmentReference.Builder reference = TraceSegmentReference.newBuilder();
reference.setParentTraceSegmentId(parentTraceSegmentId);
reference.setParentApplicationInstanceId(1);
reference.setParentApplicationInstanceId(2);
reference.setParentSpanId(1);
reference.setEntryApplicationInstanceId(1);
reference.setEntryApplicationInstanceId(2);
reference.setRefType(RefType.CrossProcess);
if (isPrepare) {
......@@ -62,9 +62,9 @@ class ProviderMock {
reference.setNetworkAddress("172.25.0.4:20880");
reference.setEntryServiceName("/dubbox-case/case/dubbox-rest");
} else {
reference.setParentServiceId(1);
reference.setNetworkAddressId(2);
reference.setEntryServiceId(1);
reference.setParentServiceId(2);
reference.setNetworkAddressId(3);
reference.setEntryServiceId(2);
}
return reference;
}
......@@ -84,8 +84,8 @@ class ProviderMock {
span.setOperationName("mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]");
span.setPeer("localhost:27017");
} else {
span.setOperationNameId(5);
span.setPeerId(1);
span.setOperationNameId(6);
span.setPeerId(2);
}
return span;
}
......@@ -105,7 +105,7 @@ class ProviderMock {
if (isPrepare) {
span.setOperationName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
} else {
span.setOperationNameId(6);
span.setOperationNameId(7);
}
return span;
}
......
......@@ -19,7 +19,6 @@
package org.apache.skywalking.oap.server.receiver.trace.mock;
import io.grpc.ManagedChannel;
import java.util.UUID;
import java.util.concurrent.*;
import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
......@@ -59,7 +58,7 @@ class RegisterMock {
ApplicationInstance.Builder instance = ApplicationInstance.newBuilder();
instance.setApplicationId(applicationMapping.getApplication().getValue());
instance.setAgentUUID(UUID.randomUUID().toString());
instance.setAgentUUID("pengys");
instance.setRegisterTime(new DateTime("2017-01-01T00:01:01.001").getMillis());
OSInfo.Builder osInfo = OSInfo.newBuilder();
......@@ -104,7 +103,7 @@ class RegisterMock {
ApplicationInstance.Builder instance = ApplicationInstance.newBuilder();
instance.setApplicationId(applicationMapping.getApplication().getValue());
instance.setAgentUUID(UUID.randomUUID().toString());
instance.setAgentUUID("peng-yongsheng");
instance.setRegisterTime(new DateTime("2017-01-01T00:01:01.001").getMillis());
OSInfo.Builder osInfo = OSInfo.newBuilder();
......
......@@ -20,7 +20,7 @@
<Configuration status="INFO">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
<PatternLayout charset="UTF-8" pattern="%d - %c - %L [%t] %-5p %x - %m%n"/>
</Console>
</Appenders>
<Loggers>
......
......@@ -91,7 +91,7 @@ public class RegisterEsDAO extends EsDAO implements IRegisterDAO {
int id = (int)agg.getValue();
if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
return 0;
return 1;
} else {
return id;
}
......
......@@ -79,17 +79,17 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
private void setQueryCondition(SearchSourceBuilder sourceBuilder, long startTB, long endTB,
List<Integer> serviceIds) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
boolQuery.must().add(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).gte(startTB).lte(endTB));
BoolQueryBuilder serviceIdBoolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(serviceIdBoolQuery);
if (serviceIds.size() == 1) {
boolQuery.should().add(QueryBuilders.termQuery(ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, serviceIds.get(0)));
boolQuery.should().add(QueryBuilders.termQuery(ServiceRelationServerSideIndicator.DEST_SERVICE_ID, serviceIds.get(0)));
serviceIdBoolQuery.should().add(QueryBuilders.termQuery(ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, serviceIds.get(0)));
serviceIdBoolQuery.should().add(QueryBuilders.termQuery(ServiceRelationServerSideIndicator.DEST_SERVICE_ID, serviceIds.get(0)));
} else {
boolQuery.should().add(QueryBuilders.termsQuery(ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, serviceIds));
boolQuery.should().add(QueryBuilders.termsQuery(ServiceRelationServerSideIndicator.DEST_SERVICE_ID, serviceIds));
serviceIdBoolQuery.should().add(QueryBuilders.termsQuery(ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, serviceIds));
serviceIdBoolQuery.should().add(QueryBuilders.termsQuery(ServiceRelationServerSideIndicator.DEST_SERVICE_ID, serviceIds));
}
sourceBuilder.query(boolQuery);
}
......@@ -97,7 +97,7 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
@Override public List<Call> loadServerSideServiceRelations(Step step, long startTB, long endTB) throws IOException {
String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationServerSideIndicator.INDEX_NAME);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).gte(startTB).lte(endTB));
sourceBuilder.size(0);
return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID, Source.Service);
......@@ -106,7 +106,7 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
@Override public List<Call> loadClientSideServiceRelations(Step step, long startTB, long endTB) throws IOException {
String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationClientSideIndicator.INDEX_NAME);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).gte(startTB).lte(endTB));
sourceBuilder.size(0);
return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID, Source.Service);
......@@ -115,7 +115,7 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
@Override public List<ServiceMapping> loadServiceMappings(Step step, long startTB, long endTB) throws IOException {
String indexName = TimePyramidTableNameBuilder.build(step, ServiceMappingIndicator.INDEX_NAME);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.rangeQuery(ServiceMappingIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
sourceBuilder.query(QueryBuilders.rangeQuery(ServiceMappingIndicator.TIME_BUCKET).gte(startTB).lte(endTB));
sourceBuilder.size(0);
TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(ServiceMappingIndicator.SERVICE_ID).field(ServiceMappingIndicator.SERVICE_ID).size(1000);
......@@ -142,7 +142,7 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
public List<ServiceComponent> loadServiceComponents(Step step, long startTB, long endTB) throws IOException {
String indexName = TimePyramidTableNameBuilder.build(step, ServiceComponentIndicator.INDEX_NAME);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.rangeQuery(ServiceComponentIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
sourceBuilder.query(QueryBuilders.rangeQuery(ServiceComponentIndicator.TIME_BUCKET).gte(startTB).lte(endTB));
sourceBuilder.size(0);
TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(ServiceComponentIndicator.SERVICE_ID).field(ServiceComponentIndicator.SERVICE_ID).size(1000);
......
......@@ -90,7 +90,7 @@ public class TraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
sourceBuilder.sort(SegmentRecord.START_TIME, SortOrder.DESC);
break;
case BY_DURATION:
sourceBuilder.sort(SegmentRecord.START_TIME, SortOrder.DESC);
sourceBuilder.sort(SegmentRecord.LATENCY, SortOrder.DESC);
break;
}
sourceBuilder.size(limit);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册