提交 366609a4 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Endpoint topology metric and query. (#1674)

上级 64d886f1
......@@ -30,10 +30,10 @@ import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancej
import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancejvmmemorypool.ServiceInstanceJVMMemoryPoolDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancerelation.ServiceInstanceRelationDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.servicerelation.ServiceRelationDispatcher;
import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.EndpointCallRelationDispatcher;
import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.ServiceCallRelationDispatcher;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.source.Source;
import org.apache.skywalking.oap.server.core.source.*;
import org.slf4j.*;
/**
......@@ -59,7 +59,7 @@ public class DispatcherManager {
this.dispatcherMap.put(Scope.ServiceRelation, new SourceDispatcher[] {new ServiceRelationDispatcher(), new ServiceCallRelationDispatcher()});
this.dispatcherMap.put(Scope.ServiceInstanceRelation, new SourceDispatcher[] {new ServiceInstanceRelationDispatcher()});
this.dispatcherMap.put(Scope.EndpointRelation, new SourceDispatcher[] {new EndpointRelationDispatcher()});
this.dispatcherMap.put(Scope.EndpointRelation, new SourceDispatcher[] {new EndpointRelationDispatcher(), new EndpointCallRelationDispatcher()});
this.dispatcherMap.put(Scope.ServiceInstanceJVMCPU, new SourceDispatcher[] {new ServiceInstanceJVMCPUDispatcher()});
this.dispatcherMap.put(Scope.ServiceInstanceJVMGC, new SourceDispatcher[] {new ServiceInstanceJVMGCDispatcher()});
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess;
import org.apache.skywalking.oap.server.core.source.EndpointRelation;
/**
* @author wusheng
*/
public class EndpointCallRelationDispatcher implements SourceDispatcher<EndpointRelation> {
@Override
public void dispatch(EndpointRelation source) {
switch (source.getDetectPoint()) {
case CLIENT:
clientSide(source);
break;
case SERVER:
serverSide(source);
break;
}
}
private void serverSide(EndpointRelation source) {
EndpointRelationServerSideIndicator indicator = new EndpointRelationServerSideIndicator();
indicator.setTimeBucket(source.getTimeBucket());
indicator.setSourceEndpointId(source.getEndpointId());
indicator.setDestEndpointId(source.getChildEndpointId());
IndicatorProcess.INSTANCE.in(indicator);
}
private void clientSide(EndpointRelation source) {
EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator();
indicator.setTimeBucket(source.getTimeBucket());
indicator.setSourceEndpointId(source.getEndpointId());
indicator.setDestEndpointId(source.getChildEndpointId());
IndicatorProcess.INSTANCE.in(indicator);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
@IndicatorType
@StreamData
@StorageEntity(name = EndpointRelationClientSideIndicator.INDEX_NAME, builder = EndpointRelationClientSideIndicator.Builder.class)
public class EndpointRelationClientSideIndicator extends Indicator {
public static final String INDEX_NAME = "endpoint_relation_client_side";
public static final String SOURCE_ENDPOINT_ID = "source_endpoint_id";
public static final String DEST_ENDPOINT_ID = "dest_endpoint_id";
@Setter @Getter @Column(columnName = SOURCE_ENDPOINT_ID) @IDColumn private int sourceEndpointId;
@Setter @Getter @Column(columnName = DEST_ENDPOINT_ID) @IDColumn private int destEndpointId;
@Override public String id() {
String splitJointId = String.valueOf(getTimeBucket());
splitJointId += Const.ID_SPLIT + String.valueOf(sourceEndpointId);
splitJointId += Const.ID_SPLIT + String.valueOf(destEndpointId);
return splitJointId;
}
@Override public void combine(Indicator indicator) {
}
@Override public void calculate() {
}
@Override public Indicator toHour() {
EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator();
indicator.setTimeBucket(toTimeBucketInHour());
indicator.setSourceEndpointId(getSourceEndpointId());
indicator.setDestEndpointId(getDestEndpointId());
return indicator;
}
@Override public Indicator toDay() {
EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator();
indicator.setTimeBucket(toTimeBucketInDay());
indicator.setSourceEndpointId(getSourceEndpointId());
indicator.setDestEndpointId(getDestEndpointId());
return indicator;
}
@Override public Indicator toMonth() {
EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator();
indicator.setTimeBucket(toTimeBucketInMonth());
indicator.setSourceEndpointId(getSourceEndpointId());
indicator.setDestEndpointId(getDestEndpointId());
return indicator;
}
@Override public int remoteHashCode() {
int result = 17;
result = 31 * result + sourceEndpointId;
result = 31 * result + destEndpointId;
return result;
}
@Override public void deserialize(RemoteData remoteData) {
setSourceEndpointId(remoteData.getDataIntegers(0));
setDestEndpointId(remoteData.getDataIntegers(1));
setTimeBucket(remoteData.getDataLongs(0));
}
@Override public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.setDataIntegers(1, getDestEndpointId());
remoteBuilder.setDataIntegers(0, getSourceEndpointId());
remoteBuilder.setDataLongs(0, getTimeBucket());
return remoteBuilder;
}
@Override public int hashCode() {
int result = 17;
result = 31 * result + sourceEndpointId;
result = 31 * result + destEndpointId;
result = 31 * result + (int)getTimeBucket();
return result;
}
@Override public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
EndpointRelationClientSideIndicator indicator = (EndpointRelationClientSideIndicator)obj;
if (sourceEndpointId != indicator.sourceEndpointId)
return false;
if (destEndpointId != indicator.destEndpointId)
return false;
if (getTimeBucket() != indicator.getTimeBucket())
return false;
return true;
}
public static class Builder implements StorageBuilder<EndpointRelationClientSideIndicator> {
@Override public EndpointRelationClientSideIndicator map2Data(Map<String, Object> dbMap) {
EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator();
indicator.setSourceEndpointId(((Number)dbMap.get(SOURCE_ENDPOINT_ID)).intValue());
indicator.setDestEndpointId(((Number)dbMap.get(DEST_ENDPOINT_ID)).intValue());
indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
return indicator;
}
@Override public Map<String, Object> data2Map(EndpointRelationClientSideIndicator storageData) {
Map<String, Object> map = new HashMap<>();
map.put(TIME_BUCKET, storageData.getTimeBucket());
map.put(SOURCE_ENDPOINT_ID, storageData.getSourceEndpointId());
map.put(DEST_ENDPOINT_ID, storageData.getDestEndpointId());
return map;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
@IndicatorType
@StreamData
@StorageEntity(name = EndpointRelationServerSideIndicator.INDEX_NAME, builder = EndpointRelationServerSideIndicator.Builder.class)
public class EndpointRelationServerSideIndicator extends Indicator {
public static final String INDEX_NAME = "endpoint_relation_server_side";
public static final String SOURCE_ENDPOINT_ID = "source_endpoint_id";
public static final String DEST_ENDPOINT_ID = "dest_endpoint_id";
@Setter @Getter @Column(columnName = SOURCE_ENDPOINT_ID) @IDColumn private int sourceEndpointId;
@Setter @Getter @Column(columnName = DEST_ENDPOINT_ID) @IDColumn private int destEndpointId;
@Override public String id() {
String splitJointId = String.valueOf(getTimeBucket());
splitJointId += Const.ID_SPLIT + String.valueOf(sourceEndpointId);
splitJointId += Const.ID_SPLIT + String.valueOf(destEndpointId);
return splitJointId;
}
@Override public void combine(Indicator indicator) {
}
@Override public void calculate() {
}
@Override public Indicator toHour() {
EndpointRelationServerSideIndicator indicator = new EndpointRelationServerSideIndicator();
indicator.setTimeBucket(toTimeBucketInHour());
indicator.setSourceEndpointId(getSourceEndpointId());
indicator.setDestEndpointId(getDestEndpointId());
return indicator;
}
@Override public Indicator toDay() {
EndpointRelationServerSideIndicator indicator = new EndpointRelationServerSideIndicator();
indicator.setTimeBucket(toTimeBucketInDay());
indicator.setSourceEndpointId(getSourceEndpointId());
indicator.setDestEndpointId(getDestEndpointId());
return indicator;
}
@Override public Indicator toMonth() {
EndpointRelationServerSideIndicator indicator = new EndpointRelationServerSideIndicator();
indicator.setTimeBucket(toTimeBucketInMonth());
indicator.setSourceEndpointId(getSourceEndpointId());
indicator.setDestEndpointId(getDestEndpointId());
return indicator;
}
@Override public int remoteHashCode() {
int result = 17;
result = 31 * result + sourceEndpointId;
result = 31 * result + destEndpointId;
return result;
}
@Override public void deserialize(RemoteData remoteData) {
setSourceEndpointId(remoteData.getDataIntegers(0));
setDestEndpointId(remoteData.getDataIntegers(1));
setTimeBucket(remoteData.getDataLongs(0));
}
@Override public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.setDataIntegers(0, getSourceEndpointId());
remoteBuilder.setDataIntegers(1, getDestEndpointId());
remoteBuilder.setDataLongs(0, getTimeBucket());
return remoteBuilder;
}
@Override public int hashCode() {
int result = 17;
result = 31 * result + sourceEndpointId;
result = 31 * result + destEndpointId;
result = 31 * result + (int)getTimeBucket();
return result;
}
@Override public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
EndpointRelationServerSideIndicator indicator = (EndpointRelationServerSideIndicator)obj;
if (sourceEndpointId != indicator.sourceEndpointId)
return false;
if (destEndpointId != indicator.destEndpointId)
return false;
if (getTimeBucket() != indicator.getTimeBucket())
return false;
return true;
}
public static class Builder implements StorageBuilder<EndpointRelationServerSideIndicator> {
@Override public EndpointRelationServerSideIndicator map2Data(Map<String, Object> dbMap) {
EndpointRelationServerSideIndicator indicator = new EndpointRelationServerSideIndicator();
indicator.setSourceEndpointId(((Number)dbMap.get(SOURCE_ENDPOINT_ID)).intValue());
indicator.setDestEndpointId(((Number)dbMap.get(DEST_ENDPOINT_ID)).intValue());
indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
return indicator;
}
@Override public Map<String, Object> data2Map(EndpointRelationServerSideIndicator storageData) {
Map<String, Object> map = new HashMap<>();
map.put(SOURCE_ENDPOINT_ID, storageData.getSourceEndpointId());
map.put(DEST_ENDPOINT_ID, storageData.getDestEndpointId());
map.put(TIME_BUCKET, storageData.getTimeBucket());
return map;
}
}
}
......@@ -37,12 +37,10 @@ class TopologyBuilder {
private static final Logger logger = LoggerFactory.getLogger(TopologyBuilder.class);
private final ServiceInventoryCache serviceInventoryCache;
// private final DateBetweenService dateBetweenService;
private final IComponentLibraryCatalogService componentLibraryCatalogService;
TopologyBuilder(ModuleManager moduleManager) {
this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).getService(ServiceInventoryCache.class);
// this.dateBetweenService = new DateBetweenService(moduleManager);
this.componentLibraryCatalogService = moduleManager.find(CoreModule.NAME).getService(IComponentLibraryCatalogService.class);
}
......@@ -56,8 +54,6 @@ class TopologyBuilder {
serviceRelationServerCalls = serverCallsFilter(serviceRelationServerCalls);
List<Node> nodes = new LinkedList<>();
Map<Integer, Integer> applicationMinuteBetweenMap = new HashMap<>();
List<Call> calls = new LinkedList<>();
Set<Integer> nodeIds = new HashSet<>();
serviceRelationClientCalls.forEach(clientCall -> {
......@@ -91,17 +87,13 @@ class TopologyBuilder {
int actualTargetId = mappings.getOrDefault(target.getSequence(), target.getSequence());
call.setTarget(actualTargetId);
call.setCallType(nodeCompMap.get(clientCall.getTarget()));
// try {
// call.setCpm(clientCall.getCalls() / getApplicationMinuteBetween(applicationMinuteBetweenMap, source.getSequence(), startSecondTimeBucket, endSecondTimeBucket));
// } catch (ParseException e) {
// logger.error(e.getMessage(), e);
// }
call.setId(clientCall.getId());
calls.add(call);
});
serviceRelationServerCalls.forEach(referenceMetric -> {
ServiceInventory source = serviceInventoryCache.get(referenceMetric.getSource());
ServiceInventory target = serviceInventoryCache.get(referenceMetric.getTarget());
serviceRelationServerCalls.forEach(serverCall -> {
ServiceInventory source = serviceInventoryCache.get(serverCall.getSource());
ServiceInventory target = serviceInventoryCache.get(serverCall.getTarget());
if (source.getSequence() == Const.NONE_SERVICE_ID) {
if (!nodeIds.contains(source.getSequence())) {
......@@ -128,17 +120,13 @@ class TopologyBuilder {
Call call = new Call();
call.setSource(source.getSequence());
call.setTarget(target.getSequence());
call.setId(serverCall.getId());
if (source.getSequence() == Const.NONE_SERVICE_ID) {
call.setCallType(Const.EMPTY_STRING);
} else {
call.setCallType(nodeCompMap.get(referenceMetric.getTarget()));
call.setCallType(nodeCompMap.get(serverCall.getTarget()));
}
// try {
// call.setCpm(referenceMetric.getCalls() / getApplicationMinuteBetween(applicationMinuteBetweenMap, target.getSequence(), startSecondTimeBucket, endSecondTimeBucket));
// } catch (ParseException e) {
// logger.error(e.getMessage(), e);
// }
calls.add(call);
});
......
......@@ -20,6 +20,9 @@ package org.apache.skywalking.oap.server.core.query;
import java.io.IOException;
import java.util.*;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
......@@ -37,6 +40,8 @@ public class TopologyQueryService implements Service {
private final ModuleManager moduleManager;
private ITopologyQueryDAO topologyQueryDAO;
private EndpointInventoryCache endpointInventoryCache;
private IComponentLibraryCatalogService componentLibraryCatalogService;
public TopologyQueryService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
......@@ -49,6 +54,20 @@ public class TopologyQueryService implements Service {
return topologyQueryDAO;
}
private IComponentLibraryCatalogService getComponentLibraryCatalogService() {
if (componentLibraryCatalogService == null) {
componentLibraryCatalogService = moduleManager.find(CoreModule.NAME).getService(IComponentLibraryCatalogService.class);
}
return componentLibraryCatalogService;
}
private EndpointInventoryCache getEndpointInventoryCache() {
if (endpointInventoryCache == null) {
endpointInventoryCache = moduleManager.find(CoreModule.NAME).getService(EndpointInventoryCache.class);
}
return endpointInventoryCache;
}
public Topology getGlobalTopology(final Step step, final long startTB, final long endTB) throws IOException {
logger.debug("step: {}, startTimeBucket: {}, endTimeBucket: {}", step, startTB, endTB);
List<ServiceComponent> serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB);
......@@ -95,4 +114,23 @@ public class TopologyQueryService implements Service {
return topology;
}
public Topology getEndpointTopology(final Step step, final long startTB, final long endTB,
final int endpointId) throws IOException {
List<ServiceComponent> serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB);
Map<Integer, String> components = new HashMap<>();
serviceComponents.forEach(component -> components.put(component.getServiceId(), getComponentLibraryCatalogService().getComponentName(component.getComponentId())));
List<Call> calls = getTopologyQueryDAO().loadSpecifiedDestOfServerSideEndpointRelations(step, startTB, endTB, endpointId);
calls.addAll(getTopologyQueryDAO().loadSpecifiedSourceOfClientSideEndpointRelations(step, startTB, endTB, endpointId));
calls.forEach(call -> {
call.setCallType(components.getOrDefault(getEndpointInventoryCache().get(call.getTarget()).getServiceId(), Const.UNKNOWN));
});
Topology topology = new Topology();
topology.getCalls().addAll(calls);
return topology;
}
}
......@@ -26,6 +26,5 @@ public class Call {
private int source;
private int target;
private String callType;
private long calls;
private long cpm;
private String id;
}
......@@ -34,6 +34,10 @@ public class EndpointRelation extends Source {
return String.valueOf(endpointId) + Const.ID_SPLIT + String.valueOf(childEndpointId);
}
public static String buildEntityId(int endpointId, int childEndpointId) {
return String.valueOf(endpointId) + Const.ID_SPLIT + String.valueOf(childEndpointId);
}
@Getter @Setter private int endpointId;
@Getter @Setter private String endpoint;
@Getter @Setter private int serviceId;
......
......@@ -34,6 +34,10 @@ public class ServiceRelation extends Source {
return String.valueOf(sourceServiceId) + Const.ID_SPLIT + String.valueOf(destServiceId);
}
public static String buildEntityId(int sourceServiceId, int destServiceId) {
return String.valueOf(sourceServiceId) + Const.ID_SPLIT + String.valueOf(destServiceId);
}
@Getter @Setter private int sourceServiceId;
@Getter @Setter private String sourceServiceName;
@Getter @Setter private String sourceServiceInstanceName;
......
......@@ -42,4 +42,10 @@ public interface ITopologyQueryDAO extends Service {
List<ServiceMapping> loadServiceMappings(Step step, long startTB, long endTB) throws IOException;
List<ServiceComponent> loadServiceComponents(Step step, long startTB, long endTB) throws IOException;
List<Call> loadSpecifiedDestOfServerSideEndpointRelations(Step step, long startTB, long endTB,
int destEndpointId) throws IOException;
List<Call> loadSpecifiedSourceOfClientSideEndpointRelations(Step step, long startTB, long endTB,
int sourceEndpointId) throws IOException;
}
......@@ -56,4 +56,11 @@ public class TopologyQuery implements GraphQLQueryResolver {
return getQueryService().getServiceTopology(duration.getStep(), startTimeBucket, endTimeBucket, serviceId);
}
public Topology getEndpointTopology(final int endpointId, final Duration duration) throws IOException {
long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
return getQueryService().getEndpointTopology(duration.getStep(), startTimeBucket, endTimeBucket, endpointId);
}
}
Subproject commit c6ee5d15f38024c3be61be579eac4035ffbfd4bf
Subproject commit b93baa58852595164aae50e3e7be37683c220175
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import java.io.IOException;
import java.util.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.*;
import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.*;
import org.apache.skywalking.oap.server.core.query.entity.*;
......@@ -57,18 +58,22 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
setQueryCondition(sourceBuilder, startTB, endTB, serviceIds);
String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationServerSideIndicator.INDEX_NAME);
return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID);
return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID, Source.Service);
}
@Override
public List<Call> loadSpecifiedClientSideServiceRelations(Step step, long startTB, long endTB,
List<Integer> serviceIds) throws IOException {
if (CollectionUtils.isEmpty(serviceIds)) {
throw new UnexpectedException("Service id is null");
}
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.size(0);
setQueryCondition(sourceBuilder, startTB, endTB, serviceIds);
String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationClientSideIndicator.INDEX_NAME);
return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID);
return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID, Source.Service);
}
private void setQueryCondition(SearchSourceBuilder sourceBuilder, long startTB, long endTB,
......@@ -95,7 +100,7 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
sourceBuilder.size(0);
return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID);
return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID, Source.Service);
}
@Override public List<Call> loadClientSideServiceRelations(Step step, long startTB, long endTB) throws IOException {
......@@ -104,29 +109,7 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
sourceBuilder.size(0);
return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID);
}
private List<Call> load(SearchSourceBuilder sourceBuilder, String indexName, String sourceCName,
String destCName) throws IOException {
TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(sourceCName).field(sourceCName).size(1000);
sourceAggregation.subAggregation(AggregationBuilders.terms(destCName).field(destCName).size(1000));
sourceBuilder.aggregation(sourceAggregation);
SearchResponse response = getClient().search(indexName, sourceBuilder);
List<Call> calls = new ArrayList<>();
Terms sourceTerms = response.getAggregations().get(sourceCName);
for (Terms.Bucket sourceBucket : sourceTerms.getBuckets()) {
Terms destTerms = sourceBucket.getAggregations().get(destCName);
for (Terms.Bucket destBucket : destTerms.getBuckets()) {
Call value = new Call();
value.setSource(sourceBucket.getKeyAsNumber().intValue());
value.setTarget(destBucket.getKeyAsNumber().intValue());
calls.add(value);
}
}
return calls;
return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID, Source.Service);
}
@Override public List<ServiceMapping> loadServiceMappings(Step step, long startTB, long endTB) throws IOException {
......@@ -181,4 +164,70 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
}
return serviceComponents;
}
@Override
public List<Call> loadSpecifiedDestOfServerSideEndpointRelations(Step step, long startTB, long endTB,
int destEndpointId) throws IOException {
String indexName = TimePyramidTableNameBuilder.build(step, EndpointRelationServerSideIndicator.INDEX_NAME);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.size(0);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(QueryBuilders.rangeQuery(EndpointRelationServerSideIndicator.TIME_BUCKET).gte(startTB).lte(endTB));
boolQuery.must().add(QueryBuilders.termQuery(EndpointRelationServerSideIndicator.DEST_ENDPOINT_ID, destEndpointId));
sourceBuilder.query(boolQuery);
return load(sourceBuilder, indexName, EndpointRelationServerSideIndicator.SOURCE_ENDPOINT_ID, EndpointRelationServerSideIndicator.DEST_ENDPOINT_ID, Source.Endpoint);
}
@Override
public List<Call> loadSpecifiedSourceOfClientSideEndpointRelations(Step step, long startTB, long endTB,
int sourceEndpointId) throws IOException {
String indexName = TimePyramidTableNameBuilder.build(step, EndpointRelationClientSideIndicator.INDEX_NAME);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.size(0);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(QueryBuilders.rangeQuery(EndpointRelationClientSideIndicator.TIME_BUCKET).gte(startTB).lte(endTB));
boolQuery.must().add(QueryBuilders.termQuery(EndpointRelationClientSideIndicator.SOURCE_ENDPOINT_ID, sourceEndpointId));
sourceBuilder.query(boolQuery);
return load(sourceBuilder, indexName, EndpointRelationClientSideIndicator.SOURCE_ENDPOINT_ID, EndpointRelationClientSideIndicator.DEST_ENDPOINT_ID, Source.Endpoint);
}
private List<Call> load(SearchSourceBuilder sourceBuilder, String indexName, String sourceCName,
String destCName, Source source) throws IOException {
TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(sourceCName).field(sourceCName).size(1000);
sourceAggregation.subAggregation(AggregationBuilders.terms(destCName).field(destCName).size(1000));
sourceBuilder.aggregation(sourceAggregation);
SearchResponse response = getClient().search(indexName, sourceBuilder);
List<Call> calls = new ArrayList<>();
Terms sourceTerms = response.getAggregations().get(sourceCName);
for (Terms.Bucket sourceBucket : sourceTerms.getBuckets()) {
Terms destTerms = sourceBucket.getAggregations().get(destCName);
for (Terms.Bucket destBucket : destTerms.getBuckets()) {
Call value = new Call();
value.setSource(sourceBucket.getKeyAsNumber().intValue());
value.setTarget(destBucket.getKeyAsNumber().intValue());
switch (source) {
case Service:
value.setId(ServiceRelation.buildEntityId(value.getSource(), value.getTarget()));
break;
case Endpoint:
value.setId(EndpointRelation.buildEntityId(value.getSource(), value.getTarget()));
break;
}
calls.add(value);
}
}
return calls;
}
enum Source {
Service, Endpoint
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册