提交 babc6d29 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Global topology and service topology write by manual. (#1671)

上级 fd83aa55
......@@ -28,11 +28,26 @@ import org.apache.skywalking.oap.server.core.source.ServiceRelation;
public class ServiceCallRelationDispatcher implements SourceDispatcher<ServiceRelation> {
@Override
public void dispatch(ServiceRelation source) {
doDispatch(source);
switch (source.getDetectPoint()) {
case SERVER:
serverSide(source);
break;
case CLIENT:
clientSide(source);
break;
}
}
public void doDispatch(ServiceRelation source) {
ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
private void serverSide(ServiceRelation source) {
ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator();
indicator.setTimeBucket(source.getTimeBucket());
indicator.setSourceServiceId(source.getSourceServiceId());
indicator.setDestServiceId(source.getDestServiceId());
IndicatorProcess.INSTANCE.in(indicator);
}
private void clientSide(ServiceRelation source) {
ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator();
indicator.setTimeBucket(source.getTimeBucket());
indicator.setSourceServiceId(source.getSourceServiceId());
indicator.setDestServiceId(source.getDestServiceId());
......
......@@ -18,26 +18,22 @@
package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation;
import java.util.HashMap;
import java.util.Map;
import lombok.Getter;
import lombok.Setter;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.IDColumn;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntity;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
@IndicatorType
@StreamData
@StorageEntity(name = ServiceCallRelationIndicator.INDEX_NAME, builder = ServiceCallRelationIndicator.Builder.class)
public class ServiceCallRelationIndicator extends Indicator {
@StorageEntity(name = ServiceRelationClientSideIndicator.INDEX_NAME, builder = ServiceRelationClientSideIndicator.Builder.class)
public class ServiceRelationClientSideIndicator extends Indicator {
public static final String INDEX_NAME = "service_call_relation";
public static final String INDEX_NAME = "service_relation_client_side";
public static final String SOURCE_SERVICE_ID = "source_service_id";
public static final String DEST_SERVICE_ID = "dest_service_id";
......@@ -60,7 +56,7 @@ public class ServiceCallRelationIndicator extends Indicator {
}
@Override public Indicator toHour() {
ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator();
indicator.setTimeBucket(toTimeBucketInHour());
indicator.setSourceServiceId(getSourceServiceId());
indicator.setDestServiceId(getDestServiceId());
......@@ -68,7 +64,7 @@ public class ServiceCallRelationIndicator extends Indicator {
}
@Override public Indicator toDay() {
ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator();
indicator.setTimeBucket(toTimeBucketInDay());
indicator.setSourceServiceId(getSourceServiceId());
indicator.setDestServiceId(getDestServiceId());
......@@ -76,7 +72,7 @@ public class ServiceCallRelationIndicator extends Indicator {
}
@Override public Indicator toMonth() {
ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator();
indicator.setTimeBucket(toTimeBucketInMonth());
indicator.setSourceServiceId(getSourceServiceId());
indicator.setDestServiceId(getDestServiceId());
......@@ -99,8 +95,8 @@ public class ServiceCallRelationIndicator extends Indicator {
@Override public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.setDataIntegers(0, getSourceServiceId());
remoteBuilder.setDataIntegers(1, getDestServiceId());
remoteBuilder.setDataIntegers(0, getSourceServiceId());
remoteBuilder.setDataLongs(0, getTimeBucket());
return remoteBuilder;
......@@ -122,7 +118,7 @@ public class ServiceCallRelationIndicator extends Indicator {
if (getClass() != obj.getClass())
return false;
ServiceCallRelationIndicator indicator = (ServiceCallRelationIndicator)obj;
ServiceRelationClientSideIndicator indicator = (ServiceRelationClientSideIndicator)obj;
if (sourceServiceId != indicator.sourceServiceId)
return false;
if (destServiceId != indicator.destServiceId)
......@@ -134,21 +130,21 @@ public class ServiceCallRelationIndicator extends Indicator {
return true;
}
public static class Builder implements StorageBuilder<ServiceCallRelationIndicator> {
public static class Builder implements StorageBuilder<ServiceRelationClientSideIndicator> {
@Override public ServiceCallRelationIndicator map2Data(Map<String, Object> dbMap) {
ServiceCallRelationIndicator indicator = new ServiceCallRelationIndicator();
@Override public ServiceRelationClientSideIndicator map2Data(Map<String, Object> dbMap) {
ServiceRelationClientSideIndicator indicator = new ServiceRelationClientSideIndicator();
indicator.setSourceServiceId(((Number)dbMap.get(SOURCE_SERVICE_ID)).intValue());
indicator.setDestServiceId(((Number)dbMap.get(DEST_SERVICE_ID)).intValue());
indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
return indicator;
}
@Override public Map<String, Object> data2Map(ServiceCallRelationIndicator storageData) {
@Override public Map<String, Object> data2Map(ServiceRelationClientSideIndicator storageData) {
Map<String, Object> map = new HashMap<>();
map.put(TIME_BUCKET, storageData.getTimeBucket());
map.put(SOURCE_SERVICE_ID, storageData.getSourceServiceId());
map.put(DEST_SERVICE_ID, storageData.getDestServiceId());
map.put(TIME_BUCKET, storageData.getTimeBucket());
return map;
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
@IndicatorType
@StreamData
@StorageEntity(name = ServiceRelationServerSideIndicator.INDEX_NAME, builder = ServiceRelationServerSideIndicator.Builder.class)
public class ServiceRelationServerSideIndicator extends Indicator {
public static final String INDEX_NAME = "service_relation_server_side";
public static final String SOURCE_SERVICE_ID = "source_service_id";
public static final String DEST_SERVICE_ID = "dest_service_id";
@Setter @Getter @Column(columnName = SOURCE_SERVICE_ID) @IDColumn private int sourceServiceId;
@Setter @Getter @Column(columnName = DEST_SERVICE_ID) @IDColumn private int destServiceId;
@Override public String id() {
String splitJointId = String.valueOf(getTimeBucket());
splitJointId += Const.ID_SPLIT + String.valueOf(sourceServiceId);
splitJointId += Const.ID_SPLIT + String.valueOf(destServiceId);
return splitJointId;
}
@Override public void combine(Indicator indicator) {
}
@Override public void calculate() {
}
@Override public Indicator toHour() {
ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator();
indicator.setTimeBucket(toTimeBucketInHour());
indicator.setSourceServiceId(getSourceServiceId());
indicator.setDestServiceId(getDestServiceId());
return indicator;
}
@Override public Indicator toDay() {
ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator();
indicator.setTimeBucket(toTimeBucketInDay());
indicator.setSourceServiceId(getSourceServiceId());
indicator.setDestServiceId(getDestServiceId());
return indicator;
}
@Override public Indicator toMonth() {
ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator();
indicator.setTimeBucket(toTimeBucketInMonth());
indicator.setSourceServiceId(getSourceServiceId());
indicator.setDestServiceId(getDestServiceId());
return indicator;
}
@Override public int remoteHashCode() {
int result = 17;
result = 31 * result + sourceServiceId;
result = 31 * result + destServiceId;
return result;
}
@Override public void deserialize(RemoteData remoteData) {
setSourceServiceId(remoteData.getDataIntegers(0));
setDestServiceId(remoteData.getDataIntegers(1));
setTimeBucket(remoteData.getDataLongs(0));
}
@Override public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.setDataIntegers(0, getSourceServiceId());
remoteBuilder.setDataIntegers(1, getDestServiceId());
remoteBuilder.setDataLongs(0, getTimeBucket());
return remoteBuilder;
}
@Override public int hashCode() {
int result = 17;
result = 31 * result + sourceServiceId;
result = 31 * result + destServiceId;
result = 31 * result + (int)getTimeBucket();
return result;
}
@Override public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ServiceRelationServerSideIndicator indicator = (ServiceRelationServerSideIndicator)obj;
if (sourceServiceId != indicator.sourceServiceId)
return false;
if (destServiceId != indicator.destServiceId)
return false;
if (getTimeBucket() != indicator.getTimeBucket())
return false;
return true;
}
public static class Builder implements StorageBuilder<ServiceRelationServerSideIndicator> {
@Override public ServiceRelationServerSideIndicator map2Data(Map<String, Object> dbMap) {
ServiceRelationServerSideIndicator indicator = new ServiceRelationServerSideIndicator();
indicator.setSourceServiceId(((Number)dbMap.get(SOURCE_SERVICE_ID)).intValue());
indicator.setDestServiceId(((Number)dbMap.get(DEST_SERVICE_ID)).intValue());
indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
return indicator;
}
@Override public Map<String, Object> data2Map(ServiceRelationServerSideIndicator storageData) {
Map<String, Object> map = new HashMap<>();
map.put(SOURCE_SERVICE_ID, storageData.getSourceServiceId());
map.put(DEST_SERVICE_ID, storageData.getDestServiceId());
map.put(TIME_BUCKET, storageData.getTimeBucket());
return map;
}
}
}
......@@ -20,12 +20,10 @@ package org.apache.skywalking.oap.server.core.query;
import java.io.IOException;
import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.query.sql.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.query.*;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.module.Service;
import org.slf4j.*;
......@@ -38,90 +36,63 @@ public class TopologyQueryService implements Service {
private static final Logger logger = LoggerFactory.getLogger(TopologyQueryService.class);
private final ModuleManager moduleManager;
private IMetricQueryDAO metricQueryDAO;
private IUniqueQueryDAO uniqueQueryDAO;
private ITopologyQueryDAO topologyQueryDAO;
public TopologyQueryService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private IMetricQueryDAO getMetricQueryDAO() {
if (metricQueryDAO == null) {
metricQueryDAO = moduleManager.find(StorageModule.NAME).getService(IMetricQueryDAO.class);
private ITopologyQueryDAO getTopologyQueryDAO() {
if (topologyQueryDAO == null) {
topologyQueryDAO = moduleManager.find(StorageModule.NAME).getService(ITopologyQueryDAO.class);
}
return metricQueryDAO;
}
private IUniqueQueryDAO getUniqueQueryDAO() {
if (uniqueQueryDAO == null) {
uniqueQueryDAO = moduleManager.find(StorageModule.NAME).getService(IUniqueQueryDAO.class);
}
return uniqueQueryDAO;
return topologyQueryDAO;
}
public Topology getGlobalTopology(final Step step, final long startTB, final long endTB) throws IOException {
logger.debug("step: {}, startTimeBucket: {}, endTimeBucket: {}", step, startTB, endTB);
List<ServiceComponent> serviceComponents = loadServiceComponent(step, startTB, endTB);
List<ServiceMapping> serviceMappings = loadServiceMapping(step, startTB, endTB);
List<ServiceComponent> serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB);
List<ServiceMapping> serviceMappings = getTopologyQueryDAO().loadServiceMappings(step, startTB, endTB);
List<Call> serviceRelationClientCalls = loadServiceRelationCalls(step, startTB, endTB, "service_relation_client_calls_sum");
List<Call> serviceRelationServerCalls = loadServiceRelationCalls(step, startTB, endTB, "service_relation_server_calls_sum");
List<Call> serviceRelationClientCalls = getTopologyQueryDAO().loadClientSideServiceRelations(step, startTB, endTB);
List<Call> serviceRelationServerCalls = getTopologyQueryDAO().loadServerSideServiceRelations(step, startTB, endTB);
TopologyBuilder builder = new TopologyBuilder(moduleManager);
return builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls);
}
public Topology getServiceTopology(final Step step, final long startTimeBucket, final long endTimeBucket,
final String serviceId) {
return new Topology();
}
private List<ServiceComponent> loadServiceComponent(final Step step, final long startTB,
final long endTB) throws IOException {
List<TwoIdGroup> twoIdGroups = getUniqueQueryDAO().aggregation(ServiceComponentIndicator.INDEX_NAME, step, startTB, endTB,
new Where(), ServiceComponentIndicator.SERVICE_ID, ServiceComponentIndicator.COMPONENT_ID);
List<ServiceComponent> serviceComponents = new ArrayList<>();
twoIdGroups.forEach(twoIdGroup -> {
ServiceComponent serviceComponent = new ServiceComponent();
serviceComponent.setServiceId(twoIdGroup.getId1());
serviceComponent.setComponentId(twoIdGroup.getId2());
serviceComponents.add(serviceComponent);
});
return serviceComponents;
}
private List<ServiceMapping> loadServiceMapping(final Step step, final long startTB,
final long endTB) throws IOException {
List<TwoIdGroup> twoIdGroups = getUniqueQueryDAO().aggregation(ServiceMappingIndicator.INDEX_NAME, step, startTB, endTB,
new Where(), ServiceMappingIndicator.SERVICE_ID, ServiceMappingIndicator.MAPPING_SERVICE_ID);
List<ServiceMapping> serviceMappings = new ArrayList<>();
twoIdGroups.forEach(twoIdGroup -> {
ServiceMapping serviceMapping = new ServiceMapping();
serviceMapping.setServiceId(twoIdGroup.getId1());
serviceMapping.setMappingServiceId(twoIdGroup.getId2());
serviceMappings.add(serviceMapping);
public Topology getServiceTopology(final Step step, final long startTB, final long endTB,
final int serviceId) throws IOException {
List<ServiceComponent> serviceComponents = getTopologyQueryDAO().loadServiceComponents(step, startTB, endTB);
List<ServiceMapping> serviceMappings = getTopologyQueryDAO().loadServiceMappings(step, startTB, endTB);
Set<Integer> serviceIds = new HashSet<>();
serviceIds.add(serviceId);
serviceMappings.forEach(mapping -> {
if (mapping.getServiceId() == serviceId) {
serviceIds.add(mapping.getMappingServiceId());
}
});
List<Integer> serviceIdList = new ArrayList<>(serviceIds);
return serviceMappings;
}
private List<Call> loadServiceRelationCalls(final Step step, final long startTB, final long endTB,
String indName) throws IOException {
List<TwoIdGroupValue> twoIdGroupValues = getMetricQueryDAO().aggregation(indName, step, startTB, endTB, new Where(), "source_service_id", "dest_service_id", "value", Function.Sum);
List<Call> serviceRelationClientCalls = getTopologyQueryDAO().loadSpecifiedClientSideServiceRelations(step, startTB, endTB, serviceIdList);
List<Call> serviceRelationServerCalls = getTopologyQueryDAO().loadSpecifiedServerSideServiceRelations(step, startTB, endTB, serviceIdList);
List<Call> clientCalls = new ArrayList<>();
TopologyBuilder builder = new TopologyBuilder(moduleManager);
Topology topology = builder.build(serviceComponents, serviceMappings, serviceRelationClientCalls, serviceRelationServerCalls);
twoIdGroupValues.forEach(twoIdGroupValue -> {
Call call = new Call();
call.setSource(twoIdGroupValue.getId1());
call.setTarget(twoIdGroupValue.getId2());
call.setCalls(twoIdGroupValue.getValue().longValue());
clientCalls.add(call);
Set<Integer> nodeIds = new HashSet<>();
topology.getCalls().forEach(call -> {
nodeIds.add(call.getSource());
nodeIds.add(call.getTarget());
});
return clientCalls;
for (int i = topology.getNodes().size() - 1; i >= 0; i--) {
if (!nodeIds.contains(topology.getNodes().get(i).getId())) {
topology.getNodes().remove(i);
}
}
return topology;
}
}
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.storage;
import org.apache.skywalking.oap.server.core.storage.cache.*;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
......@@ -36,6 +37,7 @@ public class StorageModule extends ModuleDefine {
return new Class[] {
IBatchDAO.class, StorageDAO.class, IRegisterLockDAO.class,
IServiceInventoryCacheDAO.class, IServiceInstanceInventoryCacheDAO.class,
IEndpointInventoryCacheDAO.class, INetworkAddressInventoryCacheDAO.class};
IEndpointInventoryCacheDAO.class, INetworkAddressInventoryCacheDAO.class,
ITopologyQueryDAO.class};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.query;
import java.io.IOException;
import java.util.List;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author peng-yongsheng
*/
public interface ITopologyQueryDAO extends Service {
List<Call> loadSpecifiedServerSideServiceRelations(Step step, long startTB, long endTB,
List<Integer> serviceIds) throws IOException;
List<Call> loadSpecifiedClientSideServiceRelations(Step step, long startTB, long endTB,
List<Integer> serviceIds) throws IOException;
List<Call> loadServerSideServiceRelations(Step step, long startTB, long endTB) throws IOException;
List<Call> loadClientSideServiceRelations(Step step, long startTB, long endTB) throws IOException;
List<ServiceMapping> loadServiceMappings(Step step, long startTB, long endTB) throws IOException;
List<ServiceComponent> loadServiceComponents(Step step, long startTB, long endTB) throws IOException;
}
......@@ -50,7 +50,7 @@ public class TopologyQuery implements GraphQLQueryResolver {
return getQueryService().getGlobalTopology(duration.getStep(), startTimeBucket, endTimeBucket);
}
public Topology getServiceTopology(final String serviceId, final Duration duration) {
public Topology getServiceTopology(final int serviceId, final Duration duration) throws IOException {
long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
......
......@@ -20,12 +20,14 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.cache.*;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.library.client.NameSpace;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopologyQueryEsDAO;
import org.slf4j.*;
/**
......@@ -72,6 +74,8 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheDAO(elasticSearchClient));
this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEsDAO(elasticSearchClient));
this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEsDAO(elasticSearchClient));
this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearchClient));
}
@Override
......
......@@ -34,7 +34,7 @@ public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
super(client);
}
public void queryBuild(SearchSourceBuilder sourceBuilder, Where where, long startTB, long endTB) {
public final void queryBuild(SearchSourceBuilder sourceBuilder, Where where, long startTB, long endTB) {
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(Indicator.TIME_BUCKET).gte(startTB).lte(endTB);
if (where.getKeyValues().isEmpty()) {
sourceBuilder.query(rangeQueryBuilder);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import java.io.IOException;
import java.util.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.*;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.TimePyramidTableNameBuilder;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.*;
import org.elasticsearch.search.builder.SearchSourceBuilder;
/**
* @author peng-yongsheng
*/
public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
public TopologyQueryEsDAO(ElasticSearchClient client) {
super(client);
}
@Override
public List<Call> loadSpecifiedServerSideServiceRelations(Step step, long startTB, long endTB,
List<Integer> serviceIds) throws IOException {
if (CollectionUtils.isEmpty(serviceIds)) {
throw new UnexpectedException("Service id is null");
}
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.size(0);
setQueryCondition(sourceBuilder, startTB, endTB, serviceIds);
String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationServerSideIndicator.INDEX_NAME);
return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID);
}
@Override
public List<Call> loadSpecifiedClientSideServiceRelations(Step step, long startTB, long endTB,
List<Integer> serviceIds) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.size(0);
setQueryCondition(sourceBuilder, startTB, endTB, serviceIds);
String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationClientSideIndicator.INDEX_NAME);
return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID);
}
private void setQueryCondition(SearchSourceBuilder sourceBuilder, long startTB, long endTB,
List<Integer> serviceIds) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
BoolQueryBuilder serviceIdBoolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(serviceIdBoolQuery);
if (serviceIds.size() == 1) {
boolQuery.should().add(QueryBuilders.termQuery(ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, serviceIds.get(0)));
boolQuery.should().add(QueryBuilders.termQuery(ServiceRelationServerSideIndicator.DEST_SERVICE_ID, serviceIds.get(0)));
} else {
boolQuery.should().add(QueryBuilders.termsQuery(ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, serviceIds));
boolQuery.should().add(QueryBuilders.termsQuery(ServiceRelationServerSideIndicator.DEST_SERVICE_ID, serviceIds));
}
sourceBuilder.query(boolQuery);
}
@Override public List<Call> loadServerSideServiceRelations(Step step, long startTB, long endTB) throws IOException {
String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationServerSideIndicator.INDEX_NAME);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
sourceBuilder.size(0);
return load(sourceBuilder, indexName, ServiceRelationServerSideIndicator.SOURCE_SERVICE_ID, ServiceRelationServerSideIndicator.DEST_SERVICE_ID);
}
@Override public List<Call> loadClientSideServiceRelations(Step step, long startTB, long endTB) throws IOException {
String indexName = TimePyramidTableNameBuilder.build(step, ServiceRelationClientSideIndicator.INDEX_NAME);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.rangeQuery(ServiceRelationServerSideIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
sourceBuilder.size(0);
return load(sourceBuilder, indexName, ServiceRelationClientSideIndicator.SOURCE_SERVICE_ID, ServiceRelationClientSideIndicator.DEST_SERVICE_ID);
}
private List<Call> load(SearchSourceBuilder sourceBuilder, String indexName, String sourceCName,
String destCName) throws IOException {
TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(sourceCName).field(sourceCName).size(1000);
sourceAggregation.subAggregation(AggregationBuilders.terms(destCName).field(destCName).size(1000));
sourceBuilder.aggregation(sourceAggregation);
SearchResponse response = getClient().search(indexName, sourceBuilder);
List<Call> calls = new ArrayList<>();
Terms sourceTerms = response.getAggregations().get(sourceCName);
for (Terms.Bucket sourceBucket : sourceTerms.getBuckets()) {
Terms destTerms = sourceBucket.getAggregations().get(destCName);
for (Terms.Bucket destBucket : destTerms.getBuckets()) {
Call value = new Call();
value.setSource(sourceBucket.getKeyAsNumber().intValue());
value.setTarget(destBucket.getKeyAsNumber().intValue());
calls.add(value);
}
}
return calls;
}
@Override public List<ServiceMapping> loadServiceMappings(Step step, long startTB, long endTB) throws IOException {
String indexName = TimePyramidTableNameBuilder.build(step, ServiceMappingIndicator.INDEX_NAME);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.rangeQuery(ServiceMappingIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
sourceBuilder.size(0);
TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(ServiceMappingIndicator.SERVICE_ID).field(ServiceMappingIndicator.SERVICE_ID).size(1000);
sourceAggregation.subAggregation(AggregationBuilders.terms(ServiceMappingIndicator.MAPPING_SERVICE_ID).field(ServiceMappingIndicator.MAPPING_SERVICE_ID).size(1000));
sourceBuilder.aggregation(sourceAggregation);
SearchResponse response = getClient().search(indexName, sourceBuilder);
List<ServiceMapping> serviceMappings = new ArrayList<>();
Terms serviceIdTerms = response.getAggregations().get(ServiceMappingIndicator.SERVICE_ID);
for (Terms.Bucket serviceIdBucket : serviceIdTerms.getBuckets()) {
Terms mappingServiceIdTerms = serviceIdBucket.getAggregations().get(ServiceMappingIndicator.MAPPING_SERVICE_ID);
for (Terms.Bucket mappingServiceIdBucket : mappingServiceIdTerms.getBuckets()) {
ServiceMapping serviceMapping = new ServiceMapping();
serviceMapping.setServiceId(serviceIdBucket.getKeyAsNumber().intValue());
serviceMapping.setMappingServiceId(mappingServiceIdBucket.getKeyAsNumber().intValue());
serviceMappings.add(serviceMapping);
}
}
return serviceMappings;
}
@Override
public List<ServiceComponent> loadServiceComponents(Step step, long startTB, long endTB) throws IOException {
String indexName = TimePyramidTableNameBuilder.build(step, ServiceComponentIndicator.INDEX_NAME);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.rangeQuery(ServiceComponentIndicator.TIME_BUCKET).lte(startTB).gte(endTB));
sourceBuilder.size(0);
TermsAggregationBuilder sourceAggregation = AggregationBuilders.terms(ServiceComponentIndicator.SERVICE_ID).field(ServiceComponentIndicator.SERVICE_ID).size(1000);
sourceAggregation.subAggregation(AggregationBuilders.terms(ServiceComponentIndicator.COMPONENT_ID).field(ServiceComponentIndicator.COMPONENT_ID).size(1000));
sourceBuilder.aggregation(sourceAggregation);
SearchResponse response = getClient().search(indexName, sourceBuilder);
List<ServiceComponent> serviceComponents = new ArrayList<>();
Terms serviceIdTerms = response.getAggregations().get(ServiceComponentIndicator.SERVICE_ID);
for (Terms.Bucket serviceIdBucket : serviceIdTerms.getBuckets()) {
Terms componentIdTerms = serviceIdBucket.getAggregations().get(ServiceComponentIndicator.COMPONENT_ID);
for (Terms.Bucket componentIdBucket : componentIdTerms.getBuckets()) {
ServiceComponent serviceComponent = new ServiceComponent();
serviceComponent.setServiceId(serviceIdBucket.getKeyAsNumber().intValue());
serviceComponent.setComponentId(componentIdBucket.getKeyAsNumber().intValue());
serviceComponents.add(serviceComponent);
}
}
return serviceComponents;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册