提交 40633979 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Trigger service heartbeat when received service instance heartbeat. (#1785)

Fixed some meta query bugs.
上级 064595eb
......@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogServ
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.*;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.module.Service;
import org.slf4j.*;
......@@ -40,6 +40,7 @@ public class TopologyQueryService implements Service {
private final ModuleManager moduleManager;
private ITopologyQueryDAO topologyQueryDAO;
private IMetadataQueryDAO metadataQueryDAO;
private EndpointInventoryCache endpointInventoryCache;
private IComponentLibraryCatalogService componentLibraryCatalogService;
......@@ -47,6 +48,13 @@ public class TopologyQueryService implements Service {
this.moduleManager = moduleManager;
}
private IMetadataQueryDAO getMetadataQueryDAO() {
if (metadataQueryDAO == null) {
metadataQueryDAO = moduleManager.find(StorageModule.NAME).getService(IMetadataQueryDAO.class);
}
return metadataQueryDAO;
}
private ITopologyQueryDAO getTopologyQueryDAO() {
if (topologyQueryDAO == null) {
topologyQueryDAO = moduleManager.find(StorageModule.NAME).getService(ITopologyQueryDAO.class);
......@@ -68,7 +76,8 @@ public class TopologyQueryService implements Service {
return endpointInventoryCache;
}
public Topology getGlobalTopology(final Step step, final long startTB, final long endTB) throws IOException {
public Topology getGlobalTopology(final Step step, final long startTB, final long endTB, final long startTimestamp,
final long endTimestamp) throws IOException {
logger.debug("step: {}, startTimeBucket: {}, endTimeBucket: {}", step, startTB, endTB);
List<ServiceComponent> serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB);
List<ServiceMapping> serviceMappings = getTopologyQueryDAO().loadServiceMappings(step, startTB, endTB);
......@@ -76,8 +85,31 @@ public class TopologyQueryService implements Service {
List<Call> serviceRelationClientCalls = getTopologyQueryDAO().loadClientSideServiceRelations(step, startTB, endTB);
List<Call> serviceRelationServerCalls = getTopologyQueryDAO().loadServerSideServiceRelations(step, startTB, endTB);
List<org.apache.skywalking.oap.server.core.query.entity.Service> serviceList = getMetadataQueryDAO().searchServices(startTimestamp, endTimestamp, null);
TopologyBuilder builder = new TopologyBuilder(moduleManager);
return builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls);
Topology topology = builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls);
serviceList.forEach(service -> {
boolean contains = false;
for (Node node : topology.getNodes()) {
if (service.getId() == node.getId()) {
contains = true;
break;
}
}
if (!contains) {
Node newNode = new Node();
newNode.setId(service.getId());
newNode.setName(service.getName());
newNode.setReal(true);
newNode.setType(Const.UNKNOWN);
topology.getNodes().add(newNode);
}
});
return topology;
}
public Topology getServiceTopology(final Step step, final long startTB, final long endTB,
......
......@@ -26,6 +26,6 @@ import lombok.*;
@Getter
@Setter
public class Service {
private String id;
private int id;
private String name;
}
......@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.query.graphql.resolver;
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
import java.io.IOException;
import java.text.ParseException;
import org.apache.skywalking.oap.query.graphql.type.Duration;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.query.*;
......@@ -45,11 +46,14 @@ public class TopologyQuery implements GraphQLQueryResolver {
return queryService;
}
public Topology getGlobalTopology(final Duration duration) throws IOException {
public Topology getGlobalTopology(final Duration duration) throws IOException, ParseException {
long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
return getQueryService().getGlobalTopology(duration.getStep(), startTimeBucket, endTimeBucket);
long startTimestamp = DurationUtils.INSTANCE.toTimestamp(duration.getStep(), duration.getStart());
long endTimestamp = DurationUtils.INSTANCE.toTimestamp(duration.getStep(), duration.getEnd());
return getQueryService().getGlobalTopology(duration.getStep(), startTimeBucket, endTimeBucket, startTimestamp, endTimestamp);
}
public Topology getServiceTopology(final int serviceId, final Duration duration) throws IOException {
......
......@@ -19,10 +19,12 @@
package org.apache.skywalking.oap.server.receiver.register.provider.handler.v5.grpc;
import io.grpc.stub.StreamObserver;
import java.util.Objects;
import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.slf4j.*;
......@@ -34,9 +36,13 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
private static final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServiceHandler.class);
private final ServiceInstanceInventoryCache serviceInstanceInventoryCache;
private final IServiceInventoryRegister serviceInventoryRegister;
private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister;
public InstanceDiscoveryServiceHandler(ModuleManager moduleManager) {
this.serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME).getService(ServiceInstanceInventoryCache.class);
this.serviceInventoryRegister = moduleManager.find(CoreModule.NAME).getService(IServiceInventoryRegister.class);
this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).getService(IServiceInstanceInventoryRegister.class);
}
......@@ -62,6 +68,14 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
int serviceInstanceId = request.getApplicationInstanceId();
long heartBeatTime = request.getHeartbeatTime();
serviceInstanceInventoryRegister.heartbeat(serviceInstanceId, heartBeatTime);
ServiceInstanceInventory serviceInstanceInventory = serviceInstanceInventoryCache.get(serviceInstanceId);
if (Objects.nonNull(serviceInstanceInventory)) {
serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime);
} else {
logger.warn("Can't found service instance by service instance id from cache, service instance id is: {}", serviceInstanceId);
}
responseObserver.onNext(Downstream.getDefaultInstance());
responseObserver.onCompleted();
}
......
......@@ -111,9 +111,12 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must().add(timeRangeQueryBuild(startTimestamp, endTimestamp));
boolQueryBuilder.must().add(QueryBuilders.termQuery(ServiceInventory.IS_ADDRESS, BooleanUtils.FALSE));
String matchCName = MatchCNameBuilder.INSTANCE.build(ServiceInventory.NAME);
boolQueryBuilder.must().add(QueryBuilders.matchQuery(matchCName, keyword));
if (StringUtils.isNotEmpty(keyword)) {
String matchCName = MatchCNameBuilder.INSTANCE.build(ServiceInventory.NAME);
boolQueryBuilder.must().add(QueryBuilders.matchQuery(matchCName, keyword));
}
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(100);
......@@ -127,7 +130,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
GetResponse response = getClient().get(ServiceInventory.MODEL_NAME, ServiceInventory.buildId(serviceCode));
if (response.isExists()) {
Service service = new Service();
service.setId(String.valueOf(response.getSource().get(ServiceInventory.SEQUENCE)));
service.setId(((Number)response.getSource().get(ServiceInventory.SEQUENCE)).intValue());
service.setName((String)response.getSource().get(ServiceInventory.NAME));
return service;
} else {
......@@ -142,8 +145,8 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must().add(QueryBuilders.termQuery(EndpointInventory.SERVICE_ID, serviceId));
String matchCName = MatchCNameBuilder.INSTANCE.build(EndpointInventory.NAME);
if (StringUtils.isNotEmpty(keyword)) {
String matchCName = MatchCNameBuilder.INSTANCE.build(EndpointInventory.NAME);
boolQueryBuilder.must().add(QueryBuilders.matchQuery(matchCName, keyword));
}
......@@ -208,7 +211,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
Map<String, Object> sourceAsMap = searchHit.getSourceAsMap();
Service service = new Service();
service.setId(String.valueOf(sourceAsMap.get(ServiceInventory.SEQUENCE)));
service.setId(((Number)sourceAsMap.get(ServiceInventory.SEQUENCE)).intValue());
service.setName((String)sourceAsMap.get(ServiceInventory.NAME));
services.add(service);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册