未验证 提交 b367c36d 编写于 作者: 彭勇升 pengys 提交者: GitHub

Fixed the endpoint topology bug. (#1778)

* Delete the client side endpoint relation indicator.
Fixed the endpoint topology bug.

* Fixed package failure bug.
上级 c838d161
......@@ -23,15 +23,12 @@ import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess;
import org.apache.skywalking.oap.server.core.source.EndpointRelation;
/**
* @author wusheng
* @author wusheng, peng-yongsheng
*/
public class EndpointCallRelationDispatcher implements SourceDispatcher<EndpointRelation> {
@Override
public void dispatch(EndpointRelation source) {
switch (source.getDetectPoint()) {
case CLIENT:
clientSide(source);
break;
case SERVER:
serverSide(source);
break;
......@@ -45,12 +42,4 @@ public class EndpointCallRelationDispatcher implements SourceDispatcher<Endpoint
indicator.setDestEndpointId(source.getChildEndpointId());
IndicatorProcess.INSTANCE.in(indicator);
}
private void clientSide(EndpointRelation source) {
EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator();
indicator.setTimeBucket(source.getTimeBucket());
indicator.setSourceEndpointId(source.getEndpointId());
indicator.setDestEndpointId(source.getChildEndpointId());
IndicatorProcess.INSTANCE.in(indicator);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
@IndicatorType
@StreamData
@StorageEntity(name = EndpointRelationClientSideIndicator.INDEX_NAME, builder = EndpointRelationClientSideIndicator.Builder.class)
public class EndpointRelationClientSideIndicator extends Indicator {
public static final String INDEX_NAME = "endpoint_relation_client_side";
public static final String SOURCE_ENDPOINT_ID = "source_endpoint_id";
public static final String DEST_ENDPOINT_ID = "dest_endpoint_id";
@Setter @Getter @Column(columnName = SOURCE_ENDPOINT_ID) @IDColumn private int sourceEndpointId;
@Setter @Getter @Column(columnName = DEST_ENDPOINT_ID) @IDColumn private int destEndpointId;
@Override public String id() {
String splitJointId = String.valueOf(getTimeBucket());
splitJointId += Const.ID_SPLIT + String.valueOf(sourceEndpointId);
splitJointId += Const.ID_SPLIT + String.valueOf(destEndpointId);
return splitJointId;
}
@Override public void combine(Indicator indicator) {
}
@Override public void calculate() {
}
@Override public Indicator toHour() {
EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator();
indicator.setTimeBucket(toTimeBucketInHour());
indicator.setSourceEndpointId(getSourceEndpointId());
indicator.setDestEndpointId(getDestEndpointId());
return indicator;
}
@Override public Indicator toDay() {
EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator();
indicator.setTimeBucket(toTimeBucketInDay());
indicator.setSourceEndpointId(getSourceEndpointId());
indicator.setDestEndpointId(getDestEndpointId());
return indicator;
}
@Override public Indicator toMonth() {
EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator();
indicator.setTimeBucket(toTimeBucketInMonth());
indicator.setSourceEndpointId(getSourceEndpointId());
indicator.setDestEndpointId(getDestEndpointId());
return indicator;
}
@Override public int remoteHashCode() {
int result = 17;
result = 31 * result + sourceEndpointId;
result = 31 * result + destEndpointId;
return result;
}
@Override public void deserialize(RemoteData remoteData) {
setSourceEndpointId(remoteData.getDataIntegers(0));
setDestEndpointId(remoteData.getDataIntegers(1));
setTimeBucket(remoteData.getDataLongs(0));
}
@Override public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.setDataIntegers(1, getDestEndpointId());
remoteBuilder.setDataIntegers(0, getSourceEndpointId());
remoteBuilder.setDataLongs(0, getTimeBucket());
return remoteBuilder;
}
@Override public int hashCode() {
int result = 17;
result = 31 * result + sourceEndpointId;
result = 31 * result + destEndpointId;
result = 31 * result + (int)getTimeBucket();
return result;
}
@Override public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
EndpointRelationClientSideIndicator indicator = (EndpointRelationClientSideIndicator)obj;
if (sourceEndpointId != indicator.sourceEndpointId)
return false;
if (destEndpointId != indicator.destEndpointId)
return false;
if (getTimeBucket() != indicator.getTimeBucket())
return false;
return true;
}
public static class Builder implements StorageBuilder<EndpointRelationClientSideIndicator> {
@Override public EndpointRelationClientSideIndicator map2Data(Map<String, Object> dbMap) {
EndpointRelationClientSideIndicator indicator = new EndpointRelationClientSideIndicator();
indicator.setSourceEndpointId(((Number)dbMap.get(SOURCE_ENDPOINT_ID)).intValue());
indicator.setDestEndpointId(((Number)dbMap.get(DEST_ENDPOINT_ID)).intValue());
indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
return indicator;
}
@Override public Map<String, Object> data2Map(EndpointRelationClientSideIndicator storageData) {
Map<String, Object> map = new HashMap<>();
map.put(TIME_BUCKET, storageData.getTimeBucket());
map.put(SOURCE_ENDPOINT_ID, storageData.getSourceEndpointId());
map.put(DEST_ENDPOINT_ID, storageData.getDestEndpointId());
return map;
}
}
}
......@@ -109,15 +109,35 @@ public class TopologyQueryService implements Service {
Map<Integer, String> components = new HashMap<>();
serviceComponents.forEach(component -> components.put(component.getServiceId(), getComponentLibraryCatalogService().getComponentName(component.getComponentId())));
List<Call> calls = getTopologyQueryDAO().loadSpecifiedDestOfServerSideEndpointRelations(step, startTB, endTB, endpointId);
calls.addAll(getTopologyQueryDAO().loadSpecifiedSourceOfClientSideEndpointRelations(step, startTB, endTB, endpointId));
List<Call> serverSideCalls = getTopologyQueryDAO().loadSpecifiedDestOfServerSideEndpointRelations(step, startTB, endTB, endpointId);
serverSideCalls.forEach(call -> call.setDetectPoint(DetectPoint.SERVER));
calls.forEach(call -> {
call.setCallType(components.getOrDefault(getEndpointInventoryCache().get(call.getTarget()).getServiceId(), Const.UNKNOWN));
});
serverSideCalls.forEach(call -> call.setCallType(components.getOrDefault(getEndpointInventoryCache().get(call.getTarget()).getServiceId(), Const.UNKNOWN)));
Topology topology = new Topology();
topology.getCalls().addAll(calls);
topology.getCalls().addAll(serverSideCalls);
Set<Integer> nodeIds = new HashSet<>();
serverSideCalls.forEach(call -> {
if (!nodeIds.contains(call.getSource())) {
topology.getNodes().add(buildEndpointNode(components, call.getSource()));
nodeIds.add(call.getSource());
}
if (!nodeIds.contains(call.getTarget())) {
topology.getNodes().add(buildEndpointNode(components, call.getTarget()));
nodeIds.add(call.getTarget());
}
});
return topology;
}
private Node buildEndpointNode(Map<Integer, String> components, int endpointId) {
Node node = new Node();
node.setId(endpointId);
node.setName(getEndpointInventoryCache().get(endpointId).getName());
node.setType(components.getOrDefault(getEndpointInventoryCache().get(endpointId).getServiceId(), Const.UNKNOWN));
node.setReal(true);
return node;
}
}
......@@ -45,7 +45,4 @@ public interface ITopologyQueryDAO extends Service {
List<Call> loadSpecifiedDestOfServerSideEndpointRelations(Step step, long startTB, long endTB,
int destEndpointId) throws IOException;
List<Call> loadSpecifiedSourceOfClientSideEndpointRelations(Step step, long startTB, long endTB,
int sourceEndpointId) throws IOException;
}
......@@ -182,7 +182,6 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
exitSourceBuilder.setTimeBucket(minuteTimeBucket);
sourceReceiver.receive(exitSourceBuilder.toServiceRelation());
sourceReceiver.receive(exitSourceBuilder.toServiceInstanceRelation());
sourceReceiver.receive(exitSourceBuilder.toEndpointRelation());
});
}
......
......@@ -21,7 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import java.io.IOException;
import java.util.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.*;
import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.EndpointRelationServerSideIndicator;
import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.*;
import org.apache.skywalking.oap.server.core.query.entity.*;
......@@ -175,26 +175,15 @@ public class TopologyQueryEsDAO extends EsDAO implements ITopologyQueryDAO {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(QueryBuilders.rangeQuery(EndpointRelationServerSideIndicator.TIME_BUCKET).gte(startTB).lte(endTB));
boolQuery.must().add(QueryBuilders.termQuery(EndpointRelationServerSideIndicator.DEST_ENDPOINT_ID, destEndpointId));
sourceBuilder.query(boolQuery);
return load(sourceBuilder, indexName, EndpointRelationServerSideIndicator.SOURCE_ENDPOINT_ID, EndpointRelationServerSideIndicator.DEST_ENDPOINT_ID, Source.Endpoint);
}
@Override
public List<Call> loadSpecifiedSourceOfClientSideEndpointRelations(Step step, long startTB, long endTB,
int sourceEndpointId) throws IOException {
String indexName = DownsampleingModelNameBuilder.build(step, EndpointRelationClientSideIndicator.INDEX_NAME);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.size(0);
BoolQueryBuilder serviceIdBoolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(serviceIdBoolQuery);
serviceIdBoolQuery.should().add(QueryBuilders.termQuery(EndpointRelationServerSideIndicator.SOURCE_ENDPOINT_ID, destEndpointId));
serviceIdBoolQuery.should().add(QueryBuilders.termQuery(EndpointRelationServerSideIndicator.DEST_ENDPOINT_ID, destEndpointId));
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(QueryBuilders.rangeQuery(EndpointRelationClientSideIndicator.TIME_BUCKET).gte(startTB).lte(endTB));
boolQuery.must().add(QueryBuilders.termQuery(EndpointRelationClientSideIndicator.SOURCE_ENDPOINT_ID, sourceEndpointId));
sourceBuilder.query(boolQuery);
return load(sourceBuilder, indexName, EndpointRelationClientSideIndicator.SOURCE_ENDPOINT_ID, EndpointRelationClientSideIndicator.DEST_ENDPOINT_ID, Source.Endpoint);
return load(sourceBuilder, indexName, EndpointRelationServerSideIndicator.SOURCE_ENDPOINT_ID, EndpointRelationServerSideIndicator.DEST_ENDPOINT_ID, Source.Endpoint);
}
private List<Call> load(SearchSourceBuilder sourceBuilder, String indexName, String sourceCName,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册