From 406339798849e96b593b8262d8544b194d78c1df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BD=AD=E5=8B=87=E5=8D=87=20pengys?= <8082209@qq.com> Date: Wed, 17 Oct 2018 22:44:21 +0800 Subject: [PATCH] Trigger service heartbeat when received service instance heartbeat. (#1785) Fixed some meta query bugs. --- .../core/query/TopologyQueryService.java | 38 +++++++++++++++++-- .../oap/server/core/query/entity/Service.java | 2 +- .../query/graphql/resolver/TopologyQuery.java | 8 +++- .../grpc/InstanceDiscoveryServiceHandler.java | 16 +++++++- .../query/MetadataQueryEsDAO.java | 13 ++++--- 5 files changed, 65 insertions(+), 12 deletions(-) diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java index 766fa7d798..077ba8b681 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java @@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogServ import org.apache.skywalking.oap.server.core.query.entity.*; import org.apache.skywalking.oap.server.core.source.*; import org.apache.skywalking.oap.server.core.storage.StorageModule; -import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO; +import org.apache.skywalking.oap.server.core.storage.query.*; import org.apache.skywalking.oap.server.library.module.*; import org.apache.skywalking.oap.server.library.module.Service; import org.slf4j.*; @@ -40,6 +40,7 @@ public class TopologyQueryService implements Service { private final ModuleManager moduleManager; private ITopologyQueryDAO topologyQueryDAO; + private IMetadataQueryDAO metadataQueryDAO; private EndpointInventoryCache endpointInventoryCache; private IComponentLibraryCatalogService componentLibraryCatalogService; @@ -47,6 +48,13 @@ public class TopologyQueryService implements Service { this.moduleManager = moduleManager; } + private IMetadataQueryDAO getMetadataQueryDAO() { + if (metadataQueryDAO == null) { + metadataQueryDAO = moduleManager.find(StorageModule.NAME).getService(IMetadataQueryDAO.class); + } + return metadataQueryDAO; + } + private ITopologyQueryDAO getTopologyQueryDAO() { if (topologyQueryDAO == null) { topologyQueryDAO = moduleManager.find(StorageModule.NAME).getService(ITopologyQueryDAO.class); @@ -68,7 +76,8 @@ public class TopologyQueryService implements Service { return endpointInventoryCache; } - public Topology getGlobalTopology(final Step step, final long startTB, final long endTB) throws IOException { + public Topology getGlobalTopology(final Step step, final long startTB, final long endTB, final long startTimestamp, + final long endTimestamp) throws IOException { logger.debug("step: {}, startTimeBucket: {}, endTimeBucket: {}", step, startTB, endTB); List serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB); List serviceMappings = getTopologyQueryDAO().loadServiceMappings(step, startTB, endTB); @@ -76,8 +85,31 @@ public class TopologyQueryService implements Service { List serviceRelationClientCalls = getTopologyQueryDAO().loadClientSideServiceRelations(step, startTB, endTB); List serviceRelationServerCalls = getTopologyQueryDAO().loadServerSideServiceRelations(step, startTB, endTB); + List serviceList = getMetadataQueryDAO().searchServices(startTimestamp, endTimestamp, null); + TopologyBuilder builder = new TopologyBuilder(moduleManager); - return builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls); + Topology topology = builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls); + + serviceList.forEach(service -> { + boolean contains = false; + for (Node node : topology.getNodes()) { + if (service.getId() == node.getId()) { + contains = true; + break; + } + } + + if (!contains) { + Node newNode = new Node(); + newNode.setId(service.getId()); + newNode.setName(service.getName()); + newNode.setReal(true); + newNode.setType(Const.UNKNOWN); + topology.getNodes().add(newNode); + } + }); + + return topology; } public Topology getServiceTopology(final Step step, final long startTB, final long endTB, diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Service.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Service.java index 3f7a97c5e6..76ed34e802 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Service.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Service.java @@ -26,6 +26,6 @@ import lombok.*; @Getter @Setter public class Service { - private String id; + private int id; private String name; } diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java index ef5a7bc7b9..2073b7719f 100644 --- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java +++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java @@ -20,6 +20,7 @@ package org.apache.skywalking.oap.query.graphql.resolver; import com.coxautodev.graphql.tools.GraphQLQueryResolver; import java.io.IOException; +import java.text.ParseException; import org.apache.skywalking.oap.query.graphql.type.Duration; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.query.*; @@ -45,11 +46,14 @@ public class TopologyQuery implements GraphQLQueryResolver { return queryService; } - public Topology getGlobalTopology(final Duration duration) throws IOException { + public Topology getGlobalTopology(final Duration duration) throws IOException, ParseException { long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart()); long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd()); - return getQueryService().getGlobalTopology(duration.getStep(), startTimeBucket, endTimeBucket); + long startTimestamp = DurationUtils.INSTANCE.toTimestamp(duration.getStep(), duration.getStart()); + long endTimestamp = DurationUtils.INSTANCE.toTimestamp(duration.getStep(), duration.getEnd()); + + return getQueryService().getGlobalTopology(duration.getStep(), startTimeBucket, endTimeBucket, startTimestamp, endTimestamp); } public Topology getServiceTopology(final int serviceId, final Duration duration) throws IOException { diff --git a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java index 4b12f8567e..7e5364d35c 100644 --- a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java +++ b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java @@ -19,10 +19,12 @@ package org.apache.skywalking.oap.server.receiver.register.provider.handler.v5.grpc; import io.grpc.stub.StreamObserver; +import java.util.Objects; import org.apache.skywalking.apm.network.language.agent.*; import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache; import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory; -import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler; import org.slf4j.*; @@ -34,9 +36,13 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp private static final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServiceHandler.class); + private final ServiceInstanceInventoryCache serviceInstanceInventoryCache; + private final IServiceInventoryRegister serviceInventoryRegister; private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister; public InstanceDiscoveryServiceHandler(ModuleManager moduleManager) { + this.serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME).getService(ServiceInstanceInventoryCache.class); + this.serviceInventoryRegister = moduleManager.find(CoreModule.NAME).getService(IServiceInventoryRegister.class); this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).getService(IServiceInstanceInventoryRegister.class); } @@ -62,6 +68,14 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp int serviceInstanceId = request.getApplicationInstanceId(); long heartBeatTime = request.getHeartbeatTime(); serviceInstanceInventoryRegister.heartbeat(serviceInstanceId, heartBeatTime); + + ServiceInstanceInventory serviceInstanceInventory = serviceInstanceInventoryCache.get(serviceInstanceId); + if (Objects.nonNull(serviceInstanceInventory)) { + serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime); + } else { + logger.warn("Can't found service instance by service instance id from cache, service instance id is: {}", serviceInstanceId); + } + responseObserver.onNext(Downstream.getDefaultInstance()); responseObserver.onCompleted(); } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java index 76e7b31665..d0058e2dcc 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java @@ -111,9 +111,12 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO { BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); boolQueryBuilder.must().add(timeRangeQueryBuild(startTimestamp, endTimestamp)); + boolQueryBuilder.must().add(QueryBuilders.termQuery(ServiceInventory.IS_ADDRESS, BooleanUtils.FALSE)); - String matchCName = MatchCNameBuilder.INSTANCE.build(ServiceInventory.NAME); - boolQueryBuilder.must().add(QueryBuilders.matchQuery(matchCName, keyword)); + if (StringUtils.isNotEmpty(keyword)) { + String matchCName = MatchCNameBuilder.INSTANCE.build(ServiceInventory.NAME); + boolQueryBuilder.must().add(QueryBuilders.matchQuery(matchCName, keyword)); + } sourceBuilder.query(boolQueryBuilder); sourceBuilder.size(100); @@ -127,7 +130,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO { GetResponse response = getClient().get(ServiceInventory.MODEL_NAME, ServiceInventory.buildId(serviceCode)); if (response.isExists()) { Service service = new Service(); - service.setId(String.valueOf(response.getSource().get(ServiceInventory.SEQUENCE))); + service.setId(((Number)response.getSource().get(ServiceInventory.SEQUENCE)).intValue()); service.setName((String)response.getSource().get(ServiceInventory.NAME)); return service; } else { @@ -142,8 +145,8 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO { BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); boolQueryBuilder.must().add(QueryBuilders.termQuery(EndpointInventory.SERVICE_ID, serviceId)); - String matchCName = MatchCNameBuilder.INSTANCE.build(EndpointInventory.NAME); if (StringUtils.isNotEmpty(keyword)) { + String matchCName = MatchCNameBuilder.INSTANCE.build(EndpointInventory.NAME); boolQueryBuilder.must().add(QueryBuilders.matchQuery(matchCName, keyword)); } @@ -208,7 +211,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO { Map sourceAsMap = searchHit.getSourceAsMap(); Service service = new Service(); - service.setId(String.valueOf(sourceAsMap.get(ServiceInventory.SEQUENCE))); + service.setId(((Number)sourceAsMap.get(ServiceInventory.SEQUENCE)).intValue()); service.setName((String)sourceAsMap.get(ServiceInventory.NAME)); services.add(service); } -- GitLab