提交 99fdf02b 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Feature/trace query (#1685)

* Implementation of trace query.

* Implementation of trace query.
上级 13fa3030
Subproject commit e259efbc20a69e23a5c8542394579f006396e6a5
Subproject commit e7fc69462955c86d70f3f7f33712dfe33ecefbc6
......@@ -60,6 +60,11 @@
<artifactId>apm-datacarrier</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-network</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
......
......@@ -21,7 +21,7 @@ package org.apache.skywalking.oap.server.core;
import java.util.*;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
import org.apache.skywalking.oap.server.core.query.*;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
......@@ -58,6 +58,8 @@ public class CoreModule extends ModuleDefine {
private void addQueryService(List<Class> classes) {
classes.add(TopologyQueryService.class);
classes.add(MetricQueryService.class);
classes.add(TraceQueryService.class);
}
private void addServerInterface(List<Class> classes) {
......
......@@ -25,7 +25,7 @@ import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.config.*;
import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
import org.apache.skywalking.oap.server.core.query.*;
import org.apache.skywalking.oap.server.core.register.annotation.InventoryTypeListener;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.core.remote.*;
......@@ -111,6 +111,8 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(INetworkAddressInventoryRegister.class, new NetworkAddressInventoryRegister(getManager()));
this.registerServiceImplementation(TopologyQueryService.class, new TopologyQueryService(getManager()));
this.registerServiceImplementation(MetricQueryService.class, new MetricQueryService(getManager()));
this.registerServiceImplementation(TraceQueryService.class, new TraceQueryService(getManager()));
annotationScan.registerListener(storageAnnotationListener);
annotationScan.registerListener(streamAnnotationListener);
......
......@@ -153,7 +153,7 @@ public class AllHeatmapIndicator extends ThermodynamicIndicator implements Alarm
AllHeatmapIndicator indicator = new AllHeatmapIndicator();
indicator.setStep(((Number)dbMap.get("step")).intValue());
indicator.setNumOfSteps(((Number)dbMap.get("num_of_steps")).intValue());
indicator.setDetailGroup((org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray)dbMap.get("detail_group"));
indicator.setDetailGroup(new org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray((String)dbMap.get("detail_group")));
indicator.setTimeBucket(((Number)dbMap.get("time_bucket")).longValue());
return indicator;
}
......
......@@ -153,7 +153,7 @@ public class AllP50Indicator extends P50Indicator implements AlarmSupported {
AllP50Indicator indicator = new AllP50Indicator();
indicator.setValue(((Number)dbMap.get("value")).intValue());
indicator.setPrecision(((Number)dbMap.get("precision")).intValue());
indicator.setDetailGroup((org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray)dbMap.get("detail_group"));
indicator.setDetailGroup(new org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray((String)dbMap.get("detail_group")));
indicator.setTimeBucket(((Number)dbMap.get("time_bucket")).longValue());
return indicator;
}
......
......@@ -153,7 +153,7 @@ public class AllP75Indicator extends P75Indicator implements AlarmSupported {
AllP75Indicator indicator = new AllP75Indicator();
indicator.setValue(((Number)dbMap.get("value")).intValue());
indicator.setPrecision(((Number)dbMap.get("precision")).intValue());
indicator.setDetailGroup((org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray)dbMap.get("detail_group"));
indicator.setDetailGroup(new org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray((String)dbMap.get("detail_group")));
indicator.setTimeBucket(((Number)dbMap.get("time_bucket")).longValue());
return indicator;
}
......
......@@ -153,7 +153,7 @@ public class AllP90Indicator extends P90Indicator implements AlarmSupported {
AllP90Indicator indicator = new AllP90Indicator();
indicator.setValue(((Number)dbMap.get("value")).intValue());
indicator.setPrecision(((Number)dbMap.get("precision")).intValue());
indicator.setDetailGroup((org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray)dbMap.get("detail_group"));
indicator.setDetailGroup(new org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray((String)dbMap.get("detail_group")));
indicator.setTimeBucket(((Number)dbMap.get("time_bucket")).longValue());
return indicator;
}
......
......@@ -153,7 +153,7 @@ public class AllP95Indicator extends P95Indicator implements AlarmSupported {
AllP95Indicator indicator = new AllP95Indicator();
indicator.setValue(((Number)dbMap.get("value")).intValue());
indicator.setPrecision(((Number)dbMap.get("precision")).intValue());
indicator.setDetailGroup((org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray)dbMap.get("detail_group"));
indicator.setDetailGroup(new org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray((String)dbMap.get("detail_group")));
indicator.setTimeBucket(((Number)dbMap.get("time_bucket")).longValue());
return indicator;
}
......
......@@ -153,7 +153,7 @@ public class AllP99Indicator extends P99Indicator implements AlarmSupported {
AllP99Indicator indicator = new AllP99Indicator();
indicator.setValue(((Number)dbMap.get("value")).intValue());
indicator.setPrecision(((Number)dbMap.get("precision")).intValue());
indicator.setDetailGroup((org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray)dbMap.get("detail_group"));
indicator.setDetailGroup(new org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray((String)dbMap.get("detail_group")));
indicator.setTimeBucket(((Number)dbMap.get("time_bucket")).longValue());
return indicator;
}
......
......@@ -184,7 +184,7 @@ public class EndpointP50Indicator extends P50Indicator implements AlarmSupported
indicator.setServiceInstanceId(((Number)dbMap.get("service_instance_id")).intValue());
indicator.setValue(((Number)dbMap.get("value")).intValue());
indicator.setPrecision(((Number)dbMap.get("precision")).intValue());
indicator.setDetailGroup((org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray)dbMap.get("detail_group"));
indicator.setDetailGroup(new org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray((String)dbMap.get("detail_group")));
indicator.setTimeBucket(((Number)dbMap.get("time_bucket")).longValue());
return indicator;
}
......
......@@ -184,7 +184,7 @@ public class EndpointP75Indicator extends P75Indicator implements AlarmSupported
indicator.setServiceInstanceId(((Number)dbMap.get("service_instance_id")).intValue());
indicator.setValue(((Number)dbMap.get("value")).intValue());
indicator.setPrecision(((Number)dbMap.get("precision")).intValue());
indicator.setDetailGroup((org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray)dbMap.get("detail_group"));
indicator.setDetailGroup(new org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray((String)dbMap.get("detail_group")));
indicator.setTimeBucket(((Number)dbMap.get("time_bucket")).longValue());
return indicator;
}
......
......@@ -184,7 +184,7 @@ public class EndpointP90Indicator extends P90Indicator implements AlarmSupported
indicator.setServiceInstanceId(((Number)dbMap.get("service_instance_id")).intValue());
indicator.setValue(((Number)dbMap.get("value")).intValue());
indicator.setPrecision(((Number)dbMap.get("precision")).intValue());
indicator.setDetailGroup((org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray)dbMap.get("detail_group"));
indicator.setDetailGroup(new org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray((String)dbMap.get("detail_group")));
indicator.setTimeBucket(((Number)dbMap.get("time_bucket")).longValue());
return indicator;
}
......
......@@ -184,7 +184,7 @@ public class EndpointP95Indicator extends P95Indicator implements AlarmSupported
indicator.setServiceInstanceId(((Number)dbMap.get("service_instance_id")).intValue());
indicator.setValue(((Number)dbMap.get("value")).intValue());
indicator.setPrecision(((Number)dbMap.get("precision")).intValue());
indicator.setDetailGroup((org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray)dbMap.get("detail_group"));
indicator.setDetailGroup(new org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray((String)dbMap.get("detail_group")));
indicator.setTimeBucket(((Number)dbMap.get("time_bucket")).longValue());
return indicator;
}
......
......@@ -184,7 +184,7 @@ public class EndpointP99Indicator extends P99Indicator implements AlarmSupported
indicator.setServiceInstanceId(((Number)dbMap.get("service_instance_id")).intValue());
indicator.setValue(((Number)dbMap.get("value")).intValue());
indicator.setPrecision(((Number)dbMap.get("precision")).intValue());
indicator.setDetailGroup((org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray)dbMap.get("detail_group"));
indicator.setDetailGroup(new org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray((String)dbMap.get("detail_group")));
indicator.setTimeBucket(((Number)dbMap.get("time_bucket")).longValue());
return indicator;
}
......
......@@ -168,7 +168,7 @@ public class ServiceP50Indicator extends P50Indicator implements AlarmSupported
indicator.setEntityId((String)dbMap.get("entity_id"));
indicator.setValue(((Number)dbMap.get("value")).intValue());
indicator.setPrecision(((Number)dbMap.get("precision")).intValue());
indicator.setDetailGroup((org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray)dbMap.get("detail_group"));
indicator.setDetailGroup(new org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray((String)dbMap.get("detail_group")));
indicator.setTimeBucket(((Number)dbMap.get("time_bucket")).longValue());
return indicator;
}
......
......@@ -168,7 +168,7 @@ public class ServiceP75Indicator extends P75Indicator implements AlarmSupported
indicator.setEntityId((String)dbMap.get("entity_id"));
indicator.setValue(((Number)dbMap.get("value")).intValue());
indicator.setPrecision(((Number)dbMap.get("precision")).intValue());
indicator.setDetailGroup((org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray)dbMap.get("detail_group"));
indicator.setDetailGroup(new org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray((String)dbMap.get("detail_group")));
indicator.setTimeBucket(((Number)dbMap.get("time_bucket")).longValue());
return indicator;
}
......
......@@ -168,7 +168,7 @@ public class ServiceP90Indicator extends P90Indicator implements AlarmSupported
indicator.setEntityId((String)dbMap.get("entity_id"));
indicator.setValue(((Number)dbMap.get("value")).intValue());
indicator.setPrecision(((Number)dbMap.get("precision")).intValue());
indicator.setDetailGroup((org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray)dbMap.get("detail_group"));
indicator.setDetailGroup(new org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray((String)dbMap.get("detail_group")));
indicator.setTimeBucket(((Number)dbMap.get("time_bucket")).longValue());
return indicator;
}
......
......@@ -168,7 +168,7 @@ public class ServiceP95Indicator extends P95Indicator implements AlarmSupported
indicator.setEntityId((String)dbMap.get("entity_id"));
indicator.setValue(((Number)dbMap.get("value")).intValue());
indicator.setPrecision(((Number)dbMap.get("precision")).intValue());
indicator.setDetailGroup((org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray)dbMap.get("detail_group"));
indicator.setDetailGroup(new org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray((String)dbMap.get("detail_group")));
indicator.setTimeBucket(((Number)dbMap.get("time_bucket")).longValue());
return indicator;
}
......
......@@ -168,7 +168,7 @@ public class ServiceP99Indicator extends P99Indicator implements AlarmSupported
indicator.setEntityId((String)dbMap.get("entity_id"));
indicator.setValue(((Number)dbMap.get("value")).intValue());
indicator.setPrecision(((Number)dbMap.get("precision")).intValue());
indicator.setDetailGroup((org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray)dbMap.get("detail_group"));
indicator.setDetailGroup(new org.apache.skywalking.oap.server.core.analysis.indicator.IntKeyLongValueArray((String)dbMap.get("detail_group")));
indicator.setTimeBucket(((Number)dbMap.get("time_bucket")).longValue());
return indicator;
}
......
......@@ -31,8 +31,9 @@ public class IntKeyLongValueArray extends ArrayList<IntKeyLongValue> implements
super(initialCapacity);
}
public IntKeyLongValueArray() {
public IntKeyLongValueArray(String data) {
super();
toObject(data);
}
@Override public String toStorageData() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.query;
import org.apache.skywalking.oap.server.core.query.entity.Pagination;
/**
* @author peng-yongsheng
*/
public enum PaginationUtils {
INSTANCE;
public Page exchange(Pagination paging) {
int limit = paging.getPageSize();
int from = paging.getPageSize() * ((paging.getPageNum() == 0 ? 1 : paging.getPageNum()) - 1);
return new Page(from, limit);
}
public class Page {
private int from;
private int limit;
Page(int from, int limit) {
this.from = from;
this.limit = limit;
}
public int getFrom() {
return from;
}
public int getLimit() {
return limit;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.query;
import java.io.IOException;
import java.util.*;
import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.query.entity.RefType;
import org.apache.skywalking.oap.server.core.register.EndpointInventory;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.module.Service;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import static java.util.Objects.nonNull;
/**
* @author peng-yongsheng
*/
public class TraceQueryService implements Service {
private final ModuleManager moduleManager;
private ITraceQueryDAO traceQueryDAO;
private ServiceInventoryCache serviceInventoryCache;
private EndpointInventoryCache endpointInventoryCache;
private NetworkAddressInventoryCache networkAddressInventoryCache;
private IComponentLibraryCatalogService componentLibraryCatalogService;
public TraceQueryService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private ITraceQueryDAO getTraceQueryDAO() {
if (traceQueryDAO == null) {
this.traceQueryDAO = moduleManager.find(StorageModule.NAME).getService(ITraceQueryDAO.class);
}
return traceQueryDAO;
}
private ServiceInventoryCache getServiceInventoryCache() {
if (serviceInventoryCache == null) {
this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).getService(ServiceInventoryCache.class);
}
return serviceInventoryCache;
}
private EndpointInventoryCache getEndpointInventoryCache() {
if (endpointInventoryCache == null) {
this.endpointInventoryCache = moduleManager.find(CoreModule.NAME).getService(EndpointInventoryCache.class);
}
return endpointInventoryCache;
}
private NetworkAddressInventoryCache getNetworkAddressInventoryCache() {
if (networkAddressInventoryCache == null) {
this.networkAddressInventoryCache = moduleManager.find(CoreModule.NAME).getService(NetworkAddressInventoryCache.class);
}
return networkAddressInventoryCache;
}
private IComponentLibraryCatalogService getComponentLibraryCatalogService() {
if (componentLibraryCatalogService == null) {
this.componentLibraryCatalogService = moduleManager.find(CoreModule.NAME).getService(IComponentLibraryCatalogService.class);
}
return componentLibraryCatalogService;
}
public TraceBrief queryBasicTraces(final int serviceId, final String traceId, final String endpointName,
final int minTraceDuration, int maxTraceDuration, final TraceState traceState, final QueryOrder queryOrder,
final Pagination paging, final long startTB, final long endTB) throws IOException {
PaginationUtils.Page page = PaginationUtils.INSTANCE.exchange(paging);
return getTraceQueryDAO().queryBasicTraces(startTB, endTB, minTraceDuration, maxTraceDuration, endpointName, serviceId, traceId, page.getLimit(), page.getFrom(), traceState, queryOrder);
}
public Trace queryTrace(final String traceId) throws IOException {
Trace trace = new Trace();
List<SegmentRecord> segmentRecords = getTraceQueryDAO().queryByTraceId(traceId);
for (SegmentRecord segment : segmentRecords) {
if (nonNull(segment)) {
TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(segment.getDataBinary());
trace.getSpans().addAll(buildSpanList(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList()));
}
}
List<Span> sortedSpans = new LinkedList<>();
if (CollectionUtils.isNotEmpty(trace.getSpans())) {
List<Span> rootSpans = findRoot(trace.getSpans());
if (CollectionUtils.isNotEmpty(rootSpans)) {
rootSpans.forEach(span -> {
List<Span> childrenSpan = new ArrayList<>();
childrenSpan.add(span);
findChildren(trace.getSpans(), span, childrenSpan);
sortedSpans.addAll(childrenSpan);
});
}
}
trace.getSpans().addAll(sortedSpans);
return trace;
}
private List<Span> buildSpanList(String traceId, String segmentId, int serviceId,
List<SpanObject> spanObjects) {
List<Span> spans = new ArrayList<>();
spanObjects.forEach(spanObject -> {
Span span = new Span();
span.setTraceId(traceId);
span.setSegmentId(segmentId);
span.setSpanId(spanObject.getSpanId());
span.setParentSpanId(spanObject.getParentSpanId());
span.setStartTime(spanObject.getStartTime());
span.setEndTime(spanObject.getEndTime());
span.setError(spanObject.getIsError());
span.setLayer(spanObject.getSpanLayer().name());
span.setType(spanObject.getSpanType().name());
String segmentSpanId = segmentId + Const.SEGMENT_SPAN_SPLIT + String.valueOf(spanObject.getSpanId());
span.setSegmentSpanId(segmentSpanId);
String segmentParentSpanId = segmentId + Const.SEGMENT_SPAN_SPLIT + String.valueOf(spanObject.getParentSpanId());
span.setSegmentParentSpanId(segmentParentSpanId);
if (spanObject.getPeerId() == 0) {
span.setPeer(spanObject.getPeer());
} else {
span.setPeer(getNetworkAddressInventoryCache().get(spanObject.getPeerId()).getName());
}
String endpointName = spanObject.getOperationName();
if (spanObject.getOperationNameId() != 0) {
EndpointInventory endpointInventory = getEndpointInventoryCache().get(spanObject.getOperationNameId());
if (nonNull(endpointInventory)) {
endpointName = endpointInventory.getName();
} else {
endpointName = Const.EMPTY_STRING;
}
}
span.setEndpointName(endpointName);
String serviceCode = getServiceInventoryCache().get(serviceId).getName();
span.setServiceCode(serviceCode);
if (spanObject.getComponentId() == 0) {
span.setComponent(spanObject.getComponent());
} else {
span.setComponent(getComponentLibraryCatalogService().getComponentName(spanObject.getComponentId()));
}
spanObject.getRefsList().forEach(reference -> {
Ref ref = new Ref();
ref.setTraceId(traceId);
switch (reference.getRefType()) {
case CrossThread:
ref.setType(RefType.CROSS_THREAD);
break;
case CrossProcess:
ref.setType(RefType.CROSS_PROCESS);
break;
}
ref.setParentSpanId(reference.getParentSpanId());
UniqueId uniqueId = reference.getParentTraceSegmentId();
StringBuilder segmentIdBuilder = new StringBuilder();
for (int i = 0; i < uniqueId.getIdPartsList().size(); i++) {
if (i == 0) {
segmentIdBuilder.append(String.valueOf(uniqueId.getIdPartsList().get(i)));
} else {
segmentIdBuilder.append(".").append(String.valueOf(uniqueId.getIdPartsList().get(i)));
}
}
ref.setParentSegmentId(segmentIdBuilder.toString());
span.setSegmentParentSpanId(ref.getParentSegmentId() + Const.SEGMENT_SPAN_SPLIT + String.valueOf(ref.getParentSpanId()));
span.getRefs().add(ref);
});
spanObject.getTagsList().forEach(tag -> {
KeyValue keyValue = new KeyValue();
keyValue.setKey(tag.getKey());
keyValue.setValue(tag.getValue());
span.getTags().add(keyValue);
});
spanObject.getLogsList().forEach(log -> {
LogEntity logEntity = new LogEntity();
logEntity.setTime(log.getTime());
log.getDataList().forEach(data -> {
KeyValue keyValue = new KeyValue();
keyValue.setKey(data.getKey());
keyValue.setValue(data.getValue());
logEntity.getData().add(keyValue);
});
span.getLogs().add(logEntity);
});
spans.add(span);
});
return spans;
}
private List<Span> findRoot(List<Span> spans) {
List<Span> rootSpans = new ArrayList<>();
spans.forEach(span -> {
String segmentParentSpanId = span.getSegmentParentSpanId();
boolean hasParent = false;
for (Span subSpan : spans) {
if (segmentParentSpanId.equals(subSpan.getSegmentSpanId())) {
hasParent = true;
}
}
if (!hasParent) {
span.setRoot(true);
rootSpans.add(span);
}
});
return rootSpans;
}
private void findChildren(List<Span> spans, Span parentSpan, List<Span> childrenSpan) {
spans.forEach(span -> {
if (span.getSegmentParentSpanId().equals(parentSpan.getSegmentSpanId())) {
childrenSpan.add(span);
findChildren(spans, span, childrenSpan);
}
});
}
}
......@@ -16,15 +16,18 @@
*
*/
package org.apache.skywalking.oap.query.graphql.type;
package org.apache.skywalking.oap.server.core.query.entity;
import java.util.List;
import java.util.*;
import lombok.*;
@Getter
@Setter
public class BasicTrace {
private String segmentId;
private List<String> operationNames;
private List<String> endpointNames = new ArrayList<>();
private int duration;
private String start;
private boolean isError;
private List<String> traceIds;
private List<String> traceIds = new ArrayList<>();
}
......@@ -16,8 +16,12 @@
*
*/
package org.apache.skywalking.oap.query.graphql.type;
package org.apache.skywalking.oap.server.core.query.entity;
import lombok.*;
@Getter
@Setter
public class KeyValue {
private String key;
private String value;
......
......@@ -16,11 +16,14 @@
*
*/
package org.apache.skywalking.oap.query.graphql.type;
package org.apache.skywalking.oap.server.core.query.entity;
import java.util.List;
import java.util.*;
import lombok.*;
@Getter
@Setter
public class LogEntity {
private long time;
private List<KeyValue> data;
private List<KeyValue> data = new ArrayList<>();
}
......@@ -16,8 +16,12 @@
*
*/
package org.apache.skywalking.oap.query.graphql.type;
package org.apache.skywalking.oap.server.core.query.entity;
import lombok.*;
@Getter
@Setter
public class Pagination {
private int pageNum;
private int pageSize;
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.query.graphql.type;
package org.apache.skywalking.oap.server.core.query.entity;
public enum QueryOrder {
BY_START_TIME,
......
......@@ -16,8 +16,12 @@
*
*/
package org.apache.skywalking.oap.query.graphql.type;
package org.apache.skywalking.oap.server.core.query.entity;
import lombok.*;
@Setter
@Getter
public class Ref {
private String traceId;
private String parentSegmentId;
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.query.graphql.type;
package org.apache.skywalking.oap.server.core.query.entity;
public enum RefType {
CROSS_PROCESS,
......
......@@ -16,26 +16,31 @@
*
*/
package org.apache.skywalking.oap.query.graphql.type;
package org.apache.skywalking.oap.server.core.query.entity;
import java.util.List;
import java.util.*;
import lombok.*;
@Getter
@Setter
public class Span {
private String traceId;
private String segmentId;
private int spanId;
private int parentSpanId;
private List<Ref> refs;
private String applicationCode;
private List<Ref> refs = new ArrayList<>();
private String serviceCode;
private long startTime;
private long endTime;
private String operationName;
private String endpointName;
private String type;
private String peer;
private String component;
private boolean isError;
private String layer;
private List<KeyValue> tags;
private List<LogEntity> logs;
private String serviceCode;
private List<KeyValue> tags = new ArrayList<>();
private List<LogEntity> logs = new ArrayList<>();
private boolean isRoot;
private String segmentSpanId;
private String segmentParentSpanId;
}
......@@ -16,10 +16,12 @@
*
*/
package org.apache.skywalking.oap.query.graphql.type;
package org.apache.skywalking.oap.server.core.query.entity;
import java.util.List;
import java.util.*;
import lombok.Getter;
@Getter
public class Trace {
private List<Span> spans;
private List<Span> spans = new ArrayList<>();
}
......@@ -16,11 +16,14 @@
*
*/
package org.apache.skywalking.oap.query.graphql.type;
package org.apache.skywalking.oap.server.core.query.entity;
import java.util.List;
import java.util.*;
import lombok.*;
@Setter
@Getter
public class TraceBrief {
private List<BasicTrace> traces;
private List<BasicTrace> traces = new ArrayList<>();
private int total;
}
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.query.graphql.type;
package org.apache.skywalking.oap.server.core.query.entity;
public enum TraceState {
ALL,
......
......@@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.core.storage;
import org.apache.skywalking.oap.server.core.storage.cache.*;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
......@@ -38,6 +38,6 @@ public class StorageModule extends ModuleDefine {
IBatchDAO.class, StorageDAO.class, IRegisterLockDAO.class,
IServiceInventoryCacheDAO.class, IServiceInstanceInventoryCacheDAO.class,
IEndpointInventoryCacheDAO.class, INetworkAddressInventoryCacheDAO.class,
ITopologyQueryDAO.class};
ITopologyQueryDAO.class, IMetricQueryDAO.class, ITraceQueryDAO.class};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.query;
import java.io.IOException;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author peng-yongsheng
*/
public interface ITraceQueryDAO extends Service {
TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration,
long maxDuration, String endpointName, int serviceId, String traceId, int limit, int from,
TraceState traceState, QueryOrder queryOrder) throws IOException;
List<SegmentRecord> queryByTraceId(String traceId) throws IOException;
}
......@@ -61,7 +61,7 @@ public class GraphQLQueryProvider extends ModuleProvider {
.file("query-protocol/topology.graphqls")
.resolvers(new TopologyQuery(getManager()))
.file("query-protocol/trace.graphqls")
.resolvers(new TraceQuery())
.resolvers(new TraceQuery(getManager()))
.file("query-protocol/aggregation.graphqls")
.resolvers(new AggregationQuery())
.file("query-protocol/alarm.graphqls")
......
......@@ -19,18 +19,15 @@
package org.apache.skywalking.oap.query.graphql.resolver;
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
import org.apache.skywalking.oap.query.graphql.type.AlarmTrend;
import org.apache.skywalking.oap.query.graphql.type.Alarms;
import org.apache.skywalking.oap.query.graphql.type.Duration;
import org.apache.skywalking.oap.query.graphql.type.Pagination;
import org.apache.skywalking.oap.query.graphql.type.Scope;
import org.apache.skywalking.oap.query.graphql.type.*;
import org.apache.skywalking.oap.server.core.query.entity.Pagination;
public class AlarmQuery implements GraphQLQueryResolver {
public AlarmTrend getAlarmTrend(final Duration duration) {
return new AlarmTrend();
}
public Alarms getAlarm(final Duration duration, final Scope scope,final String keyword, final Pagination paging) {
public Alarms getAlarm(final Duration duration, final Scope scope, final String keyword, final Pagination paging) {
return new Alarms();
}
}
......@@ -19,16 +19,58 @@
package org.apache.skywalking.oap.query.graphql.resolver;
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
import org.apache.skywalking.oap.query.graphql.type.Trace;
import org.apache.skywalking.oap.query.graphql.type.TraceBrief;
import java.io.IOException;
import org.apache.skywalking.oap.query.graphql.type.TraceQueryCondition;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.query.*;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.StringUtils;
import static java.util.Objects.nonNull;
public class TraceQuery implements GraphQLQueryResolver {
public TraceBrief queryBasicTraces(final TraceQueryCondition condition) {
return new TraceBrief();
private final ModuleManager moduleManager;
private TraceQueryService queryService;
public TraceQuery(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private TraceQueryService getQueryService() {
if (queryService == null) {
this.queryService = moduleManager.find(CoreModule.NAME).getService(TraceQueryService.class);
}
return queryService;
}
public TraceBrief queryBasicTraces(final TraceQueryCondition condition) throws IOException {
long startSecondTB = 0;
long endSecondTB = 0;
String traceId = Const.EMPTY_STRING;
if (StringUtils.isNotEmpty(condition.getTraceId())) {
traceId = condition.getTraceId();
} else if (nonNull(condition.getQueryDuration())) {
startSecondTB = DurationUtils.INSTANCE.startTimeDurationToSecondTimeBucket(condition.getQueryDuration().getStep(), condition.getQueryDuration().getStart());
endSecondTB = DurationUtils.INSTANCE.endTimeDurationToSecondTimeBucket(condition.getQueryDuration().getStep(), condition.getQueryDuration().getEnd());
} else {
throw new UnexpectedException("The condition must contains either queryDuration or traceId.");
}
int minDuration = condition.getMinTraceDuration();
int maxDuration = condition.getMaxTraceDuration();
String endpointName = condition.getEndpointName();
int serviceId = condition.getServiceId();
TraceState traceState = condition.getTraceState();
QueryOrder queryOrder = condition.getQueryOrder();
Pagination pagination = condition.getPaging();
return getQueryService().queryBasicTraces(serviceId, traceId, endpointName, minDuration, maxDuration, traceState, queryOrder, pagination, startSecondTB, endSecondTB);
}
public Trace queryTrace(final String traceId) {
return new Trace();
public Trace queryTrace(final String traceId) throws IOException {
return getQueryService().queryTrace(traceId);
}
}
......@@ -18,18 +18,19 @@
package org.apache.skywalking.oap.query.graphql.type;
import lombok.Getter;
import lombok.*;
import org.apache.skywalking.oap.server.core.query.entity.*;
@Getter
@Setter
public class TraceQueryCondition {
private int applicationId;
private int serviceId;
private String traceId;
private String operationName;
private String endpointName;
private Duration queryDuration;
private int minTraceDuration;
private int maxTraceDuration;
private TraceState traceState;
private QueryOrder queryOrder;
private Pagination paging;
}
Subproject commit b93baa58852595164aae50e3e7be37683c220175
Subproject commit 9eafa5a6c5b3b9a61002391485799ed89be96b9c
......@@ -20,14 +20,14 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.cache.*;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.*;
import org.apache.skywalking.oap.server.library.client.NameSpace;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopologyQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.*;
import org.slf4j.*;
/**
......@@ -76,6 +76,8 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEsDAO(elasticSearchClient));
this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IMetricQueryDAO.class, new MetricQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient));
}
@Override
......
......@@ -112,7 +112,7 @@ public class MetricQueryEsDAO extends EsDAO implements IMetricQueryDAO {
int numOfSteps = ((Number)itemResponse.getResponse().getSource().get(ThermodynamicIndicator.NUM_OF_STEPS)).intValue();
String value = (String)itemResponse.getResponse().getSource().get(ThermodynamicIndicator.DETAIL_GROUP);
IntKeyLongValueArray intKeyLongValues = new IntKeyLongValueArray();
IntKeyLongValueArray intKeyLongValues = new IntKeyLongValueArray(5);
intKeyLongValues.toObject(value);
List<Long> axisYValues = new ArrayList<>();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import java.io.IOException;
import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
/**
* @author peng-yongsheng
*/
public class TraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
public TraceQueryEsDAO(ElasticSearchClient client) {
super(client);
}
@Override
public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration,
long maxDuration, String endpointName, int serviceId, String traceId, int limit, int from,
TraceState traceState, QueryOrder queryOrder) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
sourceBuilder.query(boolQueryBuilder);
List<QueryBuilder> mustQueryList = boolQueryBuilder.must();
if (startSecondTB != 0 && endSecondTB != 0) {
mustQueryList.add(QueryBuilders.rangeQuery(SegmentRecord.TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
}
if (minDuration != 0 || maxDuration != 0) {
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(SegmentRecord.LATENCY);
if (minDuration != 0) {
rangeQueryBuilder.gte(minDuration);
}
if (maxDuration != 0) {
rangeQueryBuilder.lte(maxDuration);
}
boolQueryBuilder.must().add(rangeQueryBuilder);
}
if (StringUtils.isNotEmpty(endpointName)) {
mustQueryList.add(QueryBuilders.matchPhraseQuery(SegmentRecord.ENDPOINT_NAME, endpointName));
}
if (serviceId != 0) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(SegmentRecord.SERVICE_ID, serviceId));
}
if (StringUtils.isNotEmpty(traceId)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(SegmentRecord.TRACE_ID, traceId));
}
switch (traceState) {
case ERROR:
mustQueryList.add(QueryBuilders.matchQuery(SegmentRecord.IS_ERROR, BooleanUtils.TRUE));
break;
case SUCCESS:
mustQueryList.add(QueryBuilders.matchQuery(SegmentRecord.IS_ERROR, BooleanUtils.FALSE));
break;
}
switch (queryOrder) {
case BY_START_TIME:
sourceBuilder.sort(SegmentRecord.START_TIME, SortOrder.DESC);
break;
case BY_DURATION:
sourceBuilder.sort(SegmentRecord.START_TIME, SortOrder.DESC);
break;
}
sourceBuilder.size(limit);
sourceBuilder.from(from);
SearchResponse response = getClient().search(SegmentRecord.INDEX_NAME, sourceBuilder);
TraceBrief traceBrief = new TraceBrief();
traceBrief.setTotal((int)response.getHits().totalHits);
for (SearchHit searchHit : response.getHits().getHits()) {
BasicTrace basicTrace = new BasicTrace();
basicTrace.setSegmentId((String)searchHit.getSourceAsMap().get(SegmentRecord.SEGMENT_ID));
basicTrace.setStart(String.valueOf(searchHit.getSourceAsMap().get(SegmentRecord.START_TIME)));
basicTrace.getEndpointNames().add((String)searchHit.getSourceAsMap().get(SegmentRecord.ENDPOINT_NAME));
basicTrace.setDuration(((Number)searchHit.getSourceAsMap().get(SegmentRecord.LATENCY)).intValue());
basicTrace.setError(BooleanUtils.valueToBoolean(((Number)searchHit.getSourceAsMap().get(SegmentRecord.IS_ERROR)).intValue()));
traceBrief.getTraces().add(basicTrace);
}
return traceBrief;
}
@Override public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.termQuery(SegmentRecord.TRACE_ID, traceId));
sourceBuilder.size(20);
SearchResponse response = getClient().search(SegmentRecord.INDEX_NAME, sourceBuilder);
List<SegmentRecord> segmentRecords = new ArrayList<>();
for (SearchHit searchHit : response.getHits().getHits()) {
SegmentRecord segmentRecord = new SegmentRecord();
segmentRecord.setSegmentId((String)searchHit.getSourceAsMap().get(SegmentRecord.SEGMENT_ID));
segmentRecord.setTraceId((String)searchHit.getSourceAsMap().get(SegmentRecord.TRACE_ID));
segmentRecord.setServiceId(((Number)searchHit.getSourceAsMap().get(SegmentRecord.SERVICE_ID)).intValue());
segmentRecord.setEndpointName((String)searchHit.getSourceAsMap().get(SegmentRecord.ENDPOINT_NAME));
segmentRecord.setStartTime(((Number)searchHit.getSourceAsMap().get(SegmentRecord.START_TIME)).longValue());
segmentRecord.setEndTime(((Number)searchHit.getSourceAsMap().get(SegmentRecord.END_TIME)).longValue());
segmentRecord.setLatency(((Number)searchHit.getSourceAsMap().get(SegmentRecord.LATENCY)).intValue());
segmentRecord.setIsError(((Number)searchHit.getSourceAsMap().get(SegmentRecord.IS_ERROR)).intValue());
String dataBinaryBase64 = (String)searchHit.getSourceAsMap().get(SegmentRecord.DATA_BINARY);
if (StringUtils.isNotEmpty(dataBinaryBase64)) {
segmentRecord.setDataBinary(Base64.getDecoder().decode(dataBinaryBase64));
}
segmentRecords.add(segmentRecord);
}
return segmentRecords;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册