diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationDispatcher.java index aad015f5902b5898ed54b8e05cdd0e1c5622d7c8..c1e3cb42670fa8e37116e6bd2b646c35083251fa 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationDispatcher.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationDispatcher.java @@ -28,11 +28,26 @@ import org.apache.skywalking.oap.server.core.source.ServiceRelation; public class ServiceCallRelationDispatcher implements SourceDispatcher { @Override public void dispatch(ServiceRelation source) { - doDispatch(source); + switch (source.getDetectPoint()) { + case SERVER: + serverSide(source); + break; + case CLIENT: + clientSide(source); + break; + } } - public void doDispatch(ServiceRelation source) { - ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator(); + private void serverSide(ServiceRelation source) { + ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator(); + indicator.setTimeBucket(source.getTimeBucket()); + indicator.setSourceServiceId(source.getSourceServiceId()); + indicator.setDestServiceId(source.getDestServiceId()); + IndicatorProcess.INSTANCE.in(indicator); + } + + private void clientSide(ServiceRelation source) { + ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator(); indicator.setTimeBucket(source.getTimeBucket()); indicator.setSourceServiceId(source.getSourceServiceId()); indicator.setDestServiceId(source.getDestServiceId()); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationClientSideIndicator.java similarity index 81% rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationClientSideIndicator.java index 0663607f529231dd1f28283de096ed7433d92add..cd38b00b0bc294097f4d284fc67433735024745d 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceCallRelationIndicator.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationClientSideIndicator.java @@ -18,26 +18,22 @@ package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation; -import java.util.HashMap; -import java.util.Map; -import lombok.Getter; -import lombok.Setter; +import java.util.*; +import lombok.*; import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType; import org.apache.skywalking.oap.server.core.remote.annotation.StreamData; import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; -import org.apache.skywalking.oap.server.core.storage.annotation.Column; -import org.apache.skywalking.oap.server.core.storage.annotation.IDColumn; -import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntity; +import org.apache.skywalking.oap.server.core.storage.annotation.*; @IndicatorType @StreamData -@StorageEntity(name = ServiceCallRelationIndicator.INDEX_NAME, builder = ServiceCallRelationIndicator.Builder.class) -public class ServiceCallRelationIndicator extends Indicator { +@StorageEntity(name = ServiceRelationClientSideIndicator.INDEX_NAME, builder = ServiceRelationClientSideIndicator.Builder.class) +public class ServiceRelationClientSideIndicator extends Indicator { - public static final String INDEX_NAME = "service_call_relation"; + public static final String INDEX_NAME = "service_relation_client_side"; public static final String SOURCE_SERVICE_ID = "source_service_id"; public static final String DEST_SERVICE_ID = "dest_service_id"; @@ -60,7 +56,7 @@ public class ServiceCallRelationIndicator extends Indicator { } @Override public Indicator toHour() { - ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator(); + ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator(); indicator.setTimeBucket(toTimeBucketInHour()); indicator.setSourceServiceId(getSourceServiceId()); indicator.setDestServiceId(getDestServiceId()); @@ -68,7 +64,7 @@ public class ServiceCallRelationIndicator extends Indicator { } @Override public Indicator toDay() { - ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator(); + ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator(); indicator.setTimeBucket(toTimeBucketInDay()); indicator.setSourceServiceId(getSourceServiceId()); indicator.setDestServiceId(getDestServiceId()); @@ -76,7 +72,7 @@ public class ServiceCallRelationIndicator extends Indicator { } @Override public Indicator toMonth() { - ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator(); + ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator(); indicator.setTimeBucket(toTimeBucketInMonth()); indicator.setSourceServiceId(getSourceServiceId()); indicator.setDestServiceId(getDestServiceId()); @@ -99,8 +95,8 @@ public class ServiceCallRelationIndicator extends Indicator { @Override public RemoteData.Builder serialize() { RemoteData.Builder remoteBuilder = RemoteData.newBuilder(); - remoteBuilder.setDataIntegers(0, getSourceServiceId()); remoteBuilder.setDataIntegers(1, getDestServiceId()); + remoteBuilder.setDataIntegers(0, getSourceServiceId()); remoteBuilder.setDataLongs(0, getTimeBucket()); return remoteBuilder; @@ -122,7 +118,7 @@ public class ServiceCallRelationIndicator extends Indicator { if (getClass() != obj.getClass()) return false; - ServiceCallRelationIndicator indicator = (ServiceCallRelationIndicator)obj; + ServiceRelationClientSideIndicator indicator = (ServiceRelationClientSideIndicator)obj; if (sourceServiceId != indicator.sourceServiceId) return false; if (destServiceId != indicator.destServiceId) @@ -134,21 +130,21 @@ public class ServiceCallRelationIndicator extends Indicator { return true; } - public static class Builder implements StorageBuilder { + public static class Builder implements StorageBuilder { - @Override public ServiceCallRelationIndicator map2Data(Map dbMap) { - ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator(); + @Override public ServiceRelationClientSideIndicator map2Data(Map dbMap) { + ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator(); indicator.setSourceServiceId(((Number)dbMap.get(SOURCE_SERVICE_ID)).intValue()); indicator.setDestServiceId(((Number)dbMap.get(DEST_SERVICE_ID)).intValue()); indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue()); return indicator; } - @Override public Map data2Map(ServiceCallRelationIndicator storageData) { + @Override public Map data2Map(ServiceRelationClientSideIndicator storageData) { Map map = new HashMap<>(); + map.put(TIME_BUCKET, storageData.getTimeBucket()); map.put(SOURCE_SERVICE_ID, storageData.getSourceServiceId()); map.put(DEST_SERVICE_ID, storageData.getDestServiceId()); - map.put(TIME_BUCKET, storageData.getTimeBucket()); return map; } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationServerSideIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationServerSideIndicator.java new file mode 100644 index 0000000000000000000000000000000000000000..c438c68523a956f3a41e410ea43d40fd55f9975d --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationServerSideIndicator.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation; + +import java.util.*; +import lombok.*; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType; +import org.apache.skywalking.oap.server.core.remote.annotation.StreamData; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; +import org.apache.skywalking.oap.server.core.storage.StorageBuilder; +import org.apache.skywalking.oap.server.core.storage.annotation.*; + +@IndicatorType +@StreamData +@StorageEntity(name = ServiceRelationServerSideIndicator.INDEX_NAME, builder = ServiceRelationServerSideIndicator.Builder.class) +public class ServiceRelationServerSideIndicator extends Indicator { + + public static final String INDEX_NAME = "service_relation_server_side"; + public static final String SOURCE_SERVICE_ID = "source_service_id"; + public static final String DEST_SERVICE_ID = "dest_service_id"; + + @Setter @Getter @Column(columnName = SOURCE_SERVICE_ID) @IDColumn private int sourceServiceId; + @Setter @Getter @Column(columnName = DEST_SERVICE_ID) @IDColumn private int destServiceId; + + @Override public String id() { + String splitJointId = String.valueOf(getTimeBucket()); + splitJointId += Const.ID_SPLIT + String.valueOf(sourceServiceId); + splitJointId += Const.ID_SPLIT + String.valueOf(destServiceId); + return splitJointId; + } + + @Override public void combine(Indicator indicator) { + + } + + @Override public void calculate() { + + } + + @Override public Indicator toHour() { + ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator(); + indicator.setTimeBucket(toTimeBucketInHour()); + indicator.setSourceServiceId(getSourceServiceId()); + indicator.setDestServiceId(getDestServiceId()); + return indicator; + } + + @Override public Indicator toDay() { + ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator(); + indicator.setTimeBucket(toTimeBucketInDay()); + indicator.setSourceServiceId(getSourceServiceId()); + indicator.setDestServiceId(getDestServiceId()); + return indicator; + } + + @Override public Indicator toMonth() { + ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator(); + indicator.setTimeBucket(toTimeBucketInMonth()); + indicator.setSourceServiceId(getSourceServiceId()); + indicator.setDestServiceId(getDestServiceId()); + return indicator; + } + + @Override public int remoteHashCode() { + int result = 17; + result = 31 * result + sourceServiceId; + result = 31 * result + destServiceId; + return result; + } + + @Override public void deserialize(RemoteData remoteData) { + setSourceServiceId(remoteData.getDataIntegers(0)); + setDestServiceId(remoteData.getDataIntegers(1)); + setTimeBucket(remoteData.getDataLongs(0)); + } + + @Override public RemoteData.Builder serialize() { + RemoteData.Builder remoteBuilder = RemoteData.newBuilder(); + + remoteBuilder.setDataIntegers(0, getSourceServiceId()); + remoteBuilder.setDataIntegers(1, getDestServiceId()); + remoteBuilder.setDataLongs(0, getTimeBucket()); + + return remoteBuilder; + } + + @Override public int hashCode() { + int result = 17; + result = 31 * result + sourceServiceId; + result = 31 * result + destServiceId; + result = 31 * result + (int)getTimeBucket(); + return result; + } + + @Override public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + + ServiceRelationServerSideIndicator indicator = (ServiceRelationServerSideIndicator)obj; + if (sourceServiceId != indicator.sourceServiceId) + return false; + if (destServiceId != indicator.destServiceId) + return false; + + if (getTimeBucket() != indicator.getTimeBucket()) + return false; + + return true; + } + + public static class Builder implements StorageBuilder { + + @Override public ServiceRelationServerSideIndicator map2Data(Map dbMap) { + ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator(); + indicator.setSourceServiceId(((Number)dbMap.get(SOURCE_SERVICE_ID)).intValue()); + indicator.setDestServiceId(((Number)dbMap.get(DEST_SERVICE_ID)).intValue()); + indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue()); + return indicator; + } + + @Override public Map data2Map(ServiceRelationServerSideIndicator storageData) { + Map map = new HashMap<>(); + map.put(SOURCE_SERVICE_ID, storageData.getSourceServiceId()); + map.put(DEST_SERVICE_ID, storageData.getDestServiceId()); + map.put(TIME_BUCKET, storageData.getTimeBucket()); + return map; + } + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java index 57993dd3ed141feb1bb08d01dfaedfff908d99d3..930e2fae875154db3e348e2d192226d0676cb5e6 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java @@ -20,12 +20,10 @@ package org.apache.skywalking.oap.server.core.query; import java.io.IOException; import java.util.*; -import org.apache.skywalking.oap.server.core.analysis.manual.service.*; import org.apache.skywalking.oap.server.core.query.entity.*; -import org.apache.skywalking.oap.server.core.query.sql.*; import org.apache.skywalking.oap.server.core.source.*; import org.apache.skywalking.oap.server.core.storage.StorageModule; -import org.apache.skywalking.oap.server.core.storage.query.*; +import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO; import org.apache.skywalking.oap.server.library.module.*; import org.apache.skywalking.oap.server.library.module.Service; import org.slf4j.*; @@ -38,90 +36,63 @@ public class TopologyQueryService implements Service { private static final Logger logger = LoggerFactory.getLogger(TopologyQueryService.class); private final ModuleManager moduleManager; - private IMetricQueryDAO metricQueryDAO; - private IUniqueQueryDAO uniqueQueryDAO; + private ITopologyQueryDAO topologyQueryDAO; public TopologyQueryService(ModuleManager moduleManager) { this.moduleManager = moduleManager; } - private IMetricQueryDAO getMetricQueryDAO() { - if (metricQueryDAO == null) { - metricQueryDAO = moduleManager.find(StorageModule.NAME).getService(IMetricQueryDAO.class); + private ITopologyQueryDAO getTopologyQueryDAO() { + if (topologyQueryDAO == null) { + topologyQueryDAO = moduleManager.find(StorageModule.NAME).getService(ITopologyQueryDAO.class); } - return metricQueryDAO; - } - - private IUniqueQueryDAO getUniqueQueryDAO() { - if (uniqueQueryDAO == null) { - uniqueQueryDAO = moduleManager.find(StorageModule.NAME).getService(IUniqueQueryDAO.class); - } - return uniqueQueryDAO; + return topologyQueryDAO; } public Topology getGlobalTopology(final Step step, final long startTB, final long endTB) throws IOException { logger.debug("step: {}, startTimeBucket: {}, endTimeBucket: {}", step, startTB, endTB); - List serviceComponents = loadServiceComponent(step, startTB, endTB); - List serviceMappings = loadServiceMapping(step, startTB, endTB); + List serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB); + List serviceMappings = getTopologyQueryDAO().loadServiceMappings(step, startTB, endTB); - List serviceRelationClientCalls = loadServiceRelationCalls(step, startTB, endTB, "service_relation_client_calls_sum"); - List serviceRelationServerCalls = loadServiceRelationCalls(step, startTB, endTB, "service_relation_server_calls_sum"); + List serviceRelationClientCalls = getTopologyQueryDAO().loadClientSideServiceRelations(step, startTB, endTB); + List serviceRelationServerCalls = getTopologyQueryDAO().loadServerSideServiceRelations(step, startTB, endTB); TopologyBuilder builder = new TopologyBuilder(moduleManager); return builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls); } - public Topology getServiceTopology(final Step step, final long startTimeBucket, final long endTimeBucket, - final String serviceId) { - return new Topology(); - } - - private List loadServiceComponent(final Step step, final long startTB, - final long endTB) throws IOException { - List twoIdGroups = getUniqueQueryDAO().aggregation(ServiceComponentIndicator.INDEX_NAME, step, startTB, endTB, - new Where(), ServiceComponentIndicator.SERVICE_ID, ServiceComponentIndicator.COMPONENT_ID); - - List serviceComponents = new ArrayList<>(); - twoIdGroups.forEach(twoIdGroup -> { - ServiceComponent serviceComponent = new ServiceComponent(); - serviceComponent.setServiceId(twoIdGroup.getId1()); - serviceComponent.setComponentId(twoIdGroup.getId2()); - serviceComponents.add(serviceComponent); - }); - - return serviceComponents; - } - - private List loadServiceMapping(final Step step, final long startTB, - final long endTB) throws IOException { - List twoIdGroups = getUniqueQueryDAO().aggregation(ServiceMappingIndicator.INDEX_NAME, step, startTB, endTB, - new Where(), ServiceMappingIndicator.SERVICE_ID, ServiceMappingIndicator.MAPPING_SERVICE_ID); - - List serviceMappings = new ArrayList<>(); - twoIdGroups.forEach(twoIdGroup -> { - ServiceMapping serviceMapping = new ServiceMapping(); - serviceMapping.setServiceId(twoIdGroup.getId1()); - serviceMapping.setMappingServiceId(twoIdGroup.getId2()); - serviceMappings.add(serviceMapping); + public Topology getServiceTopology(final Step step, final long startTB, final long endTB, + final int serviceId) throws IOException { + List serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB); + List serviceMappings = getTopologyQueryDAO().loadServiceMappings(step, startTB, endTB); + + Set serviceIds = new HashSet<>(); + serviceIds.add(serviceId); + serviceMappings.forEach(mapping -> { + if (mapping.getServiceId() == serviceId) { + serviceIds.add(mapping.getMappingServiceId()); + } }); + List serviceIdList = new ArrayList<>(serviceIds); - return serviceMappings; - } - - private List loadServiceRelationCalls(final Step step, final long startTB, final long endTB, - String indName) throws IOException { - List twoIdGroupValues = getMetricQueryDAO().aggregation(indName, step, startTB, endTB, new Where(), "source_service_id", "dest_service_id", "value", Function.Sum); + List serviceRelationClientCalls = getTopologyQueryDAO().loadSpecifiedClientSideServiceRelations(step, startTB, endTB, serviceIdList); + List serviceRelationServerCalls = getTopologyQueryDAO().loadSpecifiedServerSideServiceRelations(step, startTB, endTB, serviceIdList); - List clientCalls = new ArrayList<>(); + TopologyBuilder builder = new TopologyBuilder(moduleManager); + Topology topology = builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls); - twoIdGroupValues.forEach(twoIdGroupValue -> { - Call call = new Call(); - call.setSource(twoIdGroupValue.getId1()); - call.setTarget(twoIdGroupValue.getId2()); - call.setCalls(twoIdGroupValue.getValue().longValue()); - clientCalls.add(call); + Set nodeIds = new HashSet<>(); + topology.getCalls().forEach(call -> { + nodeIds.add(call.getSource()); + nodeIds.add(call.getTarget()); }); - return clientCalls; + for (int i = topology.getNodes().size() - 1; i >= 0; i--) { + if (!nodeIds.contains(topology.getNodes().get(i).getId())) { + topology.getNodes().remove(i); + } + } + + return topology; } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java index 6bf519deb0bf1239f3ee181747dbc5a7bce64f6f..c1bf7634dbc98a21bdb53c99642f9d47e2c6f91f 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java @@ -19,6 +19,7 @@ package org.apache.skywalking.oap.server.core.storage; import org.apache.skywalking.oap.server.core.storage.cache.*; +import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO; import org.apache.skywalking.oap.server.library.module.ModuleDefine; /** @@ -36,6 +37,7 @@ public class StorageModule extends ModuleDefine { return new Class[] { IBatchDAO.class, StorageDAO.class, IRegisterLockDAO.class, IServiceInventoryCacheDAO.class, IServiceInstanceInventoryCacheDAO.class, - IEndpointInventoryCacheDAO.class, INetworkAddressInventoryCacheDAO.class}; + IEndpointInventoryCacheDAO.class, INetworkAddressInventoryCacheDAO.class, + ITopologyQueryDAO.class}; } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java new file mode 100644 index 0000000000000000000000000000000000000000..74d31b942ba52f9e6dadcf57a713d2d0244ba55c --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.storage.query; + +import java.io.IOException; +import java.util.List; +import org.apache.skywalking.oap.server.core.query.entity.*; +import org.apache.skywalking.oap.server.core.source.*; +import org.apache.skywalking.oap.server.library.module.Service; + +/** + * @author peng-yongsheng + */ +public interface ITopologyQueryDAO extends Service { + + List loadSpecifiedServerSideServiceRelations(Step step, long startTB, long endTB, + List serviceIds) throws IOException; + + List loadSpecifiedClientSideServiceRelations(Step step, long startTB, long endTB, + List serviceIds) throws IOException; + + List loadServerSideServiceRelations(Step step, long startTB, long endTB) throws IOException; + + List loadClientSideServiceRelations(Step step, long startTB, long endTB) throws IOException; + + List loadServiceMappings(Step step, long startTB, long endTB) throws IOException; + + List loadServiceComponents(Step step, long startTB, long endTB) throws IOException; +} diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java index 501513f670be114b3ef8b7b61c91b458ec6cdcec..c90ffb71f2b61e0301dd42909f19562b30a86807 100644 --- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java +++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java @@ -50,7 +50,7 @@ public class TopologyQuery implements GraphQLQueryResolver { return getQueryService().getGlobalTopology(duration.getStep(), startTimeBucket, endTimeBucket); } - public Topology getServiceTopology(final String serviceId, final Duration duration) { + public Topology getServiceTopology(final int serviceId, final Duration duration) throws IOException { long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart()); long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd()); diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java index 8acc0936dc5a6192a768c845920fb59a2a6657f5..696e86e743d1e11a015f5b2fb65ca8800c2b3337 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java @@ -20,12 +20,14 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch; import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.core.storage.cache.*; +import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO; import org.apache.skywalking.oap.server.library.client.NameSpace; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.library.module.*; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.*; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.*; +import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopologyQueryEsDAO; import org.slf4j.*; /** @@ -72,6 +74,8 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider { this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheDAO(elasticSearchClient)); this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEsDAO(elasticSearchClient)); this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEsDAO(elasticSearchClient)); + + this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearchClient)); } @Override diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java index fcc4a1eadaae24676d16d289ea15ab5d44e707b5..48bb44c1eb6bbc2d6cbe9f7b7dcc17c1ee88a231 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java @@ -34,7 +34,7 @@ public abstract class EsDAO extends AbstractDAO { super(client); } - public void queryBuild(SearchSourceBuilder sourceBuilder, Where where, long startTB, long endTB) { + public final void queryBuild(SearchSourceBuilder sourceBuilder, Where where, long startTB, long endTB) { RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(Indicator.TIME_BUCKET).gte(startTB).lte(endTB); if (where.getKeyValues().isEmpty()) { sourceBuilder.query(rangeQueryBuilder); diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java new file mode 100644 index 0000000000000000000000000000000000000000..5e7be1b6efa6e00977780063fd8d72a4fa98dd76 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query; + +import java.io.IOException; +import java.util.*; +import org.apache.skywalking.oap.server.core.UnexpectedException; +import org.apache.skywalking.oap.server.core.analysis.manual.service.*; +import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.*; +import org.apache.skywalking.oap.server.core.query.entity.*; +import org.apache.skywalking.oap.server.core.source.*; +import org.apache.skywalking.oap.server.core.storage.TimePyramidTableNameBuilder; +import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO; +import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; +import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.*; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.terms.*; +import org.elasticsearch.search.builder.SearchSourceBuilder; + +/** + * @author peng-yongsheng + */ +public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO { + + public TopologyQueryEsDAO(ElasticSearchClient client) { + super(client); + } + + @Override + public List loadSpecifiedServerSideServiceRelations(Step step, long startTB, long endTB, + List serviceIds) throws IOException { + if (CollectionUtils.isEmpty(serviceIds)) { + throw new UnexpectedException("Service id is null"); + } + + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource(); + sourceBuilder.size(0); + setQueryCondition(sourceBuilder, startTB, endTB, serviceIds); + + String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationServerSideIndicator.INDEX_NAME); + return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID); + } + + @Override + public List loadSpecifiedClientSideServiceRelations(Step step, long startTB, long endTB, + List serviceIds) throws IOException { + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource(); + sourceBuilder.size(0); + setQueryCondition(sourceBuilder, startTB, endTB, serviceIds); + + String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationClientSideIndicator.INDEX_NAME); + return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID); + } + + private void setQueryCondition(SearchSourceBuilder sourceBuilder, long startTB, long endTB, + List serviceIds) { + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); + boolQuery.must().add(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB)); + + BoolQueryBuilder serviceIdBoolQuery = QueryBuilders.boolQuery(); + boolQuery.must().add(serviceIdBoolQuery); + + if (serviceIds.size() == 1) { + boolQuery.should().add(QueryBuilders.termQuery(ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, serviceIds.get(0))); + boolQuery.should().add(QueryBuilders.termQuery(ServiceRelationServerSideIndicator.DEST_SERVICE_ID, serviceIds.get(0))); + } else { + boolQuery.should().add(QueryBuilders.termsQuery(ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, serviceIds)); + boolQuery.should().add(QueryBuilders.termsQuery(ServiceRelationServerSideIndicator.DEST_SERVICE_ID, serviceIds)); + } + sourceBuilder.query(boolQuery); + } + + @Override public List loadServerSideServiceRelations(Step step, long startTB, long endTB) throws IOException { + String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationServerSideIndicator.INDEX_NAME); + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource(); + sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB)); + sourceBuilder.size(0); + + return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID); + } + + @Override public List loadClientSideServiceRelations(Step step, long startTB, long endTB) throws IOException { + String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationClientSideIndicator.INDEX_NAME); + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource(); + sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB)); + sourceBuilder.size(0); + + return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID); + } + + private List load(SearchSourceBuilder sourceBuilder, String indexName, String sourceCName, + String destCName) throws IOException { + TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(sourceCName).field(sourceCName).size(1000); + sourceAggregation.subAggregation(AggregationBuilders.terms(destCName).field(destCName).size(1000)); + sourceBuilder.aggregation(sourceAggregation); + + SearchResponse response = getClient().search(indexName, sourceBuilder); + + List calls = new ArrayList<>(); + Terms sourceTerms = response.getAggregations().get(sourceCName); + for (Terms.Bucket sourceBucket : sourceTerms.getBuckets()) { + Terms destTerms = sourceBucket.getAggregations().get(destCName); + for (Terms.Bucket destBucket : destTerms.getBuckets()) { + Call value = new Call(); + value.setSource(sourceBucket.getKeyAsNumber().intValue()); + value.setTarget(destBucket.getKeyAsNumber().intValue()); + calls.add(value); + } + } + return calls; + } + + @Override public List loadServiceMappings(Step step, long startTB, long endTB) throws IOException { + String indexName = TimePyramidTableNameBuilder.build(step, ServiceMappingIndicator.INDEX_NAME); + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource(); + sourceBuilder.query(QueryBuilders.rangeQuery(ServiceMappingIndicator.TIME_BUCKET).lte(startTB).gte(endTB)); + sourceBuilder.size(0); + + TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(ServiceMappingIndicator.SERVICE_ID).field(ServiceMappingIndicator.SERVICE_ID).size(1000); + sourceAggregation.subAggregation(AggregationBuilders.terms(ServiceMappingIndicator.MAPPING_SERVICE_ID).field(ServiceMappingIndicator.MAPPING_SERVICE_ID).size(1000)); + sourceBuilder.aggregation(sourceAggregation); + + SearchResponse response = getClient().search(indexName, sourceBuilder); + + List serviceMappings = new ArrayList<>(); + Terms serviceIdTerms = response.getAggregations().get(ServiceMappingIndicator.SERVICE_ID); + for (Terms.Bucket serviceIdBucket : serviceIdTerms.getBuckets()) { + Terms mappingServiceIdTerms = serviceIdBucket.getAggregations().get(ServiceMappingIndicator.MAPPING_SERVICE_ID); + for (Terms.Bucket mappingServiceIdBucket : mappingServiceIdTerms.getBuckets()) { + ServiceMapping serviceMapping = new ServiceMapping(); + serviceMapping.setServiceId(serviceIdBucket.getKeyAsNumber().intValue()); + serviceMapping.setMappingServiceId(mappingServiceIdBucket.getKeyAsNumber().intValue()); + serviceMappings.add(serviceMapping); + } + } + return serviceMappings; + } + + @Override + public List loadServiceComponents(Step step, long startTB, long endTB) throws IOException { + String indexName = TimePyramidTableNameBuilder.build(step, ServiceComponentIndicator.INDEX_NAME); + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource(); + sourceBuilder.query(QueryBuilders.rangeQuery(ServiceComponentIndicator.TIME_BUCKET).lte(startTB).gte(endTB)); + sourceBuilder.size(0); + + TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(ServiceComponentIndicator.SERVICE_ID).field(ServiceComponentIndicator.SERVICE_ID).size(1000); + sourceAggregation.subAggregation(AggregationBuilders.terms(ServiceComponentIndicator.COMPONENT_ID).field(ServiceComponentIndicator.COMPONENT_ID).size(1000)); + sourceBuilder.aggregation(sourceAggregation); + + SearchResponse response = getClient().search(indexName, sourceBuilder); + + List serviceComponents = new ArrayList<>(); + Terms serviceIdTerms = response.getAggregations().get(ServiceComponentIndicator.SERVICE_ID); + for (Terms.Bucket serviceIdBucket : serviceIdTerms.getBuckets()) { + Terms componentIdTerms = serviceIdBucket.getAggregations().get(ServiceComponentIndicator.COMPONENT_ID); + for (Terms.Bucket componentIdBucket : componentIdTerms.getBuckets()) { + ServiceComponent serviceComponent = new ServiceComponent(); + serviceComponent.setServiceId(serviceIdBucket.getKeyAsNumber().intValue()); + serviceComponent.setComponentId(componentIdBucket.getKeyAsNumber().intValue()); + serviceComponents.add(serviceComponent); + } + } + return serviceComponents; + } +}