未验证 提交 0e1d5c37 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Make Zipkin trace query available (#6454)

上级 258962c6
......@@ -32,13 +32,18 @@ Release Notes.
* Introduce log analysis language (LAL).
* Fix alarm httpclient connection leak.
* Add `sum` function in meter system.
* Remove Jaeger receiver.
* Remove the experimental Zipkin span analyzer.
* Upgrade the Zipkin Elasticsearch storage from 6 to 7.
* Require Zipkin receiver must work with `zipkin-elasticsearch7` storage option.
#### UI
* Update selector scroller to show in all pages.
* Implement searching logs with date.
#### Documentation
* Polish documentation due to we have covered all tracing, logging, and metrics fields.
* Adjust documentation about Zipkin receiver.
All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/76?closed=1)
......
......@@ -43,7 +43,7 @@ including
1. Service Mesh Observability. Control panel and data panel.
1. Metrics system, including Prometheus, OpenTelemetry, Spring Sleuth(Micrometer), Zabbix.
1. Logs.
1. Zipkin v1/v2 and Jaeger gRPC format with limited topology and metrics analysis.(Experimental).
1. Zipkin v1/v2 trace.(No Analysis)
SkyWalking OAP is using the STAM(Streaming Topology Analysis Method) to analysis topology in the tracing based agent scenario
for better performance. Read [the paper of STAM](https://wu-sheng.github.io/STAM/) for more details.
......
......@@ -18,9 +18,8 @@ We have following receivers, and `default` implementors are provided in our Apac
1. **configuration-discovery**. gRPC services handle configurationDiscovery.
1. **receiver-event**. gRPC services to handle events data.
1. **receiver-zabbix**. See [details](backend-zabbix.md).
1. Experimental receivers. All following receivers are in the POC stage, not production ready.
1. **receiver_zipkin**. See [details](#zipkin-receiver). (Experimental)
1. **receiver_jaeger**. See [details](#jaeger-receiver). (Experimental)
1. Experimental receivers.
1. **receiver_zipkin**. See [details](#zipkin-receiver).
The sample settings of these receivers should be already in default `application.yml`, and also list here
```yaml
......@@ -147,18 +146,11 @@ receiver-meter:
default:
```
## Experimental receivers
All following receivers are in the POC stage, not production ready.
### Zipkin receiver
Zipkin receiver could work in two different mode.
1. Tracing mode(default). Tracing mode is that, skywalking OAP acts like zipkin collector,
fully supports Zipkin v1/v2 formats through HTTP service,
also provide persistence and query in skywalking UI.
But it wouldn't analysis metrics from them. In most case, I suggest you could use this feature, when metrics come from service mesh.
Notice, in this mode, Zipkin receiver requires `zipkin-elasticsearch` storage implementation active.
Read [this](backend-storage.md#elasticsearch-6-with-zipkin-trace-extension) to know
how to active.
## Zipkin receiver
Zipkin receiver makes the OAP server as an alternative Zipkin server implementation. It supports Zipkin v1/v2 formats through HTTP service.
Make sure you use this with `SW_STORAGE=zipkin-elasticsearch7` option to activate Zipkin storage implementation.
Once this receiver and storage activated, SkyWalking native traces would be ignored, and SkyWalking wouldn't analysis topology, metrics, endpoint
dependency from Zipkin's trace.
Use following config to active.
```yaml
......@@ -175,45 +167,6 @@ receiver_zipkin:
jettyAcceptQueueSize: ${SW_RECEIVER_ZIPKIN_QUEUE_SIZE:0}
```
2. Analysis mode(Not production ready), receive Zipkin v1/v2 formats through HTTP service. Transform the trace to skywalking
native format, and analysis like skywalking trace. This feature can't work in production env right now,
because of Zipkin tag/endpoint value unpredictable, we can't make sure it fits production env requirements.
Active `analysis mode`, you should set `needAnalysis` config.
```yaml
receiver_zipkin:
selector: ${SW_RECEIVER_ZIPKIN:-}
default:
host: ${SW_RECEIVER_ZIPKIN_HOST:0.0.0.0}
port: ${SW_RECEIVER_ZIPKIN_PORT:9411}
contextPath: ${SW_RECEIVER_ZIPKIN_CONTEXT_PATH:/}
jettyMinThreads: ${SW_RECEIVER_ZIPKIN_JETTY_MIN_THREADS:1}
jettyMaxThreads: ${SW_RECEIVER_ZIPKIN_JETTY_MAX_THREADS:200}
jettyIdleTimeOut: ${SW_RECEIVER_ZIPKIN_JETTY_IDLE_TIMEOUT:30000}
jettyAcceptorPriorityDelta: ${SW_RECEIVER_ZIPKIN_JETTY_DELTA:0}
jettyAcceptQueueSize: ${SW_RECEIVER_ZIPKIN_QUEUE_SIZE:0}
needAnalysis: true
```
NOTICE, Zipkin receiver is only provided in `apache-skywalking-apm-x.y.z.tar.gz` tar.
### Jaeger receiver
Jaeger receiver right now only works in `Tracing Mode`, and no analysis.
Jaeger receiver provides extra gRPC host/port, if absent, sharing-server host/port will be used, then core gRPC host/port.
Receiver requires `jaeger-elasticsearch` storage implementation active.
Read [this](backend-storage.md#elasticsearch-6-with-jaeger-trace-extension) to know how to active.
Right now, you need [jaeger agent](https://www.jaegertracing.io/docs/1.11/architecture/#agent) to batch
send spans to SkyWalking oap server. Read [Jaeger Architecture](https://www.jaegertracing.io/docs/1.11/architecture/)
to get more details.
Active the receiver.
```yaml
receiver_jaeger:
selector: ${SW_RECEIVER_JAEGER:-}
default:
gRPCHost: ${SW_RECEIVER_JAEGER_HOST:0.0.0.0}
gRPCPort: ${SW_RECEIVER_JAEGER_PORT:14250}
```
NOTICE, Jaeger receiver is only provided in `apache-skywalking-apm-x.y.z.tar.gz` tar.
NOTICE, Zipkin receiver is only provided in `apache-skywalking-apm-es7-x.y.z.tar.gz` tar.
And this requires `zipkin-elasticsearch7` storage implementation active.
Read [this](backend-storage.md#elasticsearch-7-with-zipkin-trace-extension) doc to know Zipkin as storage option.
......@@ -15,9 +15,6 @@ Native supported storage
- InfluxDB
- PostgreSQL
Redistribution version with supported storage.
- ElasticSearch 5
## H2
Active H2 as storage, set storage provider to **H2** In-Memory Databases. Default in distribution package.
......@@ -155,13 +152,13 @@ We strongly advice you to read more about these configurations from ElasticSearc
This effects the performance of ElasticSearch very much.
### ElasticSearch 6 with Zipkin trace extension
This implementation shares most of `elasticsearch`, just extend to support zipkin span storage.
### ElasticSearch 7 with Zipkin trace extension
This implementation shares most of `elasticsearch7`, just extends to support zipkin span storage.
It has all same configs.
```yaml
storage:
selector: ${SW_STORAGE:zipkin-elasticsearch}
zipkin-elasticsearch:
selector: ${SW_STORAGE:zipkin-elasticsearch7}
zipkin-elasticsearch7:
nameSpace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
......@@ -176,32 +173,9 @@ storage:
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
```
### ElasticSearch 6 with Jaeger trace extension
This implementation shares most of `elasticsearch`, just extend to support jaeger span storage.
It has all same configs.
```yaml
storage:
selector: ${SW_STORAGE:jaeger-elasticsearch}
jaeger-elasticsearch:
nameSpace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
```
### About Namespace
When namespace is set, names of all indexes in ElasticSearch will use it as prefix.
## MySQL
Active MySQL as storage, set storage provider to **mysql**.
......@@ -289,10 +263,6 @@ storage:
All connection related settings including link url, username and password are in `application.yml`.
Here are some of the settings, please follow [HikariCP](https://github.com/brettwooldridge/HikariCP) connection pool document for all the settings.
## ElasticSearch 5
ElasticSearch 5 is incompatible with ElasticSearch 6 Java client jar, so it could not be included in native distribution.
[OpenSkyWalking/SkyWalking-With-Es5x-Storage](https://github.com/OpenSkywalking/SkyWalking-With-Es5x-Storage) repo includes the distribution version.
## More storage solution extension
Follow [Storage extension development guide](../../guides/storage-extention.md)
in [Project Extensions document](../../guides/README.md#project-extensions) in development guide.
......@@ -194,9 +194,6 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | restHost| Binding IP of restful service. |SW_RECEIVER_ZIPKIN_HOST|0.0.0.0|
| - | - | restPort | Binding port of restful service | SW_RECEIVER_ZIPKIN_PORT|9411|
| - | - | restContextPath| Web context path of restful service| SW_RECEIVER_ZIPKIN_CONTEXT_PATH|/|
| - | - | needAnalysis|Analysis zipkin span to generate metrics| - | false|
| - | - | maxCacheSize| Max cache size for span analysis | - | 1_000_000 |
| - | - | expireTime| The expire time of analysis cache, unit is second. | - | 20|
| receiver_jaeger | default| Read [receiver doc](backend-receivers.md) | - | - |
| - | - | gRPCHost|Binding IP of gRPC service. Services include gRPC data report and internal communication among OAP nodes| SW_RECEIVER_JAEGER_HOST | - |
| - | - | gRPCPort| Binding port of gRPC service | SW_RECEIVER_JAEGER_PORT | - |
......
......@@ -71,7 +71,6 @@
<kubernetes.version>10.0.0</kubernetes.version>
<hikaricp.version>3.1.0</hikaricp.version>
<zipkin.version>2.9.1</zipkin.version>
<caffeine.version>2.6.2</caffeine.version>
<okhttp.version>3.9.0</okhttp.version>
<jackson-core.version>2.9.5</jackson-core.version>
<jackson-annotations.version>2.9.5</jackson-annotations.version>
......@@ -363,11 +362,6 @@
<artifactId>zipkin</artifactId>
<version>${zipkin.version}</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
</dependency>
<!-- -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
......
......@@ -213,6 +213,33 @@ storage:
metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
zipkin-elasticsearch7:
nameSpace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
trustStorePath: ${SW_STORAGE_ES_SSL_JKS_PATH:""}
trustStorePass: ${SW_STORAGE_ES_SSL_JKS_PASS:""}
dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index.
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:1} # Shard number of new indexes
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:1} # Replicas number of new indexes
# Super data set has been defined in the codes, such as trace segments.The following 3 config would be improve es performance when storage super size data in es.
superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value is less than 0
superDatasetIndexShardsFactor: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger traces.
superDatasetIndexReplicasNumber: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER:0} # Represent the replicas number in the super size dataset record index, the default value is 0.
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool.
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000}
segmentQueryMaxSize: ${SW_STORAGE_ES_QUERY_SEGMENT_SIZE:200}
profileTaskQueryMaxSize: ${SW_STORAGE_ES_QUERY_PROFILE_TASK_SIZE:200}
oapAnalyzer: ${SW_STORAGE_ES_OAP_ANALYZER:"{\"analyzer\":{\"oap_analyzer\":{\"type\":\"stop\"}}}"} # the oap analyzer.
oapLogAnalyzer: ${SW_STORAGE_ES_OAP_LOG_ANALYZER:"{\"analyzer\":{\"oap_log_analyzer\":{\"type\":\"standard\"}}}"} # the oap log analyzer. It could be customized by the ES analyzer configuration to support more language log formats, such as Chinese log, Japanese log and etc.
advanced: ${SW_STORAGE_ES_ADVANCED:""}
agent-analyzer:
selector: ${SW_AGENT_ANALYZER:default}
......
......@@ -48,7 +48,7 @@ public interface ITraceQueryDAO extends Service {
List<SegmentRecord> queryByTraceId(String traceId) throws IOException;
/**
* This method gives more flexible for unnative
* This method gives more flexible for 3rd trace without segment concept, which can't search data through {@link #queryByTraceId(String)}
*/
List<Span> doFlexibleTraceQuery(String traceId) throws IOException;
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>server-receiver-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>8.5.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>jaeger-receiver-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>receiver-proto</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-sharing-server-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>storage-jaeger-plugin</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.aop.server.receiver.jaeger;
import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import io.jaegertracing.api_v2.Collector;
import io.jaegertracing.api_v2.CollectorServiceGrpc;
import io.jaegertracing.api_v2.Model;
import java.time.Instant;
import java.util.Base64;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.NodeType;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JaegerGRPCHandler extends CollectorServiceGrpc.CollectorServiceImplBase {
private static final Logger LOGGER = LoggerFactory.getLogger(JaegerGRPCHandler.class);
private SourceReceiver receiver;
private JaegerReceiverConfig config;
public JaegerGRPCHandler(SourceReceiver receiver, JaegerReceiverConfig config) {
this.receiver = receiver;
this.config = config;
}
@Override
public void postSpans(Collector.PostSpansRequest request,
StreamObserver<Collector.PostSpansResponse> responseObserver) {
request.getBatch().getSpansList().forEach(span -> {
try {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(span.toString());
}
JaegerSpan jaegerSpan = new JaegerSpan();
jaegerSpan.setTraceId(format(span.getTraceId()));
jaegerSpan.setSpanId(format(span.getSpanId()));
Model.Process process = span.getProcess();
String serviceName = null;
if (process != null) {
serviceName = process.getServiceName();
}
if (StringUtil.isEmpty(serviceName)) {
serviceName = "UNKNOWN";
}
final String serviceId = IDManager.ServiceID.buildId(serviceName, NodeType.Normal);
long duration = span.getDuration().getNanos() / 1_000_000;
jaegerSpan.setStartTime(Instant.ofEpochSecond(
span.getStartTime().getSeconds(), span.getStartTime().getNanos()).toEpochMilli());
long timeBucket = TimeBucket.getRecordTimeBucket(jaegerSpan.getStartTime());
jaegerSpan.setTimeBucket(timeBucket);
jaegerSpan.setEndTime(jaegerSpan.getStartTime() + duration);
jaegerSpan.setLatency((int) duration);
jaegerSpan.setDataBinary(span.toByteArray());
jaegerSpan.setEndpointName(span.getOperationName());
jaegerSpan.setServiceId(serviceId);
span.getTagsList().forEach(tag -> {
String key = tag.getKey();
if ("error".equals(key)) {
boolean status = tag.getVBool();
jaegerSpan.setIsError(BooleanUtils.booleanToValue(status));
} else if ("span.kind".equals(key)) {
String kind = tag.getVStr();
if ("server".equals(kind) || "consumer".equals(kind)) {
String endpointName = span.getOperationName();
jaegerSpan.setEndpointName(endpointName);
jaegerSpan.setEndpointId(
IDManager.EndpointID.buildId(serviceId, endpointName));
}
}
});
receiver.receive(jaegerSpan);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
});
responseObserver.onNext(Collector.PostSpansResponse.newBuilder().build());
responseObserver.onCompleted();
}
private String format(ByteString bytes) {
Base64.Encoder encoder = Base64.getEncoder();
return encoder.encodeToString(bytes.toByteArray());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.aop.server.receiver.jaeger;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
@Setter
@Getter
public class JaegerReceiverConfig extends ModuleConfig {
private String gRPCHost = null;
private int gRPCPort = -1;
private int maxConcurrentCallsPerConnection;
private int maxMessageSize;
private int gRPCThreadPoolSize;
private int gRPCThreadPoolQueueSize;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.aop.server.receiver.jaeger;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
* Adapt Jaeger gRPC backend service.
*/
public class JaegerReceiverModule extends ModuleDefine {
public static final String NAME = "receiver_jaeger";
public JaegerReceiverModule() {
super(NAME);
}
@Override
public Class[] services() {
return new Class[0];
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.aop.server.receiver.jaeger;
import java.util.Objects;
import org.apache.logging.log4j.util.Strings;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
public class JaegerReceiverProvider extends ModuleProvider {
public static final String NAME = "default";
private JaegerReceiverConfig config;
private GRPCServer grpcServer = null;
@Override
public String name() {
return NAME;
}
@Override
public Class<? extends ModuleDefine> module() {
return JaegerReceiverModule.class;
}
@Override
public ModuleConfig createConfigBeanIfAbsent() {
config = new JaegerReceiverConfig();
return config;
}
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
if (config.getGRPCPort() > 0) {
grpcServer = new GRPCServer(Strings.isBlank(config.getGRPCHost()) ? "0.0.0.0" : config.getGRPCHost(), config
.getGRPCPort());
if (config.getMaxMessageSize() > 0) {
grpcServer.setMaxMessageSize(config.getMaxMessageSize());
}
if (config.getMaxConcurrentCallsPerConnection() > 0) {
grpcServer.setMaxConcurrentCallsPerConnection(config.getMaxConcurrentCallsPerConnection());
}
if (config.getGRPCThreadPoolQueueSize() > 0) {
grpcServer.setThreadPoolQueueSize(config.getGRPCThreadPoolQueueSize());
}
if (config.getGRPCThreadPoolSize() > 0) {
grpcServer.setThreadPoolSize(config.getGRPCThreadPoolSize());
}
grpcServer.initialize();
}
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
SourceReceiver sourceReceiver = getManager().find(CoreModule.NAME).provider().getService(SourceReceiver.class);
if (Objects.nonNull(grpcServer)) {
grpcServer.addHandler(new JaegerGRPCHandler(sourceReceiver, config));
} else {
GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME)
.provider()
.getService(GRPCHandlerRegister.class);
grpcHandlerRegister.addHandler(new JaegerGRPCHandler(sourceReceiver, config));
}
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
try {
if (Objects.nonNull(grpcServer)) {
grpcServer.start();
}
} catch (ServerException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
@Override
public String[] requiredModules() {
return new String[] {SharingServerModule.NAME};
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.aop.server.receiver.jaeger.JaegerReceiverModule
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.aop.server.receiver.jaeger.JaegerReceiverProvider
......@@ -36,7 +36,6 @@
<module>envoy-metrics-receiver-plugin</module>
<module>skywalking-sharing-server-plugin</module>
<module>skywalking-clr-receiver-plugin</module>
<module>jaeger-receiver-plugin</module>
<module>receiver-proto</module>
<module>skywalking-profile-receiver-plugin</module>
<module>otel-receiver-plugin</module>
......
// Copyright (c) 2019 The Jaeger Authors.
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax="proto3";
package jaeger.api_v2;
import "jaeger/model.proto";
import "gogoproto/gogo.proto";
import "google/api/annotations.proto";
import "protoc-gen-swagger/options/annotations.proto";
option go_package = "api_v2";
option java_package = "io.jaegertracing.api_v2";
// Enable gogoprotobuf extensions (https://github.com/gogo/protobuf/blob/master/extensions.md).
// Enable custom Marshal method.
option (gogoproto.marshaler_all) = true;
// Enable custom Unmarshal method.
option (gogoproto.unmarshaler_all) = true;
// Enable custom Size method (Required by Marshal and Unmarshal).
option (gogoproto.sizer_all) = true;
// Enable registration with golang/protobuf for the grpc-gateway.
option (gogoproto.goproto_registration) = true;
option (grpc.gateway.protoc_gen_swagger.options.openapiv2_swagger) = {
info: {
version: "1.0";
};
external_docs: {
url: "https://github.com/jaegertracing/jaeger";
description: "Jaeger API";
}
schemes: HTTP;
schemes: HTTPS;
};
message PostSpansRequest {
Batch batch = 1 [
(gogoproto.nullable) = false
];
}
message PostSpansResponse {
}
service CollectorService {
rpc PostSpans(PostSpansRequest) returns (PostSpansResponse) {
option (google.api.http) = {
post: "/api/v2/spans"
body: "*"
};
}
}
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax="proto3";
package jaeger.api_v2;
import "gogoproto/gogo.proto";
import "google/api/annotations.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
// TODO: document all types and fields
// TODO: once this moves to jaeger-idl repo, we may want to change Go pkg to api_v2
// and rewrite it to model only in this repo. That should make it easier to generate
// classes in other languages.
option go_package = "model";
option java_package = "io.jaegertracing.api_v2";
// Enable gogoprotobuf extensions (https://github.com/gogo/protobuf/blob/master/extensions.md).
// Enable custom Marshal method.
option (gogoproto.marshaler_all) = true;
// Enable custom Unmarshal method.
option (gogoproto.unmarshaler_all) = true;
// Enable custom Size method (Required by Marshal and Unmarshal).
option (gogoproto.sizer_all) = true;
// Enable registration with golang/protobuf for the grpc-gateway.
option (gogoproto.goproto_registration) = true;
enum ValueType {
STRING = 0;
BOOL = 1;
INT64 = 2;
FLOAT64 = 3;
BINARY = 4;
};
message KeyValue {
option (gogoproto.equal) = true;
option (gogoproto.compare) = true;
string key = 1;
ValueType v_type = 2;
string v_str = 3;
bool v_bool = 4;
int64 v_int64 = 5;
double v_float64 = 6;
bytes v_binary = 7;
}
message Log {
google.protobuf.Timestamp timestamp = 1 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false
];
repeated KeyValue fields = 2 [
(gogoproto.nullable) = false
];
}
enum SpanRefType {
CHILD_OF = 0;
FOLLOWS_FROM = 1;
};
message SpanRef {
bytes trace_id = 1 [
(gogoproto.nullable) = false,
(gogoproto.customtype) = "TraceID",
(gogoproto.customname) = "TraceID"
];
bytes span_id = 2 [
(gogoproto.nullable) = false,
(gogoproto.customtype) = "SpanID",
(gogoproto.customname) = "SpanID"
];
SpanRefType ref_type = 3;
}
message Process {
string service_name = 1;
repeated KeyValue tags = 2 [
(gogoproto.nullable) = false
];
}
message Span {
bytes trace_id = 1 [
(gogoproto.nullable) = false,
(gogoproto.customtype) = "TraceID",
(gogoproto.customname) = "TraceID"
];
bytes span_id = 2 [
(gogoproto.nullable) = false,
(gogoproto.customtype) = "SpanID",
(gogoproto.customname) = "SpanID"
];
string operation_name = 3;
repeated SpanRef references = 4 [
(gogoproto.nullable) = false
];
uint32 flags = 5 [
(gogoproto.nullable) = false,
(gogoproto.customtype) = "Flags"
];
google.protobuf.Timestamp start_time = 6 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false
];
google.protobuf.Duration duration = 7 [
(gogoproto.stdduration) = true,
(gogoproto.nullable) = false
];
repeated KeyValue tags = 8 [
(gogoproto.nullable) = false
];
repeated Log logs = 9 [
(gogoproto.nullable) = false
];
Process process = 10;
string process_id = 11 [
(gogoproto.customname) = "ProcessID"
];
repeated string warnings = 12;
}
message Trace {
message ProcessMapping {
string process_id = 1 [
(gogoproto.customname) = "ProcessID"
];
Process process = 2 [
(gogoproto.nullable) = false
];
}
repeated Span spans = 1;
repeated ProcessMapping process_map = 2 [
(gogoproto.nullable) = false
];
repeated string warnings = 3;
}
message Batch {
repeated Span spans = 1;
Process process = 2 [
(gogoproto.nullable) = true
];
}
......@@ -31,23 +31,9 @@
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-trace-receiver-plugin</artifactId>
<artifactId>storage-zipkin-elasticsearch7-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>storage-zipkin-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-management-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.zipkin2</groupId>
<artifactId>zipkin</artifactId>
......
......@@ -33,7 +33,4 @@ public class ZipkinReceiverConfig extends ModuleConfig {
private long jettyIdleTimeOut = 30000;
private int jettyAcceptorPriorityDelta = 0;
private int jettyAcceptQueueSize = 0;
private int expireTime = 20;
private int maxCacheSize = 1_000_000;
private boolean needAnalysis = false;
}
......@@ -18,7 +18,6 @@
package org.apache.skywalking.oap.server.receiver.zipkin;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.ISegmentParserService;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
......@@ -28,9 +27,6 @@ import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedExcepti
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServerConfig;
import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.Receiver2AnalysisBridge;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.Zipkin2SkyWalkingTransfer;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV1JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV2JettyHandler;
......@@ -82,14 +78,6 @@ public class ZipkinReceiverProvider extends ModuleProvider {
jettyServer.addHandler(new SpanV1JettyHandler(config, getManager()));
jettyServer.addHandler(new SpanV2JettyHandler(config, getManager()));
if (config.isNeedAnalysis()) {
ISegmentParserService segmentParseService = getManager().find(TraceModule.NAME)
.provider()
.getService(ISegmentParserService.class);
Receiver2AnalysisBridge bridge = new Receiver2AnalysisBridge(segmentParseService);
Zipkin2SkyWalkingTransfer.INSTANCE.addListener(bridge);
}
}
@Override
......@@ -103,13 +91,6 @@ public class ZipkinReceiverProvider extends ModuleProvider {
@Override
public String[] requiredModules() {
if (config.isNeedAnalysis()) {
return new String[] {TraceModule.NAME};
} else {
/**
* In pure trace status, we don't need the trace receiver.
*/
return new String[] {CoreModule.NAME};
}
return new String[] {CoreModule.NAME};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.analysis;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.ISegmentParserService;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.SegmentListener;
/**
* Send the segments to Analysis module, like receiving segments from native SkyWalking agents.
*/
public class Receiver2AnalysisBridge implements SegmentListener {
private ISegmentParserService segmentParseService;
public Receiver2AnalysisBridge(ISegmentParserService segmentParseService) {
this.segmentParseService = segmentParseService;
}
@Override
public void notify(SkyWalkingTrace trace) {
trace.getSegmentList().forEach(upstream -> segmentParseService.send(upstream.build()));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.analysis;
import java.util.List;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.CacheFactory;
import zipkin2.Span;
public class ZipkinSkyWalkingTransfer {
public void doTransfer(ZipkinReceiverConfig config, List<Span> spanList) {
spanList.forEach(span -> {
// In Zipkin, the local service name represents the application owner.
CacheFactory.INSTANCE.get(config).addSpan(span);
});
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.caffeine.CaffeineSpanCache;
public class CacheFactory {
public static final CacheFactory INSTANCE = new CacheFactory();
private ISpanCache implementor;
private CacheFactory() {
}
public ISpanCache get(ZipkinReceiverConfig config) {
if (implementor == null) {
synchronized (INSTANCE) {
if (implementor == null) {
implementor = new CaffeineSpanCache(config);
}
}
}
return implementor;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache;
import zipkin2.Span;
public interface ISpanCache {
void addSpan(Span span);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.caffeine;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.ISpanCache;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.ZipkinTrace;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.Zipkin2SkyWalkingTransfer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.Span;
/**
* NOTICE: FROM my test, Caffeine cache triggers/checks expire only face write/read op. In order to make trace finish in
* time, I have to set a timer to write a meaningless trace, for active expire.
*/
public class CaffeineSpanCache implements ISpanCache, RemovalListener<String, ZipkinTrace> {
private static final Logger LOGGER = LoggerFactory.getLogger(CaffeineSpanCache.class);
private Cache<String, ZipkinTrace> inProcessSpanCache;
private ReentrantLock newTraceLock;
public CaffeineSpanCache(ZipkinReceiverConfig config) {
newTraceLock = new ReentrantLock();
inProcessSpanCache = Caffeine.newBuilder()
.expireAfterWrite(config.getExpireTime(), TimeUnit.SECONDS)
.maximumSize(config.getMaxCacheSize())
.removalListener(this)
.build();
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
inProcessSpanCache.put("ACTIVE", new ZipkinTrace.TriggerTrace());
}, 2, 3, TimeUnit.SECONDS);
}
/**
* Zipkin trace finished by the expired rule.
*/
@Override
public void onRemoval(@Nullable String key, @Nullable ZipkinTrace trace, @Nonnull RemovalCause cause) {
if (trace instanceof ZipkinTrace.TriggerTrace) {
return;
}
try {
Zipkin2SkyWalkingTransfer.INSTANCE.transfer(trace);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
LOGGER.warn("Zipkin trace:" + trace);
}
}
@Override
public void addSpan(Span span) {
ZipkinTrace trace = inProcessSpanCache.getIfPresent(span.traceId());
if (trace == null) {
newTraceLock.lock();
try {
trace = inProcessSpanCache.getIfPresent(span.traceId());
if (trace == null) {
trace = new ZipkinTrace();
inProcessSpanCache.put(span.traceId(), trace);
}
} finally {
newTraceLock.unlock();
}
}
trace.addSpan(span);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.analysis.data;
import java.util.List;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
/**
* Each SkyWalkingTrace consists of segments in each application, original from {@link ZipkinTrace}s
*/
@RequiredArgsConstructor
@Getter
public class SkyWalkingTrace {
private final List<SegmentObject.Builder> segmentList;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.analysis.data;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import zipkin2.Span;
public class ZipkinTrace {
private List<Span> spans;
private ReentrantLock spanWriteLock;
public ZipkinTrace() {
spans = new LinkedList<>();
spanWriteLock = new ReentrantLock();
}
public void addSpan(Span span) {
spanWriteLock.lock();
try {
spans.add(span);
} finally {
spanWriteLock.unlock();
}
}
public List<Span> getSpans() {
return spans;
}
@Override
public String toString() {
return "ZipkinTrace{" + "spans=" + spans + '}';
}
public static class TriggerTrace extends ZipkinTrace {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform;
import com.google.common.base.Strings;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
import org.apache.skywalking.apm.network.language.agent.v3.Log;
import org.apache.skywalking.apm.network.language.agent.v3.RefType;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentReference;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanType;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
import zipkin2.Endpoint;
import zipkin2.Span;
public class SegmentBuilder {
private Context context;
private LinkedList<Segment> segments;
private Map<String, ClientSideSpan> clientPartSpan;
private SegmentBuilder() {
segments = new LinkedList<>();
context = new Context();
clientPartSpan = new HashMap<>();
}
public static SkyWalkingTrace build(List<Span> traceSpans) throws Exception {
SegmentBuilder builder = new SegmentBuilder();
// This map groups the spans by their parent id, in order to assist to build tree.
// key: parentId
// value: span
Map<String, List<Span>> childSpanMap = new HashMap<>();
AtomicReference<Span> root = new AtomicReference<>();
traceSpans.forEach(span -> {
if (span.parentId() == null) {
root.set(span);
}
List<Span> spanList = childSpanMap.get(span.parentId());
if (spanList == null) {
spanList = new LinkedList<>();
spanList.add(span);
childSpanMap.put(span.parentId(), spanList);
} else {
spanList.add(span);
}
});
Span rootSpan = root.get();
long timestamp = 0;
if (rootSpan != null) {
String applicationCode = rootSpan.localServiceName();
// If root span doesn't include applicationCode, a.k.a local service name,
// Segment can't be built
// Ignore the whole trace.
// :P Hope anyone could provide better solution.
// Wu Sheng.
if (!Strings.isNullOrEmpty(applicationCode)) {
timestamp = rootSpan.timestampAsLong();
builder.context.addService(applicationCode);
SpanObject.Builder rootSpanBuilder = builder.initSpan(null, null, rootSpan, true);
builder.context.currentSegment().addSpan(rootSpanBuilder);
builder.scanSpansFromRoot(rootSpanBuilder, rootSpan, childSpanMap);
builder.segments.add(builder.context.removeApp());
}
}
List<SegmentObject.Builder> segmentBuilders = new LinkedList<>();
// microseconds -> million seconds
long finalTimestamp = timestamp / 1000;
builder.segments.forEach(segment -> {
SegmentObject.Builder traceSegmentBuilder = segment.freeze();
segmentBuilders.add(traceSegmentBuilder);
});
return new SkyWalkingTrace(segmentBuilders);
}
private void scanSpansFromRoot(SpanObject.Builder parentSegmentSpan, Span parent,
Map<String, List<Span>> childSpanMap) throws Exception {
String parentId = parent.id();
// get child spans by parent span id
List<Span> spanList = childSpanMap.get(parentId);
if (spanList == null) {
return;
}
for (Span childSpan : spanList) {
String localServiceName = childSpan.localServiceName();
boolean isNewApp = false;
if (StringUtil.isNotEmpty(localServiceName)) {
if (context.isServiceChanged(localServiceName)) {
isNewApp = true;
}
}
try {
if (isNewApp) {
context.addService(localServiceName);
}
SpanObject.Builder childSpanBuilder = initSpan(parentSegmentSpan, parent, childSpan, isNewApp);
context.currentSegment().addSpan(childSpanBuilder);
scanSpansFromRoot(childSpanBuilder, childSpan, childSpanMap);
} finally {
if (isNewApp) {
segments.add(context.removeApp());
}
}
}
}
private SpanObject.Builder initSpan(SpanObject.Builder parentSegmentSpan, Span parentSpan, Span span,
boolean isSegmentRoot) {
SpanObject.Builder spanBuilder = SpanObject.newBuilder();
spanBuilder.setSpanId(context.currentIDs().nextSpanId());
if (isSegmentRoot) {
// spanId = -1, means no parent span
// spanId is considered unique, and from a positive sequence in each segment.
spanBuilder.setParentSpanId(-1);
}
if (!isSegmentRoot && parentSegmentSpan != null) {
spanBuilder.setParentSpanId(parentSegmentSpan.getSpanId());
}
Span.Kind kind = span.kind();
String opName = Strings.isNullOrEmpty(span.name()) ? "-" : span.name();
spanBuilder.setOperationName(opName);
ClientSideSpan clientSideSpan;
switch (kind) {
case CLIENT:
spanBuilder.setSpanType(SpanType.Exit);
String peer = getPeer(parentSpan, span);
if (peer != null) {
spanBuilder.setPeer(peer);
}
clientSideSpan = new ClientSideSpan(span, spanBuilder);
clientPartSpan.put(span.id(), clientSideSpan);
break;
case SERVER:
spanBuilder.setSpanType(SpanType.Entry);
this.buildRef(spanBuilder, span, parentSegmentSpan, parentSpan);
break;
case CONSUMER:
spanBuilder.setSpanType(SpanType.Entry);
this.buildRef(spanBuilder, span, parentSegmentSpan, parentSpan);
break;
case PRODUCER:
spanBuilder.setSpanType(SpanType.Exit);
peer = getPeer(parentSpan, span);
if (peer != null) {
spanBuilder.setPeer(peer);
}
clientSideSpan = new ClientSideSpan(span, spanBuilder);
clientPartSpan.put(span.id(), clientSideSpan);
break;
default:
spanBuilder.setSpanType(SpanType.Local);
}
// microseconds in Zipkin -> milliseconds in SkyWalking
long startTime = span.timestamp() / 1000;
// Some implement of zipkin client not include duration field in its report
// package when duration's value be 0ms, Causing a null pointer exception here.
Long durationObj = span.duration();
long duration = (durationObj == null) ? 0 : durationObj.longValue() / 1000;
spanBuilder.setStartTime(startTime);
spanBuilder.setEndTime(startTime + duration);
span.tags()
.forEach((tagKey, tagValue) -> spanBuilder.addTags(KeyStringValuePair.newBuilder()
.setKey(tagKey)
.setValue(tagValue)
.build()));
span.annotations()
.forEach(annotation -> spanBuilder.addLogs(Log.newBuilder()
.setTime(annotation.timestamp() / 1000)
.addData(KeyStringValuePair.newBuilder()
.setKey("zipkin.annotation")
.setValue(annotation.value())
.build())));
return spanBuilder;
}
private void buildRef(SpanObject.Builder spanBuilder, Span span, SpanObject.Builder parentSegmentSpan,
Span parentSpan) {
Segment parentSegment = context.parentSegment();
if (parentSegment == null) {
return;
}
Segment rootSegment = context.rootSegment();
if (rootSegment == null) {
return;
}
if (span.shared() != null && span.shared()) {
// using same span id in client and server for RPC
// SkyWalking will build both sides of span
ClientSideSpan clientSideSpan = clientPartSpan.get(span.id());
if (clientSideSpan != null) {
// For the root span, there may be no ref, because of no parent.
parentSegmentSpan = clientSideSpan.getBuilder();
parentSpan = clientSideSpan.getSpan();
}
}
String peer = getPeer(parentSpan, span);
if (StringUtil.isEmpty(peer)) {
//The IP is the most important for building the ref at both sides.
return;
}
SegmentReference.Builder refBuilder = SegmentReference.newBuilder();
// parent ref info
refBuilder.setNetworkAddressUsedAtPeer(peer);
parentSegmentSpan.setPeer(refBuilder.getNetworkAddressUsedAtPeer());
refBuilder.setParentServiceInstance(parentSegment.builder().getServiceInstance());
refBuilder.setParentSpanId(parentSegmentSpan.getSpanId());
refBuilder.setParentTraceSegmentId(parentSegment.builder().getTraceSegmentId());
refBuilder.setParentEndpoint(parentSegment.getEntryEndpointName());
refBuilder.setRefType(RefType.CrossProcess);
spanBuilder.addRefs(refBuilder);
}
private String getPeer(Span parentSpan, Span childSpan) {
String peer;
Endpoint serverEndpoint = childSpan == null ? null : childSpan.localEndpoint();
peer = endpoint2Peer(serverEndpoint);
if (peer == null) {
Endpoint clientEndpoint = parentSpan == null ? null : parentSpan.remoteEndpoint();
peer = endpoint2Peer(clientEndpoint);
}
return peer;
}
private String endpoint2Peer(Endpoint endpoint) {
String ip = null;
Integer port = 0;
if (endpoint != null) {
if (!Strings.isNullOrEmpty(endpoint.ipv4())) {
ip = endpoint.ipv4();
port = endpoint.port();
} else if (!Strings.isNullOrEmpty(endpoint.ipv6())) {
ip = endpoint.ipv6();
port = endpoint.port();
}
}
if (ip == null) {
return null;
} else {
return port == null || port == 0 ? ip : ip + ":" + port;
}
}
/**
* Context holds the values in build process.
*/
private class Context {
private LinkedList<Segment> segmentsStack = new LinkedList<>();
private boolean isServiceChanged(String service) {
return !Strings.isNullOrEmpty(service) && !service.equals(currentIDs().service);
}
private Segment addService(String serviceCode) throws Exception {
Segment segment = new Segment(serviceCode, serviceCode);
segmentsStack.add(segment);
return segment;
}
private IDCollection currentIDs() {
return segmentsStack.getLast().ids;
}
private Segment currentSegment() {
return segmentsStack.getLast();
}
private Segment parentSegment() {
if (segmentsStack.size() < 2) {
return null;
} else {
return segmentsStack.get(segmentsStack.size() - 2);
}
}
private Segment rootSegment() {
if (segmentsStack.size() < 2) {
return null;
} else {
return segmentsStack.getFirst();
}
}
private Segment removeApp() {
return segmentsStack.removeLast();
}
private int waitForExchange(Callable<Integer> callable, int retry) throws Exception {
for (int i = 0; i < retry; i++) {
Integer id = callable.call();
if (id == 0) {
Thread.sleep(1000L);
} else {
return id;
}
}
throw new TimeoutException("ID exchange costs more than expected.");
}
}
private class Segment {
private SegmentObject.Builder segmentBuilder;
private IDCollection ids;
private String entryEndpointName = null;
private List<SpanObject.Builder> spans;
private long endTime = 0;
private Segment(String service, String serviceInstance) {
ids = new IDCollection(service, serviceInstance);
spans = new LinkedList<>();
segmentBuilder = SegmentObject.newBuilder();
segmentBuilder.setService(service);
segmentBuilder.setServiceInstance(serviceInstance);
segmentBuilder.setTraceSegmentId(UUID.randomUUID().toString().replaceAll("-", ""));
}
private SegmentObject.Builder builder() {
return segmentBuilder;
}
private void addSpan(SpanObject.Builder spanBuilder) {
String operationName = spanBuilder.getOperationName();
if (StringUtil.isEmpty(entryEndpointName) && !Strings.isNullOrEmpty(operationName)) {
if (SpanType.Entry.equals(spanBuilder.getSpanType())) {
if (!Strings.isNullOrEmpty(operationName)) {
entryEndpointName = operationName;
}
}
}
// init by root span
if (spanBuilder.getSpanId() == 1 && StringUtil.isEmpty(entryEndpointName)) {
if (!Strings.isNullOrEmpty(operationName)) {
entryEndpointName = operationName;
}
}
spans.add(spanBuilder);
if (spanBuilder.getEndTime() > endTime) {
endTime = spanBuilder.getEndTime();
}
}
public String getEntryEndpointName() {
return entryEndpointName;
}
private IDCollection ids() {
return ids;
}
public SegmentObject.Builder freeze() {
for (SpanObject.Builder span : spans) {
segmentBuilder.addSpans(span);
}
return segmentBuilder;
}
}
private class IDCollection {
private String service;
private String instanceName;
private int spanIdSeq;
private IDCollection(String service, String instanceName) {
this.service = service;
this.instanceName = instanceName;
this.spanIdSeq = 0;
}
private int nextSpanId() {
return spanIdSeq++;
}
}
private class ClientSideSpan {
private Span span;
private SpanObject.Builder builder;
public ClientSideSpan(Span span, SpanObject.Builder builder) {
this.span = span;
this.builder = builder;
}
public Span getSpan() {
return span;
}
public SpanObject.Builder getBuilder() {
return builder;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
public interface SegmentListener {
void notify(SkyWalkingTrace trace);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.ZipkinTrace;
import zipkin2.Span;
public class Zipkin2SkyWalkingTransfer {
public static Zipkin2SkyWalkingTransfer INSTANCE = new Zipkin2SkyWalkingTransfer();
private List<SegmentListener> listeners = new LinkedList<>();
private Zipkin2SkyWalkingTransfer() {
}
public void addListener(SegmentListener listener) {
listeners.add(listener);
}
public void transfer(ZipkinTrace trace) throws Exception {
List<Span> traceSpans = trace.getSpans();
if (traceSpans.size() > 0) {
SkyWalkingTrace skyWalkingTrace = SegmentBuilder.build(traceSpans);
listeners.forEach(listener -> listener.notify(skyWalkingTrace));
}
}
}
......@@ -24,19 +24,18 @@ import java.io.InputStream;
import java.util.List;
import java.util.zip.GZIPInputStream;
import javax.servlet.http.HttpServletRequest;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.ZipkinSkyWalkingTransfer;
import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
@RequiredArgsConstructor
public class SpanProcessor {
private SourceReceiver receiver;
public SpanProcessor(SourceReceiver receiver) {
this.receiver = receiver;
}
private final NamingControl namingControl;
private final SourceReceiver receiver;
void convert(ZipkinReceiverConfig config, SpanBytesDecoder decoder, HttpServletRequest request) throws IOException {
try (InputStream inputStream = getInputStream(request)) {
......@@ -50,13 +49,8 @@ public class SpanProcessor {
List<Span> spanList = decoder.decodeList(out.toByteArray());
if (config.isNeedAnalysis()) {
ZipkinSkyWalkingTransfer transfer = new ZipkinSkyWalkingTransfer();
transfer.doTransfer(config, spanList);
} else {
SpanForward forward = new SpanForward(config, receiver);
forward.send(spanList);
}
SpanForward forward = new SpanForward(namingControl, receiver);
forward.send(spanList);
}
}
......
......@@ -20,23 +20,24 @@ package org.apache.skywalking.oap.server.receiver.zipkin.handler;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.codec.SpanBytesDecoder;
@Slf4j
public class SpanV1JettyHandler extends JettyHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(SpanV1JettyHandler.class);
private ZipkinReceiverConfig config;
private SourceReceiver sourceReceiver;
private final ZipkinReceiverConfig config;
private final SourceReceiver sourceReceiver;
private final NamingControl namingControl;
public SpanV1JettyHandler(ZipkinReceiverConfig config, ModuleManager manager) {
sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
namingControl = manager.find(CoreModule.NAME).provider().getService(NamingControl.class);
this.config = config;
}
......@@ -57,14 +58,14 @@ public class SpanV1JettyHandler extends JettyHandler {
SpanBytesDecoder decoder = SpanEncode.isThrift(encode) ? SpanBytesDecoder.THRIFT : SpanBytesDecoder.JSON_V1;
SpanProcessor processor = new SpanProcessor(sourceReceiver);
SpanProcessor processor = new SpanProcessor(namingControl, sourceReceiver);
processor.convert(config, decoder, request);
response.setStatus(202);
} catch (Exception e) {
response.setStatus(500);
LOGGER.error(e.getMessage(), e);
log.error(e.getMessage(), e);
}
}
......
......@@ -20,23 +20,25 @@ package org.apache.skywalking.oap.server.receiver.zipkin.handler;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.codec.SpanBytesDecoder;
@Slf4j
public class SpanV2JettyHandler extends JettyHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(SpanV2JettyHandler.class);
private ZipkinReceiverConfig config;
private SourceReceiver sourceReceiver;
private final ZipkinReceiverConfig config;
private final SourceReceiver sourceReceiver;
private final NamingControl namingControl;
public SpanV2JettyHandler(ZipkinReceiverConfig config, ModuleManager manager) {
sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
namingControl = manager.find(CoreModule.NAME).provider().getService(NamingControl.class);
this.config = config;
}
......@@ -57,14 +59,14 @@ public class SpanV2JettyHandler extends JettyHandler {
SpanBytesDecoder decoder = SpanEncode.isProto3(encode) ? SpanBytesDecoder.PROTO3 : SpanBytesDecoder.JSON_V2;
SpanProcessor processor = new SpanProcessor(sourceReceiver);
SpanProcessor processor = new SpanProcessor(namingControl, sourceReceiver);
processor.convert(config, decoder, request);
response.setStatus(202);
} catch (Exception e) {
response.setStatus(500);
LOGGER.error(e.getMessage(), e);
log.error(e.getMessage(), e);
}
}
}
......@@ -19,26 +19,25 @@
package org.apache.skywalking.oap.server.receiver.zipkin.trace;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.NodeType;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.source.EndpointMeta;
import org.apache.skywalking.oap.server.core.source.ServiceMeta;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanEncode;
import org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpan;
import zipkin2.Span;
import zipkin2.codec.SpanBytesEncoder;
@RequiredArgsConstructor
public class SpanForward {
private ZipkinReceiverConfig config;
private SourceReceiver receiver;
public SpanForward(ZipkinReceiverConfig config, SourceReceiver receiver) {
this.config = config;
this.receiver = receiver;
}
private final NamingControl namingControl;
private final SourceReceiver receiver;
public void send(List<Span> spanList) {
spanList.forEach(span -> {
......@@ -49,27 +48,28 @@ public class SpanForward {
if (StringUtil.isEmpty(serviceName)) {
serviceName = "Unknown";
}
serviceName = namingControl.formatServiceName(serviceName);
zipkinSpan.setServiceId(IDManager.ServiceID.buildId(serviceName, NodeType.Normal));
String spanName = span.name();
Span.Kind kind = span.kind();
switch (kind) {
case SERVER:
case CONSUMER:
if (!StringUtil.isEmpty(spanName)) {
zipkinSpan.setEndpointId(IDManager.EndpointID.buildId(zipkinSpan.getServiceId(), span.name()));
}
}
if (!StringUtil.isEmpty(spanName)) {
zipkinSpan.setEndpointName(spanName);
}
long startTime = span.timestampAsLong() / 1000;
zipkinSpan.setStartTime(startTime);
if (startTime != 0) {
long timeBucket = TimeBucket.getRecordTimeBucket(zipkinSpan.getStartTime());
zipkinSpan.setTimeBucket(timeBucket);
}
long timeBucket = TimeBucket.getRecordTimeBucket(zipkinSpan.getStartTime());
zipkinSpan.setTimeBucket(timeBucket);
String spanName = span.name();
if (!StringUtil.isEmpty(spanName)) {
final String endpointName = namingControl.formatEndpointName(serviceName, spanName);
zipkinSpan.setEndpointName(endpointName);
zipkinSpan.setEndpointId(IDManager.EndpointID.buildId(zipkinSpan.getServiceId(), endpointName));
//Create endpoint meta for the server side span
EndpointMeta endpointMeta = new EndpointMeta();
endpointMeta.setServiceName(serviceName);
endpointMeta.setServiceNodeType(NodeType.Normal);
endpointMeta.setEndpoint(endpointName);
endpointMeta.setTimeBucket(timeBucket);
receiver.receive(endpointMeta);
}
long latency = span.durationAsLong() / 1000;
zipkinSpan.setEndTime(startTime + latency);
......@@ -78,7 +78,19 @@ public class SpanForward {
zipkinSpan.setLatency((int) latency);
zipkinSpan.setDataBinary(SpanBytesEncoder.PROTO3.encode(span));
span.tags().forEach((key, value) -> {
zipkinSpan.getTags().add(key + "=" + value);
});
receiver.receive(zipkinSpan);
// Create the metadata source
// No instance name is required in the Zipkin model.
ServiceMeta serviceMeta = new ServiceMeta();
serviceMeta.setName(serviceName);
serviceMeta.setNodeType(NodeType.Normal);
serviceMeta.setTimeBucket(timeBucket);
receiver.receive(serviceMeta);
});
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform;
import java.io.UnsupportedEncodingException;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentReference;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanType;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.ZipkinTrace;
import org.junit.Assert;
import org.junit.Test;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
public class SpringSleuthSegmentBuilderTest implements SegmentListener {
@Test
public void testTransform() throws Exception {
Zipkin2SkyWalkingTransfer.INSTANCE.addListener(this);
List<Span> spanList = buildSpringSleuthExampleTrace();
Assert.assertEquals(3, spanList.size());
ZipkinTrace trace = new ZipkinTrace();
spanList.forEach(span -> trace.addSpan(span));
Zipkin2SkyWalkingTransfer.INSTANCE.transfer(trace);
}
private List<Span> buildSpringSleuthExampleTrace() throws UnsupportedEncodingException {
List<Span> spans = new LinkedList<>();
String span = "{\"traceId\":\"5b0e64354eea4fa71a8a1b5bdd791b8a\",\"id\":\"1a8a1b5bdd791b8a\",\"kind\":\"SERVER\",\"name\":\"get /\",\"timestamp\":1527669813700123,\"duration\":11295,\"localEndpoint\":{\"serviceName\":\"frontend\",\"ipv4\":\"192.168.72.220\"},\"remoteEndpoint\":{\"ipv6\":\"::1\",\"port\":55146},\"tags\":{\"http.method\":\"GET\",\"http.path\":\"/\",\"mvc.controller.class\":\"Frontend\",\"mvc.controller.method\":\"callBackend\"}}";
spans.add(SpanBytesDecoder.JSON_V2.decodeOne(span.getBytes("UTF-8")));
span = "{\"traceId\":\"5b0e64354eea4fa71a8a1b5bdd791b8a\",\"parentId\":\"1a8a1b5bdd791b8a\",\"id\":\"d7d5b93dcda767c8\",\"kind\":\"CLIENT\",\"name\":\"get\",\"timestamp\":1527669813702456,\"duration\":6672,\"localEndpoint\":{\"serviceName\":\"frontend\",\"ipv4\":\"192.168.72.220\"},\"tags\":{\"http.method\":\"GET\",\"http.path\":\"/api\"}}";
spans.add(SpanBytesDecoder.JSON_V2.decodeOne(span.getBytes("UTF-8")));
span = "{\"traceId\":\"5b0e64354eea4fa71a8a1b5bdd791b8a\",\"parentId\":\"1a8a1b5bdd791b8a\",\"id\":\"d7d5b93dcda767c8\",\"kind\":\"SERVER\",\"name\":\"get /api\",\"timestamp\":1527669813705106,\"duration\":4802,\"localEndpoint\":{\"serviceName\":\"backend\",\"ipv4\":\"192.168.72.220\"},\"remoteEndpoint\":{\"ipv4\":\"127.0.0.1\",\"port\":55147},\"tags\":{\"http.method\":\"GET\",\"http.path\":\"/api\",\"mvc.controller.class\":\"Backend\",\"mvc.controller.method\":\"printDate\"},\"shared\":true}";
spans.add(SpanBytesDecoder.JSON_V2.decodeOne(span.getBytes("UTF-8")));
return SpanBytesDecoder.JSON_V2.decodeList(spans.toString().getBytes("UTF-8"));
}
@Override
public void notify(SkyWalkingTrace trace) {
List<SegmentObject.Builder> segments = trace.getSegmentList();
Assert.assertEquals(2, segments.size());
SegmentObject.Builder builder = segments.get(0);
SegmentObject.Builder builder1 = segments.get(1);
SegmentObject.Builder front, end;
if (builder.getService().equals("frontend")) {
front = builder;
end = builder1;
} else if (builder.getService().equals("backend")) {
end = builder;
front = builder1;
} else {
Assert.fail("Can't find frontend and backend applications, " + builder.getService());
return;
}
Assert.assertEquals(2, front.getSpansCount());
Assert.assertEquals(1, end.getSpansCount());
front.getSpansList().forEach(spanObject -> {
if (spanObject.getSpanId() == 0) {
// span id = 1, means incoming http of frontend
Assert.assertEquals(SpanType.Entry, spanObject.getSpanType());
Assert.assertEquals("get /", spanObject.getOperationName());
Assert.assertEquals(0, spanObject.getSpanId());
Assert.assertEquals(-1, spanObject.getParentSpanId());
} else if (spanObject.getSpanId() == 1) {
Assert.assertEquals("192.168.72.220", spanObject.getPeer());
Assert.assertEquals(SpanType.Exit, spanObject.getSpanType());
Assert.assertEquals(1, spanObject.getSpanId());
Assert.assertEquals(0, spanObject.getParentSpanId());
} else {
Assert.fail("Only two spans expected");
}
Assert.assertTrue(spanObject.getTagsCount() > 0);
});
SpanObject spanObject = end.getSpans(0);
Assert.assertEquals(1, spanObject.getRefsCount());
SegmentReference spanObjectRef = spanObject.getRefs(0);
Assert.assertEquals("get /", spanObjectRef.getParentEndpoint());
Assert.assertEquals("192.168.72.220", spanObjectRef.getNetworkAddressUsedAtPeer());
Assert.assertEquals(1, spanObjectRef.getParentSpanId());
Assert.assertEquals(front.getTraceSegmentId(), spanObjectRef.getParentTraceSegmentId());
Assert.assertTrue(spanObject.getTagsCount() > 0);
Assert.assertEquals("get /api", spanObject.getOperationName());
Assert.assertEquals(0, spanObject.getSpanId());
Assert.assertEquals(-1, spanObject.getParentSpanId());
Assert.assertEquals(SpanType.Entry, spanObject.getSpanType());
}
}
......@@ -44,6 +44,11 @@
<artifactId>storage-elasticsearch7-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>zipkin-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- profile exporter -->
<dependency>
......
......@@ -40,21 +40,6 @@
<artifactId>storage-elasticsearch-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>storage-zipkin-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>jaeger-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>zipkin-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- profile exporter -->
<dependency>
......
......@@ -31,8 +31,7 @@
<module>storage-jdbc-hikaricp-plugin</module>
<module>storage-elasticsearch-plugin</module>
<module>storage-elasticsearch7-plugin</module>
<module>storage-zipkin-plugin</module>
<module>storage-jaeger-plugin</module>
<module>storage-zipkin-elasticsearch7-plugin</module>
<module>storage-influxdb-plugin</module>
<module>storage-tidb-plugin</module>
</modules>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>server-storage-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>8.5.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>storage-jaeger-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>storage-elasticsearch-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>receiver-proto</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jaeger;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
import org.apache.skywalking.oap.server.core.source.Source;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.JAEGER_SPAN;
@ScopeDeclaration(id = JAEGER_SPAN, name = "JaegerSpan")
public class JaegerSpan extends Source {
@Override
public int scope() {
return DefaultScopeDefine.JAEGER_SPAN;
}
@Override
public String getEntityId() {
return traceId + spanId;
}
@Setter
@Getter
private String traceId;
@Setter
@Getter
private String spanId;
@Setter
@Getter
private String serviceId;
@Setter
@Getter
private String serviceInstanceId;
@Setter
@Getter
private String endpointName;
@Setter
@Getter
private String endpointId;
@Setter
@Getter
private long startTime;
@Setter
@Getter
private long endTime;
@Setter
@Getter
private int latency;
@Setter
@Getter
private int isError;
@Setter
@Getter
private byte[] dataBinary;
@Setter
@Getter
private int encode;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jaeger;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
@SuperDataset
@Stream(name = JaegerSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.JAEGER_SPAN, builder = JaegerSpanRecord.Builder.class, processor = RecordStreamProcessor.class)
public class JaegerSpanRecord extends Record {
public static final String INDEX_NAME = "jaeger_span";
public static final String TRACE_ID = "trace_id";
public static final String SPAN_ID = "span_id";
public static final String SERVICE_ID = "service_id";
public static final String SERVICE_INSTANCE_ID = "service_instance_id";
public static final String ENDPOINT_NAME = "endpoint_name";
public static final String ENDPOINT_ID = "endpoint_id";
public static final String START_TIME = "start_time";
public static final String END_TIME = "end_time";
public static final String LATENCY = "latency";
public static final String IS_ERROR = "is_error";
public static final String DATA_BINARY = "data_binary";
public static final String ENCODE = "encode";
@Setter
@Getter
@Column(columnName = TRACE_ID)
private String traceId;
@Setter
@Getter
@Column(columnName = SPAN_ID)
private String spanId;
@Setter
@Getter
@Column(columnName = SERVICE_ID)
private String serviceId;
@Setter
@Getter
@Column(columnName = SERVICE_INSTANCE_ID)
private String serviceInstanceId;
@Setter
@Getter
@Column(columnName = ENDPOINT_NAME, matchQuery = true)
private String endpointName;
@Setter
@Getter
@Column(columnName = ENDPOINT_ID)
private String endpointId;
@Setter
@Getter
@Column(columnName = START_TIME)
private long startTime;
@Setter
@Getter
@Column(columnName = END_TIME)
private long endTime;
@Setter
@Getter
@Column(columnName = LATENCY)
private int latency;
@Setter
@Getter
@Column(columnName = IS_ERROR)
private int isError;
@Setter
@Getter
@Column(columnName = DATA_BINARY)
private byte[] dataBinary;
@Setter
@Getter
@Column(columnName = ENCODE)
private int encode;
@Override
public String id() {
return traceId + "-" + spanId;
}
public static class Builder implements StorageHashMapBuilder<JaegerSpanRecord> {
@Override
public Map<String, Object> entity2Storage(JaegerSpanRecord storageData) {
Map<String, Object> map = new HashMap<>();
map.put(TRACE_ID, storageData.getTraceId());
map.put(SPAN_ID, storageData.getSpanId());
map.put(SERVICE_ID, storageData.getServiceId());
map.put(SERVICE_INSTANCE_ID, storageData.getServiceInstanceId());
map.put(ENDPOINT_NAME, storageData.getEndpointName());
map.put(ENDPOINT_ID, storageData.getEndpointId());
map.put(START_TIME, storageData.getStartTime());
map.put(END_TIME, storageData.getEndTime());
map.put(LATENCY, storageData.getLatency());
map.put(IS_ERROR, storageData.getIsError());
map.put(TIME_BUCKET, storageData.getTimeBucket());
if (CollectionUtils.isEmpty(storageData.getDataBinary())) {
map.put(DATA_BINARY, Const.EMPTY_STRING);
} else {
map.put(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary())));
}
map.put(ENCODE, storageData.getEncode());
return map;
}
@Override
public JaegerSpanRecord storage2Entity(Map<String, Object> dbMap) {
JaegerSpanRecord record = new JaegerSpanRecord();
record.setTraceId((String) dbMap.get(TRACE_ID));
record.setSpanId((String) dbMap.get(SPAN_ID));
record.setServiceId((String) dbMap.get(SERVICE_ID));
record.setServiceInstanceId((String) dbMap.get(SERVICE_INSTANCE_ID));
record.setEndpointName((String) dbMap.get(ENDPOINT_NAME));
record.setEndpointId((String) dbMap.get(ENDPOINT_ID));
record.setStartTime(((Number) dbMap.get(START_TIME)).longValue());
record.setEndTime(((Number) dbMap.get(END_TIME)).longValue());
record.setLatency(((Number) dbMap.get(LATENCY)).intValue());
record.setIsError(((Number) dbMap.get(IS_ERROR)).intValue());
record.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
if (StringUtil.isEmpty((String) dbMap.get(DATA_BINARY))) {
record.setDataBinary(new byte[] {});
} else {
record.setDataBinary(Base64.getDecoder().decode((String) dbMap.get(DATA_BINARY)));
}
record.setEncode(((Number) dbMap.get(ENCODE)).intValue());
return record;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jaeger;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
/**
* Dispatch for Zipkin native mode spans.
*/
public class JaegerSpanRecordDispatcher implements SourceDispatcher<JaegerSpan> {
@Override
public void dispatch(JaegerSpan source) {
JaegerSpanRecord segment = new JaegerSpanRecord();
segment.setTraceId(source.getTraceId());
segment.setSpanId(source.getSpanId());
segment.setServiceId(source.getServiceId());
segment.setServiceInstanceId(source.getServiceInstanceId());
segment.setEndpointName(source.getEndpointName());
segment.setEndpointId(source.getEndpointId());
segment.setStartTime(source.getStartTime());
segment.setEndTime(source.getEndTime());
segment.setLatency(source.getLatency());
segment.setIsError(source.getIsError());
segment.setDataBinary(source.getDataBinary());
segment.setTimeBucket(source.getTimeBucket());
segment.setEncode(source.getEncode());
RecordStreamProcessor.getInstance().in(segment);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jaeger.elasticsearch;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchProvider;
@Slf4j
public class JaegerStorageModuleElasticsearchProvider extends StorageModuleElasticsearchProvider {
@Override
public String name() {
return "jaeger-elasticsearch";
}
@Override
public void prepare() throws ServiceNotProvidedException {
super.prepare();
JaegerTraceQueryEsDAO traceQueryEsDAO = new JaegerTraceQueryEsDAO(elasticSearchClient);
this.registerServiceImplementation(ITraceQueryDAO.class, traceQueryEsDAO);
}
@Override
public String[] requiredModules() {
return new String[] {CoreModule.NAME};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jaeger.elasticsearch;
import com.google.common.base.Strings;
import com.google.protobuf.ByteString;
import io.jaegertracing.api_v2.Model;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.query.type.KeyValue;
import org.apache.skywalking.oap.server.core.query.type.LogEntity;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
import org.apache.skywalking.oap.server.core.query.type.Ref;
import org.apache.skywalking.oap.server.core.query.type.RefType;
import org.apache.skywalking.oap.server.core.query.type.Span;
import org.apache.skywalking.oap.server.core.query.type.TraceBrief;
import org.apache.skywalking.oap.server.core.query.type.TraceState;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpanRecord;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.search.aggregations.metrics.min.Min;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import static org.apache.skywalking.oap.server.core.analysis.record.Record.TIME_BUCKET;
import static org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpanRecord.ENDPOINT_ID;
import static org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpanRecord.ENDPOINT_NAME;
import static org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpanRecord.END_TIME;
import static org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpanRecord.IS_ERROR;
import static org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpanRecord.LATENCY;
import static org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpanRecord.SERVICE_ID;
import static org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpanRecord.SERVICE_INSTANCE_ID;
import static org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpanRecord.START_TIME;
import static org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpanRecord.TRACE_ID;
public class JaegerTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
public JaegerTraceQueryEsDAO(ElasticSearchClient client) {
super(client);
}
@Override
public TraceBrief queryBasicTraces(long startSecondTB,
long endSecondTB,
long minDuration,
long maxDuration,
String endpointName,
String serviceId,
String serviceInstanceId,
String endpointId,
String traceId,
int limit,
int from,
TraceState traceState,
QueryOrder queryOrder,
final List<Tag> tags) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
sourceBuilder.query(boolQueryBuilder);
List<QueryBuilder> mustQueryList = boolQueryBuilder.must();
if (startSecondTB != 0 && endSecondTB != 0) {
mustQueryList.add(QueryBuilders.rangeQuery(TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
}
if (minDuration != 0 || maxDuration != 0) {
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(LATENCY);
if (minDuration != 0) {
rangeQueryBuilder.gte(minDuration);
}
if (maxDuration != 0) {
rangeQueryBuilder.lte(maxDuration);
}
boolQueryBuilder.must().add(rangeQueryBuilder);
}
if (!Strings.isNullOrEmpty(endpointName)) {
mustQueryList.add(QueryBuilders.matchPhraseQuery(ENDPOINT_NAME, endpointName));
}
if (StringUtil.isNotEmpty(serviceId)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_ID, serviceId));
}
if (StringUtil.isNotEmpty(serviceInstanceId)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_INSTANCE_ID, serviceInstanceId));
}
if (!Strings.isNullOrEmpty(endpointId)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(ENDPOINT_ID, endpointId));
}
if (!Strings.isNullOrEmpty(traceId)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(TRACE_ID, traceId));
}
switch (traceState) {
case ERROR:
mustQueryList.add(QueryBuilders.matchQuery(IS_ERROR, BooleanUtils.TRUE));
break;
case SUCCESS:
mustQueryList.add(QueryBuilders.matchQuery(IS_ERROR, BooleanUtils.FALSE));
break;
}
TermsAggregationBuilder builder = AggregationBuilders.terms(TRACE_ID)
.field(TRACE_ID)
.size(limit)
.subAggregation(AggregationBuilders.max(LATENCY)
.field(LATENCY))
.subAggregation(AggregationBuilders.min(START_TIME)
.field(START_TIME));
switch (queryOrder) {
case BY_START_TIME:
builder.order(BucketOrder.aggregation(START_TIME, false));
break;
case BY_DURATION:
builder.order(BucketOrder.aggregation(LATENCY, false));
break;
}
sourceBuilder.aggregation(builder);
SearchResponse response = getClient().search(JaegerSpanRecord.INDEX_NAME, sourceBuilder);
TraceBrief traceBrief = new TraceBrief();
Terms terms = response.getAggregations().get(TRACE_ID);
for (Terms.Bucket termsBucket : terms.getBuckets()) {
BasicTrace basicTrace = new BasicTrace();
basicTrace.setSegmentId(termsBucket.getKeyAsString());
Min startTime = termsBucket.getAggregations().get(START_TIME);
Max latency = termsBucket.getAggregations().get(LATENCY);
basicTrace.setStart(String.valueOf((long) startTime.getValue()));
basicTrace.getEndpointNames().add("");
basicTrace.setDuration((int) latency.getValue());
basicTrace.setError(false);
basicTrace.getTraceIds().add(termsBucket.getKeyAsString());
traceBrief.getTraces().add(basicTrace);
}
return traceBrief;
}
@Override
public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
return Collections.emptyList();
}
@Override
public List<Span> doFlexibleTraceQuery(String traceId) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.termQuery(TRACE_ID, traceId));
sourceBuilder.sort(START_TIME, SortOrder.ASC);
sourceBuilder.size(1000);
SearchResponse response = getClient().search(JaegerSpanRecord.INDEX_NAME, sourceBuilder);
List<Span> spanList = new ArrayList<>();
for (SearchHit searchHit : response.getHits().getHits()) {
String serviceId = (String) searchHit.getSourceAsMap().get(SERVICE_ID);
long startTime = ((Number) searchHit.getSourceAsMap().get(START_TIME)).longValue();
long endTime = ((Number) searchHit.getSourceAsMap().get(END_TIME)).longValue();
String dataBinaryBase64 = (String) searchHit.getSourceAsMap().get(SegmentRecord.DATA_BINARY);
Model.Span jaegerSpan = Model.Span.newBuilder()
.mergeFrom(Base64.getDecoder().decode(dataBinaryBase64))
.build();
Span swSpan = new Span();
swSpan.setTraceId(format(jaegerSpan.getTraceId()));
swSpan.setEndpointName(jaegerSpan.getOperationName());
swSpan.setStartTime(startTime);
swSpan.setEndTime(endTime);
jaegerSpan.getTagsList().forEach(keyValue -> {
String key = keyValue.getKey();
Model.ValueType valueVType = keyValue.getVType();
switch (valueVType) {
case STRING:
swSpan.getTags().add(new KeyValue(key, keyValue.getVStr()));
break;
case INT64:
swSpan.getTags().add(new KeyValue(key, keyValue.getVInt64() + ""));
break;
case BOOL:
swSpan.getTags().add(new KeyValue(key, keyValue.getVBool() + ""));
break;
case FLOAT64:
swSpan.getTags().add(new KeyValue(key, keyValue.getVFloat64() + ""));
break;
}
swSpan.setType("Local");
if ("span.kind".equals(key)) {
String kind = keyValue.getVStr();
if ("server".equals(kind) || "consumer".equals(kind)) {
swSpan.setType("Entry");
} else if ("client".equals(kind) || "producer".equals(kind)) {
swSpan.setType("Exit");
}
}
});
jaegerSpan.getLogsList().forEach(log -> {
LogEntity entity = new LogEntity();
boolean hasTimestamp = log.hasTimestamp();
if (hasTimestamp) {
long time = Instant.ofEpochSecond(log.getTimestamp().getSeconds(), log.getTimestamp().getNanos())
.toEpochMilli();
entity.setTime(time);
}
log.getFieldsList().forEach(field -> {
String key = field.getKey();
Model.ValueType valueVType = field.getVType();
switch (valueVType) {
case STRING:
entity.getData().add(new KeyValue(key, field.getVStr()));
break;
case INT64:
entity.getData().add(new KeyValue(key, field.getVInt64() + ""));
break;
case BOOL:
entity.getData().add(new KeyValue(key, field.getVBool() + ""));
break;
case FLOAT64:
entity.getData().add(new KeyValue(key, field.getVFloat64() + ""));
break;
}
});
swSpan.getLogs().add(entity);
});
final IDManager.ServiceID.ServiceIDDefinition serviceIDDefinition = IDManager.ServiceID.analysisId(
serviceId);
swSpan.setServiceCode(serviceIDDefinition.getName());
swSpan.setSpanId(0);
swSpan.setParentSpanId(-1);
String spanId = id(format(jaegerSpan.getTraceId()), format(jaegerSpan.getSpanId()));
swSpan.setSegmentSpanId(spanId);
swSpan.setSegmentId(spanId);
List<Model.SpanRef> spanReferencesList = jaegerSpan.getReferencesList();
if (spanReferencesList.size() > 0) {
spanReferencesList.forEach(jaegerRef -> {
Ref ref = new Ref();
ref.setTraceId(format(jaegerRef.getTraceId()));
String parentId = id(format(jaegerRef.getTraceId()), format(jaegerRef.getSpanId()));
ref.setParentSegmentId(parentId);
ref.setType(RefType.CROSS_PROCESS);
ref.setParentSpanId(0);
swSpan.getRefs().add(ref);
swSpan.setSegmentParentSpanId(parentId);
});
} else {
swSpan.setRoot(true);
swSpan.setSegmentParentSpanId("");
}
spanList.add(swSpan);
}
return spanList;
}
private String id(String traceId, String spanId) {
return traceId + "_" + spanId;
}
private String format(ByteString bytes) {
Base64.Encoder encoder = Base64.getEncoder();
return encoder.encodeToString(bytes.toByteArray());
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.storage.plugin.jaeger.elasticsearch.JaegerStorageModuleElasticsearchProvider
\ No newline at end of file
......@@ -25,14 +25,22 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>storage-zipkin-plugin</artifactId>
<artifactId>storage-zipkin-elasticsearch7-plugin</artifactId>
<properties>
<elasticsearch.version>7.5.0</elasticsearch.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>storage-elasticsearch-plugin</artifactId>
<artifactId>storage-elasticsearch7-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>io.zipkin.zipkin2</groupId>
<artifactId>zipkin</artifactId>
......
......@@ -18,6 +18,8 @@
package org.apache.skywalking.oap.server.storage.plugin.zipkin;
import java.util.ArrayList;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
......@@ -75,4 +77,7 @@ public class ZipkinSpan extends Source {
@Setter
@Getter
private int encode;
@Setter
@Getter
private List<String> tags = new ArrayList<>();
}
......@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.storage.plugin.zipkin;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.Getter;
import lombok.Setter;
......@@ -50,6 +51,7 @@ public class ZipkinSpanRecord extends Record {
public static final String IS_ERROR = "is_error";
public static final String DATA_BINARY = "data_binary";
public static final String ENCODE = "encode";
public static final String TAGS = "tags";
@Setter
@Getter
......@@ -99,6 +101,10 @@ public class ZipkinSpanRecord extends Record {
@Getter
@Column(columnName = ENCODE)
private int encode;
@Setter
@Getter
@Column(columnName = TAGS)
private List<String> tags;
@Override
public String id() {
......@@ -127,6 +133,7 @@ public class ZipkinSpanRecord extends Record {
map.put(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary())));
}
map.put(ENCODE, storageData.getEncode());
map.put(TAGS, storageData.getTags());
return map;
}
......@@ -150,6 +157,7 @@ public class ZipkinSpanRecord extends Record {
record.setDataBinary(Base64.getDecoder().decode((String) dbMap.get(DATA_BINARY)));
}
record.setEncode(((Number) dbMap.get(ENCODE)).intValue());
// Don't read the tags as they has been in the data binary already.
return record;
}
}
......
......@@ -42,6 +42,7 @@ public class ZipkinSpanRecordDispatcher implements SourceDispatcher<ZipkinSpan>
segment.setDataBinary(source.getDataBinary());
segment.setTimeBucket(source.getTimeBucket());
segment.setEncode(source.getEncode());
segment.setTags(source.getTags());
RecordStreamProcessor.getInstance().in(segment);
}
......
......@@ -22,20 +22,20 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchProvider;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.StorageModuleElasticsearch7Provider;
@Slf4j
public class ZipkinStorageModuleElasticsearchProvider extends StorageModuleElasticsearchProvider {
public class ZipkinStorageModuleElasticsearchProvider extends StorageModuleElasticsearch7Provider {
@Override
public String name() {
return "zipkin-elasticsearch";
return "zipkin-elasticsearch7";
}
@Override
public void prepare() throws ServiceNotProvidedException {
super.prepare();
final ZipkinTraceQueryEsDAO traceQueryEsDAO = new ZipkinTraceQueryEsDAO(elasticSearchClient);
final ZipkinTraceQueryEs7DAO traceQueryEsDAO = new ZipkinTraceQueryEs7DAO(elasticSearch7Client);
this.registerServiceImplementation(ITraceQueryDAO.class, traceQueryEsDAO);
}
......
......@@ -39,6 +39,7 @@ import org.apache.skywalking.oap.server.core.query.type.TraceState;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord;
import org.elasticsearch.action.search.SearchResponse;
......@@ -47,12 +48,6 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.search.aggregations.metrics.min.Min;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import zipkin2.Span;
......@@ -65,12 +60,13 @@ import static org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanR
import static org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord.SERVICE_ID;
import static org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord.SERVICE_INSTANCE_ID;
import static org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord.START_TIME;
import static org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord.TAGS;
import static org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord.TIME_BUCKET;
import static org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord.TRACE_ID;
public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
public class ZipkinTraceQueryEs7DAO extends EsDAO implements ITraceQueryDAO {
public ZipkinTraceQueryEsDAO(ElasticSearchClient client) {
public ZipkinTraceQueryEs7DAO(ElasticSearchClient client) {
super(client);
}
......@@ -108,22 +104,22 @@ public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
if (maxDuration != 0) {
rangeQueryBuilder.lte(maxDuration);
}
boolQueryBuilder.must().add(rangeQueryBuilder);
mustQueryList.add(rangeQueryBuilder);
}
if (!Strings.isNullOrEmpty(endpointName)) {
mustQueryList.add(QueryBuilders.matchPhraseQuery(ENDPOINT_NAME, endpointName));
}
if (StringUtil.isNotEmpty(serviceId)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_ID, serviceId));
mustQueryList.add(QueryBuilders.termQuery(SERVICE_ID, serviceId));
}
if (StringUtil.isNotEmpty(serviceInstanceId)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_INSTANCE_ID, serviceInstanceId));
mustQueryList.add(QueryBuilders.termQuery(SERVICE_INSTANCE_ID, serviceInstanceId));
}
if (!Strings.isNullOrEmpty(endpointId)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(ENDPOINT_ID, endpointId));
mustQueryList.add(QueryBuilders.termQuery(ENDPOINT_ID, endpointId));
}
if (!Strings.isNullOrEmpty(traceId)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(TRACE_ID, traceId));
mustQueryList.add(QueryBuilders.termQuery(TRACE_ID, traceId));
}
switch (traceState) {
case ERROR:
......@@ -133,41 +129,43 @@ public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
mustQueryList.add(QueryBuilders.matchQuery(IS_ERROR, BooleanUtils.FALSE));
break;
}
if (CollectionUtils.isNotEmpty(tags)) {
BoolQueryBuilder tagMatchQuery = QueryBuilders.boolQuery();
tags.forEach(tag -> {
tagMatchQuery.must(QueryBuilders.termQuery(TAGS, tag.toString()));
});
mustQueryList.add(tagMatchQuery);
}
TermsAggregationBuilder builder = AggregationBuilders.terms(TRACE_ID)
.field(TRACE_ID)
.size(limit)
.subAggregation(AggregationBuilders.max(LATENCY)
.field(LATENCY))
.subAggregation(AggregationBuilders.min(START_TIME)
.field(START_TIME));
switch (queryOrder) {
case BY_START_TIME:
builder.order(BucketOrder.aggregation(START_TIME, false));
sourceBuilder.sort(START_TIME, SortOrder.DESC);
break;
case BY_DURATION:
builder.order(BucketOrder.aggregation(LATENCY, false));
sourceBuilder.sort(LATENCY, SortOrder.DESC);
break;
}
sourceBuilder.aggregation(builder);
sourceBuilder.size(limit);
sourceBuilder.from(from);
SearchResponse response = getClient().search(ZipkinSpanRecord.INDEX_NAME, sourceBuilder);
TraceBrief traceBrief = new TraceBrief();
traceBrief.setTotal((int) response.getHits().getTotalHits().value);
Terms terms = response.getAggregations().get(TRACE_ID);
for (Terms.Bucket termsBucket : terms.getBuckets()) {
for (SearchHit searchHit : response.getHits().getHits()) {
BasicTrace basicTrace = new BasicTrace();
basicTrace.setSegmentId(termsBucket.getKeyAsString());
Min startTime = termsBucket.getAggregations().get(START_TIME);
Max latency = termsBucket.getAggregations().get(LATENCY);
basicTrace.setStart(String.valueOf((long) startTime.getValue()));
basicTrace.getEndpointNames().add("");
basicTrace.setDuration((int) latency.getValue());
basicTrace.setError(false);
basicTrace.getTraceIds().add(termsBucket.getKeyAsString());
final ZipkinSpanRecord zipkinSpanRecord = new ZipkinSpanRecord.Builder().storage2Entity(
searchHit.getSourceAsMap());
basicTrace.setSegmentId(zipkinSpanRecord.getSpanId());
basicTrace.setStart(String.valueOf((long) zipkinSpanRecord.getStartTime()));
// Show trace id as the name
basicTrace.getEndpointNames().add(zipkinSpanRecord.getEndpointName());
basicTrace.setDuration(zipkinSpanRecord.getLatency());
basicTrace.setError(BooleanUtils.valueToBoolean(zipkinSpanRecord.getIsError()));
basicTrace.getTraceIds().add(zipkinSpanRecord.getTraceId());
traceBrief.getTraces().add(basicTrace);
}
......
......@@ -166,6 +166,7 @@ swagger-annotations-1.6.2.jar
t-digest-3.2.jar
vavr-0.10.3.jar
vavr-match-0.10.3.jar
zipkin-2.9.1.jar
zookeeper-3.4.10.jar
kafka-clients-2.4.1.jar
lz4-java-1.6.0.jar
......
......@@ -11,7 +11,6 @@ bcpkix-jdk15on-1.66.jar
bcprov-ext-jdk15on-1.66.jar
bcprov-jdk15on-1.66.jar
builder-annotations-0.22.0.jar
caffeine-2.6.2.jar
checker-qual-2.8.1.jar
client-java-10.0.0.jar
client-java-api-10.0.0.jar
......@@ -163,7 +162,6 @@ swagger-annotations-1.6.2.jar
t-digest-3.2.jar
vavr-0.10.3.jar
vavr-match-0.10.3.jar
zipkin-2.9.1.jar
zookeeper-3.4.10.jar
kafka-clients-2.4.1.jar
lz4-java-1.6.0.jar
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册