未验证 提交 ff6fb90d 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Add jaeger traces support (#2434)

* Make Jaeger receiver works in 12450 as default.

* Finish compile and codebase.

* Make codes runnable.

* Finish Jaeger query tests.

* Add document for jaeger receiver.

* Add readme.

* Fix a break link.

* Finish doc.
上级 a160c39f
......@@ -39,6 +39,7 @@ including
1. Java, .NET Core, NodeJS and PHP auto-instrument agents in SkyWalking format
1. Istio telemetry format
1. Zipkin v1/v2 format
1. Jaeger gRPC format.
1. Envoy metrics format (the metric entries itself is prometheus client [metric family](https://github.com/prometheus/client_model/blob/fd36f4220a901265f90734c3183c5f0c91daa0b8/metrics.proto#L77))
......
......@@ -12,6 +12,7 @@ We have following receivers, and `default` implementors are provided in our Apac
1. **istio-telemetry**. Istio telemetry is from Istio official bypass adaptor, this receiver match its gRPC services.
1. **envoy-metric**. Envoy `metrics_service` supported by this receiver. OAL script support all GAUGE type metrics.
1. **receiver_zipkin**. See [details](#zipkin-receiver).
1. **receiver_jaeger**. See [details](#jaeger-receiver).
The sample settings of these receivers should be already in default `application.yml`, and also list here
```yaml
......@@ -69,7 +70,17 @@ But it wouldn't analysis metric from them. In most case, I suggest you could use
Notice, in this mode, Zipkin receiver requires `zipkin-elasticsearch` storage implementation active.
Read [this](backend-storage.md#elasticsearch-6-with-zipkin-trace-extension) to know
how to active.
1. Analysis mode(Not production ready), receive Zipkin v1/v2 formats through HTTP service. Transform the trace to skywalking
Use following config to active.
```yaml
receiver_zipkin:
default:
host: ${SW_RECEIVER_ZIPKIN_HOST:0.0.0.0}
port: ${SW_RECEIVER_ZIPKIN_PORT:9411}
contextPath: ${SW_RECEIVER_ZIPKIN_CONTEXT_PATH:/}
```
2. Analysis mode(Not production ready), receive Zipkin v1/v2 formats through HTTP service. Transform the trace to skywalking
native format, and analysis like skywalking trace. This feature can't work in production env right now,
because of Zipkin tag/endpoint value unpredictable, we can't make sure it fits production env requirements.
......@@ -81,4 +92,22 @@ receiver_zipkin:
port: ${SW_RECEIVER_ZIPKIN_PORT:9411}
contextPath: ${SW_RECEIVER_ZIPKIN_CONTEXT_PATH:/}
needAnalysis: true
```
\ No newline at end of file
```
## Jaeger receiver
Jaeger receiver right now only works in `Tracing Mode`, and no analysis.
Jaeger receiver provides extra gRPC host/port, if absent, sharing-server host/port will be used, then core gRPC host/port.
Receiver requires `jaeger-elasticsearch` storage implementation active.
Read [this](backend-storage.md#elasticsearch-6-with-jaeger-trace-extension) to know how to active.
Right now, you need [jaeger agent](https://www.jaegertracing.io/docs/1.11/architecture/#agent) to batch
send spans to SkyWalking oap server. Read [Jaeger Architecture](https://www.jaegertracing.io/docs/1.11/architecture/)
to get more details.
Active the receiver.
```yaml
receiver_jaeger:
default:
gRPCHost: ${SW_RECEIVER_JAEGER_HOST:0.0.0.0}
gRPCPort: ${SW_RECEIVER_JAEGER_PORT:14250}
```
\ No newline at end of file
......@@ -68,6 +68,25 @@ storage:
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
```
### ElasticSearch 6 with Jaeger trace extension
This implementation shares most of `elasticsearch`, just extend to support zipkin span storage.
It has all same configs.
```yaml
storage:
jaeger-elasticsearch:
nameSpace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
```
### About Namespace
When namespace is set, names of all indexes in ElasticSearch will use it as prefix.
......
......@@ -52,6 +52,9 @@ SRC_SERVICE_RELATION_SERVER_SIDE: 'service_relation_server_side';
SRC_SERVICE_RELATION_CLIENT_SIDE: 'service_relation_client_side';
SRC_ALARM_RECORD: 'alarm_record';
SRC_ZIPKIN_SPAN: 'zipkin_span';
SRC_JAEGER_SPAN: 'jaeger_span';
// Literals
BOOL_LITERAL: 'true'
......
......@@ -54,7 +54,8 @@ source
SRC_SERVICE_RELATION | SRC_SERVICE_INSTANCE_RELATION | SRC_ENDPOINT_RELATION |
SRC_SERVICE_INSTANCE_JVM_CPU | SRC_SERVICE_INSTANCE_JVM_MEMORY | SRC_SERVICE_INSTANCE_JVM_MEMORY_POOL | SRC_SERVICE_INSTANCE_JVM_GC |// JVM source of service instance
SRC_SERVICE_INSTANCE_CLR_CPU | SRC_SERVICE_INSTANCE_CLR_GC | SRC_SERVICE_INSTANCE_CLR_THREAD |
SRC_ENVOY_INSTANCE_METRIC
SRC_ENVOY_INSTANCE_METRIC |
SRC_ZIPKIN_SPAN | SRC_JAEGER_SPAN
;
disableSource
......
......@@ -101,4 +101,6 @@ envoy_parent_connections_used = from(EnvoyInstanceMetric.value).filter(metricNam
/////////
// disable(segment);
// disable(endpoint_relation_server_side);
// disable(top_n_database_statement);
\ No newline at end of file
// disable(top_n_database_statement);
// disable(zipkin_span);
// disable(jaeger_span);
\ No newline at end of file
......@@ -91,6 +91,7 @@ public class CoreModuleProvider extends ModuleProvider {
AnnotationScan scopeScan = new AnnotationScan();
scopeScan.registerListener(new DefaultScopeDefine.Listener());
scopeScan.registerListener(DisableRegister.INSTANCE);
scopeScan.registerListener(new DisableRegister.SingleDisableScanListener());
try {
scopeScan.scan(null);
} catch (IOException e) {
......
......@@ -51,4 +51,15 @@ public class DisableRegister implements AnnotationListener {
public boolean include(String name) {
return disableEntitySet.contains(name);
}
public static class SingleDisableScanListener implements AnnotationListener {
@Override public Class<? extends Annotation> annotation() {
return Disable.class;
}
@Override public void notify(Class aClass) {
String name = ((Disable)aClass.getAnnotation(Disable.class)).value();
DisableRegister.INSTANCE.disableEntitySet.add(name);
}
}
}
......@@ -60,6 +60,7 @@ public class DefaultScopeDefine {
public static final int SERVICE_INSTANCE_CLR_THREAD = 21;
public static final int ENVOY_INSTANCE_METRIC = 22;
public static final int ZIPKIN_SPAN = 23;
public static final int JAEGER_SPAN = 24;
/**
* Catalog of scope, the indicator processor could use this to group all generated indicators by oal tool.
......
......@@ -40,5 +40,10 @@
<artifactId>skywalking-sharing-server-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>storage-jaeger-plugin</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -18,9 +18,18 @@
package org.apache.skywalking.aop.server.receiver.jaeger;
import com.google.gson.JsonObject;
import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import io.jaegertracing.api_v2.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import java.time.Instant;
import java.util.Base64;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.library.util.*;
import org.apache.skywalking.oap.server.receiver.sharing.server.CoreRegisterLinker;
import org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpan;
import org.slf4j.*;
/**
......@@ -29,18 +38,106 @@ import org.slf4j.*;
public class JaegerGRPCHandler extends CollectorServiceGrpc.CollectorServiceImplBase {
private static final Logger logger = LoggerFactory.getLogger(JaegerGRPCHandler.class);
public JaegerGRPCHandler(ModuleManager manager) {
private SourceReceiver receiver;
private JaegerReceiverConfig config;
public JaegerGRPCHandler(SourceReceiver receiver,
JaegerReceiverConfig config) {
this.receiver = receiver;
this.config = config;
}
public void postSpans(Collector.PostSpansRequest request,
StreamObserver<Collector.PostSpansResponse> responseObserver) {
request.getBatch().getSpansList().forEach(span -> {
logger.debug(span.toString());
try {
if (logger.isDebugEnabled()) {
logger.debug(span.toString());
}
JaegerSpan jaegerSpan = new JaegerSpan();
jaegerSpan.setTraceId(format(span.getTraceId()));
jaegerSpan.setSpanId(format(span.getSpanId()));
Model.Process process = span.getProcess();
int serviceId = Const.NONE;
String serviceName = null;
if (process != null) {
serviceName = process.getServiceName();
}
if (StringUtil.isEmpty(serviceName)) {
serviceName = "UNKNOWN";
}
serviceId = CoreRegisterLinker.getServiceInventoryCache().getServiceId(serviceName);
if (serviceId != Const.NONE) {
jaegerSpan.setServiceId(serviceId);
} else {
JsonObject properties = new JsonObject();
if (process != null) {
process.getTagsList().forEach(keyValue -> {
String key = keyValue.getKey();
Model.ValueType valueVType = keyValue.getVType();
switch (valueVType) {
case STRING:
properties.addProperty(key, keyValue.getVStr());
break;
case INT64:
properties.addProperty(key, keyValue.getVInt64());
break;
case BOOL:
properties.addProperty(key, keyValue.getVBool());
break;
case FLOAT64:
properties.addProperty(key, keyValue.getVFloat64());
break;
}
});
}
CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(serviceName, properties);
}
long duration = span.getDuration().getNanos() / 1_000_000;
jaegerSpan.setStartTime(Instant.ofEpochSecond(span.getStartTime().getSeconds(), span.getStartTime().getNanos()).toEpochMilli());
long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(jaegerSpan.getStartTime());
jaegerSpan.setTimeBucket(timeBucket);
jaegerSpan.setEndTime(jaegerSpan.getStartTime() + duration);
jaegerSpan.setLatency((int)duration);
jaegerSpan.setDataBinary(span.toByteArray());
jaegerSpan.setEndpointName(span.getOperationName());
int finalServiceId = serviceId;
span.getTagsList().forEach(tag -> {
String key = tag.getKey();
if ("error".equals(key)) {
boolean status = tag.getVBool();
jaegerSpan.setIsError(BooleanUtils.booleanToValue(status));
} else if ("span.kind".equals(key)) {
String kind = tag.getVStr();
if ("server".equals(kind) || "consumer".equals(kind)) {
String endpointName = span.getOperationName();
jaegerSpan.setEndpointName(endpointName);
int endpointId = CoreRegisterLinker.getEndpointInventoryCache().getEndpointId(finalServiceId, endpointName,
DetectPoint.SERVER.ordinal());
if (endpointId != Const.NONE) {
CoreRegisterLinker.getEndpointInventoryRegister().getOrCreate(finalServiceId, endpointName, DetectPoint.SERVER);
}
}
}
});
receiver.receive(jaegerSpan);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
});
responseObserver.onNext(Collector.PostSpansResponse.newBuilder().build());
responseObserver.onCompleted();
}
private String format(ByteString bytes) {
Base64.Encoder encoder = Base64.getEncoder();
return encoder.encodeToString(bytes.toByteArray());
}
}
......@@ -24,5 +24,9 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig;
@Setter
@Getter
public class JaegerReceiverConfig extends ModuleConfig {
private String gRPCHost = null;
private int gRPCPort = -1;
private int maxConcurrentCallsPerConnection;
private int maxMessageSize;
private boolean registerJaegerEndpoint = true;
}
......@@ -18,15 +18,23 @@
package org.apache.skywalking.aop.server.receiver.jaeger;
import java.util.Objects;
import org.apache.logging.log4j.util.Strings;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.receiver.sharing.server.*;
/**
* @author wusheng
*/
public class JaegerReceiverProvider extends ModuleProvider {
public static final String NAME = "default";
private JaegerReceiverConfig config;
private GRPCServer grpcServer = null;
@Override public String name() {
return NAME;
......@@ -37,20 +45,45 @@ public class JaegerReceiverProvider extends ModuleProvider {
}
@Override public ModuleConfig createConfigBeanIfAbsent() {
return null;
config = new JaegerReceiverConfig();
return config;
}
@Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
if (config.getGRPCPort() > 0) {
grpcServer = new GRPCServer(Strings.isBlank(config.getGRPCHost()) ? "0.0.0.0" : config.getGRPCHost(), config.getGRPCPort());
if (config.getMaxMessageSize() > 0) {
grpcServer.setMaxMessageSize(config.getMaxMessageSize());
}
if (config.getMaxConcurrentCallsPerConnection() > 0) {
grpcServer.setMaxConcurrentCallsPerConnection(config.getMaxConcurrentCallsPerConnection());
}
grpcServer.initialize();
}
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME).provider().getService(GRPCHandlerRegister.class);
grpcHandlerRegister.addHandler(new JaegerGRPCHandler(getManager()));
CoreRegisterLinker.setModuleManager(getManager());
SourceReceiver sourceReceiver = getManager().find(CoreModule.NAME).provider().getService(SourceReceiver.class);
if (Objects.nonNull(grpcServer)) {
grpcServer.addHandler(new JaegerGRPCHandler(sourceReceiver, config));
} else {
GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME).provider().getService(GRPCHandlerRegister.class);
grpcHandlerRegister.addHandler(new JaegerGRPCHandler(sourceReceiver, config));
}
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
try {
if (Objects.nonNull(grpcServer)) {
grpcServer.start();
}
} catch (ServerException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
@Override public String[] requiredModules() {
......
......@@ -30,14 +30,6 @@
<artifactId>receiver-proto</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-sharing-server-plugin</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
......
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.receiver.sharing.server;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
......@@ -27,6 +28,8 @@ public class CoreRegisterLinker {
private static volatile IServiceInventoryRegister SERVICE_INVENTORY_REGISTER;
private static volatile IServiceInstanceInventoryRegister SERVICE_INSTANCE_INVENTORY_REGISTER;
private static volatile IEndpointInventoryRegister ENDPOINT_INVENTORY_REGISTER;
private static volatile ServiceInventoryCache SERVICE_INVENTORY_CACHE;
private static volatile EndpointInventoryCache ENDPOINT_INVENTORY_CACHE;
public static void setModuleManager(ModuleManager moduleManager) {
CoreRegisterLinker.MODULE_MANAGER = moduleManager;
......@@ -52,4 +55,18 @@ public class CoreRegisterLinker {
}
return ENDPOINT_INVENTORY_REGISTER;
}
public static ServiceInventoryCache getServiceInventoryCache() {
if (SERVICE_INVENTORY_CACHE == null) {
SERVICE_INVENTORY_CACHE = MODULE_MANAGER.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
}
return SERVICE_INVENTORY_CACHE;
}
public static EndpointInventoryCache getEndpointInventoryCache() {
if (ENDPOINT_INVENTORY_CACHE == null) {
ENDPOINT_INVENTORY_CACHE = MODULE_MANAGER.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class);
}
return ENDPOINT_INVENTORY_CACHE;
}
}
......@@ -103,8 +103,10 @@ envoy-metric:
# host: ${SW_RECEIVER_ZIPKIN_HOST:0.0.0.0}
# port: ${SW_RECEIVER_ZIPKIN_PORT:9411}
# contextPath: ${SW_RECEIVER_ZIPKIN_CONTEXT_PATH:/}
receiver_jaeger:
default:
#receiver_jaeger:
# default:
# gRPCHost: ${SW_RECEIVER_JAEGER_HOST:0.0.0.0}
# gRPCPort: ${SW_RECEIVER_JAEGER_PORT:14250}
query:
graphql:
path: ${SW_QUERY_GRAPHQL_PATH:/graphql}
......
......@@ -31,6 +31,7 @@
<module>storage-jdbc-hikaricp-plugin</module>
<module>storage-elasticsearch-plugin</module>
<module>storage-zipkin-plugin</module>
<module>storage-jaeger-plugin</module>
</modules>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>server-storage-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>storage-jaeger-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>storage-elasticsearch-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>receiver-proto</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jaeger;
import lombok.*;
import org.apache.skywalking.oap.server.core.source.*;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.*;
/**
* @author peng-yongsheng
*/
@ScopeDeclaration(id = JAEGER_SPAN, name = "JaegerSpan")
public class JaegerSpan extends Source {
@Override public int scope() {
return DefaultScopeDefine.JAEGER_SPAN;
}
@Override public String getEntityId() {
return traceId + spanId;
}
@Setter @Getter private String traceId;
@Setter @Getter private String spanId;
@Setter @Getter private int serviceId;
@Setter @Getter private int serviceInstanceId;
@Setter @Getter private String endpointName;
@Setter @Getter private int endpointId;
@Setter @Getter private long startTime;
@Setter @Getter private long endTime;
@Setter @Getter private int latency;
@Setter @Getter private int isError;
@Setter @Getter private byte[] dataBinary;
@Setter @Getter private int encode;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jaeger;
import java.util.*;
import lombok.*;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordType;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
@RecordType
@StorageEntity(name = JaegerSpanRecord.INDEX_NAME, builder = JaegerSpanRecord.Builder.class, sourceScopeId = DefaultScopeDefine.JAEGER_SPAN)
public class JaegerSpanRecord extends Record {
public static final String INDEX_NAME = "jaeger_span";
public static final String TRACE_ID = "trace_id";
public static final String SPAN_ID = "span_id";
public static final String SERVICE_ID = "service_id";
public static final String SERVICE_INSTANCE_ID = "service_instance_id";
public static final String ENDPOINT_NAME = "endpoint_name";
public static final String ENDPOINT_ID = "endpoint_id";
public static final String START_TIME = "start_time";
public static final String END_TIME = "end_time";
public static final String LATENCY = "latency";
public static final String IS_ERROR = "is_error";
public static final String DATA_BINARY = "data_binary";
public static final String ENCODE = "encode";
@Setter @Getter @Column(columnName = TRACE_ID) @IDColumn private String traceId;
@Setter @Getter @Column(columnName = SPAN_ID) @IDColumn private String spanId;
@Setter @Getter @Column(columnName = SERVICE_ID) @IDColumn private int serviceId;
@Setter @Getter @Column(columnName = SERVICE_INSTANCE_ID) @IDColumn private int serviceInstanceId;
@Setter @Getter @Column(columnName = ENDPOINT_NAME, matchQuery = true) @IDColumn private String endpointName;
@Setter @Getter @Column(columnName = ENDPOINT_ID) @IDColumn private int endpointId;
@Setter @Getter @Column(columnName = START_TIME) @IDColumn private long startTime;
@Setter @Getter @Column(columnName = END_TIME) @IDColumn private long endTime;
@Setter @Getter @Column(columnName = LATENCY) @IDColumn private int latency;
@Setter @Getter @Column(columnName = IS_ERROR) @IDColumn private int isError;
@Setter @Getter @Column(columnName = DATA_BINARY) @IDColumn private byte[] dataBinary;
@Setter @Getter @Column(columnName = ENCODE) @IDColumn private int encode;
@Override public String id() {
return traceId + "-" + spanId;
}
public static class Builder implements StorageBuilder<JaegerSpanRecord> {
@Override public Map<String, Object> data2Map(JaegerSpanRecord storageData) {
Map<String, Object> map = new HashMap<>();
map.put(TRACE_ID, storageData.getTraceId());
map.put(SPAN_ID, storageData.getSpanId());
map.put(SERVICE_ID, storageData.getServiceId());
map.put(SERVICE_INSTANCE_ID, storageData.getServiceInstanceId());
map.put(ENDPOINT_NAME, storageData.getEndpointName());
map.put(ENDPOINT_ID, storageData.getEndpointId());
map.put(START_TIME, storageData.getStartTime());
map.put(END_TIME, storageData.getEndTime());
map.put(LATENCY, storageData.getLatency());
map.put(IS_ERROR, storageData.getIsError());
map.put(TIME_BUCKET, storageData.getTimeBucket());
if (CollectionUtils.isEmpty(storageData.getDataBinary())) {
map.put(DATA_BINARY, Const.EMPTY_STRING);
} else {
map.put(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary())));
}
map.put(ENCODE, storageData.getEncode());
return map;
}
@Override public JaegerSpanRecord map2Data(Map<String, Object> dbMap) {
JaegerSpanRecord record = new JaegerSpanRecord();
record.setTraceId((String)dbMap.get(TRACE_ID));
record.setSpanId((String)dbMap.get(SPAN_ID));
record.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue());
record.setServiceInstanceId(((Number)dbMap.get(SERVICE_INSTANCE_ID)).intValue());
record.setEndpointName((String)dbMap.get(ENDPOINT_NAME));
record.setEndpointId(((Number)dbMap.get(ENDPOINT_ID)).intValue());
record.setStartTime(((Number)dbMap.get(START_TIME)).longValue());
record.setEndTime(((Number)dbMap.get(END_TIME)).longValue());
record.setLatency(((Number)dbMap.get(LATENCY)).intValue());
record.setIsError(((Number)dbMap.get(IS_ERROR)).intValue());
record.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
if (StringUtil.isEmpty((String)dbMap.get(DATA_BINARY))) {
record.setDataBinary(new byte[] {});
} else {
record.setDataBinary(Base64.getDecoder().decode((String)dbMap.get(DATA_BINARY)));
}
record.setEncode(((Number)dbMap.get(ENCODE)).intValue());
return record;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jaeger;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess;
/**
* Dispatch for Zipkin native mode spans.
*
* @author wusheng
*/
public class JaegerSpanRecordDispatcher implements SourceDispatcher<JaegerSpan> {
@Override public void dispatch(JaegerSpan source) {
JaegerSpanRecord segment = new JaegerSpanRecord();
segment.setTraceId(source.getTraceId());
segment.setSpanId(source.getSpanId());
segment.setServiceId(source.getServiceId());
segment.setServiceInstanceId(source.getServiceInstanceId());
segment.setEndpointName(source.getEndpointName());
segment.setEndpointId(source.getEndpointId());
segment.setStartTime(source.getStartTime());
segment.setEndTime(source.getEndTime());
segment.setLatency(source.getLatency());
segment.setIsError(source.getIsError());
segment.setDataBinary(source.getDataBinary());
segment.setTimeBucket(source.getTimeBucket());
segment.setEncode(source.getEncode());
RecordProcess.INSTANCE.in(segment);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jaeger.elasticsearch;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchProvider;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class JaegerStorageModuleElasticsearchProvider extends StorageModuleElasticsearchProvider {
private static final Logger logger = LoggerFactory.getLogger(JaegerStorageModuleElasticsearchProvider.class);
private JaegerTraceQueryEsDAO traceQueryEsDAO;
@Override
public String name() {
return "jaeger-elasticsearch";
}
@Override
public void prepare() throws ServiceNotProvidedException {
super.prepare();
traceQueryEsDAO = new JaegerTraceQueryEsDAO(elasticSearchClient);
this.registerServiceImplementation(ITraceQueryDAO.class, traceQueryEsDAO);
}
@Override public void notifyAfterCompleted() {
super.notifyAfterCompleted();
traceQueryEsDAO.setServiceInventoryCache(getManager().find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class));
}
@Override
public String[] requiredModules() {
return new String[] {CoreModule.NAME};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jaeger.elasticsearch;
import com.google.common.base.Strings;
import com.google.protobuf.ByteString;
import io.jaegertracing.api_v2.Model;
import java.io.IOException;
import java.time.Instant;
import java.util.*;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpanRecord;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.*;
import org.elasticsearch.search.aggregations.bucket.terms.*;
import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.search.aggregations.metrics.min.Min;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import static org.apache.skywalking.oap.server.core.analysis.record.Record.TIME_BUCKET;
import static org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpanRecord.*;
public class JaegerTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
@Setter
private ServiceInventoryCache serviceInventoryCache;
public JaegerTraceQueryEsDAO(
ElasticSearchClient client) {
super(client);
}
@Override
public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration,
String endpointName, int serviceId, int serviceInstanceId, int endpointId, String traceId, int limit, int from,
TraceState traceState, QueryOrder queryOrder) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
sourceBuilder.query(boolQueryBuilder);
List<QueryBuilder> mustQueryList = boolQueryBuilder.must();
if (startSecondTB != 0 && endSecondTB != 0) {
mustQueryList.add(QueryBuilders.rangeQuery(TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
}
if (minDuration != 0 || maxDuration != 0) {
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(LATENCY);
if (minDuration != 0) {
rangeQueryBuilder.gte(minDuration);
}
if (maxDuration != 0) {
rangeQueryBuilder.lte(maxDuration);
}
boolQueryBuilder.must().add(rangeQueryBuilder);
}
if (!Strings.isNullOrEmpty(endpointName)) {
mustQueryList.add(QueryBuilders.matchPhraseQuery(ENDPOINT_NAME, endpointName));
}
if (serviceId != 0) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_ID, serviceId));
}
if (serviceInstanceId != 0) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_INSTANCE_ID, serviceInstanceId));
}
if (endpointId != 0) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(ENDPOINT_ID, endpointId));
}
if (!Strings.isNullOrEmpty(traceId)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(TRACE_ID, traceId));
}
switch (traceState) {
case ERROR:
mustQueryList.add(QueryBuilders.matchQuery(IS_ERROR, BooleanUtils.TRUE));
break;
case SUCCESS:
mustQueryList.add(QueryBuilders.matchQuery(IS_ERROR, BooleanUtils.FALSE));
break;
}
TermsAggregationBuilder builder = AggregationBuilders.terms(TRACE_ID).field(TRACE_ID).size(limit)
.subAggregation(
AggregationBuilders.max(LATENCY).field(LATENCY)
)
.subAggregation(
AggregationBuilders.min(START_TIME).field(START_TIME)
);
switch (queryOrder) {
case BY_START_TIME:
builder.order(BucketOrder.aggregation(START_TIME, false));
break;
case BY_DURATION:
builder.order(BucketOrder.aggregation(LATENCY, false));
break;
}
sourceBuilder.aggregation(builder);
SearchResponse response = getClient().search(JaegerSpanRecord.INDEX_NAME, sourceBuilder);
TraceBrief traceBrief = new TraceBrief();
Terms terms = response.getAggregations().get(TRACE_ID);
for (Terms.Bucket termsBucket : terms.getBuckets()) {
BasicTrace basicTrace = new BasicTrace();
basicTrace.setSegmentId(termsBucket.getKeyAsString());
Min startTime = termsBucket.getAggregations().get(START_TIME);
Max latency = termsBucket.getAggregations().get(LATENCY);
basicTrace.setStart(String.valueOf((long)startTime.getValue()));
basicTrace.getEndpointNames().add("");
basicTrace.setDuration((int)latency.getValue());
basicTrace.setError(false);
basicTrace.getTraceIds().add(termsBucket.getKeyAsString());
traceBrief.getTraces().add(basicTrace);
}
return traceBrief;
}
@Override public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
return Collections.emptyList();
}
@Override public List<Span> doFlexibleTraceQuery(
String traceId) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.termQuery(TRACE_ID, traceId));
sourceBuilder.sort(START_TIME, SortOrder.ASC);
sourceBuilder.size(1000);
SearchResponse response = getClient().search(JaegerSpanRecord.INDEX_NAME, sourceBuilder);
List<Span> spanList = new ArrayList<>();
for (SearchHit searchHit : response.getHits().getHits()) {
int serviceId = ((Number)searchHit.getSourceAsMap().get(SERVICE_ID)).intValue();
long startTime = ((Number)searchHit.getSourceAsMap().get(START_TIME)).longValue();
long endTime = ((Number)searchHit.getSourceAsMap().get(END_TIME)).longValue();
String dataBinaryBase64 = (String)searchHit.getSourceAsMap().get(SegmentRecord.DATA_BINARY);
Model.Span jaegerSpan = Model.Span.newBuilder().mergeFrom(Base64.getDecoder().decode(dataBinaryBase64)).build();
Span swSpan = new Span();
swSpan.setTraceId(format(jaegerSpan.getTraceId()));
swSpan.setEndpointName(jaegerSpan.getOperationName());
swSpan.setStartTime(startTime);
swSpan.setEndTime(endTime);
jaegerSpan.getTagsList().forEach(keyValue -> {
String key = keyValue.getKey();
Model.ValueType valueVType = keyValue.getVType();
switch (valueVType) {
case STRING:
swSpan.getTags().add(new KeyValue(key, keyValue.getVStr()));
break;
case INT64:
swSpan.getTags().add(new KeyValue(key, keyValue.getVInt64() + ""));
break;
case BOOL:
swSpan.getTags().add(new KeyValue(key, keyValue.getVBool() + ""));
break;
case FLOAT64:
swSpan.getTags().add(new KeyValue(key, keyValue.getVFloat64() + ""));
break;
}
swSpan.setType("Local");
if ("span.kind".equals(key)) {
String kind = keyValue.getVStr();
if ("server".equals(kind) || "consumer".equals(kind)) {
swSpan.setType("Entry");
} else if ("client".equals(kind) || "producer".equals(kind)) {
swSpan.setType("Exit");
}
}
});
jaegerSpan.getLogsList().forEach(log -> {
LogEntity entity = new LogEntity();
boolean hasTimestamp = log.hasTimestamp();
if (hasTimestamp) {
long time = Instant.ofEpochSecond(log.getTimestamp().getSeconds(), log.getTimestamp().getNanos()).toEpochMilli();
entity.setTime(time);
}
log.getFieldsList().forEach(field -> {
String key = field.getKey();
Model.ValueType valueVType = field.getVType();
switch (valueVType) {
case STRING:
entity.getData().add(new KeyValue(key, field.getVStr()));
break;
case INT64:
entity.getData().add(new KeyValue(key, field.getVInt64() + ""));
break;
case BOOL:
entity.getData().add(new KeyValue(key, field.getVBool() + ""));
break;
case FLOAT64:
entity.getData().add(new KeyValue(key, field.getVFloat64() + ""));
break;
}
});
swSpan.getLogs().add(entity);
});
if (serviceId != Const.NONE) {
swSpan.setServiceCode(serviceInventoryCache.get(serviceId).getName());
} else {
swSpan.setServiceCode("UNKNOWN");
}
swSpan.setSpanId(0);
swSpan.setParentSpanId(-1);
String spanId = id(format(jaegerSpan.getTraceId()), format(jaegerSpan.getSpanId()));
swSpan.setSegmentSpanId(spanId);
swSpan.setSegmentId(spanId);
List<Model.SpanRef> spanReferencesList = jaegerSpan.getReferencesList();
if (spanReferencesList.size() > 0) {
spanReferencesList.forEach(jaegerRef -> {
Ref ref = new Ref();
ref.setTraceId(format(jaegerRef.getTraceId()));
String parentId = id(format(jaegerRef.getTraceId()), format(jaegerRef.getSpanId()));
ref.setParentSegmentId(parentId);
ref.setType(RefType.CROSS_PROCESS);
ref.setParentSpanId(0);
swSpan.getRefs().add(ref);
swSpan.setSegmentParentSpanId(parentId);
});
} else {
swSpan.setRoot(true);
swSpan.setSegmentParentSpanId("");
}
spanList.add(swSpan);
}
return spanList;
}
private String id(String traceId, String spanId) {
return traceId + "_" + spanId;
}
private String format(ByteString bytes) {
Base64.Encoder encoder = Base64.getEncoder();
return encoder.encodeToString(bytes.toByteArray());
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.storage.plugin.jaeger.elasticsearch.JaegerStorageModuleElasticsearchProvider
\ No newline at end of file
......@@ -22,6 +22,7 @@ import com.google.common.base.Strings;
import java.io.IOException;
import java.util.*;
import lombok.Setter;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
......@@ -158,7 +159,6 @@ public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
List<org.apache.skywalking.oap.server.core.query.entity.Span> spanList = new ArrayList<>();
boolean isFirst = true;
for (SearchHit searchHit : response.getHits().getHits()) {
int serviceId = ((Number)searchHit.getSourceAsMap().get(SERVICE_ID)).intValue();
String dataBinaryBase64 = (String)searchHit.getSourceAsMap().get(SegmentRecord.DATA_BINARY);
......@@ -201,10 +201,9 @@ public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
}
if (isFirst) {
if (StringUtil.isEmpty(span.parentId())) {
swSpan.setRoot(true);
swSpan.setSegmentParentSpanId("");
isFirst = false;
} else {
Ref ref = new Ref();
ref.setTraceId(span.traceId());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册