提交 6b351135 编写于 作者: wu-sheng's avatar wu-sheng 提交者: 彭勇升 pengys

Restore Zipkin receiver based on new core (#1932)

* Change some codes to make zipkin receiver works, not done yet.

* @adriancole Zipkin receiver is coming back again.

* Fix document and fix for Zipkin receiver.

* Fix some bugs.

* Fix register bug.

* Fix transfer bug for new backend core.
上级 afd61643
...@@ -10,23 +10,32 @@ We have following receivers, and `default` implementors are provided in our Apac ...@@ -10,23 +10,32 @@ We have following receivers, and `default` implementors are provided in our Apac
1. **service-mesh**. gRPC services accept data from inbound mesh probes. 1. **service-mesh**. gRPC services accept data from inbound mesh probes.
1. **istio-telemetry**. Istio telemetry is from Istio official bypass adaptor, this receiver match its gRPC services. 1. **istio-telemetry**. Istio telemetry is from Istio official bypass adaptor, this receiver match its gRPC services.
1. **receiver-jvm**. gRPC services accept JVM metric data. 1. **receiver-jvm**. gRPC services accept JVM metric data.
1. **receiver_zipkin**. HTTP service accepts Span in Zipkin v1 and v2 formats. Notice, this receiver only
works as expected in backend single node mode. Cluster mode is not supported. Welcome anyone to improve this.
The sample settings of these receivers should be already in default `application.yml`, and also list here The sample settings of these receivers should be already in default `application.yml`, and also list here
```yaml ```yaml
receiver-register:
default:
receiver-trace: receiver-trace:
default: default:
bufferPath: ../buffer/ # Path to trace buffer files, suggest to use absolute path bufferPath: ../trace-buffer/ # Path to trace buffer files, suggest to use absolute path
bufferOffsetMaxFileSize: 100 # Unit is MB bufferOffsetMaxFileSize: 100 # Unit is MB
bufferDataMaxFileSize: 500 # Unit is MB bufferDataMaxFileSize: 500 # Unit is MB
bufferFileCleanWhenRestart: false # Clean buffer file when backend restart. bufferFileCleanWhenRestart: false
receiver-jvm: receiver-jvm:
default: default:
service-mesh: service-mesh:
default: default:
bufferPath: ../mesh-buffer/ # Path to mesh telemetry data buffer files, suggest to use absolute path bufferPath: ../mesh-buffer/ # Path to trace buffer files, suggest to use absolute path
bufferOffsetMaxFileSize: 100 # Unit is MB bufferOffsetMaxFileSize: 100 # Unit is MB
bufferDataMaxFileSize: 500 # Unit is MB bufferDataMaxFileSize: 500 # Unit is MB
bufferFileCleanWhenRestart: false bufferFileCleanWhenRestart: false
istio-telemetry: istio-telemetry:
default: default:
receiver_zipkin:
default:
host: 0.0.0.0
port: 9411
contextPath: /
``` ```
\ No newline at end of file
...@@ -61,6 +61,8 @@ ...@@ -61,6 +61,8 @@
<joda-time.version>2.9.9</joda-time.version> <joda-time.version>2.9.9</joda-time.version>
<kubernetes.version>2.0.0</kubernetes.version> <kubernetes.version>2.0.0</kubernetes.version>
<hikaricp.version>3.1.0</hikaricp.version> <hikaricp.version>3.1.0</hikaricp.version>
<zipkin.version>2.9.1</zipkin.version>
<caffeine.version>2.6.2</caffeine.version>
</properties> </properties>
<dependencies> <dependencies>
...@@ -277,6 +279,18 @@ ...@@ -277,6 +279,18 @@
<artifactId>HikariCP</artifactId> <artifactId>HikariCP</artifactId>
<version>${hikaricp.version}</version> <version>${hikaricp.version}</version>
</dependency> </dependency>
<!-- for zipkin receiver -->
<dependency>
<groupId>io.zipkin.zipkin2</groupId>
<artifactId>zipkin</artifactId>
<version>${zipkin.version}</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
</dependency>
<!-- -->
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
</project> </project>
\ No newline at end of file
...@@ -19,17 +19,19 @@ ...@@ -19,17 +19,19 @@
package org.apache.skywalking.oap.server.receiver.trace.module; package org.apache.skywalking.oap.server.receiver.trace.module;
import org.apache.skywalking.oap.server.library.module.ModuleDefine; import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public class TraceModule extends ModuleDefine { public class TraceModule extends ModuleDefine {
public static final String NAME = "receiver-trace";
public TraceModule() { public TraceModule() {
super("receiver-trace"); super(NAME);
} }
@Override public Class[] services() { @Override public Class[] services() {
return new Class[0]; return new Class[] {ISegmentParserService.class};
} }
} }
...@@ -37,6 +37,7 @@ import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardi ...@@ -37,6 +37,7 @@ import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardi
public class TraceModuleProvider extends ModuleProvider { public class TraceModuleProvider extends ModuleProvider {
private final TraceServiceModuleConfig moduleConfig; private final TraceServiceModuleConfig moduleConfig;
private SegmentParse.Producer segmentProducer;
public TraceModuleProvider() { public TraceModuleProvider() {
this.moduleConfig = new TraceServiceModuleConfig(); this.moduleConfig = new TraceServiceModuleConfig();
...@@ -54,19 +55,21 @@ public class TraceModuleProvider extends ModuleProvider { ...@@ -54,19 +55,21 @@ public class TraceModuleProvider extends ModuleProvider {
return moduleConfig; return moduleConfig;
} }
@Override public void prepare() { @Override public void prepare() throws ServiceNotProvidedException {
}
@Override public void start() throws ModuleStartException {
SegmentParserListenerManager listenerManager = new SegmentParserListenerManager(); SegmentParserListenerManager listenerManager = new SegmentParserListenerManager();
listenerManager.add(new MultiScopesSpanListener.Factory()); listenerManager.add(new MultiScopesSpanListener.Factory());
listenerManager.add(new ServiceMappingSpanListener.Factory()); listenerManager.add(new ServiceMappingSpanListener.Factory());
listenerManager.add(new SegmentSpanListener.Factory()); listenerManager.add(new SegmentSpanListener.Factory());
segmentProducer = new SegmentParse.Producer(getManager(), listenerManager);
this.registerServiceImplementation(ISegmentParserService.class, new SegmentParserServiceImpl(segmentProducer));
}
@Override public void start() throws ModuleStartException {
GRPCHandlerRegister grpcHandlerRegister = getManager().find(CoreModule.NAME).provider().getService(GRPCHandlerRegister.class); GRPCHandlerRegister grpcHandlerRegister = getManager().find(CoreModule.NAME).provider().getService(GRPCHandlerRegister.class);
JettyHandlerRegister jettyHandlerRegister = getManager().find(CoreModule.NAME).provider().getService(JettyHandlerRegister.class); JettyHandlerRegister jettyHandlerRegister = getManager().find(CoreModule.NAME).provider().getService(JettyHandlerRegister.class);
try { try {
SegmentParse.Producer segmentProducer = new SegmentParse.Producer(getManager(), listenerManager);
grpcHandlerRegister.addHandler(new TraceSegmentServiceHandler(segmentProducer)); grpcHandlerRegister.addHandler(new TraceSegmentServiceHandler(segmentProducer));
jettyHandlerRegister.addHandler(new TraceSegmentServletHandler(segmentProducer)); jettyHandlerRegister.addHandler(new TraceSegmentServletHandler(segmentProducer));
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory;
/**
* @author wusheng
*/
public interface ISegmentParserListenerManager {
void add(SpanListenerFactory spanListenerFactory);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author wusheng
*/
public interface ISegmentParserService extends Service {
void send(UpstreamSegment segment);
}
...@@ -18,13 +18,14 @@ ...@@ -18,13 +18,14 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser; package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
import java.util.*; import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public class SegmentParserListenerManager { public class SegmentParserListenerManager implements ISegmentParserListenerManager {
private List<SpanListenerFactory> spanListenerFactories; private List<SpanListenerFactory> spanListenerFactories;
...@@ -32,6 +33,7 @@ public class SegmentParserListenerManager { ...@@ -32,6 +33,7 @@ public class SegmentParserListenerManager {
this.spanListenerFactories = new LinkedList<>(); this.spanListenerFactories = new LinkedList<>();
} }
@Override
public void add(SpanListenerFactory spanListenerFactory) { public void add(SpanListenerFactory spanListenerFactory) {
spanListenerFactories.add(spanListenerFactory); spanListenerFactories.add(spanListenerFactory);
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
/**
* @author wusheng
*/
public class SegmentParserServiceImpl implements ISegmentParserService {
private final SegmentParse.Producer segmentProducer;
public SegmentParserServiceImpl(
SegmentParse.Producer segmentProducer) {
this.segmentProducer = segmentProducer;
}
@Override
public void send(UpstreamSegment segment) {
segmentProducer.send(segment, SegmentParse.Source.Agent);
}
}
...@@ -35,9 +35,7 @@ import static java.util.Objects.nonNull; ...@@ -35,9 +35,7 @@ import static java.util.Objects.nonNull;
* *
* v5 | v6 * v5 | v6
* *
* 1. Application == Service * 1. Application == Service 2. Server == Service Instance 3. Service == Endpoint
* 2. Server == Service Instance
* 3. Service == Endpoint
* *
* @author peng-yongsheng, wusheng * @author peng-yongsheng, wusheng
*/ */
...@@ -120,6 +118,9 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe ...@@ -120,6 +118,9 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
SourceBuilder sourceBuilder = new SourceBuilder(); SourceBuilder sourceBuilder = new SourceBuilder();
int peerId = spanDecorator.getPeerId(); int peerId = spanDecorator.getPeerId();
if (peerId == 0) {
return;
}
int destServiceId = serviceInventoryCache.getServiceId(peerId); int destServiceId = serviceInventoryCache.getServiceId(peerId);
int mappingServiceId = serviceInventoryCache.get(destServiceId).getMappingServiceId(); int mappingServiceId = serviceInventoryCache.get(destServiceId).getMappingServiceId();
int destInstanceId = instanceInventoryCache.getServiceInstanceId(destServiceId, peerId); int destInstanceId = instanceInventoryCache.getServiceInstanceId(destServiceId, peerId);
......
...@@ -17,7 +17,8 @@ ...@@ -17,7 +17,8 @@
~ ~
--> -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent> <parent>
<artifactId>server-receiver-plugin</artifactId> <artifactId>server-receiver-plugin</artifactId>
<groupId>org.apache.skywalking</groupId> <groupId>org.apache.skywalking</groupId>
...@@ -28,4 +29,28 @@ ...@@ -28,4 +29,28 @@
<artifactId>zipkin-receiver-plugin</artifactId> <artifactId>zipkin-receiver-plugin</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-trace-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-register-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.zipkin2</groupId>
<artifactId>zipkin</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
</dependencies>
</project> </project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
public class CoreRegisterLinker {
private static volatile ModuleManager MODULE_MANAGER;
private static volatile IServiceInventoryRegister SERVICE_INVENTORY_REGISTER;
private static volatile IServiceInstanceInventoryRegister SERVICE_INSTANCE_INVENTORY_REGISTER;
public static void setModuleManager(ModuleManager moduleManager) {
CoreRegisterLinker.MODULE_MANAGER = moduleManager;
}
public static IServiceInventoryRegister getServiceInventoryRegister() {
if (SERVICE_INVENTORY_REGISTER == null) {
SERVICE_INVENTORY_REGISTER = MODULE_MANAGER.find(CoreModule.NAME).provider().getService(IServiceInventoryRegister.class);
}
return SERVICE_INVENTORY_REGISTER;
}
public static IServiceInstanceInventoryRegister getServiceInstanceInventoryRegister() {
if (SERVICE_INSTANCE_INVENTORY_REGISTER == null) {
SERVICE_INSTANCE_INVENTORY_REGISTER = MODULE_MANAGER.find(CoreModule.NAME).provider().getService(IServiceInstanceInventoryRegister.class);
}
return SERVICE_INSTANCE_INVENTORY_REGISTER;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService;
import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
import org.apache.skywalking.oap.server.receiver.zipkin.transform.SegmentListener;
import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer;
/**
* Send the segments to Analysis module, like receiving segments from native SkyWalking agents.
*/
public class Receiver2AnalysisBridge implements SegmentListener {
private ISegmentParserService segmentParseService;
public Receiver2AnalysisBridge(ISegmentParserService segmentParseService) {
this.segmentParseService = segmentParseService;
}
/**
* Add this bridge as listener to Zipkin span transfer.
*/
public void build() {
Zipkin2SkyWalkingTransfer.INSTANCE.addListener(this);
}
@Override
public void notify(SkyWalkingTrace trace) {
trace.toUpstreamSegment().forEach(upstream -> segmentParseService.send(upstream.build()));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
/**
* @author wusheng
*/
public class ZipkinReceiverConfig extends ModuleConfig {
private String host;
private int port;
private String contextPath;
private int expireTime = 20;
private int maxCacheSize = 1_000_000;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getContextPath() {
return contextPath;
}
public void setContextPath(String contextPath) {
this.contextPath = contextPath;
}
public int getExpireTime() {
return expireTime;
}
public void setExpireTime(int expireTime) {
this.expireTime = expireTime;
}
public int getMaxCacheSize() {
return maxCacheSize;
}
public void setMaxCacheSize(int maxCacheSize) {
this.maxCacheSize = maxCacheSize;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
* Zipkin receiver module provides the HTTP, protoc serve for any SDK or agent by following Zipkin format.
*
* At this moment, Zipkin format is not compatible with SkyWalking, especially HEADERs. Please don't consider this as a
* Zipkin-SkyWalking integration, it is provided for adding analysis, aggregation and visualization capabilities to
* zipkin backend.
*
* @author wusheng
*/
public class ZipkinReceiverModule extends ModuleDefine {
public static final String NAME = "receiver_zipkin";
public ZipkinReceiverModule() {
super(NAME);
}
@Override public Class[] services() {
return new Class[0];
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV1JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV2JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer;
/**
* @author wusheng
*/
public class ZipkinReceiverProvider extends ModuleProvider {
public static final String NAME = "default";
private ZipkinReceiverConfig config;
private JettyServer jettyServer;
public ZipkinReceiverProvider() {
config = new ZipkinReceiverConfig();
}
@Override public String name() {
return NAME;
}
@Override public Class<? extends ModuleDefine> module() {
return ZipkinReceiverModule.class;
}
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void prepare() throws ServiceNotProvidedException {
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
CoreRegisterLinker.setModuleManager(getManager());
jettyServer = new JettyServer(config.getHost(), config.getPort(), config.getContextPath());
jettyServer.initialize();
jettyServer.addHandler(new SpanV1JettyHandler(config));
jettyServer.addHandler(new SpanV2JettyHandler(config));
ISegmentParserService segmentParseService = getManager().find(TraceModule.NAME).provider().getService(ISegmentParserService.class);
Receiver2AnalysisBridge bridge = new Receiver2AnalysisBridge(segmentParseService);
Zipkin2SkyWalkingTransfer.INSTANCE.addListener(bridge);
}
@Override public void notifyAfterCompleted() throws ModuleStartException {
try {
jettyServer.start();
} catch (ServerException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
@Override public String[] requiredModules() {
return new String[] {TraceModule.NAME};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
/**
* @author wusheng
*/
public class ZipkinTraceOSInfoBuilder {
public static ServiceInstanceInventory.AgentOsInfo getOSInfoForZipkin(String instanceName) {
ServiceInstanceInventory.AgentOsInfo osInfo = new ServiceInstanceInventory.AgentOsInfo();
osInfo.setHostname(instanceName);
return osInfo;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.cache;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.apache.skywalking.oap.server.receiver.zipkin.cache.caffeine.CaffeineSpanCache;
/**
* @author wusheng
*/
public class CacheFactory {
public static final CacheFactory INSTANCE = new CacheFactory();
private ISpanCache implementor;
private CacheFactory() {
}
public ISpanCache get(ZipkinReceiverConfig config) {
if (implementor == null) {
synchronized (INSTANCE) {
if (implementor == null) {
implementor = new CaffeineSpanCache(config);
}
}
}
return implementor;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.cache;
import zipkin2.Span;
/**
* @author wusheng
*/
public interface ISpanCache {
void addSpan(Span span);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.cache.caffeine;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.apache.skywalking.oap.server.receiver.zipkin.cache.ISpanCache;
import org.apache.skywalking.oap.server.receiver.zipkin.data.ZipkinTrace;
import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.Span;
/**
* NOTICE: FROM my test, Caffeine cache triggers/checks expire only face write/read op.
* In order to make trace finish in time, I have to set a timer to write a meaningless trace, for active expire.
*
* @author wusheng
*/
public class CaffeineSpanCache implements ISpanCache, RemovalListener<String, ZipkinTrace> {
private static final Logger logger = LoggerFactory.getLogger(CaffeineSpanCache.class);
private Cache<String, ZipkinTrace> inProcessSpanCache;
private ReentrantLock newTraceLock;
public CaffeineSpanCache(ZipkinReceiverConfig config) {
newTraceLock = new ReentrantLock();
inProcessSpanCache = Caffeine.newBuilder()
.expireAfterWrite(config.getExpireTime(), TimeUnit.SECONDS)
.maximumSize(config.getMaxCacheSize())
.removalListener(this)
.build();
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
inProcessSpanCache.put("ACTIVE", new ZipkinTrace.TriggerTrace());
}, 2, 3, TimeUnit.SECONDS);
}
/**
* Zipkin trace finished by the expired rule.
*
* @param key
* @param trace
* @param cause
*/
@Override
public void onRemoval(@Nullable String key, @Nullable ZipkinTrace trace, @Nonnull RemovalCause cause) {
if (trace instanceof ZipkinTrace.TriggerTrace) {
return;
}
try {
Zipkin2SkyWalkingTransfer.INSTANCE.transfer(trace);
} catch (Exception e) {
logger.error(e.getMessage(), e);
logger.warn("Zipkin trace:" + trace);
}
}
@Override
public void addSpan(Span span) {
ZipkinTrace trace = inProcessSpanCache.getIfPresent(span.traceId());
if (trace == null) {
newTraceLock.lock();
try {
trace = inProcessSpanCache.getIfPresent(span.traceId());
if (trace == null) {
trace = new ZipkinTrace();
inProcessSpanCache.put(span.traceId(), trace);
}
} finally {
newTraceLock.unlock();
}
}
trace.addSpan(span);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.data;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.apm.network.language.agent.TraceSegmentObject;
import org.apache.skywalking.apm.network.language.agent.UniqueId;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
/**
* Each SkyWalkingTrace consists of segments in each application, original from {@link ZipkinTrace}s
*/
public class SkyWalkingTrace {
private UniqueId globalTraceId;
private List<TraceSegmentObject.Builder> segmentList;
public SkyWalkingTrace(UniqueId globalTraceId, List<TraceSegmentObject.Builder> segmentList) {
this.globalTraceId = globalTraceId;
this.segmentList = segmentList;
}
public List<UpstreamSegment.Builder> toUpstreamSegment() {
List<UpstreamSegment.Builder> newUpstreamList = new LinkedList<>();
segmentList.forEach(segment -> {
UpstreamSegment.Builder builder = UpstreamSegment.newBuilder();
builder.addGlobalTraceIds(globalTraceId);
builder.setSegment(segment.build().toByteString());
newUpstreamList.add(builder);
});
return newUpstreamList;
}
public UniqueId getGlobalTraceId() {
return globalTraceId;
}
public List<TraceSegmentObject.Builder> getSegmentList() {
return segmentList;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.data;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import zipkin2.Span;
/**
* @author wusheng
*/
public class ZipkinTrace {
private List<Span> spans;
private ReentrantLock spanWriteLock;
public ZipkinTrace() {
spans = new LinkedList<>();
spanWriteLock = new ReentrantLock();
}
public void addSpan(Span span) {
spanWriteLock.lock();
try {
spans.add(span);
} finally {
spanWriteLock.unlock();
}
}
public List<Span> getSpans() {
return spans;
}
@Override
public String toString() {
return "ZipkinTrace{" +
"spans=" + spans +
'}';
}
public static class TriggerTrace extends ZipkinTrace {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.handler;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.zip.GZIPInputStream;
import javax.servlet.http.HttpServletRequest;
import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinTraceOSInfoBuilder;
import org.apache.skywalking.oap.server.receiver.zipkin.cache.CacheFactory;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
public class SpanProcessor {
void convert(ZipkinReceiverConfig config, SpanBytesDecoder decoder, HttpServletRequest request) throws IOException {
InputStream inputStream = getInputStream(request);
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] buffer = new byte[2048];
int readCntOnce;
while ((readCntOnce = inputStream.read(buffer)) >= 0) {
out.write(buffer, 0, readCntOnce);
}
List<Span> spanList = decoder.decodeList(out.toByteArray());
spanList.forEach(span -> {
// In Zipkin, the local service name represents the application owner.
String applicationCode = span.localServiceName();
if (applicationCode != null) {
int applicationId = CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(applicationCode);
if (applicationId != 0) {
CoreRegisterLinker.getServiceInstanceInventoryRegister().getOrCreate(applicationId, applicationCode, applicationCode,
span.timestampAsLong(),
ZipkinTraceOSInfoBuilder.getOSInfoForZipkin(applicationCode));
}
}
CacheFactory.INSTANCE.get(config).addSpan(span);
});
}
private InputStream getInputStream(HttpServletRequest request) throws IOException {
InputStream requestInStream;
String headEncoding = request.getHeader("accept-encoding");
if (headEncoding != null && (headEncoding.indexOf("gzip") != -1)) {
requestInStream = new GZIPInputStream(request.getInputStream());
} else {
requestInStream = request.getInputStream();
}
return requestInStream;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.handler;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.codec.SpanBytesDecoder;
public class SpanV1JettyHandler extends JettyHandler {
private static final Logger logger = LoggerFactory.getLogger(SpanV2JettyHandler.class);
private ZipkinReceiverConfig config;
public SpanV1JettyHandler(ZipkinReceiverConfig config) {
this.config = config;
}
@Override
public String pathSpec() {
return "/api/v1/spans";
}
@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) {
response.setContentType("application/json");
response.setCharacterEncoding("utf-8");
try {
String type = request.getHeader("Content-Type");
SpanBytesDecoder decoder = type != null && type.contains("/x-thrift")
? SpanBytesDecoder.THRIFT
: SpanBytesDecoder.JSON_V1;
SpanProcessor processor = new SpanProcessor();
processor.convert(config, decoder, request);
response.setStatus(202);
} catch (Exception e) {
response.setStatus(500);
logger.error(e.getMessage(), e);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.handler;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.codec.SpanBytesDecoder;
/**
* @author wusheng
*/
public class SpanV2JettyHandler extends JettyHandler {
private static final Logger logger = LoggerFactory.getLogger(SpanV2JettyHandler.class);
private ZipkinReceiverConfig config;
public SpanV2JettyHandler(ZipkinReceiverConfig config) {
this.config = config;
}
@Override
public String pathSpec() {
return "/api/v2/spans";
}
@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) {
response.setContentType("application/json");
response.setCharacterEncoding("utf-8");
try {
String type = request.getHeader("Content-Type");
SpanBytesDecoder decoder = type != null && type.contains("/x-protobuf")
? SpanBytesDecoder.PROTO3
: SpanBytesDecoder.JSON_V2;
SpanProcessor processor = new SpanProcessor();
processor.convert(config, decoder, request);
response.setStatus(202);
} catch (Exception e) {
response.setStatus(500);
logger.error(e.getMessage(), e);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.transform;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.util.Strings;
import org.apache.skywalking.apm.network.language.agent.KeyWithStringValue;
import org.apache.skywalking.apm.network.language.agent.LogMessage;
import org.apache.skywalking.apm.network.language.agent.RefType;
import org.apache.skywalking.apm.network.language.agent.SpanObject;
import org.apache.skywalking.apm.network.language.agent.SpanType;
import org.apache.skywalking.apm.network.language.agent.TraceSegmentObject;
import org.apache.skywalking.apm.network.language.agent.TraceSegmentReference;
import org.apache.skywalking.apm.network.language.agent.UniqueId;
import org.apache.skywalking.oap.server.library.util.StringUtils;
import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinTraceOSInfoBuilder;
import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
import org.eclipse.jetty.util.StringUtil;
import zipkin2.Endpoint;
import zipkin2.Span;
/**
* @author wusheng
*/
public class SegmentBuilder {
private Context context;
private LinkedList<Segment> segments;
private Map<String, ClientSideSpan> clientPartSpan;
private SegmentBuilder() {
segments = new LinkedList<>();
context = new Context();
clientPartSpan = new HashMap<>();
}
public static SkyWalkingTrace build(List<Span> traceSpans) throws Exception {
SegmentBuilder builder = new SegmentBuilder();
// This map groups the spans by their parent id, in order to assist to build tree.
// key: parentId
// value: span
Map<String, List<Span>> childSpanMap = new HashMap<>();
AtomicReference<Span> root = new AtomicReference<>();
traceSpans.forEach(span -> {
if (span.parentId() == null) {
root.set(span);
}
List<Span> spanList = childSpanMap.get(span.parentId());
if (spanList == null) {
spanList = new LinkedList<>();
spanList.add(span);
childSpanMap.put(span.parentId(), spanList);
} else {
spanList.add(span);
}
});
Span rootSpan = root.get();
long timestamp = 0;
if (rootSpan != null) {
String applicationCode = rootSpan.localServiceName();
// If root span doesn't include applicationCode, a.k.a local service name,
// Segment can't be built
// Ignore the whole trace.
// :P Hope anyone could provide better solution.
// Wu Sheng.
if (StringUtils.isNotEmpty(applicationCode)) {
timestamp = rootSpan.timestampAsLong();
builder.context.addApp(applicationCode, rootSpan.timestampAsLong() / 1000);
SpanObject.Builder rootSpanBuilder = builder.initSpan(null, null, rootSpan, true);
builder.context.currentSegment().addSpan(rootSpanBuilder);
builder.scanSpansFromRoot(rootSpanBuilder, rootSpan, childSpanMap);
builder.segments.add(builder.context.removeApp());
}
}
List<TraceSegmentObject.Builder> segmentBuilders = new LinkedList<>();
// microseconds -> million seconds
long finalTimestamp = timestamp / 1000;
builder.segments.forEach(segment -> {
TraceSegmentObject.Builder traceSegmentBuilder = segment.freeze();
segmentBuilders.add(traceSegmentBuilder);
CoreRegisterLinker.getServiceInventoryRegister().heartbeat(traceSegmentBuilder.getApplicationId(), finalTimestamp);
CoreRegisterLinker.getServiceInstanceInventoryRegister().heartbeat(traceSegmentBuilder.getApplicationInstanceId(), finalTimestamp);
});
return new SkyWalkingTrace(builder.generateTraceOrSegmentId(), segmentBuilders);
}
private void scanSpansFromRoot(SpanObject.Builder parentSegmentSpan, Span parent,
Map<String, List<Span>> childSpanMap) throws Exception {
String parentId = parent.id();
// get child spans by parent span id
List<Span> spanList = childSpanMap.get(parentId);
if (spanList == null) {
return;
}
for (Span childSpan : spanList) {
String localServiceName = childSpan.localServiceName();
boolean isNewApp = false;
if (StringUtil.isNotBlank(localServiceName)) {
if (context.isAppChanged(localServiceName)) {
isNewApp = true;
}
}
try {
if (isNewApp) {
context.addApp(localServiceName, childSpan.timestampAsLong() / 1000);
}
SpanObject.Builder childSpanBuilder = initSpan(parentSegmentSpan, parent, childSpan, isNewApp);
context.currentSegment().addSpan(childSpanBuilder);
scanSpansFromRoot(childSpanBuilder, childSpan, childSpanMap);
} finally {
if (isNewApp) {
segments.add(context.removeApp());
}
}
}
}
private SpanObject.Builder initSpan(SpanObject.Builder parentSegmentSpan, Span parentSpan, Span span,
boolean isSegmentRoot) {
SpanObject.Builder spanBuilder = SpanObject.newBuilder();
spanBuilder.setSpanId(context.currentIDs().nextSpanId());
if (isSegmentRoot) {
// spanId = -1, means no parent span
// spanId is considered unique, and from a positive sequence in each segment.
spanBuilder.setParentSpanId(-1);
}
if (!isSegmentRoot && parentSegmentSpan != null) {
spanBuilder.setParentSpanId(parentSegmentSpan.getSpanId());
}
Span.Kind kind = span.kind();
String opName = Strings.isBlank(span.name()) ? "-" : span.name();
spanBuilder.setOperationName(opName);
ClientSideSpan clientSideSpan;
switch (kind) {
case CLIENT:
spanBuilder.setSpanType(SpanType.Exit);
String peer = getPeer(parentSpan, span);
if (peer != null) {
spanBuilder.setPeer(peer);
}
clientSideSpan = new ClientSideSpan(span, spanBuilder);
clientPartSpan.put(span.id(), clientSideSpan);
break;
case SERVER:
spanBuilder.setSpanType(SpanType.Entry);
this.buildRef(spanBuilder, span, parentSegmentSpan, parentSpan);
break;
case CONSUMER:
spanBuilder.setSpanType(SpanType.Entry);
this.buildRef(spanBuilder, span, parentSegmentSpan, parentSpan);
break;
case PRODUCER:
spanBuilder.setSpanType(SpanType.Exit);
peer = getPeer(parentSpan, span);
if (peer != null) {
spanBuilder.setPeer(peer);
}
clientSideSpan = new ClientSideSpan(span, spanBuilder);
clientPartSpan.put(span.id(), clientSideSpan);
break;
default:
spanBuilder.setSpanType(SpanType.Local);
}
// microseconds in Zipkin -> milliseconds in SkyWalking
long startTime = span.timestamp() / 1000;
// Some implement of zipkin client not include duration field in its report
// package when duration's value be 0ms, Causing a null pointer exception here.
Long durationObj = span.duration();
long duration = (durationObj == null) ? 0 : durationObj.longValue() / 1000;
spanBuilder.setStartTime(startTime);
spanBuilder.setEndTime(startTime + duration);
span.tags().forEach((tagKey, tagValue) -> spanBuilder.addTags(
KeyWithStringValue.newBuilder().setKey(tagKey).setValue(tagValue).build())
);
span.annotations().forEach(annotation ->
spanBuilder.addLogs(LogMessage.newBuilder().setTime(annotation.timestamp() / 1000).addData(
KeyWithStringValue.newBuilder().setKey("zipkin.annotation").setValue(annotation.value()).build()
))
);
return spanBuilder;
}
private void buildRef(SpanObject.Builder spanBuilder, Span span, SpanObject.Builder parentSegmentSpan,
Span parentSpan) {
Segment parentSegment = context.parentSegment();
if (parentSegment == null) {
return;
}
Segment rootSegment = context.rootSegment();
if (rootSegment == null) {
return;
}
if (span.shared() != null && span.shared()) {
// using same span id in client and server for RPC
// SkyWalking will build both sides of span
ClientSideSpan clientSideSpan = clientPartSpan.get(span.id());
if (clientSideSpan != null) {
// For the root span, there may be no ref, because of no parent.
parentSegmentSpan = clientSideSpan.getBuilder();
parentSpan = clientSideSpan.getSpan();
}
}
String peer = getPeer(parentSpan, span);
if (StringUtil.isBlank(peer)) {
//The IP is the most important for building the ref at both sides.
return;
}
TraceSegmentReference.Builder refBuilder = TraceSegmentReference.newBuilder();
refBuilder.setEntryApplicationInstanceId(rootSegment.builder().getApplicationInstanceId());
int serviceId = rootSegment.getEntryServiceId();
if (serviceId == 0) {
refBuilder.setEntryServiceName(rootSegment.getEntryServiceName());
} else {
refBuilder.setEntryServiceId(serviceId);
}
refBuilder.setEntryApplicationInstanceId(rootSegment.builder().getApplicationInstanceId());
// parent ref info
refBuilder.setNetworkAddress(peer);
parentSegmentSpan.setPeer(refBuilder.getNetworkAddress());
refBuilder.setParentApplicationInstanceId(parentSegment.builder().getApplicationInstanceId());
refBuilder.setParentSpanId(parentSegmentSpan.getSpanId());
refBuilder.setParentTraceSegmentId(parentSegment.builder().getTraceSegmentId());
int parentServiceId = parentSegment.getEntryServiceId();
if (parentServiceId == 0) {
refBuilder.setParentServiceName(parentSegment.getEntryServiceName());
} else {
refBuilder.setParentServiceId(parentServiceId);
}
refBuilder.setRefType(RefType.CrossProcess);
spanBuilder.addRefs(refBuilder);
}
private String getPeer(Span parentSpan, Span childSpan) {
String peer;
Endpoint serverEndpoint = childSpan == null ? null : childSpan.localEndpoint();
peer = endpoint2Peer(serverEndpoint);
if (peer == null) {
Endpoint clientEndpoint = parentSpan == null ? null : parentSpan.remoteEndpoint();
peer = endpoint2Peer(clientEndpoint);
}
return peer;
}
private String endpoint2Peer(Endpoint endpoint) {
String ip = null;
Integer port = 0;
if (endpoint != null) {
if (StringUtils.isNotEmpty(endpoint.ipv4())) {
ip = endpoint.ipv4();
port = endpoint.port();
} else if (StringUtils.isNotEmpty(endpoint.ipv6())) {
ip = endpoint.ipv6();
port = endpoint.port();
}
}
if (ip == null) {
return null;
} else {
return port == null || port == 0 ? ip : ip + ":" + port;
}
}
/**
* Context holds the values in build process.
*/
private class Context {
private LinkedList<Segment> segmentsStack = new LinkedList<>();
private boolean isAppChanged(String applicationCode) {
return StringUtils.isNotEmpty(applicationCode) && !applicationCode.equals(currentIDs().applicationCode);
}
private Segment addApp(String applicationCode, long registerTime) throws Exception {
int serviceId = waitForExchange(() ->
CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(applicationCode),
10
);
int serviceInstanceId = waitForExchange(() ->
CoreRegisterLinker.getServiceInstanceInventoryRegister().getOrCreate(serviceId, applicationCode, applicationCode,
registerTime, ZipkinTraceOSInfoBuilder.getOSInfoForZipkin(applicationCode)),
10
);
Segment segment = new Segment(applicationCode, serviceId, serviceInstanceId);
segmentsStack.add(segment);
return segment;
}
private IDCollection currentIDs() {
return segmentsStack.getLast().ids;
}
private Segment currentSegment() {
return segmentsStack.getLast();
}
private Segment parentSegment() {
if (segmentsStack.size() < 2) {
return null;
} else {
return segmentsStack.get(segmentsStack.size() - 2);
}
}
private Segment rootSegment() {
if (segmentsStack.size() < 2) {
return null;
} else {
return segmentsStack.getFirst();
}
}
private Segment removeApp() {
return segmentsStack.removeLast();
}
private int waitForExchange(Callable<Integer> callable, int retry) throws Exception {
for (int i = 0; i < retry; i++) {
Integer id = callable.call();
if (id == 0) {
Thread.sleep(1000L);
} else {
return id;
}
}
throw new TimeoutException("ID exchange costs more than expected.");
}
}
private class Segment {
private TraceSegmentObject.Builder segmentBuilder;
private IDCollection ids;
private int entryServiceId = 0;
private String entryServiceName = null;
private List<SpanObject.Builder> spans;
private long endTime = 0;
private Segment(String applicationCode, int serviceId, int serviceInstanceId) {
ids = new IDCollection(applicationCode, serviceId, serviceInstanceId);
spans = new LinkedList<>();
segmentBuilder = TraceSegmentObject.newBuilder();
segmentBuilder.setApplicationId(serviceId);
segmentBuilder.setApplicationInstanceId(serviceInstanceId);
segmentBuilder.setTraceSegmentId(generateTraceOrSegmentId());
}
private TraceSegmentObject.Builder builder() {
return segmentBuilder;
}
private void addSpan(SpanObject.Builder spanBuilder) {
String operationName = spanBuilder.getOperationName();
if (entryServiceId == 0 && StringUtils.isNotEmpty(operationName)) {
if (SpanType.Entry == spanBuilder.getSpanType()) {
if (StringUtils.isNotEmpty(operationName)) {
entryServiceName = operationName;
} else {
entryServiceId = spanBuilder.getOperationNameId();
}
}
}
// init by root span
if (spanBuilder.getSpanId() == 1 && entryServiceId == 0) {
if (StringUtils.isNotEmpty(operationName)) {
entryServiceName = operationName;
} else {
entryServiceId = spanBuilder.getOperationNameId();
}
}
spans.add(spanBuilder);
if (spanBuilder.getEndTime() > endTime) {
endTime = spanBuilder.getEndTime();
}
}
public int getEntryServiceId() {
return entryServiceId;
}
public String getEntryServiceName() {
return entryServiceName;
}
private IDCollection ids() {
return ids;
}
public TraceSegmentObject.Builder freeze() {
for (SpanObject.Builder span : spans) {
segmentBuilder.addSpans(span);
}
return segmentBuilder;
}
public long getEndTime() {
return endTime;
}
}
private class IDCollection {
private String applicationCode;
private int appId;
private int instanceId;
private int spanIdSeq;
private IDCollection(String applicationCode, int appId, int instanceId) {
this.applicationCode = applicationCode;
this.appId = appId;
this.instanceId = instanceId;
this.spanIdSeq = 0;
}
private int nextSpanId() {
return spanIdSeq++;
}
}
private UniqueId generateTraceOrSegmentId() {
return UniqueId.newBuilder()
.addIdParts(ThreadLocalRandom.current().nextLong())
.addIdParts(ThreadLocalRandom.current().nextLong())
.addIdParts(ThreadLocalRandom.current().nextLong())
.build();
}
private class ClientSideSpan {
private Span span;
private SpanObject.Builder builder;
public ClientSideSpan(Span span, SpanObject.Builder builder) {
this.span = span;
this.builder = builder;
}
public Span getSpan() {
return span;
}
public SpanObject.Builder getBuilder() {
return builder;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.transform;
import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
public interface SegmentListener {
void notify(SkyWalkingTrace trace);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.transform;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
import org.apache.skywalking.oap.server.receiver.zipkin.data.ZipkinTrace;
import zipkin2.Span;
/**
* @author wusheng
*/
public class Zipkin2SkyWalkingTransfer {
public static Zipkin2SkyWalkingTransfer INSTANCE = new Zipkin2SkyWalkingTransfer();
private List<SegmentListener> listeners = new LinkedList<>();
private Zipkin2SkyWalkingTransfer() {
}
public void addListener(SegmentListener listener) {
listeners.add(listener);
}
public void transfer(ZipkinTrace trace) throws Exception {
List<Span> traceSpans = trace.getSpans();
if (traceSpans.size() > 0) {
SkyWalkingTrace skyWalkingTrace = SegmentBuilder.build(traceSpans);
listeners.forEach(listener ->
listener.notify(skyWalkingTrace)
);
}
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverProvider
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.transform;
import org.apache.skywalking.apm.network.language.agent.SpanObject;
import org.apache.skywalking.apm.network.language.agent.SpanType;
import org.apache.skywalking.apm.network.language.agent.TraceSegmentObject;
import org.apache.skywalking.apm.network.language.agent.TraceSegmentReference;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker;
import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
import org.apache.skywalking.oap.server.receiver.zipkin.data.ZipkinTrace;
import org.junit.Assert;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* @author wusheng
*/
public class SpringSleuthSegmentBuilderTest implements SegmentListener {
private Map<String, Integer> applicationInstRegister = new HashMap<>();
private Map<String, Integer> applicationRegister = new HashMap<>();
private int appIdSeg = 1;
private int appInstIdSeq = 1;
@Test
public void testTransform() throws Exception {
IServiceInventoryRegister applicationIDService = new IServiceInventoryRegister() {
@Override public int getOrCreate(String serviceName) {
String key = "AppCode:" + serviceName;
if (applicationRegister.containsKey(key)) {
return applicationRegister.get(key);
} else {
int id = appIdSeg++;
applicationRegister.put(key, id);
return id;
}
}
@Override public int getOrCreate(int addressId, String serviceName) {
String key = "Address:" + serviceName;
if (applicationRegister.containsKey(key)) {
return applicationRegister.get(key);
} else {
int id = appIdSeg++;
applicationRegister.put(key, id);
return id;
}
}
@Override public void heartbeat(int serviceId, long heartBeatTime) {
}
@Override public void updateMapping(int serviceId, int mappingServiceId) {
}
};
IServiceInstanceInventoryRegister instanceIDService = new IServiceInstanceInventoryRegister() {
@Override public int getOrCreate(int serviceId, String serviceInstanceName, String uuid, long registerTime,
ServiceInstanceInventory.AgentOsInfo osInfo) {
String key = "AppCode:" + serviceId + ",UUID:" + uuid;
if (applicationInstRegister.containsKey(key)) {
return applicationInstRegister.get(key);
} else {
int id = appInstIdSeq++;
applicationInstRegister.put(key, id);
return id;
}
}
@Override public int getOrCreate(int serviceId, int addressId, long registerTime) {
String key = "VitualAppCode:" + serviceId + ",address:" + addressId;
if (applicationInstRegister.containsKey(key)) {
return applicationInstRegister.get(key);
} else {
int id = appInstIdSeq++;
applicationInstRegister.put(key, id);
return id;
}
}
@Override public void heartbeat(int serviceInstanceId, long heartBeatTime) {
}
};
Whitebox.setInternalState(CoreRegisterLinker.class, "SERVICE_INVENTORY_REGISTER", applicationIDService);
Whitebox.setInternalState(CoreRegisterLinker.class, "SERVICE_INSTANCE_INVENTORY_REGISTER", instanceIDService);
Zipkin2SkyWalkingTransfer.INSTANCE.addListener(this);
List<Span> spanList = buildSpringSleuthExampleTrace();
Assert.assertEquals(3, spanList.size());
ZipkinTrace trace = new ZipkinTrace();
spanList.forEach(span -> trace.addSpan(span));
Zipkin2SkyWalkingTransfer.INSTANCE.transfer(trace);
}
private List<Span> buildSpringSleuthExampleTrace() throws UnsupportedEncodingException {
List<Span> spans = new LinkedList<>();
String span = "{\"traceId\":\"5b0e64354eea4fa71a8a1b5bdd791b8a\",\"id\":\"1a8a1b5bdd791b8a\",\"kind\":\"SERVER\",\"name\":\"get /\",\"timestamp\":1527669813700123,\"duration\":11295,\"localEndpoint\":{\"serviceName\":\"frontend\",\"ipv4\":\"192.168.72.220\"},\"remoteEndpoint\":{\"ipv6\":\"::1\",\"port\":55146},\"tags\":{\"http.method\":\"GET\",\"http.path\":\"/\",\"mvc.controller.class\":\"Frontend\",\"mvc.controller.method\":\"callBackend\"}}";
spans.add(SpanBytesDecoder.JSON_V2.decodeOne(span.getBytes("UTF-8")));
span = "{\"traceId\":\"5b0e64354eea4fa71a8a1b5bdd791b8a\",\"parentId\":\"1a8a1b5bdd791b8a\",\"id\":\"d7d5b93dcda767c8\",\"kind\":\"CLIENT\",\"name\":\"get\",\"timestamp\":1527669813702456,\"duration\":6672,\"localEndpoint\":{\"serviceName\":\"frontend\",\"ipv4\":\"192.168.72.220\"},\"tags\":{\"http.method\":\"GET\",\"http.path\":\"/api\"}}";
spans.add(SpanBytesDecoder.JSON_V2.decodeOne(span.getBytes("UTF-8")));
span = "{\"traceId\":\"5b0e64354eea4fa71a8a1b5bdd791b8a\",\"parentId\":\"1a8a1b5bdd791b8a\",\"id\":\"d7d5b93dcda767c8\",\"kind\":\"SERVER\",\"name\":\"get /api\",\"timestamp\":1527669813705106,\"duration\":4802,\"localEndpoint\":{\"serviceName\":\"backend\",\"ipv4\":\"192.168.72.220\"},\"remoteEndpoint\":{\"ipv4\":\"127.0.0.1\",\"port\":55147},\"tags\":{\"http.method\":\"GET\",\"http.path\":\"/api\",\"mvc.controller.class\":\"Backend\",\"mvc.controller.method\":\"printDate\"},\"shared\":true}";
spans.add(SpanBytesDecoder.JSON_V2.decodeOne(span.getBytes("UTF-8")));
return SpanBytesDecoder.JSON_V2.decodeList(spans.toString().getBytes("UTF-8"));
}
@Override
public void notify(SkyWalkingTrace trace) {
List<TraceSegmentObject.Builder> segments = trace.getSegmentList();
Assert.assertEquals(2, segments.size());
TraceSegmentObject.Builder builder = segments.get(0);
TraceSegmentObject.Builder builder1 = segments.get(1);
TraceSegmentObject.Builder front, end;
if (builder.getApplicationId() == applicationRegister.get("AppCode:frontend")) {
front = builder;
end = builder1;
Assert.assertEquals(applicationRegister.get("AppCode:backend").longValue(), builder1.getApplicationId());
} else if (builder.getApplicationId() == applicationRegister.get("AppCode:backend")) {
end = builder;
front = builder1;
Assert.assertEquals(applicationRegister.get("AppCode:frontend").longValue(), builder1.getApplicationId());
} else {
Assert.fail("Can't find frontend and backend applications. ");
return;
}
Assert.assertEquals(2, front.getSpansCount());
Assert.assertEquals(1, end.getSpansCount());
front.getSpansList().forEach(spanObject -> {
if (spanObject.getSpanId() == 0) {
// span id = 1, means incoming http of frontend
Assert.assertEquals(SpanType.Entry, spanObject.getSpanType());
Assert.assertEquals("get /", spanObject.getOperationName());
Assert.assertEquals(0, spanObject.getSpanId());
Assert.assertEquals(-1, spanObject.getParentSpanId());
} else if (spanObject.getSpanId() == 1) {
Assert.assertEquals("192.168.72.220", spanObject.getPeer());
Assert.assertEquals(SpanType.Exit, spanObject.getSpanType());
Assert.assertEquals(1, spanObject.getSpanId());
Assert.assertEquals(0, spanObject.getParentSpanId());
} else {
Assert.fail("Only two spans expected");
}
Assert.assertTrue(spanObject.getTagsCount() > 0);
});
SpanObject spanObject = end.getSpans(0);
Assert.assertEquals(1, spanObject.getRefsCount());
TraceSegmentReference spanObjectRef = spanObject.getRefs(0);
Assert.assertEquals("get", spanObjectRef.getEntryServiceName());
Assert.assertEquals("get", spanObjectRef.getParentServiceName());
//Assert.assertEquals("192.168.72.220", spanObjectRef.getNetworkAddress());
Assert.assertEquals(1, spanObjectRef.getParentSpanId());
Assert.assertEquals(front.getTraceSegmentId(), spanObjectRef.getParentTraceSegmentId());
Assert.assertTrue(spanObject.getTagsCount() > 0);
Assert.assertEquals("get /api", spanObject.getOperationName());
Assert.assertEquals(0, spanObject.getSpanId());
Assert.assertEquals(-1, spanObject.getParentSpanId());
Assert.assertEquals(SpanType.Entry, spanObject.getSpanType());
}
}
\ No newline at end of file
...@@ -78,6 +78,11 @@ ...@@ -78,6 +78,11 @@
<artifactId>skywalking-trace-receiver-plugin</artifactId> <artifactId>skywalking-trace-receiver-plugin</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>zipkin-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- receiver module --> <!-- receiver module -->
<!-- storage module --> <!-- storage module -->
......
...@@ -77,6 +77,11 @@ service-mesh: ...@@ -77,6 +77,11 @@ service-mesh:
bufferFileCleanWhenRestart: false bufferFileCleanWhenRestart: false
istio-telemetry: istio-telemetry:
default: default:
#receiver_zipkin:
# default:
# host: 0.0.0.0
# port: 9411
# contextPath: /
query: query:
graphql: graphql:
path: /graphql path: /graphql
......
...@@ -77,6 +77,11 @@ service-mesh: ...@@ -77,6 +77,11 @@ service-mesh:
bufferFileCleanWhenRestart: false bufferFileCleanWhenRestart: false
istio-telemetry: istio-telemetry:
default: default:
receiver_zipkin:
default:
host: 0.0.0.0
port: 9411
contextPath: /
query: query:
graphql: graphql:
path: /graphql path: /graphql
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册