未验证 提交 e346b618 编写于 作者: W Wan Kai 提交者: GitHub

Support Zipkin kafka collector. (#9252)

上级 bc462f07
......@@ -503,6 +503,8 @@ jobs:
config: test/e2e-v2/cases/zipkin/h2/e2e.yaml
- name: Zipkin Postgres
config: test/e2e-v2/cases/zipkin/postgres/e2e.yaml
- name: Zipkin Kafka
config: test/e2e-v2/cases/zipkin/kafka/e2e.yaml
steps:
- uses: actions/checkout@v3
with:
......
......@@ -10,6 +10,7 @@
* ElasticSearch: scroll id should be updated when scrolling as it may change.
* Mesh: fix only last rule works when multiple rules are defined in metadata-service-mapping.yaml.
* Support sending alarm messages to PagerDuty.
* Support Zipkin kafka collector.
#### UI
......
## Zipkin receiver
The Zipkin receiver makes the OAP server work as an alternative Zipkin server implementation for collecting traces.
It supports Zipkin v1/v2 formats through the HTTP service.
It supports Zipkin v1/v2 formats through the HTTP collector and Kafka collector.
Use the following config to activate it.
Set `enableHttpCollector` to enable HTTP collector and `enableKafkaCollector` to enable Kafka collector.
```yaml
receiver-zipkin:
selector: ${SW_RECEIVER_ZIPKIN:default}
default:
# For HTTP server
searchableTracesTags: ${SW_ZIPKIN_SEARCHABLE_TAG_KEYS:http.method}
# The sample rate precision is 1/10000, should be between 0 and 10000
sampleRate: ${SW_ZIPKIN_SAMPLE_RATE:10000}
## The below configs are for OAP collect zipkin trace from HTTP
enableHttpCollector: ${SW_ZIPKIN_HTTP_COLLECTOR_ENABLED:true}
restHost: ${SW_RECEIVER_ZIPKIN_REST_HOST:0.0.0.0}
restPort: ${SW_RECEIVER_ZIPKIN_REST_PORT:9411}
restContextPath: ${SW_RECEIVER_ZIPKIN_REST_CONTEXT_PATH:/}
restMaxThreads: ${SW_RECEIVER_ZIPKIN_REST_MAX_THREADS:200}
restIdleTimeOut: ${SW_RECEIVER_ZIPKIN_REST_IDLE_TIMEOUT:30000}
restAcceptQueueSize: ${SW_RECEIVER_ZIPKIN_REST_QUEUE_SIZE:0}
searchableTracesTags: ${SW_ZIPKIN_SEARCHABLE_TAG_KEYS:http.method}
# The sample rate precision is 1/10000, should be between 0 and 10000
sampleRate: ${SW_ZIPKIN_SAMPLE_RATE:10000}
## The below configs are for OAP collect zipkin trace from kafka
enableKafkaCollector: ${SW_ZIPKIN_KAFKA_COLLECTOR_ENABLED:true}
kafkaBootstrapServers: ${SW_ZIPKIN_KAFKA_SERVERS:localhost:9092}
kafkaGroupId: ${SW_ZIPKIN_KAFKA_Group_Id:zipkin}
kafkaTopic: ${SW_ZIPKIN_KAFKA_TOPIC:zipkin}
# Kafka consumer config, JSON format as Properties. If it contains the same key with above, would override.
kafkaConsumerConfig: ${SW_ZIPKIN_KAFKA_CONSUMER_CONFIG:"{\"auto.offset.reset\":\"earliest\",\"enable.auto.commit\":true}"}
# The Count of the topic consumers
kafkaConsumers: ${SW_ZIPKIN_KAFKA_CONSUMERS:1}
kafkaHandlerThreadPoolSize: ${SW_ZIPKIN_KAFKA_HANDLER_THREAD_POOL_SIZE:-1}
kafkaHandlerThreadPoolQueueSize: ${SW_ZIPKIN_KAFKA_HANDLER_THREAD_POOL_QUEUE_SIZE:-1}
```
## Zipkin query
......
......@@ -155,7 +155,7 @@ public class ZipkinSpanRecord extends Record {
@Override
public String id() {
return traceId + "-" + spanId;
return spanId + Const.LINE + kind;
}
public static class Builder implements StorageBuilder<ZipkinSpanRecord> {
......
......@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
import org.apache.skywalking.oap.server.core.source.Source;
......@@ -37,7 +38,7 @@ public class ZipkinSpan extends Source {
@Override
public String getEntityId() {
return traceId + "-" + spanId;
return spanId + Const.LINE + kind;
}
@Setter
......
......@@ -33,5 +33,10 @@
<groupId>io.zipkin.zipkin2</groupId>
<artifactId>zipkin</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-clients.version}</version>
</dependency>
</dependencies>
</project>
......@@ -26,6 +26,8 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig;
@Setter
@Getter
public class ZipkinReceiverConfig extends ModuleConfig {
// HTTP collector config
private boolean enableHttpCollector = true;
private String restHost;
private int restPort;
private String restContextPath;
......@@ -39,5 +41,28 @@ public class ZipkinReceiverConfig extends ModuleConfig {
Const.COMMA,
"http.method"
);
// kafka collector config
private boolean enableKafkaCollector = false;
/**
* A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
*/
private String kafkaBootstrapServers;
private String kafkaGroupId = "zipkin";
private String kafkaTopic = "zipkin";
/**
* Kafka consumer config,JSON format as Properties. If it contains the same key with above, would override.
*/
private String kafkaConsumerConfig = "{\"auto.offset.reset\":\"earliest\",\"enable.auto.commit\":true}";
private int kafkaConsumers = 1;
private int kafkaHandlerThreadPoolSize;
private int kafkaHandlerThreadPoolQueueSize;
}
......@@ -29,12 +29,14 @@ import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedExcepti
import org.apache.skywalking.oap.server.library.server.http.HTTPServer;
import org.apache.skywalking.oap.server.library.server.http.HTTPServerConfig;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.ZipkinSpanHTTPHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.kafka.KafkaHandler;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
public class ZipkinReceiverProvider extends ModuleProvider {
public static final String NAME = "default";
private final ZipkinReceiverConfig config;
private HTTPServer httpServer;
private KafkaHandler kafkaHandler;
public ZipkinReceiverProvider() {
config = new ZipkinReceiverConfig();
......@@ -57,7 +59,6 @@ public class ZipkinReceiverProvider extends ModuleProvider {
@Override
public void prepare() throws ServiceNotProvidedException {
}
@Override
......@@ -66,27 +67,40 @@ public class ZipkinReceiverProvider extends ModuleProvider {
throw new IllegalArgumentException(
"sampleRate: " + config.getSampleRate() + ", should be between 0 and 10000");
}
HTTPServerConfig httpServerConfig = HTTPServerConfig.builder()
.host(config.getRestHost())
.port(config.getRestPort())
.contextPath(config.getRestContextPath())
.idleTimeOut(config.getRestIdleTimeOut())
.maxThreads(config.getRestMaxThreads())
.acceptQueueSize(config.getRestAcceptQueueSize())
.build();
httpServer = new HTTPServer(httpServerConfig);
httpServer.initialize();
httpServer.addHandler(
new ZipkinSpanHTTPHandler(config, getManager()),
Arrays.asList(HttpMethod.POST, HttpMethod.GET)
);
if (config.isEnableHttpCollector()) {
HTTPServerConfig httpServerConfig = HTTPServerConfig.builder()
.host(config.getRestHost())
.port(config.getRestPort())
.contextPath(config.getRestContextPath())
.idleTimeOut(config.getRestIdleTimeOut())
.maxThreads(config.getRestMaxThreads())
.acceptQueueSize(config.getRestAcceptQueueSize())
.build();
httpServer = new HTTPServer(httpServerConfig);
httpServer.initialize();
httpServer.addHandler(
new ZipkinSpanHTTPHandler(config, getManager()),
Arrays.asList(HttpMethod.POST, HttpMethod.GET)
);
}
if (config.isEnableKafkaCollector()) {
kafkaHandler = new KafkaHandler(config, getManager());
}
}
@Override
public void notifyAfterCompleted() {
httpServer.start();
public void notifyAfterCompleted() throws ModuleStartException {
if (config.isEnableHttpCollector()) {
httpServer.start();
}
if (config.isEnableKafkaCollector()) {
kafkaHandler.start();
}
}
@Override
......
......@@ -26,12 +26,8 @@ import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.annotation.ConsumesJson;
import com.linecorp.armeria.server.annotation.ConsumesProtobuf;
import com.linecorp.armeria.server.annotation.Post;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
......@@ -42,35 +38,26 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
import zipkin2.internal.HexCodec;
import static java.util.Objects.nonNull;
@Slf4j
public class ZipkinSpanHTTPHandler {
private final ZipkinReceiverConfig config;
private final SourceReceiver sourceReceiver;
private final NamingControl namingControl;
private final HistogramMetrics histogram;
private final CounterMetrics errorCounter;
private final long samplerBoundary;
private final SpanForward spanForward;
public ZipkinSpanHTTPHandler(ZipkinReceiverConfig config, ModuleManager manager) {
sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
namingControl = manager.find(CoreModule.NAME).provider().getService(NamingControl.class);
this.config = config;
float sampleRate = (float) config.getSampleRate() / 10000;
samplerBoundary = (long) (Long.MAX_VALUE * sampleRate);
this.spanForward = new SpanForward(config, manager);
MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
histogram = metricsCreator.createHistogramMetric(
"trace_in_latency", "The process latency of trace data",
new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin")
new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin-http")
);
errorCounter = metricsCreator.createCounter(
"trace_analysis_error_count", "The error number of trace analysis",
new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin")
new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin-http")
);
}
......@@ -115,8 +102,7 @@ public class ZipkinSpanHTTPHandler {
final HttpResponse response = HttpResponse.from(req.aggregate().thenApply(request -> {
final HttpData httpData = UnzippingBytesRequestConverter.convertRequest(ctx, request);
final List<Span> spanList = decoder.decodeList(httpData.byteBuf().nioBuffer());
final SpanForward forward = new SpanForward(namingControl, sourceReceiver, config);
forward.send(getSampledTraces(spanList));
spanForward.send(spanList);
return HttpResponse.of(HttpStatus.OK);
}));
response.whenComplete().handle((unused, throwable) -> {
......@@ -128,24 +114,4 @@ public class ZipkinSpanHTTPHandler {
});
return response;
}
private List<Span> getSampledTraces(List<Span> input) {
//100% sampleRate
if (config.getSampleRate() == 10000) {
return input;
}
List<Span> sampledTraces = new ArrayList<>(input.size());
for (Span span : input) {
if (Boolean.TRUE.equals(span.debug())) {
sampledTraces.add(span);
continue;
}
long traceId = HexCodec.lowerHexToUnsignedLong(span.traceId());
traceId = traceId == Long.MIN_VALUE ? Long.MAX_VALUE : Math.abs(traceId);
if (traceId <= samplerBoundary) {
sampledTraces.add(span);
}
}
return sampledTraces;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.kafka;
import com.google.gson.Gson;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.server.pool.CustomThreadFactory;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import zipkin2.Span;
import zipkin2.SpanBytesDecoderDetector;
import zipkin2.codec.BytesDecoder;
@Slf4j
public class KafkaHandler {
private final ZipkinReceiverConfig config;
private final SpanForward spanForward;
private final Properties properties;
private final ThreadPoolExecutor executor;
private final boolean enableKafkaMessageAutoCommit;
private final List<String> topics;
private final CounterMetrics msgDroppedIncr;
private final CounterMetrics errorCounter;
private final HistogramMetrics histogram;
public KafkaHandler(final ZipkinReceiverConfig config, ModuleManager manager) {
this.config = config;
this.spanForward = new SpanForward(config, manager);
properties = new Properties();
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, config.getKafkaGroupId());
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getKafkaBootstrapServers());
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
Gson gson = new Gson();
Properties override = gson.fromJson(config.getKafkaConsumerConfig(), Properties.class);
properties.putAll(override);
int threadPoolSize = Runtime.getRuntime().availableProcessors() * 2;
if (config.getKafkaHandlerThreadPoolSize() > 0) {
threadPoolSize = config.getKafkaHandlerThreadPoolSize();
}
int threadPoolQueueSize = 10000;
if (config.getKafkaHandlerThreadPoolQueueSize() > 0) {
threadPoolQueueSize = config.getKafkaHandlerThreadPoolQueueSize();
}
enableKafkaMessageAutoCommit = new ConsumerConfig(properties).getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
executor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(threadPoolQueueSize),
new CustomThreadFactory("Zipkin-Kafka-Consumer"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
topics = Arrays.asList(config.getKafkaTopic().split(","));
MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
histogram = metricsCreator.createHistogramMetric(
"trace_in_latency",
"The process latency of trace data",
new MetricsTag.Keys("protocol"),
new MetricsTag.Values("zipkin-kafka")
);
msgDroppedIncr = metricsCreator.createCounter(
"trace_dropped_count", "The dropped number of traces",
new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin-kafka"));
errorCounter = metricsCreator.createCounter(
"trace_analysis_error_count", "The error number of trace analysis",
new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin-kafka")
);
}
public void start() throws ModuleStartException {
for (int i = 0; i < config.getKafkaConsumers(); i++) {
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(topics);
consumer.seekToEnd(consumer.assignment());
executor.submit(() -> runTask(consumer));
}
}
private void runTask(final KafkaConsumer<byte[], byte[]> consumer) {
if (log.isDebugEnabled()) {
log.debug("Start Consume zipkin trace records from kafka.");
}
while (true) {
try {
ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(1000L));
if (log.isDebugEnabled()) {
log.debug(
"Consume zipkin trace records from kafka, records count:[{}].",
consumerRecords.count()
);
}
if (!consumerRecords.isEmpty()) {
for (final ConsumerRecord<byte[], byte[]> record : consumerRecords) {
final byte[] bytes = record.value();
//empty or illegal message
if (bytes.length < 2) {
msgDroppedIncr.inc();
continue;
}
executor.submit(() -> handleRecord(bytes));
}
if (!enableKafkaMessageAutoCommit) {
consumer.commitAsync();
}
}
} catch (Exception e) {
log.error("Kafka handle message error.", e);
errorCounter.inc();
}
}
}
private void handleRecord(byte[] bytes) {
try (HistogramMetrics.Timer ignored = histogram.createTimer()) {
BytesDecoder<Span> decoder = SpanBytesDecoderDetector.decoderForListMessage(bytes);
final List<Span> spanList = decoder.decodeList(bytes);
spanForward.send(spanList);
}
}
}
......@@ -19,11 +19,13 @@
package org.apache.skywalking.oap.server.receiver.zipkin.trace;
import com.google.gson.JsonObject;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType;
import org.apache.skywalking.oap.server.core.source.TagAutocomplete;
import org.apache.skywalking.oap.server.core.zipkin.source.ZipkinService;
......@@ -33,27 +35,36 @@ import org.apache.skywalking.oap.server.core.zipkin.source.ZipkinSpan;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import zipkin2.Annotation;
import zipkin2.Span;
import zipkin2.internal.HexCodec;
import zipkin2.internal.RecyclableBuffers;
public class SpanForward {
private final ZipkinReceiverConfig config;
private final NamingControl namingControl;
private final SourceReceiver receiver;
private final List<String> searchTagKeys;
private final long samplerBoundary;
public SpanForward(final NamingControl namingControl,
final SourceReceiver receiver,
final ZipkinReceiverConfig config) {
this.namingControl = namingControl;
this.receiver = receiver;
this.searchTagKeys = Arrays.asList(config.getSearchableTracesTags().split(Const.COMMA));
public SpanForward(final ZipkinReceiverConfig config, final ModuleManager manager) {
this.config = config;
this.namingControl = manager.find(CoreModule.NAME).provider().getService(NamingControl.class);
this.receiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
this.searchTagKeys = Arrays.asList(config.getSearchableTracesTags().split(Const.COMMA));
float sampleRate = (float) config.getSampleRate() / 10000;
samplerBoundary = (long) (Long.MAX_VALUE * sampleRate);
}
public void send(List<Span> spanList) {
spanList.forEach(span -> {
if (CollectionUtils.isEmpty(spanList)) {
return;
}
getSampledTraces(spanList).forEach(span -> {
ZipkinSpan zipkinSpan = new ZipkinSpan();
String serviceName = span.localServiceName();
if (StringUtil.isEmpty(serviceName)) {
......@@ -73,12 +84,14 @@ public class SpanForward {
if (localPort != null) {
zipkinSpan.setLocalEndpointPort(localPort);
}
zipkinSpan.setRemoteEndpointServiceName(namingControl.formatServiceName(span.remoteServiceName()));
zipkinSpan.setRemoteEndpointIPV4(span.remoteEndpoint().ipv4());
zipkinSpan.setRemoteEndpointIPV6(span.remoteEndpoint().ipv6());
Integer remotePort = span.remoteEndpoint().port();
if (remotePort != null) {
zipkinSpan.setRemoteEndpointPort(remotePort);
if (span.remoteEndpoint() != null) {
zipkinSpan.setRemoteEndpointServiceName(namingControl.formatServiceName(span.remoteServiceName()));
zipkinSpan.setRemoteEndpointIPV4(span.remoteEndpoint().ipv4());
zipkinSpan.setRemoteEndpointIPV6(span.remoteEndpoint().ipv6());
Integer remotePort = span.remoteEndpoint().port();
if (remotePort != null) {
zipkinSpan.setRemoteEndpointPort(remotePort);
}
}
zipkinSpan.setTimestamp(span.timestampAsLong());
zipkinSpan.setDebug(span.debug());
......@@ -159,4 +172,24 @@ public class SpanForward {
relation.setTimeBucket(minuteTimeBucket);
receiver.receive(relation);
}
private List<Span> getSampledTraces(List<Span> input) {
//100% sampleRate
if (config.getSampleRate() == 10000) {
return input;
}
List<Span> sampledTraces = new ArrayList<>(input.size());
for (Span span : input) {
if (Boolean.TRUE.equals(span.debug())) {
sampledTraces.add(span);
continue;
}
long traceId = HexCodec.lowerHexToUnsignedLong(span.traceId());
traceId = traceId == Long.MIN_VALUE ? Long.MAX_VALUE : Math.abs(traceId);
if (traceId <= samplerBoundary) {
sampledTraces.add(span);
}
}
return sampledTraces;
}
}
......@@ -342,16 +342,28 @@ receiver-otel:
receiver-zipkin:
selector: ${SW_RECEIVER_ZIPKIN:-}
default:
# For HTTP server
searchableTracesTags: ${SW_ZIPKIN_SEARCHABLE_TAG_KEYS:http.method}
# The sample rate precision is 1/10000, should be between 0 and 10000
sampleRate: ${SW_ZIPKIN_SAMPLE_RATE:10000}
## The below configs are for OAP collect zipkin trace from HTTP
enableHttpCollector: ${SW_ZIPKIN_HTTP_COLLECTOR_ENABLED:true}
restHost: ${SW_RECEIVER_ZIPKIN_REST_HOST:0.0.0.0}
restPort: ${SW_RECEIVER_ZIPKIN_REST_PORT:9411}
restContextPath: ${SW_RECEIVER_ZIPKIN_REST_CONTEXT_PATH:/}
restMaxThreads: ${SW_RECEIVER_ZIPKIN_REST_MAX_THREADS:200}
restIdleTimeOut: ${SW_RECEIVER_ZIPKIN_REST_IDLE_TIMEOUT:30000}
restAcceptQueueSize: ${SW_RECEIVER_ZIPKIN_REST_QUEUE_SIZE:0}
searchableTracesTags: ${SW_ZIPKIN_SEARCHABLE_TAG_KEYS:http.method}
# The sample rate precision is 1/10000, should be between 0 and 10000
sampleRate: ${SW_ZIPKIN_SAMPLE_RATE:10000}
## The below configs are for OAP collect zipkin trace from kafka
enableKafkaCollector: ${SW_ZIPKIN_KAFKA_COLLECTOR_ENABLED:false}
kafkaBootstrapServers: ${SW_ZIPKIN_KAFKA_SERVERS:localhost:9092}
kafkaGroupId: ${SW_ZIPKIN_KAFKA_GROUP_ID:zipkin}
kafkaTopic: ${SW_ZIPKIN_KAFKA_TOPIC:zipkin}
# Kafka consumer config, JSON format as Properties. If it contains the same key with above, would override.
kafkaConsumerConfig: ${SW_ZIPKIN_KAFKA_CONSUMER_CONFIG:"{\"auto.offset.reset\":\"earliest\",\"enable.auto.commit\":true}"}
# The Count of the topic consumers
kafkaConsumers: ${SW_ZIPKIN_KAFKA_CONSUMERS:1}
kafkaHandlerThreadPoolSize: ${SW_ZIPKIN_KAFKA_HANDLER_THREAD_POOL_SIZE:-1}
kafkaHandlerThreadPoolQueueSize: ${SW_ZIPKIN_KAFKA_HANDLER_THREAD_POOL_QUEUE_SIZE:-1}
receiver-browser:
selector: ${SW_RECEIVER_BROWSER:default}
......
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: '2.1'
services:
zookeeper:
image: zookeeper:3.4
hostname: zookeeper
expose:
- 2181
networks:
- e2e
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
healthcheck:
test: [ "CMD", "sh", "-c", "nc -nz 127.0.0.1 2181" ]
interval: 5s
timeout: 60s
retries: 120
broker-a:
image: bitnami/kafka:2.4.1
hostname: broker-a
expose:
- 9092
networks:
- e2e
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_BROKER_ID=10
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
zookeeper:
condition: service_healthy
healthcheck:
test: [ "CMD", "kafka-topics.sh", "--list", "--zookeeper", "zookeeper:2181" ]
interval: 5s
timeout: 60s
retries: 120
broker-b:
image: bitnami/kafka:2.4.1
hostname: broker-b
expose:
- 9092
networks:
- e2e
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_BROKER_ID=24
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
zookeeper:
condition: service_healthy
healthcheck:
test: [ "CMD", "kafka-topics.sh", "--list", "--zookeeper", "zookeeper:2181" ]
interval: 5s
timeout: 60s
retries: 120
oap:
extends:
file: ../../../script/docker-compose/base-compose.yml
service: oap
environment:
SW_QUERY_ZIPKIN: default
SW_RECEIVER_ZIPKIN: default
SW_ZIPKIN_KAFKA_SERVERS: broker-a:9092,broker-b:9092
SW_ZIPKIN_KAFKA_COLLECTOR_ENABLED: "true"
expose:
- 9411
ports:
- 9412:9412
depends_on:
broker-a:
condition: service_healthy
broker-b:
condition: service_healthy
networks:
- e2e
sender:
image: "eclipse-temurin:8-jre"
volumes:
- ./../../../java-test-service/e2e-mock-sender/target/e2e-mock-sender-2.0.0.jar:/e2e-mock-sender-2.0.0.jar
command: [ "java", "-jar", "/e2e-mock-sender-2.0.0.jar" ]
environment:
ZIPKIN_KAFKA_BOOTSTRAP_SERVERS: broker-a:9092,broker-b:9092
networks:
- e2e
ports:
- 9093
healthcheck:
test: ["CMD", "sh", "-c", "nc -nz 127.0.0.1 9093"]
interval: 5s
timeout: 60s
retries: 120
depends_on:
oap:
condition: service_healthy
broker-a:
condition: service_healthy
broker-b:
condition: service_healthy
networks:
e2e:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This file is used to show how to write configuration files and can be used to test.
setup:
env: compose
file: docker-compose.yml
timeout: 20m
init-system-environment: ../../../script/env
steps:
- name: set PATH
command: export PATH=/tmp/skywalking-infra-e2e/bin:$PATH
- name: install yq
command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
- name: install swctl
command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
trigger:
action: http
interval: 3s
times: 10
url: http://${sender_host}:${sender_9093}/sendZipkinTrace2Kafka
method: POST
verify:
# verify with retry strategy
retry:
# max retry count
count: 20
# the interval between two retries, in millisecond.
interval: 10s
cases:
- includes:
- ../zipkin-cases.yaml
......@@ -39,6 +39,15 @@
<artifactId>e2e-protocol</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.zipkin2</groupId>
<artifactId>zipkin</artifactId>
<version>2.23.16</version>
</dependency>
</dependencies>
<build>
<plugins>
......
......@@ -28,4 +28,7 @@ import org.springframework.context.annotation.Configuration;
public class E2EConfiguration {
private String oapHost;
private String oapGrpcPort;
private String zipkinKafkaBootstrapServers;
private String zipkinKafkaGroupId;
private String zipkinKafkaTopic;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.e2e.controller;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.skywalking.e2e.E2EConfiguration;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import zipkin2.Endpoint;
import zipkin2.Span;
import zipkin2.codec.SpanBytesEncoder;
@RestController
public class ZipkinKafkaProduceController {
private final KafkaProducer<byte[], byte[]> producer;
private final E2EConfiguration config;
public ZipkinKafkaProduceController(final E2EConfiguration config) {
this.config = config;
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getZipkinKafkaBootstrapServers());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, config.getZipkinKafkaGroupId());
producer = new KafkaProducer<>(properties, new ByteArraySerializer(), new ByteArraySerializer());
}
@PostMapping("/sendZipkinTrace2Kafka")
public String sendTrace() {
producer.send(new ProducerRecord<>(config.getZipkinKafkaTopic(), 0, null, SpanBytesEncoder.JSON_V2.encodeList(makeTrace())));
producer.flush();
return "Trace send success!";
}
private static List<Span> makeTrace() {
String traceId = generateHexId(16);
String span1Id = traceId;
String span2Id = generateHexId(16);
String span3Id = generateHexId(16);
List<Span> trace = new ArrayList<>();
trace.add(Span.newBuilder()
.traceId(traceId)
.id(span1Id)
.name("post /")
.kind(Span.Kind.SERVER)
.localEndpoint(Endpoint.newBuilder().serviceName("frontend").ip("192.168.0.1").build())
.remoteEndpoint(Endpoint.newBuilder().ip("127.0.0.1").port(63720).build())
.timestamp(System.currentTimeMillis() * 1000L + 1000L)
.duration(16683)
.addAnnotation(System.currentTimeMillis() * 1000L + 1100L, "wr")
.addAnnotation(System.currentTimeMillis() * 1000L + 1200L, "ws")
.putTag("http.method", "POST")
.putTag("http.path", "/")
.build());
trace.add(Span.newBuilder()
.traceId(traceId)
.parentId(span1Id)
.id(span2Id)
.name("get")
.kind(Span.Kind.CLIENT)
.localEndpoint(Endpoint.newBuilder().serviceName("frontend").ip("192.168.0.1").build())
.remoteEndpoint(Endpoint.newBuilder().serviceName("backend").ip("127.0.0.1").port(9000).build())
.timestamp(System.currentTimeMillis() * 1000L + 2000L)
.duration(15380)
.addAnnotation(System.currentTimeMillis() * 1000L + 2100L, "wr")
.addAnnotation(System.currentTimeMillis() * 1000L + 2600L, "ws")
.putTag("http.method", "GET")
.putTag("http.path", "/api")
.build());
trace.add(Span.newBuilder()
.traceId(traceId)
.parentId(span2Id)
.id(span3Id)
.name("get /api")
.kind(Span.Kind.SERVER)
.localEndpoint(Endpoint.newBuilder().serviceName("backend").ip("192.168.0.1").build())
.remoteEndpoint(Endpoint.newBuilder().ip("127.0.0.1").port(63722).build())
.timestamp(System.currentTimeMillis() * 1000L + 3000L)
.duration(1557)
.addAnnotation(System.currentTimeMillis() * 1000L + 3100L, "wr")
.addAnnotation(System.currentTimeMillis() * 1000L + 3300L, "ws")
.putTag("http.method", "GET")
.putTag("http.path", "/api")
.build());
return trace;
}
private static String generateHexId(int bound) {
Random r = new Random();
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < bound; i++) {
buffer.append(Integer.toHexString(r.nextInt(bound)));
}
return buffer.toString();
}
}
......@@ -22,4 +22,7 @@ spring:
e2e:
oap-host: ${OAP_HOST:127.0.0.1}
oap-grpc-port: ${OAP_GRPC-PORT:11800}
oap-grpc-port: ${OAP_GRPC_PORT:11800}
zipkin-kafka-bootstrapServers: ${ZIPKIN_KAFKA_BOOTSTRAP_SERVERS:127.0.0.1:9092}
zipkin-kafka-groupId: ${ZIPKIN_KAFKA_GROUPID:zipkin}
zipkin-kafka-topic: ${ZIPKIN_KAFKA_TOPIC:zipkin}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册