diff --git a/.github/workflows/skywalking.yaml b/.github/workflows/skywalking.yaml index 36d774e45e018b61d8f99fefc5c132d6f88be49b..82aaeabfc34442f389d4acb293caefbf3fdb03df 100644 --- a/.github/workflows/skywalking.yaml +++ b/.github/workflows/skywalking.yaml @@ -503,6 +503,8 @@ jobs: config: test/e2e-v2/cases/zipkin/h2/e2e.yaml - name: Zipkin Postgres config: test/e2e-v2/cases/zipkin/postgres/e2e.yaml + - name: Zipkin Kafka + config: test/e2e-v2/cases/zipkin/kafka/e2e.yaml steps: - uses: actions/checkout@v3 with: diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index 982f3dcd9cec164aa147b1438fbb33873fc1edd7..0c8442494db1b5d55f5300964d538f2e2ef87346 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -10,6 +10,7 @@ * ElasticSearch: scroll id should be updated when scrolling as it may change. * Mesh: fix only last rule works when multiple rules are defined in metadata-service-mapping.yaml. * Support sending alarm messages to PagerDuty. +* Support Zipkin kafka collector. #### UI diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md index 04ecd0ba745f50fb5db08406ff95cc33a430d978..85ec7ecec494bfb216f0edb5a93bc3a000fc0e70 100644 --- a/docs/en/setup/backend/configuration-vocabulary.md +++ b/docs/en/setup/backend/configuration-vocabulary.md @@ -35,8 +35,8 @@ The Configuration Vocabulary lists all available configurations provided by `app | - | - | searchableTracesTags | Defines a set of span tag keys which are searchable through GraphQL. Multiple values are separated by commas. | SW_SEARCHABLE_TAG_KEYS | http.method,http.status_code,rpc.status_code,db.type,db.instance,mq.queue,mq.topic,mq.broker | | - | - | searchableLogsTags | Defines a set of log tag keys which are searchable through GraphQL. Multiple values are separated by commas. | SW_SEARCHABLE_LOGS_TAG_KEYS | level | | - | - | searchableAlarmTags | Defines a set of alarm tag keys which are searchable through GraphQL. Multiple values are separated by commas. | SW_SEARCHABLE_ALARM_TAG_KEYS | level | -| - | - | autocompleteTagKeysQueryMaxSize | The max size of tags keys for autocomplete select. | SW_AUTOCOMPLETE_TAG_KEYS_QUERY_MAX_SIZE | 100 | -| - | - | autocompleteTagValuesQueryMaxSize | The max size of tags values for autocomplete select. | SW_AUTOCOMPLETE_TAG_VALUES_QUERY_MAX_SIZE | 100 | +| - | - | autocompleteTagKeysQueryMaxSize | The max size of tags keys for autocomplete select. | SW_AUTOCOMPLETE_TAG_KEYS_QUERY_MAX_SIZE | 100 | +| - | - | autocompleteTagValuesQueryMaxSize | The max size of tags values for autocomplete select. | SW_AUTOCOMPLETE_TAG_VALUES_QUERY_MAX_SIZE | 100 | | - | - | gRPCThreadPoolSize | Pool size of gRPC server. | SW_CORE_GRPC_THREAD_POOL_SIZE | CPU core * 4 | | - | - | gRPCThreadPoolQueueSize | Queue size of gRPC server. | SW_CORE_GRPC_POOL_QUEUE_SIZE | 10000 | | - | - | maxConcurrentCallsPerConnection | The maximum number of concurrent calls permitted for each incoming connection. Defaults to no limit. | SW_CORE_GRPC_MAX_CONCURRENT_CALL | - | @@ -193,14 +193,23 @@ The Configuration Vocabulary lists all available configurations provided by `app | - | - | enabledHandlers | Enabled handlers for otel. | SW_OTEL_RECEIVER_ENABLED_HANDLERS | - | | - | - | enabledOcRules | Enabled metric rules for OC handler. | SW_OTEL_RECEIVER_ENABLED_OC_RULES | - | | receiver-zipkin | default | A receiver for Zipkin traces. | - | - | | +| - | - | sampleRate | The sample rate precision is 1/10000, should be between 0 and 10000 | SW_ZIPKIN_SAMPLE_RATE | 10000 | +| - | - | searchableTracesTags | Defines a set of span tag keys which are searchable. Multiple values are separated by commas. | SW_ZIPKIN_SEARCHABLE_TAG_KEYS | http.method | +| - | - | enableHttpCollector | Enable Http Collector. | SW_ZIPKIN_HTTP_COLLECTOR_ENABLED | true | | - | - | restHost | Binding IP of RESTful services. | SW_RECEIVER_ZIPKIN_REST_HOST | 0.0.0.0 | | - | - | restPort | Binding port of RESTful services. | SW_RECEIVER_ZIPKIN_REST_PORT | 9411 | | - | - | restContextPath | Web context path of RESTful services. | SW_RECEIVER_ZIPKIN_REST_CONTEXT_PATH | / | | - | - | restMaxThreads | Maximum thread number of RESTful services. | SW_RECEIVER_ZIPKIN_REST_MAX_THREADS | 200 | | - | - | restIdleTimeOut | Connector idle timeout of RESTful services (in milliseconds). | SW_RECEIVER_ZIPKIN_REST_IDLE_TIMEOUT | 30000 | | - | - | restAcceptQueueSize | Maximum request header size accepted. | SW_RECEIVER_ZIPKIN_REST_QUEUE_SIZE | 0 | -| - | - | sampleRate | The sample rate precision is 1/10000, should be between 0 and 10000 | SW_ZIPKIN_SAMPLE_RATE | 10000 | -| - | - | searchableTracesTags | Defines a set of span tag keys which are searchable. Multiple values are separated by commas. | SW_ZIPKIN_SEARCHABLE_TAG_KEYS | http.method | +| - | - | enableKafkaCollector | Enable Kafka Collector. | SW_ZIPKIN_KAFKA_COLLECTOR_ENABLED | false | +| - | - | kafkaBootstrapServers | Kafka ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG. | SW_ZIPKIN_KAFKA_SERVERS | localhost:9092 | +| - | - | kafkaGroupId | Kafka ConsumerConfig.GROUP_ID_CONFIG. | SW_ZIPKIN_KAFKA_GROUP_ID | zipkin | +| - | - | kafkaTopic | Kafka Topics. | SW_ZIPKIN_KAFKA_TOPIC | zipkin | +| - | - | kafkaConsumerConfig | Kafka consumer config, JSON format as Properties. If it contains the same key with above, would override. | SW_ZIPKIN_KAFKA_CONSUMER_CONFIG | "{\"auto.offset.reset\":\"earliest\",\"enable.auto.commit\":true}" | +| - | - | kafkaConsumers | The number of consumers to create. | SW_ZIPKIN_KAFKA_CONSUMERS | 1 | +| - | - | kafkaHandlerThreadPoolSize | Pool size of Kafka message handler executor. | SW_ZIPKIN_KAFKA_HANDLER_THREAD_POOL_SIZE | CPU core * 2 | +| - | - | kafkaHandlerThreadPoolQueueSize | Queue size of Kafka message handler executor. | SW_ZIPKIN_KAFKA_HANDLER_THREAD_POOL_QUEUE_SIZE | 10000 | | prometheus-fetcher | default | Prometheus fetcher reads metrics from Prometheus endpoint, and transfer the metrics into SkyWalking native format for the MAL engine. | - | - | | | - | - | enabledRules | Enabled rules. | SW_PROMETHEUS_FETCHER_ENABLED_RULES | self | | - | - | maxConvertWorker | The maximize meter convert worker. | SW_PROMETHEUS_FETCHER_NUM_CONVERT_WORKER | -1(by default, half the number of CPU core(s)) | diff --git a/docs/en/setup/backend/zipkin-trace.md b/docs/en/setup/backend/zipkin-trace.md index 829f322b81aa6668dfebd2c4686c42ddcfb2eaae..962eae97d854012448a0c48eca619fdeb560a188 100644 --- a/docs/en/setup/backend/zipkin-trace.md +++ b/docs/en/setup/backend/zipkin-trace.md @@ -1,22 +1,37 @@ ## Zipkin receiver The Zipkin receiver makes the OAP server work as an alternative Zipkin server implementation for collecting traces. -It supports Zipkin v1/v2 formats through the HTTP service. +It supports Zipkin v1/v2 formats through the HTTP collector and Kafka collector. Use the following config to activate it. +Set `enableHttpCollector` to enable HTTP collector and `enableKafkaCollector` to enable Kafka collector. + ```yaml receiver-zipkin: selector: ${SW_RECEIVER_ZIPKIN:default} default: - # For HTTP server + searchableTracesTags: ${SW_ZIPKIN_SEARCHABLE_TAG_KEYS:http.method} + # The sample rate precision is 1/10000, should be between 0 and 10000 + sampleRate: ${SW_ZIPKIN_SAMPLE_RATE:10000} + ## The below configs are for OAP collect zipkin trace from HTTP + enableHttpCollector: ${SW_ZIPKIN_HTTP_COLLECTOR_ENABLED:true} restHost: ${SW_RECEIVER_ZIPKIN_REST_HOST:0.0.0.0} restPort: ${SW_RECEIVER_ZIPKIN_REST_PORT:9411} restContextPath: ${SW_RECEIVER_ZIPKIN_REST_CONTEXT_PATH:/} restMaxThreads: ${SW_RECEIVER_ZIPKIN_REST_MAX_THREADS:200} restIdleTimeOut: ${SW_RECEIVER_ZIPKIN_REST_IDLE_TIMEOUT:30000} restAcceptQueueSize: ${SW_RECEIVER_ZIPKIN_REST_QUEUE_SIZE:0} - searchableTracesTags: ${SW_ZIPKIN_SEARCHABLE_TAG_KEYS:http.method} - # The sample rate precision is 1/10000, should be between 0 and 10000 - sampleRate: ${SW_ZIPKIN_SAMPLE_RATE:10000} + ## The below configs are for OAP collect zipkin trace from kafka + enableKafkaCollector: ${SW_ZIPKIN_KAFKA_COLLECTOR_ENABLED:true} + kafkaBootstrapServers: ${SW_ZIPKIN_KAFKA_SERVERS:localhost:9092} + kafkaGroupId: ${SW_ZIPKIN_KAFKA_Group_Id:zipkin} + kafkaTopic: ${SW_ZIPKIN_KAFKA_TOPIC:zipkin} + # Kafka consumer config, JSON format as Properties. If it contains the same key with above, would override. + kafkaConsumerConfig: ${SW_ZIPKIN_KAFKA_CONSUMER_CONFIG:"{\"auto.offset.reset\":\"earliest\",\"enable.auto.commit\":true}"} + # The Count of the topic consumers + kafkaConsumers: ${SW_ZIPKIN_KAFKA_CONSUMERS:1} + kafkaHandlerThreadPoolSize: ${SW_ZIPKIN_KAFKA_HANDLER_THREAD_POOL_SIZE:-1} + kafkaHandlerThreadPoolQueueSize: ${SW_ZIPKIN_KAFKA_HANDLER_THREAD_POOL_QUEUE_SIZE:-1} + ``` ## Zipkin query diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java index a6c419a0b59439665235e29738f82c8960ee1978..5bc72871ac4d0873f646755e4c89936cf3fbafd8 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java @@ -155,7 +155,7 @@ public class ZipkinSpanRecord extends Record { @Override public String id() { - return traceId + "-" + spanId; + return spanId + Const.LINE + kind; } public static class Builder implements StorageBuilder { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/source/ZipkinSpan.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/source/ZipkinSpan.java index bb19b31f6af360494ff20960d8de5038d7331bc1..c795cb53bd31501039d648c1b40eed76bd72aa66 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/source/ZipkinSpan.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/source/ZipkinSpan.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import lombok.Getter; import lombok.Setter; +import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.source.ScopeDeclaration; import org.apache.skywalking.oap.server.core.source.Source; @@ -37,7 +38,7 @@ public class ZipkinSpan extends Source { @Override public String getEntityId() { - return traceId + "-" + spanId; + return spanId + Const.LINE + kind; } @Setter diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml index c8e0542861129ac1ecebbe55b1d505cfe73fdeb5..e3dda9f7c67c0f2bbc52b784f202935368b8e270 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml @@ -33,5 +33,10 @@ io.zipkin.zipkin2 zipkin + + org.apache.kafka + kafka-clients + ${kafka-clients.version} + diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java index 0cd6e86695d92889bb9ec3a61f4bb5d9527e9ea9..00d527625432dfed0f5a3d5ba473b44825e5eae3 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java @@ -26,6 +26,8 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig; @Setter @Getter public class ZipkinReceiverConfig extends ModuleConfig { + // HTTP collector config + private boolean enableHttpCollector = true; private String restHost; private int restPort; private String restContextPath; @@ -39,5 +41,28 @@ public class ZipkinReceiverConfig extends ModuleConfig { Const.COMMA, "http.method" ); + // kafka collector config + private boolean enableKafkaCollector = false; + /** + * A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. + */ + private String kafkaBootstrapServers; + + private String kafkaGroupId = "zipkin"; + + private String kafkaTopic = "zipkin"; + + /** + * Kafka consumer config,JSON format as Properties. If it contains the same key with above, would override. + */ + private String kafkaConsumerConfig = "{\"auto.offset.reset\":\"earliest\",\"enable.auto.commit\":true}"; + + private int kafkaConsumers = 1; + + private int kafkaHandlerThreadPoolSize; + + private int kafkaHandlerThreadPoolQueueSize; + + } diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java index 95574ce856f0e79f27ac9da17e15e1024a584db0..4f63bcd7f97e5bc14fcad835104299cafab89a34 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java @@ -29,12 +29,14 @@ import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedExcepti import org.apache.skywalking.oap.server.library.server.http.HTTPServer; import org.apache.skywalking.oap.server.library.server.http.HTTPServerConfig; import org.apache.skywalking.oap.server.receiver.zipkin.handler.ZipkinSpanHTTPHandler; +import org.apache.skywalking.oap.server.receiver.zipkin.kafka.KafkaHandler; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; public class ZipkinReceiverProvider extends ModuleProvider { public static final String NAME = "default"; private final ZipkinReceiverConfig config; private HTTPServer httpServer; + private KafkaHandler kafkaHandler; public ZipkinReceiverProvider() { config = new ZipkinReceiverConfig(); @@ -57,7 +59,6 @@ public class ZipkinReceiverProvider extends ModuleProvider { @Override public void prepare() throws ServiceNotProvidedException { - } @Override @@ -66,27 +67,40 @@ public class ZipkinReceiverProvider extends ModuleProvider { throw new IllegalArgumentException( "sampleRate: " + config.getSampleRate() + ", should be between 0 and 10000"); } - HTTPServerConfig httpServerConfig = HTTPServerConfig.builder() - .host(config.getRestHost()) - .port(config.getRestPort()) - .contextPath(config.getRestContextPath()) - .idleTimeOut(config.getRestIdleTimeOut()) - .maxThreads(config.getRestMaxThreads()) - .acceptQueueSize(config.getRestAcceptQueueSize()) - .build(); - - httpServer = new HTTPServer(httpServerConfig); - httpServer.initialize(); - - httpServer.addHandler( - new ZipkinSpanHTTPHandler(config, getManager()), - Arrays.asList(HttpMethod.POST, HttpMethod.GET) - ); + + if (config.isEnableHttpCollector()) { + HTTPServerConfig httpServerConfig = HTTPServerConfig.builder() + .host(config.getRestHost()) + .port(config.getRestPort()) + .contextPath(config.getRestContextPath()) + .idleTimeOut(config.getRestIdleTimeOut()) + .maxThreads(config.getRestMaxThreads()) + .acceptQueueSize(config.getRestAcceptQueueSize()) + .build(); + + httpServer = new HTTPServer(httpServerConfig); + httpServer.initialize(); + + httpServer.addHandler( + new ZipkinSpanHTTPHandler(config, getManager()), + Arrays.asList(HttpMethod.POST, HttpMethod.GET) + ); + } + + if (config.isEnableKafkaCollector()) { + kafkaHandler = new KafkaHandler(config, getManager()); + } } @Override - public void notifyAfterCompleted() { - httpServer.start(); + public void notifyAfterCompleted() throws ModuleStartException { + if (config.isEnableHttpCollector()) { + httpServer.start(); + } + + if (config.isEnableKafkaCollector()) { + kafkaHandler.start(); + } } @Override diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/ZipkinSpanHTTPHandler.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/ZipkinSpanHTTPHandler.java index dd31ed7ef2ed9bc9396a186e57d20ebc470b2051..02dc0f8a81575fe26b96ec257a08ce3d914a7b66 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/ZipkinSpanHTTPHandler.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/ZipkinSpanHTTPHandler.java @@ -26,12 +26,8 @@ import com.linecorp.armeria.server.ServiceRequestContext; import com.linecorp.armeria.server.annotation.ConsumesJson; import com.linecorp.armeria.server.annotation.ConsumesProtobuf; import com.linecorp.armeria.server.annotation.Post; -import java.util.ArrayList; import java.util.List; import lombok.extern.slf4j.Slf4j; -import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.config.NamingControl; -import org.apache.skywalking.oap.server.core.source.SourceReceiver; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; @@ -42,35 +38,26 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; import zipkin2.Span; import zipkin2.codec.SpanBytesDecoder; -import zipkin2.internal.HexCodec; - import static java.util.Objects.nonNull; @Slf4j public class ZipkinSpanHTTPHandler { - private final ZipkinReceiverConfig config; - private final SourceReceiver sourceReceiver; - private final NamingControl namingControl; private final HistogramMetrics histogram; private final CounterMetrics errorCounter; - private final long samplerBoundary; + private final SpanForward spanForward; public ZipkinSpanHTTPHandler(ZipkinReceiverConfig config, ModuleManager manager) { - sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class); - namingControl = manager.find(CoreModule.NAME).provider().getService(NamingControl.class); - this.config = config; - float sampleRate = (float) config.getSampleRate() / 10000; - samplerBoundary = (long) (Long.MAX_VALUE * sampleRate); + this.spanForward = new SpanForward(config, manager); MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME) .provider() .getService(MetricsCreator.class); histogram = metricsCreator.createHistogramMetric( "trace_in_latency", "The process latency of trace data", - new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin") + new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin-http") ); errorCounter = metricsCreator.createCounter( "trace_analysis_error_count", "The error number of trace analysis", - new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin") + new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin-http") ); } @@ -115,8 +102,7 @@ public class ZipkinSpanHTTPHandler { final HttpResponse response = HttpResponse.from(req.aggregate().thenApply(request -> { final HttpData httpData = UnzippingBytesRequestConverter.convertRequest(ctx, request); final List spanList = decoder.decodeList(httpData.byteBuf().nioBuffer()); - final SpanForward forward = new SpanForward(namingControl, sourceReceiver, config); - forward.send(getSampledTraces(spanList)); + spanForward.send(spanList); return HttpResponse.of(HttpStatus.OK); })); response.whenComplete().handle((unused, throwable) -> { @@ -128,24 +114,4 @@ public class ZipkinSpanHTTPHandler { }); return response; } - - private List getSampledTraces(List input) { - //100% sampleRate - if (config.getSampleRate() == 10000) { - return input; - } - List sampledTraces = new ArrayList<>(input.size()); - for (Span span : input) { - if (Boolean.TRUE.equals(span.debug())) { - sampledTraces.add(span); - continue; - } - long traceId = HexCodec.lowerHexToUnsignedLong(span.traceId()); - traceId = traceId == Long.MIN_VALUE ? Long.MAX_VALUE : Math.abs(traceId); - if (traceId <= samplerBoundary) { - sampledTraces.add(span); - } - } - return sampledTraces; - } } diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/kafka/KafkaHandler.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/kafka/KafkaHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..49f3839890182117a1140f50838bd54ef88d84a5 --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/kafka/KafkaHandler.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin.kafka; + +import com.google.gson.Gson; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.library.server.pool.CustomThreadFactory; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; +import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; +import org.apache.skywalking.oap.server.telemetry.TelemetryModule; +import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; +import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; +import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; +import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; +import zipkin2.Span; +import zipkin2.SpanBytesDecoderDetector; +import zipkin2.codec.BytesDecoder; + +@Slf4j +public class KafkaHandler { + private final ZipkinReceiverConfig config; + private final SpanForward spanForward; + private final Properties properties; + private final ThreadPoolExecutor executor; + private final boolean enableKafkaMessageAutoCommit; + private final List topics; + private final CounterMetrics msgDroppedIncr; + private final CounterMetrics errorCounter; + private final HistogramMetrics histogram; + + public KafkaHandler(final ZipkinReceiverConfig config, ModuleManager manager) { + this.config = config; + this.spanForward = new SpanForward(config, manager); + + properties = new Properties(); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, config.getKafkaGroupId()); + properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getKafkaBootstrapServers()); + properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + Gson gson = new Gson(); + Properties override = gson.fromJson(config.getKafkaConsumerConfig(), Properties.class); + properties.putAll(override); + + int threadPoolSize = Runtime.getRuntime().availableProcessors() * 2; + if (config.getKafkaHandlerThreadPoolSize() > 0) { + threadPoolSize = config.getKafkaHandlerThreadPoolSize(); + } + int threadPoolQueueSize = 10000; + if (config.getKafkaHandlerThreadPoolQueueSize() > 0) { + threadPoolQueueSize = config.getKafkaHandlerThreadPoolQueueSize(); + } + enableKafkaMessageAutoCommit = new ConsumerConfig(properties).getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); + executor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 60, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(threadPoolQueueSize), + new CustomThreadFactory("Zipkin-Kafka-Consumer"), + new ThreadPoolExecutor.CallerRunsPolicy() + ); + + topics = Arrays.asList(config.getKafkaTopic().split(",")); + MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME) + .provider() + .getService(MetricsCreator.class); + histogram = metricsCreator.createHistogramMetric( + "trace_in_latency", + "The process latency of trace data", + new MetricsTag.Keys("protocol"), + new MetricsTag.Values("zipkin-kafka") + ); + msgDroppedIncr = metricsCreator.createCounter( + "trace_dropped_count", "The dropped number of traces", + new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin-kafka")); + errorCounter = metricsCreator.createCounter( + "trace_analysis_error_count", "The error number of trace analysis", + new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin-kafka") + ); + } + + public void start() throws ModuleStartException { + for (int i = 0; i < config.getKafkaConsumers(); i++) { + KafkaConsumer consumer = new KafkaConsumer<>(properties); + consumer.subscribe(topics); + consumer.seekToEnd(consumer.assignment()); + executor.submit(() -> runTask(consumer)); + } + } + + private void runTask(final KafkaConsumer consumer) { + if (log.isDebugEnabled()) { + log.debug("Start Consume zipkin trace records from kafka."); + } + while (true) { + try { + ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1000L)); + if (log.isDebugEnabled()) { + log.debug( + "Consume zipkin trace records from kafka, records count:[{}].", + consumerRecords.count() + ); + } + if (!consumerRecords.isEmpty()) { + for (final ConsumerRecord record : consumerRecords) { + final byte[] bytes = record.value(); + //empty or illegal message + if (bytes.length < 2) { + msgDroppedIncr.inc(); + continue; + } + executor.submit(() -> handleRecord(bytes)); + } + if (!enableKafkaMessageAutoCommit) { + consumer.commitAsync(); + } + } + } catch (Exception e) { + log.error("Kafka handle message error.", e); + errorCounter.inc(); + } + } + } + + private void handleRecord(byte[] bytes) { + try (HistogramMetrics.Timer ignored = histogram.createTimer()) { + BytesDecoder decoder = SpanBytesDecoderDetector.decoderForListMessage(bytes); + final List spanList = decoder.decodeList(bytes); + spanForward.send(spanList); + } + } +} diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java index 4167cdc98d2e85141b7cdfba6e049634e8be5f50..9548b67a7ba7ad3c93cd64478a24bb8366b30529 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java @@ -19,11 +19,13 @@ package org.apache.skywalking.oap.server.receiver.zipkin.trace; import com.google.gson.JsonObject; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType; import org.apache.skywalking.oap.server.core.source.TagAutocomplete; import org.apache.skywalking.oap.server.core.zipkin.source.ZipkinService; @@ -33,27 +35,36 @@ import org.apache.skywalking.oap.server.core.zipkin.source.ZipkinSpan; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.config.NamingControl; import org.apache.skywalking.oap.server.core.source.SourceReceiver; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.library.util.StringUtil; import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; import zipkin2.Annotation; import zipkin2.Span; +import zipkin2.internal.HexCodec; import zipkin2.internal.RecyclableBuffers; public class SpanForward { + private final ZipkinReceiverConfig config; private final NamingControl namingControl; private final SourceReceiver receiver; private final List searchTagKeys; + private final long samplerBoundary; - public SpanForward(final NamingControl namingControl, - final SourceReceiver receiver, - final ZipkinReceiverConfig config) { - this.namingControl = namingControl; - this.receiver = receiver; - this.searchTagKeys = Arrays.asList(config.getSearchableTracesTags().split(Const.COMMA)); + public SpanForward(final ZipkinReceiverConfig config, final ModuleManager manager) { + this.config = config; + this.namingControl = manager.find(CoreModule.NAME).provider().getService(NamingControl.class); + this.receiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class); + this.searchTagKeys = Arrays.asList(config.getSearchableTracesTags().split(Const.COMMA)); + float sampleRate = (float) config.getSampleRate() / 10000; + samplerBoundary = (long) (Long.MAX_VALUE * sampleRate); } public void send(List spanList) { - spanList.forEach(span -> { + if (CollectionUtils.isEmpty(spanList)) { + return; + } + getSampledTraces(spanList).forEach(span -> { ZipkinSpan zipkinSpan = new ZipkinSpan(); String serviceName = span.localServiceName(); if (StringUtil.isEmpty(serviceName)) { @@ -73,12 +84,14 @@ public class SpanForward { if (localPort != null) { zipkinSpan.setLocalEndpointPort(localPort); } - zipkinSpan.setRemoteEndpointServiceName(namingControl.formatServiceName(span.remoteServiceName())); - zipkinSpan.setRemoteEndpointIPV4(span.remoteEndpoint().ipv4()); - zipkinSpan.setRemoteEndpointIPV6(span.remoteEndpoint().ipv6()); - Integer remotePort = span.remoteEndpoint().port(); - if (remotePort != null) { - zipkinSpan.setRemoteEndpointPort(remotePort); + if (span.remoteEndpoint() != null) { + zipkinSpan.setRemoteEndpointServiceName(namingControl.formatServiceName(span.remoteServiceName())); + zipkinSpan.setRemoteEndpointIPV4(span.remoteEndpoint().ipv4()); + zipkinSpan.setRemoteEndpointIPV6(span.remoteEndpoint().ipv6()); + Integer remotePort = span.remoteEndpoint().port(); + if (remotePort != null) { + zipkinSpan.setRemoteEndpointPort(remotePort); + } } zipkinSpan.setTimestamp(span.timestampAsLong()); zipkinSpan.setDebug(span.debug()); @@ -159,4 +172,24 @@ public class SpanForward { relation.setTimeBucket(minuteTimeBucket); receiver.receive(relation); } + + private List getSampledTraces(List input) { + //100% sampleRate + if (config.getSampleRate() == 10000) { + return input; + } + List sampledTraces = new ArrayList<>(input.size()); + for (Span span : input) { + if (Boolean.TRUE.equals(span.debug())) { + sampledTraces.add(span); + continue; + } + long traceId = HexCodec.lowerHexToUnsignedLong(span.traceId()); + traceId = traceId == Long.MIN_VALUE ? Long.MAX_VALUE : Math.abs(traceId); + if (traceId <= samplerBoundary) { + sampledTraces.add(span); + } + } + return sampledTraces; + } } diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml index 36be7641771ff2beeb9b2ac7aa3929211cc4cfa4..632bc702a9b5b43d19e20df4b9743dff16019446 100755 --- a/oap-server/server-starter/src/main/resources/application.yml +++ b/oap-server/server-starter/src/main/resources/application.yml @@ -342,16 +342,28 @@ receiver-otel: receiver-zipkin: selector: ${SW_RECEIVER_ZIPKIN:-} default: - # For HTTP server + searchableTracesTags: ${SW_ZIPKIN_SEARCHABLE_TAG_KEYS:http.method} + # The sample rate precision is 1/10000, should be between 0 and 10000 + sampleRate: ${SW_ZIPKIN_SAMPLE_RATE:10000} + ## The below configs are for OAP collect zipkin trace from HTTP + enableHttpCollector: ${SW_ZIPKIN_HTTP_COLLECTOR_ENABLED:true} restHost: ${SW_RECEIVER_ZIPKIN_REST_HOST:0.0.0.0} restPort: ${SW_RECEIVER_ZIPKIN_REST_PORT:9411} restContextPath: ${SW_RECEIVER_ZIPKIN_REST_CONTEXT_PATH:/} restMaxThreads: ${SW_RECEIVER_ZIPKIN_REST_MAX_THREADS:200} restIdleTimeOut: ${SW_RECEIVER_ZIPKIN_REST_IDLE_TIMEOUT:30000} restAcceptQueueSize: ${SW_RECEIVER_ZIPKIN_REST_QUEUE_SIZE:0} - searchableTracesTags: ${SW_ZIPKIN_SEARCHABLE_TAG_KEYS:http.method} - # The sample rate precision is 1/10000, should be between 0 and 10000 - sampleRate: ${SW_ZIPKIN_SAMPLE_RATE:10000} + ## The below configs are for OAP collect zipkin trace from kafka + enableKafkaCollector: ${SW_ZIPKIN_KAFKA_COLLECTOR_ENABLED:false} + kafkaBootstrapServers: ${SW_ZIPKIN_KAFKA_SERVERS:localhost:9092} + kafkaGroupId: ${SW_ZIPKIN_KAFKA_GROUP_ID:zipkin} + kafkaTopic: ${SW_ZIPKIN_KAFKA_TOPIC:zipkin} + # Kafka consumer config, JSON format as Properties. If it contains the same key with above, would override. + kafkaConsumerConfig: ${SW_ZIPKIN_KAFKA_CONSUMER_CONFIG:"{\"auto.offset.reset\":\"earliest\",\"enable.auto.commit\":true}"} + # The Count of the topic consumers + kafkaConsumers: ${SW_ZIPKIN_KAFKA_CONSUMERS:1} + kafkaHandlerThreadPoolSize: ${SW_ZIPKIN_KAFKA_HANDLER_THREAD_POOL_SIZE:-1} + kafkaHandlerThreadPoolQueueSize: ${SW_ZIPKIN_KAFKA_HANDLER_THREAD_POOL_QUEUE_SIZE:-1} receiver-browser: selector: ${SW_RECEIVER_BROWSER:default} diff --git a/test/e2e-v2/cases/zipkin/kafka/docker-compose.yml b/test/e2e-v2/cases/zipkin/kafka/docker-compose.yml new file mode 100644 index 0000000000000000000000000000000000000000..c13d119093d8708794623ad3545b868b9bc497eb --- /dev/null +++ b/test/e2e-v2/cases/zipkin/kafka/docker-compose.yml @@ -0,0 +1,119 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: '2.1' + +services: + zookeeper: + image: zookeeper:3.4 + hostname: zookeeper + expose: + - 2181 + networks: + - e2e + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + healthcheck: + test: [ "CMD", "sh", "-c", "nc -nz 127.0.0.1 2181" ] + interval: 5s + timeout: 60s + retries: 120 + + broker-a: + image: bitnami/kafka:2.4.1 + hostname: broker-a + expose: + - 9092 + networks: + - e2e + environment: + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_BROKER_ID=10 + - ALLOW_PLAINTEXT_LISTENER=yes + depends_on: + zookeeper: + condition: service_healthy + healthcheck: + test: [ "CMD", "kafka-topics.sh", "--list", "--zookeeper", "zookeeper:2181" ] + interval: 5s + timeout: 60s + retries: 120 + + broker-b: + image: bitnami/kafka:2.4.1 + hostname: broker-b + expose: + - 9092 + networks: + - e2e + environment: + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_BROKER_ID=24 + - ALLOW_PLAINTEXT_LISTENER=yes + depends_on: + zookeeper: + condition: service_healthy + healthcheck: + test: [ "CMD", "kafka-topics.sh", "--list", "--zookeeper", "zookeeper:2181" ] + interval: 5s + timeout: 60s + retries: 120 + + oap: + extends: + file: ../../../script/docker-compose/base-compose.yml + service: oap + environment: + SW_QUERY_ZIPKIN: default + SW_RECEIVER_ZIPKIN: default + SW_ZIPKIN_KAFKA_SERVERS: broker-a:9092,broker-b:9092 + SW_ZIPKIN_KAFKA_COLLECTOR_ENABLED: "true" + expose: + - 9411 + ports: + - 9412:9412 + depends_on: + broker-a: + condition: service_healthy + broker-b: + condition: service_healthy + networks: + - e2e + + sender: + image: "eclipse-temurin:8-jre" + volumes: + - ./../../../java-test-service/e2e-mock-sender/target/e2e-mock-sender-2.0.0.jar:/e2e-mock-sender-2.0.0.jar + command: [ "java", "-jar", "/e2e-mock-sender-2.0.0.jar" ] + environment: + ZIPKIN_KAFKA_BOOTSTRAP_SERVERS: broker-a:9092,broker-b:9092 + networks: + - e2e + ports: + - 9093 + healthcheck: + test: ["CMD", "sh", "-c", "nc -nz 127.0.0.1 9093"] + interval: 5s + timeout: 60s + retries: 120 + depends_on: + oap: + condition: service_healthy + broker-a: + condition: service_healthy + broker-b: + condition: service_healthy +networks: + e2e: diff --git a/test/e2e-v2/cases/zipkin/kafka/e2e.yaml b/test/e2e-v2/cases/zipkin/kafka/e2e.yaml new file mode 100644 index 0000000000000000000000000000000000000000..90d877db1405f72e8b3647fac3c5fa56d3e2f9dd --- /dev/null +++ b/test/e2e-v2/cases/zipkin/kafka/e2e.yaml @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This file is used to show how to write configuration files and can be used to test. + +setup: + env: compose + file: docker-compose.yml + timeout: 20m + init-system-environment: ../../../script/env + steps: + - name: set PATH + command: export PATH=/tmp/skywalking-infra-e2e/bin:$PATH + - name: install yq + command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq + - name: install swctl + command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl + +trigger: + action: http + interval: 3s + times: 10 + url: http://${sender_host}:${sender_9093}/sendZipkinTrace2Kafka + method: POST + +verify: + # verify with retry strategy + retry: + # max retry count + count: 20 + # the interval between two retries, in millisecond. + interval: 10s + cases: + - includes: + - ../zipkin-cases.yaml diff --git a/test/e2e-v2/java-test-service/e2e-mock-sender/pom.xml b/test/e2e-v2/java-test-service/e2e-mock-sender/pom.xml index 9ea4961d75d24052a1a36abed3c15d8322102fab..dc17f07598604b0d28da26ec9335d6095e73cded 100644 --- a/test/e2e-v2/java-test-service/e2e-mock-sender/pom.xml +++ b/test/e2e-v2/java-test-service/e2e-mock-sender/pom.xml @@ -39,6 +39,15 @@ e2e-protocol ${project.version} + + org.apache.kafka + kafka-clients + + + io.zipkin.zipkin2 + zipkin + 2.23.16 + diff --git a/test/e2e-v2/java-test-service/e2e-mock-sender/src/main/java/org/apache/skywalking/e2e/E2EConfiguration.java b/test/e2e-v2/java-test-service/e2e-mock-sender/src/main/java/org/apache/skywalking/e2e/E2EConfiguration.java index cf0bdbeaae484074e688cefbfcc48e1676959a91..7099c2b88b2a100cef89205837db899f48679d81 100644 --- a/test/e2e-v2/java-test-service/e2e-mock-sender/src/main/java/org/apache/skywalking/e2e/E2EConfiguration.java +++ b/test/e2e-v2/java-test-service/e2e-mock-sender/src/main/java/org/apache/skywalking/e2e/E2EConfiguration.java @@ -28,4 +28,7 @@ import org.springframework.context.annotation.Configuration; public class E2EConfiguration { private String oapHost; private String oapGrpcPort; + private String zipkinKafkaBootstrapServers; + private String zipkinKafkaGroupId; + private String zipkinKafkaTopic; } diff --git a/test/e2e-v2/java-test-service/e2e-mock-sender/src/main/java/org/apache/skywalking/e2e/controller/ZipkinKafkaProduceController.java b/test/e2e-v2/java-test-service/e2e-mock-sender/src/main/java/org/apache/skywalking/e2e/controller/ZipkinKafkaProduceController.java new file mode 100644 index 0000000000000000000000000000000000000000..46184f6e516c39084d2840fd072eb9ed2df72c9e --- /dev/null +++ b/test/e2e-v2/java-test-service/e2e-mock-sender/src/main/java/org/apache/skywalking/e2e/controller/ZipkinKafkaProduceController.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.e2e.controller; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.skywalking.e2e.E2EConfiguration; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RestController; +import zipkin2.Endpoint; +import zipkin2.Span; +import zipkin2.codec.SpanBytesEncoder; + +@RestController +public class ZipkinKafkaProduceController { + private final KafkaProducer producer; + private final E2EConfiguration config; + + public ZipkinKafkaProduceController(final E2EConfiguration config) { + this.config = config; + Properties properties = new Properties(); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getZipkinKafkaBootstrapServers()); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, config.getZipkinKafkaGroupId()); + producer = new KafkaProducer<>(properties, new ByteArraySerializer(), new ByteArraySerializer()); + } + + @PostMapping("/sendZipkinTrace2Kafka") + public String sendTrace() { + producer.send(new ProducerRecord<>(config.getZipkinKafkaTopic(), 0, null, SpanBytesEncoder.JSON_V2.encodeList(makeTrace()))); + producer.flush(); + return "Trace send success!"; + } + + private static List makeTrace() { + String traceId = generateHexId(16); + String span1Id = traceId; + String span2Id = generateHexId(16); + String span3Id = generateHexId(16); + List trace = new ArrayList<>(); + trace.add(Span.newBuilder() + .traceId(traceId) + .id(span1Id) + .name("post /") + .kind(Span.Kind.SERVER) + .localEndpoint(Endpoint.newBuilder().serviceName("frontend").ip("192.168.0.1").build()) + .remoteEndpoint(Endpoint.newBuilder().ip("127.0.0.1").port(63720).build()) + .timestamp(System.currentTimeMillis() * 1000L + 1000L) + .duration(16683) + .addAnnotation(System.currentTimeMillis() * 1000L + 1100L, "wr") + .addAnnotation(System.currentTimeMillis() * 1000L + 1200L, "ws") + .putTag("http.method", "POST") + .putTag("http.path", "/") + .build()); + trace.add(Span.newBuilder() + .traceId(traceId) + .parentId(span1Id) + .id(span2Id) + .name("get") + .kind(Span.Kind.CLIENT) + .localEndpoint(Endpoint.newBuilder().serviceName("frontend").ip("192.168.0.1").build()) + .remoteEndpoint(Endpoint.newBuilder().serviceName("backend").ip("127.0.0.1").port(9000).build()) + .timestamp(System.currentTimeMillis() * 1000L + 2000L) + .duration(15380) + .addAnnotation(System.currentTimeMillis() * 1000L + 2100L, "wr") + .addAnnotation(System.currentTimeMillis() * 1000L + 2600L, "ws") + .putTag("http.method", "GET") + .putTag("http.path", "/api") + .build()); + trace.add(Span.newBuilder() + .traceId(traceId) + .parentId(span2Id) + .id(span3Id) + .name("get /api") + .kind(Span.Kind.SERVER) + .localEndpoint(Endpoint.newBuilder().serviceName("backend").ip("192.168.0.1").build()) + .remoteEndpoint(Endpoint.newBuilder().ip("127.0.0.1").port(63722).build()) + .timestamp(System.currentTimeMillis() * 1000L + 3000L) + .duration(1557) + .addAnnotation(System.currentTimeMillis() * 1000L + 3100L, "wr") + .addAnnotation(System.currentTimeMillis() * 1000L + 3300L, "ws") + .putTag("http.method", "GET") + .putTag("http.path", "/api") + .build()); + return trace; + } + + private static String generateHexId(int bound) { + Random r = new Random(); + StringBuffer buffer = new StringBuffer(); + for (int i = 0; i < bound; i++) { + buffer.append(Integer.toHexString(r.nextInt(bound))); + } + return buffer.toString(); + } +} diff --git a/test/e2e-v2/java-test-service/e2e-mock-sender/src/main/resources/application.yml b/test/e2e-v2/java-test-service/e2e-mock-sender/src/main/resources/application.yml index cfe28fa57448009b534869af8e24b1565958fc29..116b35982a10458ebe619ffb7b4ff563193d2b25 100644 --- a/test/e2e-v2/java-test-service/e2e-mock-sender/src/main/resources/application.yml +++ b/test/e2e-v2/java-test-service/e2e-mock-sender/src/main/resources/application.yml @@ -22,4 +22,7 @@ spring: e2e: oap-host: ${OAP_HOST:127.0.0.1} - oap-grpc-port: ${OAP_GRPC-PORT:11800} + oap-grpc-port: ${OAP_GRPC_PORT:11800} + zipkin-kafka-bootstrapServers: ${ZIPKIN_KAFKA_BOOTSTRAP_SERVERS:127.0.0.1:9092} + zipkin-kafka-groupId: ${ZIPKIN_KAFKA_GROUPID:zipkin} + zipkin-kafka-topic: ${ZIPKIN_KAFKA_TOPIC:zipkin}