未验证 提交 f38aad5f 编写于 作者: P pg.yang 提交者: GitHub

Optimize MQ Topology analysis. (#9911)

Use entry span's peer from the consumer side as source service when no producer instrumentation(no cross-process reference).
上级 6984bbc6
......@@ -96,6 +96,7 @@
* Optimize data binary parse methods in *LogQueryDAO
* Support different indexType
* Support configuration for TTL and (block|segment) intervals
* Optimize MQ Topology analysis. Use entry span's peer from the consumer side as source service when no producer instrumentation(no cross-process reference).
#### UI
......
......@@ -12,4 +12,5 @@ The MQ operation span should have
- **Span's layer == MQ**
- Tag key = `mq.queue`, value = MQ queue name
- Tag key = `mq.topic`, value = MQ queue topic , it's optional as some MQ don't have topic concept.
- Tag key = `transmission.latency`, value = Transmission latency from consumer to producer
\ No newline at end of file
- Tag key = `transmission.latency`, value = Transmission latency from consumer to producer
- Set `peer` at both sides(producer and consumer). And the value of peer should represent the MQ server cluster.
\ No newline at end of file
......@@ -125,6 +125,21 @@ public class RPCAnalysisListener extends CommonAnalysisListener implements Entry
setPublicAttrs(sourceBuilder, span);
callingInTraffic.add(sourceBuilder);
}
} else if (span.getSpanLayer() == SpanLayer.MQ && StringUtil.isNotBlank(span.getPeer())) {
// For MQ, if there is no producer-side instrumentation, we set the existing peer as the source service name.
RPCTrafficSourceBuilder sourceBuilder = new RPCTrafficSourceBuilder(namingControl);
sourceBuilder.setSourceServiceName(span.getPeer());
sourceBuilder.setSourceServiceInstanceName(span.getPeer());
sourceBuilder.setDestEndpointName(span.getOperationName());
sourceBuilder.setSourceLayer(Layer.MQ);
sourceBuilder.setDestEndpointName(span.getOperationName());
sourceBuilder.setDestServiceInstanceName(segmentObject.getServiceInstance());
sourceBuilder.setDestServiceName(segmentObject.getService());
sourceBuilder.setDestLayer(identifyServiceLayer(span.getSpanLayer()));
sourceBuilder.setDetectPoint(DetectPoint.SERVER);
sourceBuilder.setComponentId(span.getComponentId());
setPublicAttrs(sourceBuilder, span);
callingInTraffic.add(sourceBuilder);
} else {
RPCTrafficSourceBuilder sourceBuilder = new RPCTrafficSourceBuilder(namingControl);
sourceBuilder.setSourceServiceName(Const.USER_SERVICE_NAME);
......@@ -249,7 +264,7 @@ public class RPCAnalysisListener extends CommonAnalysisListener implements Entry
sourceReceiver.receive(callingIn.toServiceInstanceRelation());
// Service is equivalent to endpoint in FaaS (function as a service)
// Don't generate endpoint and endpoint dependency to avoid unnecessary costs.
if (Layer.FAAS != callingIn.getDestLayer()) {
if (Layer.FAAS != callingIn.getDestLayer()) {
sourceReceiver.receive(callingIn.toEndpoint());
EndpointRelation endpointRelation = callingIn.toEndpointRelation();
/*
......
......@@ -54,21 +54,25 @@ public class VirtualMQProcessor implements VirtualServiceProcessor {
if (!(span.getSpanType() == SpanType.Exit || span.getSpanType() == SpanType.Entry)) {
return;
}
MQTags mqTags = collectTags(span.getTagsList());
final String peer;
final MQOperation mqOperation;
final String serviceName;
if (span.getSpanType() == SpanType.Entry) {
mqOperation = MQOperation.Consume;
final String peer = span.getRefsList()
.stream()
.findFirst()
.map(SegmentReference::getNetworkAddressUsedAtPeer)
.orElse(null);
serviceName = namingControl.formatServiceName(peer);
peer = span.getRefsList()
.stream()
.findFirst()
.map(SegmentReference::getNetworkAddressUsedAtPeer)
.filter(StringUtil::isNotBlank)
.orElse(span.getPeer());
} else {
mqOperation = MQOperation.Produce;
serviceName = namingControl.formatServiceName(span.getPeer());
peer = span.getPeer();
}
if (StringUtil.isBlank(peer)) {
return;
}
MQTags mqTags = collectTags(span.getTagsList());
String serviceName = namingControl.formatServiceName(peer);
long timeBucket = TimeBucket.getMinuteTimeBucket(span.getStartTime());
sourceList.add(toServiceMeta(serviceName, timeBucket));
String endpoint = buildEndpointName(mqTags.topic, mqTags.queue);
......
......@@ -40,6 +40,7 @@ import org.apache.skywalking.oap.server.core.config.group.EndpointNameGrouping;
import org.apache.skywalking.oap.server.core.source.Endpoint;
import org.apache.skywalking.oap.server.core.source.EndpointRelation;
import org.apache.skywalking.oap.server.core.source.ISource;
import org.apache.skywalking.oap.server.core.source.RequestType;
import org.apache.skywalking.oap.server.core.source.Service;
import org.apache.skywalking.oap.server.core.source.ServiceInstance;
import org.apache.skywalking.oap.server.core.source.ServiceInstanceRelation;
......@@ -466,4 +467,58 @@ public class RPCAnalysisListenerTest {
Assert.assertEquals("target-instance", serviceInstanceRelation.getDestServiceInstanceName());
mockReceiver.clear();
}
@Test
public void testMQEntryWithoutRef() {
final MockReceiver mockReceiver = new MockReceiver();
RPCAnalysisListener listener = new RPCAnalysisListener(
mockReceiver,
CONFIG,
CACHE,
NAMING_CONTROL
);
final long startTime = System.currentTimeMillis();
SpanObject spanObject = SpanObject.newBuilder()
.setOperationName("/MQ/consumer")
.setStartTime(startTime)
.setEndTime(startTime + 1000L)
.setIsError(true)
.setSpanType(SpanType.Entry)
.setSpanLayer(SpanLayer.MQ)
.setPeer("mq-server:9090")
.addTags(
KeyStringValuePair.newBuilder()
.setKey(SpanTags.MQ_QUEUE)
.setValue("queue")
.build()
).build();
final SegmentObject segment = SegmentObject.newBuilder()
.setService("mock-service")
.setServiceInstance("mock-instance")
.addSpans(spanObject)
.build();
listener.parseEntry(spanObject, segment);
listener.build();
final List<ISource> receivedSources = mockReceiver.getReceivedSources();
Assert.assertEquals(5, receivedSources.size());
final Service service = (Service) receivedSources.get(0);
final ServiceInstance serviceInstance = (ServiceInstance) receivedSources.get(1);
final ServiceRelation serviceRelation = (ServiceRelation) receivedSources.get(2);
final ServiceInstanceRelation serviceInstanceRelation = (ServiceInstanceRelation) receivedSources.get(3);
final Endpoint endpoint = (Endpoint) receivedSources.get(4);
Assert.assertEquals("mock-service", service.getName());
Assert.assertEquals("/MQ/consumer", service.getEndpointName());
Assert.assertEquals(RequestType.MQ, service.getType());
Assert.assertFalse(service.isStatus());
Assert.assertEquals("mock-instance", serviceInstance.getName());
Assert.assertEquals("/MQ/consumer", endpoint.getName());
Assert.assertEquals("mq-server:9090", serviceRelation.getSourceServiceName());
Assert.assertEquals("mock-service", serviceRelation.getDestServiceName());
Assert.assertEquals("mq-server:9090", serviceInstanceRelation.getSourceServiceInstanceName());
Assert.assertEquals("mock-instance", serviceInstanceRelation.getDestServiceInstanceName());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册