diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java index 7a44a31548f5a3a8b253dc0a336d97dcf563ce0d..a6765000470d736c43ce066c8e48079f1ec36723 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java @@ -30,10 +30,10 @@ import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancej import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancejvmmemorypool.ServiceInstanceJVMMemoryPoolDispatcher; import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancerelation.ServiceInstanceRelationDispatcher; import org.apache.skywalking.oap.server.core.analysis.generated.servicerelation.ServiceRelationDispatcher; +import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.EndpointCallRelationDispatcher; import org.apache.skywalking.oap.server.core.analysis.manual.service.*; import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.ServiceCallRelationDispatcher; -import org.apache.skywalking.oap.server.core.source.Scope; -import org.apache.skywalking.oap.server.core.source.Source; +import org.apache.skywalking.oap.server.core.source.*; import org.slf4j.*; /** @@ -59,7 +59,7 @@ public class DispatcherManager { this.dispatcherMap.put(Scope.ServiceRelation, new SourceDispatcher[] {new ServiceRelationDispatcher(), new ServiceCallRelationDispatcher()}); this.dispatcherMap.put(Scope.ServiceInstanceRelation, new SourceDispatcher[] {new ServiceInstanceRelationDispatcher()}); - this.dispatcherMap.put(Scope.EndpointRelation, new SourceDispatcher[] {new EndpointRelationDispatcher()}); + this.dispatcherMap.put(Scope.EndpointRelation, new SourceDispatcher[] {new EndpointRelationDispatcher(), new EndpointCallRelationDispatcher()}); this.dispatcherMap.put(Scope.ServiceInstanceJVMCPU, new SourceDispatcher[] {new ServiceInstanceJVMCPUDispatcher()}); this.dispatcherMap.put(Scope.ServiceInstanceJVMGC, new SourceDispatcher[] {new ServiceInstanceJVMGCDispatcher()}); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointCallRelationDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointCallRelationDispatcher.java new file mode 100644 index 0000000000000000000000000000000000000000..8b011694c4297874f99d2149be936d7ca08ac3fd --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointCallRelationDispatcher.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation; + +import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher; +import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess; +import org.apache.skywalking.oap.server.core.source.EndpointRelation; + +/** + * @author wusheng + */ +public class EndpointCallRelationDispatcher implements SourceDispatcher { + @Override + public void dispatch(EndpointRelation source) { + switch (source.getDetectPoint()) { + case CLIENT: + clientSide(source); + break; + case SERVER: + serverSide(source); + break; + } + } + + private void serverSide(EndpointRelation source) { + EndpointRelationServerSideIndicator indicator = new EndpointRelationServerSideIndicator(); + indicator.setTimeBucket(source.getTimeBucket()); + indicator.setSourceEndpointId(source.getEndpointId()); + indicator.setDestEndpointId(source.getChildEndpointId()); + IndicatorProcess.INSTANCE.in(indicator); + } + + private void clientSide(EndpointRelation source) { + EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator(); + indicator.setTimeBucket(source.getTimeBucket()); + indicator.setSourceEndpointId(source.getEndpointId()); + indicator.setDestEndpointId(source.getChildEndpointId()); + IndicatorProcess.INSTANCE.in(indicator); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointRelationClientSideIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointRelationClientSideIndicator.java new file mode 100644 index 0000000000000000000000000000000000000000..9bea74bff94bffb23b7078d2ee32dc8a7134486a --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointRelationClientSideIndicator.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation; + +import java.util.*; +import lombok.*; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType; +import org.apache.skywalking.oap.server.core.remote.annotation.StreamData; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; +import org.apache.skywalking.oap.server.core.storage.StorageBuilder; +import org.apache.skywalking.oap.server.core.storage.annotation.*; + +@IndicatorType +@StreamData +@StorageEntity(name = EndpointRelationClientSideIndicator.INDEX_NAME, builder = EndpointRelationClientSideIndicator.Builder.class) +public class EndpointRelationClientSideIndicator extends Indicator { + + public static final String INDEX_NAME = "endpoint_relation_client_side"; + public static final String SOURCE_ENDPOINT_ID = "source_endpoint_id"; + public static final String DEST_ENDPOINT_ID = "dest_endpoint_id"; + + @Setter @Getter @Column(columnName = SOURCE_ENDPOINT_ID) @IDColumn private int sourceEndpointId; + @Setter @Getter @Column(columnName = DEST_ENDPOINT_ID) @IDColumn private int destEndpointId; + + @Override public String id() { + String splitJointId = String.valueOf(getTimeBucket()); + splitJointId += Const.ID_SPLIT + String.valueOf(sourceEndpointId); + splitJointId += Const.ID_SPLIT + String.valueOf(destEndpointId); + return splitJointId; + } + + @Override public void combine(Indicator indicator) { + + } + + @Override public void calculate() { + + } + + @Override public Indicator toHour() { + EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator(); + indicator.setTimeBucket(toTimeBucketInHour()); + indicator.setSourceEndpointId(getSourceEndpointId()); + indicator.setDestEndpointId(getDestEndpointId()); + return indicator; + } + + @Override public Indicator toDay() { + EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator(); + indicator.setTimeBucket(toTimeBucketInDay()); + indicator.setSourceEndpointId(getSourceEndpointId()); + indicator.setDestEndpointId(getDestEndpointId()); + return indicator; + } + + @Override public Indicator toMonth() { + EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator(); + indicator.setTimeBucket(toTimeBucketInMonth()); + indicator.setSourceEndpointId(getSourceEndpointId()); + indicator.setDestEndpointId(getDestEndpointId()); + return indicator; + } + + @Override public int remoteHashCode() { + int result = 17; + result = 31 * result + sourceEndpointId; + result = 31 * result + destEndpointId; + return result; + } + + @Override public void deserialize(RemoteData remoteData) { + setSourceEndpointId(remoteData.getDataIntegers(0)); + setDestEndpointId(remoteData.getDataIntegers(1)); + setTimeBucket(remoteData.getDataLongs(0)); + } + + @Override public RemoteData.Builder serialize() { + RemoteData.Builder remoteBuilder = RemoteData.newBuilder(); + + remoteBuilder.setDataIntegers(1, getDestEndpointId()); + remoteBuilder.setDataIntegers(0, getSourceEndpointId()); + remoteBuilder.setDataLongs(0, getTimeBucket()); + + return remoteBuilder; + } + + @Override public int hashCode() { + int result = 17; + result = 31 * result + sourceEndpointId; + result = 31 * result + destEndpointId; + result = 31 * result + (int)getTimeBucket(); + return result; + } + + @Override public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + + EndpointRelationClientSideIndicator indicator = (EndpointRelationClientSideIndicator)obj; + if (sourceEndpointId != indicator.sourceEndpointId) + return false; + if (destEndpointId != indicator.destEndpointId) + return false; + + if (getTimeBucket() != indicator.getTimeBucket()) + return false; + + return true; + } + + public static class Builder implements StorageBuilder { + + @Override public EndpointRelationClientSideIndicator map2Data(Map dbMap) { + EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator(); + indicator.setSourceEndpointId(((Number)dbMap.get(SOURCE_ENDPOINT_ID)).intValue()); + indicator.setDestEndpointId(((Number)dbMap.get(DEST_ENDPOINT_ID)).intValue()); + indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue()); + return indicator; + } + + @Override public Map data2Map(EndpointRelationClientSideIndicator storageData) { + Map map = new HashMap<>(); + map.put(TIME_BUCKET, storageData.getTimeBucket()); + map.put(SOURCE_ENDPOINT_ID, storageData.getSourceEndpointId()); + map.put(DEST_ENDPOINT_ID, storageData.getDestEndpointId()); + return map; + } + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointRelationServerSideIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointRelationServerSideIndicator.java new file mode 100644 index 0000000000000000000000000000000000000000..52c7ee518356b872304e62a043ec035d78ad48a7 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointRelationServerSideIndicator.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation; + +import java.util.*; +import lombok.*; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType; +import org.apache.skywalking.oap.server.core.remote.annotation.StreamData; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; +import org.apache.skywalking.oap.server.core.storage.StorageBuilder; +import org.apache.skywalking.oap.server.core.storage.annotation.*; + +@IndicatorType +@StreamData +@StorageEntity(name = EndpointRelationServerSideIndicator.INDEX_NAME, builder = EndpointRelationServerSideIndicator.Builder.class) +public class EndpointRelationServerSideIndicator extends Indicator { + + public static final String INDEX_NAME = "endpoint_relation_server_side"; + public static final String SOURCE_ENDPOINT_ID = "source_endpoint_id"; + public static final String DEST_ENDPOINT_ID = "dest_endpoint_id"; + + @Setter @Getter @Column(columnName = SOURCE_ENDPOINT_ID) @IDColumn private int sourceEndpointId; + @Setter @Getter @Column(columnName = DEST_ENDPOINT_ID) @IDColumn private int destEndpointId; + + @Override public String id() { + String splitJointId = String.valueOf(getTimeBucket()); + splitJointId += Const.ID_SPLIT + String.valueOf(sourceEndpointId); + splitJointId += Const.ID_SPLIT + String.valueOf(destEndpointId); + return splitJointId; + } + + @Override public void combine(Indicator indicator) { + + } + + @Override public void calculate() { + + } + + @Override public Indicator toHour() { + EndpointRelationServerSideIndicator indicator = new EndpointRelationServerSideIndicator(); + indicator.setTimeBucket(toTimeBucketInHour()); + indicator.setSourceEndpointId(getSourceEndpointId()); + indicator.setDestEndpointId(getDestEndpointId()); + return indicator; + } + + @Override public Indicator toDay() { + EndpointRelationServerSideIndicator indicator = new EndpointRelationServerSideIndicator(); + indicator.setTimeBucket(toTimeBucketInDay()); + indicator.setSourceEndpointId(getSourceEndpointId()); + indicator.setDestEndpointId(getDestEndpointId()); + return indicator; + } + + @Override public Indicator toMonth() { + EndpointRelationServerSideIndicator indicator = new EndpointRelationServerSideIndicator(); + indicator.setTimeBucket(toTimeBucketInMonth()); + indicator.setSourceEndpointId(getSourceEndpointId()); + indicator.setDestEndpointId(getDestEndpointId()); + return indicator; + } + + @Override public int remoteHashCode() { + int result = 17; + result = 31 * result + sourceEndpointId; + result = 31 * result + destEndpointId; + return result; + } + + @Override public void deserialize(RemoteData remoteData) { + setSourceEndpointId(remoteData.getDataIntegers(0)); + setDestEndpointId(remoteData.getDataIntegers(1)); + setTimeBucket(remoteData.getDataLongs(0)); + } + + @Override public RemoteData.Builder serialize() { + RemoteData.Builder remoteBuilder = RemoteData.newBuilder(); + + remoteBuilder.setDataIntegers(0, getSourceEndpointId()); + remoteBuilder.setDataIntegers(1, getDestEndpointId()); + remoteBuilder.setDataLongs(0, getTimeBucket()); + + return remoteBuilder; + } + + @Override public int hashCode() { + int result = 17; + result = 31 * result + sourceEndpointId; + result = 31 * result + destEndpointId; + result = 31 * result + (int)getTimeBucket(); + return result; + } + + @Override public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + + EndpointRelationServerSideIndicator indicator = (EndpointRelationServerSideIndicator)obj; + if (sourceEndpointId != indicator.sourceEndpointId) + return false; + if (destEndpointId != indicator.destEndpointId) + return false; + + if (getTimeBucket() != indicator.getTimeBucket()) + return false; + + return true; + } + + public static class Builder implements StorageBuilder { + + @Override public EndpointRelationServerSideIndicator map2Data(Map dbMap) { + EndpointRelationServerSideIndicator indicator = new EndpointRelationServerSideIndicator(); + indicator.setSourceEndpointId(((Number)dbMap.get(SOURCE_ENDPOINT_ID)).intValue()); + indicator.setDestEndpointId(((Number)dbMap.get(DEST_ENDPOINT_ID)).intValue()); + indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue()); + return indicator; + } + + @Override public Map data2Map(EndpointRelationServerSideIndicator storageData) { + Map map = new HashMap<>(); + map.put(SOURCE_ENDPOINT_ID, storageData.getSourceEndpointId()); + map.put(DEST_ENDPOINT_ID, storageData.getDestEndpointId()); + map.put(TIME_BUCKET, storageData.getTimeBucket()); + return map; + } + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyBuilder.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyBuilder.java index abdae7c71e4c91dab2c1c1463fb00cbc0544aa16..7be38bd0414ce4a2569ba83649476df7b05d2016 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyBuilder.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyBuilder.java @@ -37,12 +37,10 @@ class TopologyBuilder { private static final Logger logger = LoggerFactory.getLogger(TopologyBuilder.class); private final ServiceInventoryCache serviceInventoryCache; - // private final DateBetweenService dateBetweenService; private final IComponentLibraryCatalogService componentLibraryCatalogService; TopologyBuilder(ModuleManager moduleManager) { this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).getService(ServiceInventoryCache.class); -// this.dateBetweenService = new DateBetweenService(moduleManager); this.componentLibraryCatalogService = moduleManager.find(CoreModule.NAME).getService(IComponentLibraryCatalogService.class); } @@ -56,8 +54,6 @@ class TopologyBuilder { serviceRelationServerCalls = serverCallsFilter(serviceRelationServerCalls); List nodes = new LinkedList<>(); - Map applicationMinuteBetweenMap = new HashMap<>(); - List calls = new LinkedList<>(); Set nodeIds = new HashSet<>(); serviceRelationClientCalls.forEach(clientCall -> { @@ -91,17 +87,13 @@ class TopologyBuilder { int actualTargetId = mappings.getOrDefault(target.getSequence(), target.getSequence()); call.setTarget(actualTargetId); call.setCallType(nodeCompMap.get(clientCall.getTarget())); -// try { -// call.setCpm(clientCall.getCalls() / getApplicationMinuteBetween(applicationMinuteBetweenMap, source.getSequence(), startSecondTimeBucket, endSecondTimeBucket)); -// } catch (ParseException e) { -// logger.error(e.getMessage(), e); -// } + call.setId(clientCall.getId()); calls.add(call); }); - serviceRelationServerCalls.forEach(referenceMetric -> { - ServiceInventory source = serviceInventoryCache.get(referenceMetric.getSource()); - ServiceInventory target = serviceInventoryCache.get(referenceMetric.getTarget()); + serviceRelationServerCalls.forEach(serverCall -> { + ServiceInventory source = serviceInventoryCache.get(serverCall.getSource()); + ServiceInventory target = serviceInventoryCache.get(serverCall.getTarget()); if (source.getSequence() == Const.NONE_SERVICE_ID) { if (!nodeIds.contains(source.getSequence())) { @@ -128,17 +120,13 @@ class TopologyBuilder { Call call = new Call(); call.setSource(source.getSequence()); call.setTarget(target.getSequence()); + call.setId(serverCall.getId()); if (source.getSequence() == Const.NONE_SERVICE_ID) { call.setCallType(Const.EMPTY_STRING); } else { - call.setCallType(nodeCompMap.get(referenceMetric.getTarget())); + call.setCallType(nodeCompMap.get(serverCall.getTarget())); } -// try { -// call.setCpm(referenceMetric.getCalls() / getApplicationMinuteBetween(applicationMinuteBetweenMap, target.getSequence(), startSecondTimeBucket, endSecondTimeBucket)); -// } catch (ParseException e) { -// logger.error(e.getMessage(), e); -// } calls.add(call); }); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java index 930e2fae875154db3e348e2d192226d0676cb5e6..3ba322f01aeb51d4af4aa131fc624c4657b6a60c 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopologyQueryService.java @@ -20,6 +20,9 @@ package org.apache.skywalking.oap.server.core.query; import java.io.IOException; import java.util.*; +import org.apache.skywalking.oap.server.core.*; +import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache; +import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService; import org.apache.skywalking.oap.server.core.query.entity.*; import org.apache.skywalking.oap.server.core.source.*; import org.apache.skywalking.oap.server.core.storage.StorageModule; @@ -37,6 +40,8 @@ public class TopologyQueryService implements Service { private final ModuleManager moduleManager; private ITopologyQueryDAO topologyQueryDAO; + private EndpointInventoryCache endpointInventoryCache; + private IComponentLibraryCatalogService componentLibraryCatalogService; public TopologyQueryService(ModuleManager moduleManager) { this.moduleManager = moduleManager; @@ -49,6 +54,20 @@ public class TopologyQueryService implements Service { return topologyQueryDAO; } + private IComponentLibraryCatalogService getComponentLibraryCatalogService() { + if (componentLibraryCatalogService == null) { + componentLibraryCatalogService = moduleManager.find(CoreModule.NAME).getService(IComponentLibraryCatalogService.class); + } + return componentLibraryCatalogService; + } + + private EndpointInventoryCache getEndpointInventoryCache() { + if (endpointInventoryCache == null) { + endpointInventoryCache = moduleManager.find(CoreModule.NAME).getService(EndpointInventoryCache.class); + } + return endpointInventoryCache; + } + public Topology getGlobalTopology(final Step step, final long startTB, final long endTB) throws IOException { logger.debug("step: {}, startTimeBucket: {}, endTimeBucket: {}", step, startTB, endTB); List serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB); @@ -95,4 +114,23 @@ public class TopologyQueryService implements Service { return topology; } + + public Topology getEndpointTopology(final Step step, final long startTB, final long endTB, + final int endpointId) throws IOException { + List serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB); + + Map components = new HashMap<>(); + serviceComponents.forEach(component -> components.put(component.getServiceId(), getComponentLibraryCatalogService().getComponentName(component.getComponentId()))); + + List calls = getTopologyQueryDAO().loadSpecifiedDestOfServerSideEndpointRelations(step, startTB, endTB, endpointId); + calls.addAll(getTopologyQueryDAO().loadSpecifiedSourceOfClientSideEndpointRelations(step, startTB, endTB, endpointId)); + + calls.forEach(call -> { + call.setCallType(components.getOrDefault(getEndpointInventoryCache().get(call.getTarget()).getServiceId(), Const.UNKNOWN)); + }); + + Topology topology = new Topology(); + topology.getCalls().addAll(calls); + return topology; + } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Call.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Call.java index 7a2f807ed6727770a0dcc9d84397e649abb63e47..92f78526780adef2508745ac110f8c1813c087b4 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Call.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/Call.java @@ -26,6 +26,5 @@ public class Call { private int source; private int target; private String callType; - private long calls; - private long cpm; + private String id; } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/EndpointRelation.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/EndpointRelation.java index 76cb76fd3c35f8a4a06dd9569ab94eeec2bb5d67..d1a025e765c5d91ad65a17f93a3784208fecd905 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/EndpointRelation.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/EndpointRelation.java @@ -34,6 +34,10 @@ public class EndpointRelation extends Source { return String.valueOf(endpointId) + Const.ID_SPLIT + String.valueOf(childEndpointId); } + public static String buildEntityId(int endpointId, int childEndpointId) { + return String.valueOf(endpointId) + Const.ID_SPLIT + String.valueOf(childEndpointId); + } + @Getter @Setter private int endpointId; @Getter @Setter private String endpoint; @Getter @Setter private int serviceId; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceRelation.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceRelation.java index b65b150a3d959347a70cdadaf4c9b875456a4462..bed3929eb20f55ca07577ed07bd236ba60083d76 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceRelation.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceRelation.java @@ -34,6 +34,10 @@ public class ServiceRelation extends Source { return String.valueOf(sourceServiceId) + Const.ID_SPLIT + String.valueOf(destServiceId); } + public static String buildEntityId(int sourceServiceId, int destServiceId) { + return String.valueOf(sourceServiceId) + Const.ID_SPLIT + String.valueOf(destServiceId); + } + @Getter @Setter private int sourceServiceId; @Getter @Setter private String sourceServiceName; @Getter @Setter private String sourceServiceInstanceName; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java index 74d31b942ba52f9e6dadcf57a713d2d0244ba55c..514fb75b86a2b06eaed23354bd2ed0a746ff43c6 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopologyQueryDAO.java @@ -42,4 +42,10 @@ public interface ITopologyQueryDAO extends Service { List loadServiceMappings(Step step, long startTB, long endTB) throws IOException; List loadServiceComponents(Step step, long startTB, long endTB) throws IOException; + + List loadSpecifiedDestOfServerSideEndpointRelations(Step step, long startTB, long endTB, + int destEndpointId) throws IOException; + + List loadSpecifiedSourceOfClientSideEndpointRelations(Step step, long startTB, long endTB, + int sourceEndpointId) throws IOException; } diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java index c90ffb71f2b61e0301dd42909f19562b30a86807..185e858049f14032c691b471fad48bd73a1dad1b 100644 --- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java +++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopologyQuery.java @@ -56,4 +56,11 @@ public class TopologyQuery implements GraphQLQueryResolver { return getQueryService().getServiceTopology(duration.getStep(), startTimeBucket, endTimeBucket, serviceId); } + + public Topology getEndpointTopology(final int endpointId, final Duration duration) throws IOException { + long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart()); + long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd()); + + return getQueryService().getEndpointTopology(duration.getStep(), startTimeBucket, endTimeBucket, endpointId); + } } diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol index c6ee5d15f38024c3be61be579eac4035ffbfd4bf..b93baa58852595164aae50e3e7be37683c220175 160000 --- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol +++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol @@ -1 +1 @@ -Subproject commit c6ee5d15f38024c3be61be579eac4035ffbfd4bf +Subproject commit b93baa58852595164aae50e3e7be37683c220175 diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java index 5e7be1b6efa6e00977780063fd8d72a4fa98dd76..00de99eac6daa09c6abc49aacfd2455cc391391d 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java @@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query; import java.io.IOException; import java.util.*; import org.apache.skywalking.oap.server.core.UnexpectedException; +import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.*; import org.apache.skywalking.oap.server.core.analysis.manual.service.*; import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.*; import org.apache.skywalking.oap.server.core.query.entity.*; @@ -57,18 +58,22 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO { setQueryCondition(sourceBuilder, startTB, endTB, serviceIds); String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationServerSideIndicator.INDEX_NAME); - return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID); + return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID, Source.Service); } @Override public List loadSpecifiedClientSideServiceRelations(Step step, long startTB, long endTB, List serviceIds) throws IOException { + if (CollectionUtils.isEmpty(serviceIds)) { + throw new UnexpectedException("Service id is null"); + } + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource(); sourceBuilder.size(0); setQueryCondition(sourceBuilder, startTB, endTB, serviceIds); String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationClientSideIndicator.INDEX_NAME); - return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID); + return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID, Source.Service); } private void setQueryCondition(SearchSourceBuilder sourceBuilder, long startTB, long endTB, @@ -95,7 +100,7 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO { sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB)); sourceBuilder.size(0); - return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID); + return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID, Source.Service); } @Override public List loadClientSideServiceRelations(Step step, long startTB, long endTB) throws IOException { @@ -104,29 +109,7 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO { sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB)); sourceBuilder.size(0); - return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID); - } - - private List load(SearchSourceBuilder sourceBuilder, String indexName, String sourceCName, - String destCName) throws IOException { - TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(sourceCName).field(sourceCName).size(1000); - sourceAggregation.subAggregation(AggregationBuilders.terms(destCName).field(destCName).size(1000)); - sourceBuilder.aggregation(sourceAggregation); - - SearchResponse response = getClient().search(indexName, sourceBuilder); - - List calls = new ArrayList<>(); - Terms sourceTerms = response.getAggregations().get(sourceCName); - for (Terms.Bucket sourceBucket : sourceTerms.getBuckets()) { - Terms destTerms = sourceBucket.getAggregations().get(destCName); - for (Terms.Bucket destBucket : destTerms.getBuckets()) { - Call value = new Call(); - value.setSource(sourceBucket.getKeyAsNumber().intValue()); - value.setTarget(destBucket.getKeyAsNumber().intValue()); - calls.add(value); - } - } - return calls; + return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID, Source.Service); } @Override public List loadServiceMappings(Step step, long startTB, long endTB) throws IOException { @@ -181,4 +164,70 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO { } return serviceComponents; } + + @Override + public List loadSpecifiedDestOfServerSideEndpointRelations(Step step, long startTB, long endTB, + int destEndpointId) throws IOException { + String indexName = TimePyramidTableNameBuilder.build(step, EndpointRelationServerSideIndicator.INDEX_NAME); + + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource(); + sourceBuilder.size(0); + + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); + boolQuery.must().add(QueryBuilders.rangeQuery(EndpointRelationServerSideIndicator.TIME_BUCKET).gte(startTB).lte(endTB)); + boolQuery.must().add(QueryBuilders.termQuery(EndpointRelationServerSideIndicator.DEST_ENDPOINT_ID, destEndpointId)); + sourceBuilder.query(boolQuery); + + return load(sourceBuilder, indexName, EndpointRelationServerSideIndicator.SOURCE_ENDPOINT_ID, EndpointRelationServerSideIndicator.DEST_ENDPOINT_ID, Source.Endpoint); + } + + @Override + public List loadSpecifiedSourceOfClientSideEndpointRelations(Step step, long startTB, long endTB, + int sourceEndpointId) throws IOException { + String indexName = TimePyramidTableNameBuilder.build(step, EndpointRelationClientSideIndicator.INDEX_NAME); + + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource(); + sourceBuilder.size(0); + + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); + boolQuery.must().add(QueryBuilders.rangeQuery(EndpointRelationClientSideIndicator.TIME_BUCKET).gte(startTB).lte(endTB)); + boolQuery.must().add(QueryBuilders.termQuery(EndpointRelationClientSideIndicator.SOURCE_ENDPOINT_ID, sourceEndpointId)); + sourceBuilder.query(boolQuery); + + return load(sourceBuilder, indexName, EndpointRelationClientSideIndicator.SOURCE_ENDPOINT_ID, EndpointRelationClientSideIndicator.DEST_ENDPOINT_ID, Source.Endpoint); + } + + private List load(SearchSourceBuilder sourceBuilder, String indexName, String sourceCName, + String destCName, Source source) throws IOException { + TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(sourceCName).field(sourceCName).size(1000); + sourceAggregation.subAggregation(AggregationBuilders.terms(destCName).field(destCName).size(1000)); + sourceBuilder.aggregation(sourceAggregation); + + SearchResponse response = getClient().search(indexName, sourceBuilder); + + List calls = new ArrayList<>(); + Terms sourceTerms = response.getAggregations().get(sourceCName); + for (Terms.Bucket sourceBucket : sourceTerms.getBuckets()) { + Terms destTerms = sourceBucket.getAggregations().get(destCName); + for (Terms.Bucket destBucket : destTerms.getBuckets()) { + Call value = new Call(); + value.setSource(sourceBucket.getKeyAsNumber().intValue()); + value.setTarget(destBucket.getKeyAsNumber().intValue()); + switch (source) { + case Service: + value.setId(ServiceRelation.buildEntityId(value.getSource(), value.getTarget())); + break; + case Endpoint: + value.setId(EndpointRelation.buildEntityId(value.getSource(), value.getTarget())); + break; + } + calls.add(value); + } + } + return calls; + } + + enum Source { + Service, Endpoint + } }