未验证 提交 595498e7 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

New topology query protocol implementation (#2654)

* Make backend fits new topology query protocol and logic.

* Change topology line id.

* Change endpoint point id rule to match service relation id rule.

* Refactor util.

* Revert wrong refactor by IDEA.

* Revert some changes.

* Fix entity id bug.

* Fix endpoint topology query

* Update UI.

* UI license update.
上级 b7ef10a7
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.Const;
public class RelationDefineUtil {
public static String buildEntityId(RelationDefine define) {
return String.valueOf(define.source)
+ Const.ID_SPLIT + String.valueOf(define.dest)
+ Const.ID_SPLIT + String.valueOf(define.componentId);
}
/**
* @param entityId
* @return 1. sourceId 2. destId 3. componentId
*/
public static RelationDefine splitEntityId(String entityId) {
String[] parts = entityId.split(Const.ID_SPLIT);
if (parts.length != 3) {
throw new RuntimeException("Illegal Service/Endpoint Relation entity id");
}
return new RelationDefine(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]), Integer.parseInt(parts[2]));
}
@Getter
public static class RelationDefine {
private int source;
private int dest;
private int componentId;
public RelationDefine(int source, int dest, int componentId) {
this.source = source;
this.dest = dest;
this.componentId = componentId;
}
}
}
......@@ -42,8 +42,7 @@ public class EndpointCallRelationDispatcher implements SourceDispatcher<Endpoint
metrics.setSourceEndpointId(source.getEndpointId());
metrics.setDestEndpointId(source.getChildEndpointId());
metrics.setComponentId(source.getComponentId());
metrics.setEntityId(source.getEntityId());
metrics.buildEntityId();
MetricsStreamProcessor.getInstance().in(metrics);
}
}
......@@ -40,7 +40,7 @@ public class EndpointRelationServerSideMetrics extends Metrics {
@Setter @Getter @Column(columnName = SOURCE_ENDPOINT_ID) @IDColumn private int sourceEndpointId;
@Setter @Getter @Column(columnName = DEST_ENDPOINT_ID) @IDColumn private int destEndpointId;
@Setter @Getter @Column(columnName = COMPONENT_ID) @IDColumn private int componentId;
@Setter @Getter @Column(columnName = ENTITY_ID) @IDColumn private String entityId;
@Setter(AccessLevel.PRIVATE) @Getter @Column(columnName = ENTITY_ID) @IDColumn private String entityId;
@Override public String id() {
String splitJointId = String.valueOf(getTimeBucket());
......@@ -50,6 +50,13 @@ public class EndpointRelationServerSideMetrics extends Metrics {
return splitJointId;
}
public void buildEntityId() {
String splitJointId = String.valueOf(sourceEndpointId);
splitJointId += Const.ID_SPLIT + String.valueOf(destEndpointId);
splitJointId += Const.ID_SPLIT + String.valueOf(componentId);
entityId = splitJointId;
}
@Override public void combine(Metrics metrics) {
}
......
......@@ -45,8 +45,7 @@ public class ServiceCallRelationDispatcher implements SourceDispatcher<ServiceRe
metrics.setSourceServiceId(source.getSourceServiceId());
metrics.setDestServiceId(source.getDestServiceId());
metrics.setComponentId(source.getComponentId());
metrics.setEntityId(source.getEntityId());
metrics.buildEntityId();
MetricsStreamProcessor.getInstance().in(metrics);
}
......@@ -56,8 +55,7 @@ public class ServiceCallRelationDispatcher implements SourceDispatcher<ServiceRe
metrics.setSourceServiceId(source.getSourceServiceId());
metrics.setDestServiceId(source.getDestServiceId());
metrics.setComponentId(source.getComponentId());
metrics.setEntityId(source.getEntityId());
metrics.buildEntityId();
MetricsStreamProcessor.getInstance().in(metrics);
}
}
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.manual.RelationDefineUtil;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
......@@ -40,16 +41,22 @@ public class ServiceRelationClientSideMetrics extends Metrics {
@Setter @Getter @Column(columnName = SOURCE_SERVICE_ID) @IDColumn private int sourceServiceId;
@Setter @Getter @Column(columnName = DEST_SERVICE_ID) @IDColumn private int destServiceId;
@Setter @Getter @Column(columnName = COMPONENT_ID) @IDColumn private int componentId;
@Setter @Getter @Column(columnName = ENTITY_ID) @IDColumn private String entityId;
@Setter(AccessLevel.PRIVATE) @Getter @Column(columnName = ENTITY_ID) @IDColumn private String entityId;
@Override public String id() {
String splitJointId = String.valueOf(getTimeBucket());
splitJointId += Const.ID_SPLIT + sourceServiceId;
splitJointId += Const.ID_SPLIT + destServiceId;
splitJointId += Const.ID_SPLIT + componentId;
splitJointId += Const.ID_SPLIT + RelationDefineUtil.buildEntityId(
new RelationDefineUtil.RelationDefine(sourceServiceId, destServiceId, componentId));
return splitJointId;
}
public void buildEntityId() {
String splitJointId = String.valueOf(sourceServiceId);
splitJointId += Const.ID_SPLIT + String.valueOf(destServiceId);
splitJointId += Const.ID_SPLIT + String.valueOf(componentId);
entityId = splitJointId;
}
@Override public void combine(Metrics metrics) {
}
......
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.manual.RelationDefineUtil;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
......@@ -40,16 +41,22 @@ public class ServiceRelationServerSideMetrics extends Metrics {
@Setter @Getter @Column(columnName = SOURCE_SERVICE_ID) @IDColumn private int sourceServiceId;
@Setter @Getter @Column(columnName = DEST_SERVICE_ID) @IDColumn private int destServiceId;
@Setter @Getter @Column(columnName = COMPONENT_ID) @IDColumn private int componentId;
@Setter @Getter @Column(columnName = ENTITY_ID) @IDColumn private String entityId;
@Setter(AccessLevel.PRIVATE) @Getter @Column(columnName = ENTITY_ID) @IDColumn private String entityId;
@Override public String id() {
String splitJointId = String.valueOf(getTimeBucket());
splitJointId += Const.ID_SPLIT + sourceServiceId;
splitJointId += Const.ID_SPLIT + destServiceId;
splitJointId += Const.ID_SPLIT + componentId;
splitJointId += Const.ID_SPLIT + RelationDefineUtil.buildEntityId(
new RelationDefineUtil.RelationDefine(sourceServiceId, destServiceId, componentId));
return splitJointId;
}
public void buildEntityId() {
String splitJointId = String.valueOf(sourceServiceId);
splitJointId += Const.ID_SPLIT + String.valueOf(destServiceId);
splitJointId += Const.ID_SPLIT + String.valueOf(componentId);
entityId = splitJointId;
}
@Override public void combine(Metrics metrics) {
}
......
......@@ -98,7 +98,7 @@ public class ComponentLibraryCatalogService implements IComponentLibraryCatalogS
throw new InitialComponentCatalogException("Component name [" + name + "] in Component-Server-Mappings doesn't exist in component define. ");
}
if (!componentName2Id.containsKey(serverName)) {
throw new InitialComponentCatalogException("Server component name [" + serverName + "] in Component-Server-Mappings doesn't exist in component define. ");
throw new InitialComponentCatalogException("Server componentId name [" + serverName + "] in Component-Server-Mappings doesn't exist in component define. ");
}
componentId2ServerId.put(componentName2Id.get(name), componentName2Id.get(serverName));
......
......@@ -44,15 +44,15 @@ class TopologyBuilder {
this.componentLibraryCatalogService = moduleManager.find(CoreModule.NAME).provider().getService(IComponentLibraryCatalogService.class);
}
Topology build(List<Call> serviceRelationClientCalls, List<Call> serviceRelationServerCalls) {
Topology build(List<Call.CallDetail> serviceRelationClientCalls, List<Call.CallDetail> serviceRelationServerCalls) {
filterZeroSourceOrTargetReference(serviceRelationClientCalls);
filterZeroSourceOrTargetReference(serviceRelationServerCalls);
Map<Integer, Node> nodes = new HashMap<>();
List<Call> calls = new LinkedList<>();
Set<String> callIds = new HashSet<>();
HashMap<String, Call> callMap = new HashMap<>();
for (Call clientCall : serviceRelationClientCalls) {
for (Call.CallDetail clientCall : serviceRelationClientCalls) {
ServiceInventory source = serviceInventoryCache.get(clientCall.getSource());
ServiceInventory target = serviceInventoryCache.get(clientCall.getTarget());
......@@ -72,20 +72,25 @@ class TopologyBuilder {
}
String callId = source.getSequence() + Const.ID_SPLIT + target.getSequence();
if (!callIds.contains(callId)) {
callIds.add(callId);
if (!callMap.containsKey(callId)) {
Call call = new Call();
callMap.put(callId, call);
call.setSource(clientCall.getSource());
call.setTarget(clientCall.getTarget());
call.setId(clientCall.getId());
call.setDetectPoint(DetectPoint.CLIENT);
call.setCallType(componentLibraryCatalogService.getComponentName(clientCall.getComponentId()));
call.addDetectPoint(DetectPoint.CLIENT);
call.addSourceComponent(componentLibraryCatalogService.getComponentName(clientCall.getComponentId()));
calls.add(call);
} else {
Call call = callMap.get(callId);
call.addDetectPoint(DetectPoint.CLIENT);
call.addSourceComponent(componentLibraryCatalogService.getComponentName(clientCall.getComponentId()));
}
}
for (Call serverCall : serviceRelationServerCalls) {
for (Call.CallDetail serverCall : serviceRelationServerCalls) {
ServiceInventory source = serviceInventoryCache.get(serverCall.getSource());
ServiceInventory target = serviceInventoryCache.get(serverCall.getTarget());
......@@ -112,21 +117,22 @@ class TopologyBuilder {
}
String callId = source.getSequence() + Const.ID_SPLIT + target.getSequence();
if (!callIds.contains(callId)) {
callIds.add(callId);
if (!callMap.containsKey(callId)) {
Call call = new Call();
callMap.put(callId, call);
call.setSource(serverCall.getSource());
call.setTarget(serverCall.getTarget());
call.setId(serverCall.getId());
call.setDetectPoint(DetectPoint.SERVER);
call.addDetectPoint(DetectPoint.SERVER);
call.addTargetComponent(componentLibraryCatalogService.getComponentName(serverCall.getComponentId()));
calls.add(call);
} else {
Call call = callMap.get(callId);
if (source.getSequence() == Const.USER_SERVICE_ID) {
call.setCallType(Const.EMPTY_STRING);
} else {
call.setCallType(componentLibraryCatalogService.getComponentName(serverCall.getComponentId()));
}
call.addDetectPoint(DetectPoint.SERVER);
call.addTargetComponent(componentLibraryCatalogService.getComponentName(serverCall.getComponentId()));
}
if (!nodes.containsKey(source.getSequence())) {
......@@ -160,9 +166,9 @@ class TopologyBuilder {
return serviceNode;
}
private void filterZeroSourceOrTargetReference(List<Call> serviceRelationClientCalls) {
private void filterZeroSourceOrTargetReference(List<Call.CallDetail> serviceRelationClientCalls) {
for (int i = serviceRelationClientCalls.size() - 1; i >= 0; i--) {
Call call = serviceRelationClientCalls.get(i);
Call.CallDetail call = serviceRelationClientCalls.get(i);
if (call.getSource() == 0 || call.getTarget() == 0) {
serviceRelationClientCalls.remove(i);
}
......
......@@ -81,8 +81,8 @@ public class TopologyQueryService implements Service {
public Topology getGlobalTopology(final Step step, final long startTB, final long endTB, final long startTimestamp,
final long endTimestamp) throws IOException {
logger.debug("step: {}, startTimeBucket: {}, endTimeBucket: {}", step, startTB, endTB);
List<Call> serviceRelationServerCalls = getTopologyQueryDAO().loadServerSideServiceRelations(step, startTB, endTB);
List<Call> serviceRelationClientCalls = getTopologyQueryDAO().loadClientSideServiceRelations(step, startTB, endTB);
List<Call.CallDetail> serviceRelationServerCalls = getTopologyQueryDAO().loadServerSideServiceRelations(step, startTB, endTB);
List<Call.CallDetail> serviceRelationClientCalls = getTopologyQueryDAO().loadClientSideServiceRelations(step, startTB, endTB);
TopologyBuilder builder = new TopologyBuilder(moduleManager);
Topology topology = builder.build(serviceRelationClientCalls, serviceRelationServerCalls);
......@@ -95,8 +95,8 @@ public class TopologyQueryService implements Service {
List<Integer> serviceIds = new ArrayList<>();
serviceIds.add(serviceId);
List<Call> serviceRelationClientCalls = getTopologyQueryDAO().loadSpecifiedClientSideServiceRelations(step, startTB, endTB, serviceIds);
List<Call> serviceRelationServerCalls = getTopologyQueryDAO().loadSpecifiedServerSideServiceRelations(step, startTB, endTB, serviceIds);
List<Call.CallDetail> serviceRelationClientCalls = getTopologyQueryDAO().loadSpecifiedClientSideServiceRelations(step, startTB, endTB, serviceIds);
List<Call.CallDetail> serviceRelationServerCalls = getTopologyQueryDAO().loadSpecifiedServerSideServiceRelations(step, startTB, endTB, serviceIds);
TopologyBuilder builder = new TopologyBuilder(moduleManager);
Topology topology = builder.build(serviceRelationClientCalls, serviceRelationServerCalls);
......@@ -104,10 +104,10 @@ public class TopologyQueryService implements Service {
List<Integer> sourceServiceIds = new ArrayList<>();
serviceRelationClientCalls.forEach(call -> sourceServiceIds.add(call.getSource()));
if (CollectionUtils.isNotEmpty(sourceServiceIds)) {
List<Call> sourceCalls = getTopologyQueryDAO().loadSpecifiedServerSideServiceRelations(step, startTB, endTB, sourceServiceIds);
List<Call.CallDetail> sourceCalls = getTopologyQueryDAO().loadSpecifiedServerSideServiceRelations(step, startTB, endTB, sourceServiceIds);
topology.getNodes().forEach(node -> {
if (Strings.isNullOrEmpty(node.getType())) {
for (Call call : sourceCalls) {
for (Call.CallDetail call : sourceCalls) {
if (node.getId() == call.getTarget()) {
node.setType(getComponentLibraryCatalogService().getComponentName(call.getComponentId()));
break;
......@@ -122,13 +122,17 @@ public class TopologyQueryService implements Service {
public Topology getEndpointTopology(final Step step, final long startTB, final long endTB,
final int endpointId) throws IOException {
List<Call> serverSideCalls = getTopologyQueryDAO().loadSpecifiedDestOfServerSideEndpointRelations(step, startTB, endTB, endpointId);
serverSideCalls.forEach(call -> call.setDetectPoint(DetectPoint.SERVER));
serverSideCalls.forEach(call -> call.setCallType(Const.EMPTY_STRING));
List<Call.CallDetail> serverSideCalls = getTopologyQueryDAO().loadSpecifiedDestOfServerSideEndpointRelations(step, startTB, endTB, endpointId);
Topology topology = new Topology();
topology.getCalls().addAll(serverSideCalls);
serverSideCalls.forEach(callDetail -> {
Call call = new Call();
call.setId(callDetail.getId());
call.setSource(callDetail.getSource());
call.setTarget(callDetail.getTarget());
call.addDetectPoint(DetectPoint.SERVER);
topology.getCalls().add(call);
});
Set<Integer> nodeIds = new HashSet<>();
serverSideCalls.forEach(call -> {
......
......@@ -18,7 +18,9 @@
package org.apache.skywalking.oap.server.core.query.entity;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
/**
......@@ -27,10 +29,68 @@ import org.apache.skywalking.oap.server.core.source.DetectPoint;
@Getter
@Setter
public class Call {
private int source;
private int target;
private int componentId;
private String callType;
private Integer source;
private Integer target;
private List<String> sourceComponents;
private List<String> targetComponents;
private String id;
private DetectPoint detectPoint;
private List<DetectPoint> detectPoints;
private List<Integer> sourceComponentIDs;
private List<Integer> targetComponentIDs;
public Call() {
sourceComponents = new ArrayList<>();
targetComponents = new ArrayList<>();
detectPoints = new ArrayList<>();
}
public void setSource(int source) {
this.source = source;
}
public void setTarget(int target) {
this.target = target;
}
public void addSourceComponentId(int componentId) {
sourceComponentIDs.add(componentId);
}
public void addTargetComponentId(int componentId) {
targetComponentIDs.add(componentId);
}
public void addSourceComponent(String component) {
if (!sourceComponents.contains(component)) {
sourceComponents.add(component);
}
}
public void addTargetComponent(String component) {
if (!targetComponents.contains(component)) {
targetComponents.add(component);
}
}
public void addDetectPoint(DetectPoint point) {
if (!detectPoints.contains(point)) {
detectPoints.add(point);
}
}
@Setter
@Getter
public static class CallDetail {
@Setter(AccessLevel.PRIVATE)
private String id;
private Integer source;
private Integer target;
private DetectPoint detectPoint;
private Integer componentId;
public void generateID() {
id = source + Const.ID_SPLIT + target;
}
}
}
......@@ -34,11 +34,7 @@ public class EndpointRelation extends Source {
}
@Override public String getEntityId() {
return String.valueOf(endpointId) + Const.ID_SPLIT + String.valueOf(childEndpointId) + Const.ID_SPLIT + String.valueOf(componentId);
}
public static String buildEntityId(int endpointId, int childEndpointId, int componentId) {
return String.valueOf(endpointId) + Const.ID_SPLIT + String.valueOf(childEndpointId) + Const.ID_SPLIT + String.valueOf(componentId);
return String.valueOf(endpointId) + Const.ID_SPLIT + String.valueOf(childEndpointId);
}
@Getter @Setter private int endpointId;
......
......@@ -34,27 +34,7 @@ public class ServiceRelation extends Source {
}
@Override public String getEntityId() {
return String.valueOf(sourceServiceId) + Const.ID_SPLIT + String.valueOf(destServiceId) + Const.ID_SPLIT + String.valueOf(componentId);
}
public static String buildEntityId(int sourceServiceId, int destServiceId, int componentId) {
return String.valueOf(sourceServiceId) + Const.ID_SPLIT + String.valueOf(destServiceId) + Const.ID_SPLIT + String.valueOf(componentId);
}
/**
* @param entityId
* @return 1. sourceServiceId 2. destServiceId 3. componentId
*/
public static Integer[] splitEntityId(String entityId) {
String[] parts = entityId.split(Const.ID_SPLIT);
if (parts.length != 3) {
throw new RuntimeException("Illegal ServiceRelation eneity id");
}
Integer[] ids = new Integer[3];
ids[0] = Integer.parseInt(parts[0]);
ids[1] = Integer.parseInt(parts[1]);
ids[2] = Integer.parseInt(parts[2]);
return ids;
return String.valueOf(sourceServiceId) + Const.ID_SPLIT + String.valueOf(destServiceId);
}
@Getter @Setter private int sourceServiceId;
......
......@@ -28,16 +28,16 @@ import org.apache.skywalking.oap.server.library.module.Service;
*/
public interface ITopologyQueryDAO extends Service {
List<Call> loadSpecifiedServerSideServiceRelations(Step step, long startTB, long endTB,
List<Call.CallDetail> loadSpecifiedServerSideServiceRelations(Step step, long startTB, long endTB,
List<Integer> serviceIds) throws IOException;
List<Call> loadSpecifiedClientSideServiceRelations(Step step, long startTB, long endTB,
List<Call.CallDetail> loadSpecifiedClientSideServiceRelations(Step step, long startTB, long endTB,
List<Integer> serviceIds) throws IOException;
List<Call> loadServerSideServiceRelations(Step step, long startTB, long endTB) throws IOException;
List<Call.CallDetail> loadServerSideServiceRelations(Step step, long startTB, long endTB) throws IOException;
List<Call> loadClientSideServiceRelations(Step step, long startTB, long endTB) throws IOException;
List<Call.CallDetail> loadClientSideServiceRelations(Step step, long startTB, long endTB) throws IOException;
List<Call> loadSpecifiedDestOfServerSideEndpointRelations(Step step, long startTB, long endTB,
List<Call.CallDetail> loadSpecifiedDestOfServerSideEndpointRelations(Step step, long startTB, long endTB,
int destEndpointId) throws IOException;
}
Subproject commit 02ddbfa8d84865e1a85a25f49933307970d0ab71
Subproject commit 6fc96650acc7f539fcdf6d51648525ce93e5fa1a
......@@ -19,25 +19,21 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.manual.RelationDefineUtil;
import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.EndpointRelationServerSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.EndpointRelationServerSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.ServiceRelationServerSideMetrics;
import org.apache.skywalking.oap.server.core.query.entity.Call;
import org.apache.skywalking.oap.server.core.query.entity.Step;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.core.source.ServiceRelation;
import org.apache.skywalking.oap.server.core.storage.DownSamplingModelNameBuilder;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.builder.SearchSourceBuilder;
......@@ -52,7 +48,7 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
}
@Override
public List<Call> loadSpecifiedServerSideServiceRelations(Step step, long startTB, long endTB,
public List<Call.CallDetail> loadSpecifiedServerSideServiceRelations(Step step, long startTB, long endTB,
List<Integer> serviceIds) throws IOException {
if (CollectionUtils.isEmpty(serviceIds)) {
throw new UnexpectedException("Service id is empty");
......@@ -67,7 +63,7 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
}
@Override
public List<Call> loadSpecifiedClientSideServiceRelations(Step step, long startTB, long endTB,
public List<Call.CallDetail> loadSpecifiedClientSideServiceRelations(Step step, long startTB, long endTB,
List<Integer> serviceIds) throws IOException {
if (CollectionUtils.isEmpty(serviceIds)) {
throw new UnexpectedException("Service id is empty");
......@@ -99,7 +95,8 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
sourceBuilder.query(boolQuery);
}
@Override public List<Call> loadServerSideServiceRelations(Step step, long startTB, long endTB) throws IOException {
@Override public List<Call.CallDetail> loadServerSideServiceRelations(Step step, long startTB,
long endTB) throws IOException {
String indexName = DownSamplingModelNameBuilder.build(step, ServiceRelationServerSideMetrics.INDEX_NAME);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideMetrics.TIME_BUCKET).gte(startTB).lte(endTB));
......@@ -108,7 +105,8 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
return load(sourceBuilder, indexName, DetectPoint.SERVER);
}
@Override public List<Call> loadClientSideServiceRelations(Step step, long startTB, long endTB) throws IOException {
@Override public List<Call.CallDetail> loadClientSideServiceRelations(Step step, long startTB,
long endTB) throws IOException {
String indexName = DownSamplingModelNameBuilder.build(step, ServiceRelationClientSideMetrics.INDEX_NAME);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideMetrics.TIME_BUCKET).gte(startTB).lte(endTB));
......@@ -118,7 +116,7 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
}
@Override
public List<Call> loadSpecifiedDestOfServerSideEndpointRelations(Step step, long startTB, long endTB,
public List<Call.CallDetail> loadSpecifiedDestOfServerSideEndpointRelations(Step step, long startTB, long endTB,
int destEndpointId) throws IOException {
String indexName = DownSamplingModelNameBuilder.build(step, EndpointRelationServerSideMetrics.INDEX_NAME);
......@@ -138,24 +136,24 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
return load(sourceBuilder, indexName, DetectPoint.SERVER);
}
private List<Call> load(SearchSourceBuilder sourceBuilder, String indexName,
private List<Call.CallDetail> load(SearchSourceBuilder sourceBuilder, String indexName,
DetectPoint detectPoint) throws IOException {
sourceBuilder.aggregation(AggregationBuilders.terms(Metrics.ENTITY_ID).field(Metrics.ENTITY_ID).size(1000));
SearchResponse response = getClient().search(indexName, sourceBuilder);
List<Call> calls = new ArrayList<>();
List<Call.CallDetail> calls = new ArrayList<>();
Terms entityTerms = response.getAggregations().get(Metrics.ENTITY_ID);
for (Terms.Bucket entityBucket : entityTerms.getBuckets()) {
String entityId = entityBucket.getKeyAsString();
Integer[] entityIds = ServiceRelation.splitEntityId(entityId);
Call call = new Call();
call.setId(entityId);
call.setSource(entityIds[0]);
call.setTarget(entityIds[1]);
call.setComponentId(entityIds[2]);
RelationDefineUtil.RelationDefine relationDefine = RelationDefineUtil.splitEntityId(entityId);
Call.CallDetail call = new Call.CallDetail();
call.setSource(relationDefine.getSource());
call.setTarget(relationDefine.getDest());
call.setComponentId(relationDefine.getComponentId());
call.setDetectPoint(detectPoint);
call.generateID();
calls.add(call);
}
return calls;
......
......@@ -19,19 +19,14 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.sql.*;
import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.manual.RelationDefineUtil;
import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.EndpointRelationServerSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.ServiceRelationServerSideMetrics;
import org.apache.skywalking.oap.server.core.query.entity.Call;
import org.apache.skywalking.oap.server.core.query.entity.Step;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.core.source.ServiceRelation;
import org.apache.skywalking.oap.server.core.storage.DownSamplingModelNameBuilder;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
......@@ -46,38 +41,41 @@ public class H2TopologyQueryDAO implements ITopologyQueryDAO {
this.h2Client = h2Client;
}
@Override public List<Call> loadSpecifiedServerSideServiceRelations(Step step, long startTB, long endTB,
@Override public List<Call.CallDetail> loadSpecifiedServerSideServiceRelations(Step step, long startTB, long endTB,
List<Integer> serviceIds) throws IOException {
String tableName = DownSamplingModelNameBuilder.build(step, ServiceRelationServerSideMetrics.INDEX_NAME);
return loadServiceCalls(tableName, startTB, endTB, ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID, ServiceRelationServerSideMetrics.DEST_SERVICE_ID, serviceIds, true);
}
@Override public List<Call> loadSpecifiedClientSideServiceRelations(Step step, long startTB, long endTB,
@Override public List<Call.CallDetail> loadSpecifiedClientSideServiceRelations(Step step, long startTB, long endTB,
List<Integer> serviceIds) throws IOException {
String tableName = DownSamplingModelNameBuilder.build(step, ServiceRelationClientSideMetrics.INDEX_NAME);
return loadServiceCalls(tableName, startTB, endTB, ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID, ServiceRelationServerSideMetrics.DEST_SERVICE_ID, serviceIds, false);
}
@Override public List<Call> loadServerSideServiceRelations(Step step, long startTB, long endTB) throws IOException {
@Override public List<Call.CallDetail> loadServerSideServiceRelations(Step step, long startTB,
long endTB) throws IOException {
String tableName = DownSamplingModelNameBuilder.build(step, ServiceRelationServerSideMetrics.INDEX_NAME);
return loadServiceCalls(tableName, startTB, endTB, ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID, ServiceRelationServerSideMetrics.DEST_SERVICE_ID, new ArrayList<>(0), false);
}
@Override public List<Call> loadClientSideServiceRelations(Step step, long startTB, long endTB) throws IOException {
@Override public List<Call.CallDetail> loadClientSideServiceRelations(Step step, long startTB,
long endTB) throws IOException {
String tableName = DownSamplingModelNameBuilder.build(step, ServiceRelationClientSideMetrics.INDEX_NAME);
return loadServiceCalls(tableName, startTB, endTB, ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID, ServiceRelationServerSideMetrics.DEST_SERVICE_ID, new ArrayList<>(0), true);
}
@Override public List<Call> loadSpecifiedDestOfServerSideEndpointRelations(Step step, long startTB, long endTB,
@Override
public List<Call.CallDetail> loadSpecifiedDestOfServerSideEndpointRelations(Step step, long startTB, long endTB,
int destEndpointId) throws IOException {
String tableName = DownSamplingModelNameBuilder.build(step, EndpointRelationServerSideMetrics.INDEX_NAME);
List<Call> calls = loadEndpointFromSide(tableName, startTB, endTB, EndpointRelationServerSideMetrics.SOURCE_ENDPOINT_ID, EndpointRelationServerSideMetrics.DEST_ENDPOINT_ID, destEndpointId, false);
List<Call.CallDetail> calls = loadEndpointFromSide(tableName, startTB, endTB, EndpointRelationServerSideMetrics.SOURCE_ENDPOINT_ID, EndpointRelationServerSideMetrics.DEST_ENDPOINT_ID, destEndpointId, false);
calls.addAll(loadEndpointFromSide(tableName, startTB, endTB, EndpointRelationServerSideMetrics.SOURCE_ENDPOINT_ID, EndpointRelationServerSideMetrics.DEST_ENDPOINT_ID, destEndpointId, true));
return calls;
}
private List<Call> loadServiceCalls(String tableName, long startTB, long endTB, String sourceCName,
private List<Call.CallDetail> loadServiceCalls(String tableName, long startTB, long endTB, String sourceCName,
String destCName, List<Integer> serviceIds, boolean isClientSide) throws IOException {
Object[] conditions = new Object[serviceIds.size() * 2 + 2];
conditions[0] = startTB;
......@@ -95,7 +93,7 @@ public class H2TopologyQueryDAO implements ITopologyQueryDAO {
}
serviceIdMatchSql.append(")");
}
List<Call> calls = new ArrayList<>();
List<Call.CallDetail> calls = new ArrayList<>();
try (Connection connection = h2Client.getConnection()) {
try (ResultSet resultSet = h2Client.executeQuery(connection, "select "
+ Metrics.ENTITY_ID
......@@ -112,13 +110,13 @@ public class H2TopologyQueryDAO implements ITopologyQueryDAO {
return calls;
}
private List<Call> loadEndpointFromSide(String tableName, long startTB, long endTB, String sourceCName,
private List<Call.CallDetail> loadEndpointFromSide(String tableName, long startTB, long endTB, String sourceCName,
String destCName, int id, boolean isSourceId) throws IOException {
Object[] conditions = new Object[3];
conditions[0] = startTB;
conditions[1] = endTB;
conditions[2] = id;
List<Call> calls = new ArrayList<>();
List<Call.CallDetail> calls = new ArrayList<>();
try (Connection connection = h2Client.getConnection()) {
try (ResultSet resultSet = h2Client.executeQuery(connection, "select "
+ Metrics.ENTITY_ID
......@@ -135,17 +133,22 @@ public class H2TopologyQueryDAO implements ITopologyQueryDAO {
return calls;
}
private void buildCalls(ResultSet resultSet, List<Call> calls, boolean isClientSide) throws SQLException {
private void buildCalls(ResultSet resultSet, List<Call.CallDetail> calls,
boolean isClientSide) throws SQLException {
while (resultSet.next()) {
Call call = new Call();
Call.CallDetail call = new Call.CallDetail();
String entityId = resultSet.getString(Metrics.ENTITY_ID);
Integer[] entityIds = ServiceRelation.splitEntityId(entityId);
RelationDefineUtil.RelationDefine relationDefine = RelationDefineUtil.splitEntityId(entityId);
call.setSource(entityIds[0]);
call.setTarget(entityIds[1]);
call.setComponentId(entityIds[2]);
call.setDetectPoint(isClientSide ? DetectPoint.CLIENT : DetectPoint.SERVER);
call.setId(entityId);
call.setSource(relationDefine.getSource());
call.setTarget(relationDefine.getDest());
call.setComponentId(relationDefine.getComponentId());
if (isClientSide) {
call.setDetectPoint(DetectPoint.CLIENT);
} else {
call.setDetectPoint(DetectPoint.SERVER);
}
call.generateID();
calls.add(call);
}
}
......
Subproject commit 973a272ddf62a3ec646f33e749eb34a61fe74582
Subproject commit f08fbcfa05915dc19d48814ad0bfa572c0309dc1
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册