未验证 提交 1fb2b01b 编写于 作者: K kl 提交者: GitHub

fix: Receiver_zipkin missing ServiceInstanceID bug #6749 (#6763)

上级 10c47471
......@@ -370,6 +370,7 @@ receiver_zipkin:
jettyIdleTimeOut: ${SW_RECEIVER_ZIPKIN_JETTY_IDLE_TIMEOUT:30000}
jettyAcceptorPriorityDelta: ${SW_RECEIVER_ZIPKIN_JETTY_DELTA:0}
jettyAcceptQueueSize: ${SW_RECEIVER_ZIPKIN_QUEUE_SIZE:0}
instanceNameRule: ${SW_RECEIVER_ZIPKIN_INSTANCE_NAME_RULE:[spring.instance_id,node_id]}
receiver_jaeger:
selector: ${SW_RECEIVER_JAEGER:-}
......
......@@ -18,13 +18,16 @@
package org.apache.skywalking.oap.server.starter.config;
import java.util.Properties;
import org.apache.skywalking.oap.server.library.module.ApplicationConfiguration;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
public class ApplicationConfigLoaderTestCase {
......@@ -34,6 +37,7 @@ public class ApplicationConfigLoaderTestCase {
@Before
public void setUp() throws ConfigFileNotFoundException {
System.setProperty("SW_STORAGE", "mysql");
System.setProperty("SW_RECEIVER_ZIPKIN", "default");
ApplicationConfigLoader configLoader = new ApplicationConfigLoader();
applicationConfiguration = configLoader.load();
}
......@@ -47,4 +51,13 @@ public class ApplicationConfigLoaderTestCase {
Properties properties = (Properties) providerConfig.get("properties");
assertThat(properties.get("jdbcUrl"), is("jdbc:mysql://localhost:3306/swtest"));
}
@Test
public void testLoadListTypeConfig() {
Properties providerConfig = applicationConfiguration.getModuleConfiguration("receiver_zipkin")
.getProviderConfiguration("default");
List<String> instanceNameRule = (List<String>) providerConfig.get("instanceNameRule");
assertEquals(2, instanceNameRule.size());
}
}
......@@ -22,6 +22,9 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import java.util.ArrayList;
import java.util.List;
@Setter
@Getter
public class ZipkinReceiverConfig extends ModuleConfig {
......@@ -33,4 +36,5 @@ public class ZipkinReceiverConfig extends ModuleConfig {
private long jettyIdleTimeOut = 30000;
private int jettyAcceptorPriorityDelta = 0;
private int jettyAcceptQueueSize = 0;
private List<String> instanceNameRule = new ArrayList<>();
}
......@@ -24,6 +24,7 @@ import java.io.InputStream;
import java.util.List;
import java.util.zip.GZIPInputStream;
import javax.servlet.http.HttpServletRequest;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
......@@ -49,7 +50,7 @@ public class SpanProcessor {
List<Span> spanList = decoder.decodeList(out.toByteArray());
SpanForward forward = new SpanForward(namingControl, receiver);
SpanForward forward = new SpanForward(namingControl, receiver, config);
forward.send(spanList);
}
}
......
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.receiver.zipkin.trace;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
......@@ -29,6 +30,7 @@ import org.apache.skywalking.oap.server.core.source.EndpointMeta;
import org.apache.skywalking.oap.server.core.source.ServiceMeta;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanEncode;
import org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpan;
import zipkin2.Span;
......@@ -36,8 +38,10 @@ import zipkin2.codec.SpanBytesEncoder;
@RequiredArgsConstructor
public class SpanForward {
private static final String DEFAULT_SERVICE_INSTANCE_NAME = " unknown_instance";
private final NamingControl namingControl;
private final SourceReceiver receiver;
private final ZipkinReceiverConfig config;
public void send(List<Span> spanList) {
spanList.forEach(span -> {
......@@ -49,7 +53,11 @@ public class SpanForward {
serviceName = "Unknown";
}
serviceName = namingControl.formatServiceName(serviceName);
zipkinSpan.setServiceId(IDManager.ServiceID.buildId(serviceName, NodeType.Normal));
String serviceId = IDManager.ServiceID.buildId(serviceName, NodeType.Normal);
zipkinSpan.setServiceId(serviceId);
String serviceInstanceName = this.getServiceInstanceName(span);
serviceInstanceName = namingControl.formatInstanceName(serviceInstanceName);
zipkinSpan.setServiceInstanceId(IDManager.ServiceInstanceID.buildId(serviceId, serviceInstanceName));
long startTime = span.timestampAsLong() / 1000;
zipkinSpan.setStartTime(startTime);
......@@ -93,4 +101,14 @@ public class SpanForward {
receiver.receive(serviceMeta);
});
}
private String getServiceInstanceName(Span span) {
for (String tagName : config.getInstanceNameRule()) {
String serviceInstanceName = span.tags().get(tagName);
if (StringUtil.isNotEmpty(serviceInstanceName)) {
return serviceInstanceName;
}
}
return DEFAULT_SERVICE_INSTANCE_NAME;
}
}
......@@ -19,11 +19,13 @@
package org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
......@@ -157,7 +159,7 @@ public class ZipkinTraceQueryEs7DAO extends EsDAO implements ITraceQueryDAO {
BasicTrace basicTrace = new BasicTrace();
final ZipkinSpanRecord zipkinSpanRecord = new ZipkinSpanRecord.Builder().storage2Entity(
searchHit.getSourceAsMap());
searchHit.getSourceAsMap());
basicTrace.setSegmentId(zipkinSpanRecord.getSpanId());
basicTrace.setStart(String.valueOf((long) zipkinSpanRecord.getStartTime()));
......@@ -179,7 +181,7 @@ public class ZipkinTraceQueryEs7DAO extends EsDAO implements ITraceQueryDAO {
@Override
public List<org.apache.skywalking.oap.server.core.query.type.Span> doFlexibleTraceQuery(
String traceId) throws IOException {
String traceId) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.termQuery(TRACE_ID, traceId));
sourceBuilder.sort(START_TIME, SortOrder.ASC);
......@@ -191,6 +193,7 @@ public class ZipkinTraceQueryEs7DAO extends EsDAO implements ITraceQueryDAO {
for (SearchHit searchHit : response.getHits().getHits()) {
String serviceId = (String) searchHit.getSourceAsMap().get(SERVICE_ID);
String serviceInstanceId = (String) searchHit.getSourceAsMap().get(SERVICE_INSTANCE_ID);
String dataBinaryBase64 = (String) searchHit.getSourceAsMap().get(SegmentRecord.DATA_BINARY);
Span span = SpanBytesDecoder.PROTO3.decodeOne(Base64.getDecoder().decode(dataBinaryBase64));
......@@ -211,9 +214,14 @@ public class ZipkinTraceQueryEs7DAO extends EsDAO implements ITraceQueryDAO {
});
if (StringUtil.isNotEmpty(serviceId)) {
final IDManager.ServiceID.ServiceIDDefinition serviceIDDefinition = IDManager.ServiceID.analysisId(
serviceId);
serviceId);
swSpan.setServiceCode(serviceIDDefinition.getName());
}
if (StringUtil.isNotEmpty(serviceInstanceId)) {
final IDManager.ServiceInstanceID.InstanceIDDefinition instanceIDDefinition = IDManager.ServiceInstanceID.analysisId(
serviceInstanceId);
swSpan.setServiceInstanceName(instanceIDDefinition.getName());
}
swSpan.setSpanId(0);
swSpan.setParentSpanId(-1);
swSpan.setSegmentSpanId(span.id());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册