未验证 提交 df362b18 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Query protocol and codebase for log type record (#2449)

* New query graphql protocol added.

* Finish a new metric, log(abstract) with HTTP Access Log.

* Submit the draft codes of logs.

* Remove @IDColumn

* Fix startup issue.

* Add http_access_log to disable list.

* Finish a wrong service implementation.

* Add trace id in log query condition and result.

* Fix style
上级 c3cc3d2e
......@@ -51,6 +51,7 @@ SRC_ENDPOINT_RELATION_SERVER_SIDE: 'endpoint_relation_server_side';
SRC_SERVICE_RELATION_SERVER_SIDE: 'service_relation_server_side';
SRC_SERVICE_RELATION_CLIENT_SIDE: 'service_relation_client_side';
SRC_ALARM_RECORD: 'alarm_record';
SRC_HTTP_ACCESS_LOG: 'http_access_log';
SRC_ZIPKIN_SPAN: 'zipkin_span';
SRC_JAEGER_SPAN: 'jaeger_span';
......
......@@ -60,7 +60,7 @@ source
disableSource
: SRC_SEGMENT | SRC_TOP_N_DB_STATEMENT | SRC_ENDPOINT_RELATION_SERVER_SIDE | SRC_SERVICE_RELATION_SERVER_SIDE |
SRC_SERVICE_RELATION_CLIENT_SIDE | SRC_ALARM_RECORD
SRC_SERVICE_RELATION_CLIENT_SIDE | SRC_ALARM_RECORD | SRC_HTTP_ACCESS_LOG
;
sourceAttribute
......
......@@ -66,6 +66,7 @@ public class CoreModule extends ModuleDefine {
classes.add(TopologyQueryService.class);
classes.add(MetricQueryService.class);
classes.add(TraceQueryService.class);
classes.add(LogQueryService.class);
classes.add(MetadataQueryService.class);
classes.add(AggregationQueryService.class);
classes.add(AlarmQueryService.class);
......
......@@ -145,6 +145,7 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(TopologyQueryService.class, new TopologyQueryService(getManager()));
this.registerServiceImplementation(MetricQueryService.class, new MetricQueryService(getManager()));
this.registerServiceImplementation(TraceQueryService.class, new TraceQueryService(getManager()));
this.registerServiceImplementation(LogQueryService.class, new LogQueryService(getManager()));
this.registerServiceImplementation(MetadataQueryService.class, new MetadataQueryService(getManager()));
this.registerServiceImplementation(AggregationQueryService.class, new AggregationQueryService(getManager()));
this.registerServiceImplementation(AlarmQueryService.class, new AlarmQueryService(getManager()));
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual.log;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.query.entity.ContentType;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
* @author wusheng
*/
public abstract class AbstractLogRecord extends Record {
public static final String SERVICE_ID = "service_id";
public static final String SERVICE_INSTANCE_ID = "service_instance_id";
public static final String ENDPOINT_ID = "endpoint_id";
public static final String TRACE_ID = "trace_id";
public static final String IS_ERROR = "is_error";
public static final String STATUS_CODE = "status_code";
public static final String CONTENT_TYPE = "content_type";
public static final String CONTENT = "content";
public static final String TIMESTAMP = "timestamp";
@Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
@Setter @Getter @Column(columnName = SERVICE_INSTANCE_ID) private int serviceInstanceId;
@Setter @Getter @Column(columnName = ENDPOINT_ID) private int endpointId;
@Setter @Getter @Column(columnName = TRACE_ID) private String traceId;
@Setter @Getter @Column(columnName = IS_ERROR) private int isError;
@Setter @Getter @Column(columnName = STATUS_CODE) private String statusCode;
@Setter @Getter @Column(columnName = CONTENT_TYPE) private int contentType = ContentType.NONE.value();
@Setter @Getter @Column(columnName = CONTENT) private String content;
@Setter @Getter @Column(columnName = TIMESTAMP) private long timestamp;
@Override public String id() {
throw new UnexpectedException("AbstractLogRecord doesn't provide id()");
}
public static abstract class Builder<T extends AbstractLogRecord> implements StorageBuilder<T> {
protected void map2Data(T record, Map<String, Object> dbMap) {
record.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue());
record.setServiceInstanceId(((Number)dbMap.get(SERVICE_INSTANCE_ID)).intValue());
record.setEndpointId(((Number)dbMap.get(ENDPOINT_ID)).intValue());
record.setIsError(((Number)dbMap.get(IS_ERROR)).intValue());
record.setTraceId((String)dbMap.get(TRACE_ID));
record.setStatusCode((String)dbMap.get(STATUS_CODE));
record.setContentType(((Number)dbMap.get(CONTENT_TYPE)).intValue());
record.setContent((String)dbMap.get(CONTENT));
record.setTimestamp(((Number)dbMap.get(TIMESTAMP)).longValue());
record.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
}
@Override public Map<String, Object> data2Map(AbstractLogRecord record) {
Map<String, Object> map = new HashMap<>();
map.put(SERVICE_ID, record.getServiceId());
map.put(SERVICE_INSTANCE_ID, record.getServiceInstanceId());
map.put(ENDPOINT_ID, record.getEndpointId());
map.put(TRACE_ID, record.getTraceId());
map.put(IS_ERROR, record.getIsError());
map.put(STATUS_CODE, record.getStatusCode());
map.put(TIME_BUCKET, record.getTimeBucket());
map.put(CONTENT_TYPE, record.getContentType());
map.put(CONTENT, record.getContent());
map.put(TIMESTAMP, record.getTimestamp());
return map;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual.log;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess;
import org.apache.skywalking.oap.server.core.source.HTTPAccessLog;
/**
* @author wusheng
*/
public class HTTPAccessLogDispatcher implements SourceDispatcher<HTTPAccessLog> {
@Override public void dispatch(HTTPAccessLog source) {
HTTPAccessLogRecord record = new HTTPAccessLogRecord();
record.setTimestamp(source.getTimestamp());
record.setTimeBucket(source.getTimeBucket());
record.setServiceId(source.getServiceId());
record.setServiceInstanceId(source.getServiceInstanceId());
record.setEndpointId(source.getEndpointId());
record.setTraceId(source.getTraceId());
record.setIsError(source.getIsError());
record.setStatusCode(source.getStatusCode());
record.setContentType(source.getContentType().value());
record.setContent(source.getContent());
RecordProcess.INSTANCE.in(record);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual.log;
import java.util.Map;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntity;
import static org.apache.skywalking.oap.server.core.analysis.manual.log.HTTPAccessLogRecord.INDEX_NAME;
@StorageEntity(name = INDEX_NAME, builder = HTTPAccessLogRecord.Builder.class, sourceScopeId = DefaultScopeDefine.HTTP_ACCESS_LOG)
public class HTTPAccessLogRecord extends AbstractLogRecord {
public static final String INDEX_NAME = "http_access_log";
public static class Builder extends AbstractLogRecord.Builder<HTTPAccessLogRecord> {
@Override public HTTPAccessLogRecord map2Data(Map<String, Object> dbMap) {
HTTPAccessLogRecord record = new HTTPAccessLogRecord();
super.map2Data(record, dbMap);
return record;
}
}
}
......@@ -50,18 +50,18 @@ public class SegmentRecord extends Record {
public static final String DATA_BINARY = "data_binary";
public static final String VERSION = "version";
@Setter @Getter @Column(columnName = SEGMENT_ID) @IDColumn private String segmentId;
@Setter @Getter @Column(columnName = TRACE_ID) @IDColumn private String traceId;
@Setter @Getter @Column(columnName = SERVICE_ID) @IDColumn private int serviceId;
@Setter @Getter @Column(columnName = SERVICE_INSTANCE_ID) @IDColumn private int serviceInstanceId;
@Setter @Getter @Column(columnName = ENDPOINT_NAME, matchQuery = true) @IDColumn private String endpointName;
@Setter @Getter @Column(columnName = ENDPOINT_ID) @IDColumn private int endpointId;
@Setter @Getter @Column(columnName = START_TIME) @IDColumn private long startTime;
@Setter @Getter @Column(columnName = END_TIME) @IDColumn private long endTime;
@Setter @Getter @Column(columnName = LATENCY) @IDColumn private int latency;
@Setter @Getter @Column(columnName = IS_ERROR) @IDColumn private int isError;
@Setter @Getter @Column(columnName = DATA_BINARY) @IDColumn private byte[] dataBinary;
@Setter @Getter @Column(columnName = VERSION) @IDColumn private int version;
@Setter @Getter @Column(columnName = SEGMENT_ID) private String segmentId;
@Setter @Getter @Column(columnName = TRACE_ID) private String traceId;
@Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
@Setter @Getter @Column(columnName = SERVICE_INSTANCE_ID) private int serviceInstanceId;
@Setter @Getter @Column(columnName = ENDPOINT_NAME, matchQuery = true) private String endpointName;
@Setter @Getter @Column(columnName = ENDPOINT_ID) private int endpointId;
@Setter @Getter @Column(columnName = START_TIME) private long startTime;
@Setter @Getter @Column(columnName = END_TIME) private long endTime;
@Setter @Getter @Column(columnName = LATENCY) private int latency;
@Setter @Getter @Column(columnName = IS_ERROR) private int isError;
@Setter @Getter @Column(columnName = DATA_BINARY) private byte[] dataBinary;
@Setter @Getter @Column(columnName = VERSION) private int version;
@Override public String id() {
return segmentId;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.query;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.library.module.Service;
import org.apache.skywalking.oap.server.library.module.*;
/**
* @author wusheng
*/
public class LogQueryService implements Service {
private final ModuleManager moduleManager;
private ILogQueryDAO logQueryDAO;
private ServiceInventoryCache serviceInventoryCache;
private ServiceInstanceInventoryCache serviceInstanceInventoryCache;
private EndpointInventoryCache endpointInventoryCache;
public LogQueryService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private ILogQueryDAO getLogQueryDAO() {
if (logQueryDAO == null) {
this.logQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(ILogQueryDAO.class);
}
return logQueryDAO;
}
private ServiceInventoryCache getServiceInventoryCache() {
if (serviceInventoryCache == null) {
this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
}
return serviceInventoryCache;
}
private ServiceInstanceInventoryCache getServiceInstanceInventoryCache() {
if (serviceInstanceInventoryCache == null) {
this.serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
}
return serviceInstanceInventoryCache;
}
private EndpointInventoryCache getEndpointInventoryCache() {
if (endpointInventoryCache == null) {
this.endpointInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class);
}
return endpointInventoryCache;
}
public Logs queryLogs(final String metricName, int serviceId, int serviceInstanceId, int endpointId,
String traceId, LogState state, String stateCode, Pagination paging, final long startTB,
final long endTB) throws IOException {
PaginationUtils.Page page = PaginationUtils.INSTANCE.exchange(paging);
Logs logs = getLogQueryDAO().queryLogs(metricName, serviceId, serviceInstanceId, endpointId,
traceId, state, stateCode, paging, page.getFrom(), page.getLimit(), startTB, endTB);
logs.getLogs().forEach(log -> {
if (log.getServiceId() != Const.NONE) {
log.setServiceName(getServiceInventoryCache().get(log.getServiceId()).getName());
}
if (log.getServiceInstanceId() != Const.NONE) {
log.setServiceInstanceName(getServiceInstanceInventoryCache().get(log.getServiceInstanceId()).getName());
}
if (log.getEndpointId() != Const.NONE) {
log.setEndpointName(getEndpointInventoryCache().get(log.getEndpointId()).getName());
}
});
return logs;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.query.entity;
import org.apache.skywalking.oap.server.core.UnexpectedException;
/**
* @author wusheng
*/
public enum ContentType {
TEXT(1), JSON(2), NONE(0);
private int value;
ContentType(int value) {
this.value = value;
}
public int value() {
return value;
}
public static ContentType instanceOf(int value) {
switch (value) {
case 1:
return TEXT;
case 2:
return JSON;
case 0:
return NONE;
default:
throw new UnexpectedException("unexpected value=" + value);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.query.entity;
import lombok.*;
/**
* @author wusheng
*/
@Setter
@Getter
public class Log {
private String serviceName;
private int serviceId;
private String serviceInstanceName;
private int serviceInstanceId;
private String endpointName;
private int endpointId;
private String traceId;
private String timestamp;
private boolean isError;
private String statusCode;
private ContentType contentType = ContentType.NONE;
private String content;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.query.entity;
/**
* @author wusheng
*/
public enum LogState {
ALL, SUCCESS, ERROR
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.query.entity;
import java.util.*;
import lombok.*;
/**
* @author wusheng
*/
@Setter
@Getter
public class Logs {
private final List<Log> logs;
private int total;
public Logs() {
this.logs = new ArrayList<>();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.source;
import lombok.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.query.entity.ContentType;
@Setter
@Getter
public abstract class AbstractLog extends Source {
private long timeBucket;
private long timestamp;
private int serviceId;
private int serviceInstanceId;
private int endpointId;
private String traceId;
private int isError;
private String statusCode;
private ContentType contentType = ContentType.NONE;
private String content;
@Override public String getEntityId() {
throw new UnexpectedException("getEntityId is not supported in AbstractLog source");
}
}
......@@ -61,6 +61,7 @@ public class DefaultScopeDefine {
public static final int ENVOY_INSTANCE_METRIC = 22;
public static final int ZIPKIN_SPAN = 23;
public static final int JAEGER_SPAN = 24;
public static final int HTTP_ACCESS_LOG = 25;
/**
* Catalog of scope, the indicator processor could use this to group all generated indicators by oal tool.
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.source;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.HTTP_ACCESS_LOG;
/**
* @author wusheng
*/
@ScopeDeclaration(id = HTTP_ACCESS_LOG, name = "HTTPAccessLog")
public class HTTPAccessLog extends AbstractLog {
@Override public int scope() {
return HTTP_ACCESS_LOG;
}
}
......@@ -40,6 +40,6 @@ public class StorageModule extends ModuleDefine {
IServiceInventoryCacheDAO.class, IServiceInstanceInventoryCacheDAO.class,
IEndpointInventoryCacheDAO.class, INetworkAddressInventoryCacheDAO.class,
ITopologyQueryDAO.class, IMetricQueryDAO.class, ITraceQueryDAO.class, IMetadataQueryDAO.class, IAggregationQueryDAO.class, IAlarmQueryDAO.class,
ITopNRecordsQueryDAO.class};
ITopNRecordsQueryDAO.class, ILogQueryDAO.class};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.query;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author wusheng
*/
public interface ILogQueryDAO extends Service {
Logs queryLogs(final String metricName, int serviceId, int serviceInstanceId, int endpointId,
String traceId, LogState state, String stateCode, Pagination paging, int from, int limit, final long startTB, final long endTB) throws IOException;
}
......@@ -68,6 +68,8 @@ public class GraphQLQueryProvider extends ModuleProvider {
.resolvers(new AlarmQuery(getManager()))
.file("query-protocol/top-n-records.graphqls")
.resolvers(new TopNRecordsQuery(getManager()))
.file("query-protocol/log.graphqls")
.resolvers(new LogQuery(getManager()))
.build()
.makeExecutableSchema();
this.graphQL = GraphQL.newGraphQL(schema).build();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.query.graphql.resolver;
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
import java.io.IOException;
import org.apache.skywalking.oap.query.graphql.type.LogQueryCondition;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.query.*;
import org.apache.skywalking.oap.server.core.query.entity.Logs;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import static java.util.Objects.nonNull;
/**
* @author wusheng
*/
public class LogQuery implements GraphQLQueryResolver {
private final ModuleManager moduleManager;
private LogQueryService logQueryService;
public LogQuery(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private LogQueryService getQueryService() {
if (logQueryService == null) {
this.logQueryService = moduleManager.find(CoreModule.NAME).provider().getService(LogQueryService.class);
}
return logQueryService;
}
public Logs queryLogs(LogQueryCondition condition) throws IOException {
long startSecondTB = 0;
long endSecondTB = 0;
if (nonNull(condition.getQueryDuration())) {
startSecondTB = DurationUtils.INSTANCE.startTimeDurationToSecondTimeBucket(condition.getQueryDuration().getStep(), condition.getQueryDuration().getStart());
endSecondTB = DurationUtils.INSTANCE.endTimeDurationToSecondTimeBucket(condition.getQueryDuration().getStep(), condition.getQueryDuration().getEnd());
}
return getQueryService().queryLogs(condition.getMetricName(), condition.getServiceId(), condition.getServiceInstanceId(), condition.getEndpointId(),
condition.getTraceId(), condition.getState(), condition.getStateCode(), condition.getPaging(), startSecondTB, endSecondTB);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.query.graphql.type;
import lombok.*;
import org.apache.skywalking.oap.server.core.query.entity.*;
/**
* @author wusheng
*/
@Getter
@Setter
public class LogQueryCondition {
private String metricName;
private int serviceId;
private int serviceInstanceId;
private int endpointId;
private String traceId;
private LogState state;
private String stateCode;
private Duration queryDuration;
private Pagination paging;
}
Subproject commit 6f11e3b829bba4d3532477e968291cf657f0ac0b
Subproject commit a005dd1024c2b7e68fbcab5795824634cb81cde8
......@@ -20,48 +20,16 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.cache.*;
import org.apache.skywalking.oap.server.core.storage.query.*;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.BatchProcessEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.HistoryDeleteEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.StorageEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.StorageEsInstaller;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.EndpointInventoryCacheEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.NetworkAddressInventoryCacheEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.ServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.ServiceInventoryCacheEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.RegisterLockDAOImpl;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.RegisterLockInstaller;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.AggregationQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.AlarmQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.MetadataQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.MetricQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopNRecordsQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopologyQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TraceQueryEsDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.*;
import org.slf4j.*;
/**
* @author peng-yongsheng
......@@ -117,6 +85,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEsDAO(elasticSearchClient));
}
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.List;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import static org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord.TRACE_ID;
/**
* @author wusheng
*/
public class LogQueryEsDAO extends EsDAO implements ILogQueryDAO {
public LogQueryEsDAO(ElasticSearchClient client) {
super(client);
}
@Override
public Logs queryLogs(String metricName, int serviceId, int serviceInstanceId, int endpointId,
String traceId, LogState state, String stateCode, Pagination paging, int from, int limit, long startSecondTB,
long endSecondTB) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
sourceBuilder.query(boolQueryBuilder);
List<QueryBuilder> mustQueryList = boolQueryBuilder.must();
if (startSecondTB != 0 && endSecondTB != 0) {
mustQueryList.add(QueryBuilders.rangeQuery(Record.TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
}
if (serviceId != Const.NONE) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(AbstractLogRecord.SERVICE_ID, serviceId));
}
if (serviceInstanceId != Const.NONE) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(AbstractLogRecord.SERVICE_INSTANCE_ID, serviceInstanceId));
}
if (endpointId != Const.NONE) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(AbstractLogRecord.ENDPOINT_ID, endpointId));
}
if (!Strings.isNullOrEmpty(stateCode)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(AbstractLogRecord.STATUS_CODE, stateCode));
}
if (!Strings.isNullOrEmpty(traceId)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(TRACE_ID, traceId));
}
if (LogState.ERROR.equals(state)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(AbstractLogRecord.IS_ERROR, BooleanUtils.booleanToValue(true)));
} else if (LogState.ERROR.equals(state)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(AbstractLogRecord.IS_ERROR, BooleanUtils.booleanToValue(false)));
}
sourceBuilder.size(limit);
sourceBuilder.from(from);
SearchResponse response = getClient().search(metricName, sourceBuilder);
Logs logs = new Logs();
logs.setTotal((int)response.getHits().totalHits);
for (SearchHit searchHit : response.getHits().getHits()) {
Log log = new Log();
log.setServiceId(((Number)searchHit.getSourceAsMap().get(AbstractLogRecord.SERVICE_ID)).intValue());
log.setServiceInstanceId(((Number)searchHit.getSourceAsMap().get(AbstractLogRecord.SERVICE_INSTANCE_ID)).intValue());
log.setEndpointId(((Number)searchHit.getSourceAsMap().get(AbstractLogRecord.ENDPOINT_ID)).intValue());
log.setError(BooleanUtils.valueToBoolean(((Number)searchHit.getSourceAsMap().get(AbstractLogRecord.IS_ERROR)).intValue()));
log.setStatusCode((String)searchHit.getSourceAsMap().get(AbstractLogRecord.STATUS_CODE));
log.setContentType(ContentType.instanceOf(((Number)searchHit.getSourceAsMap().get(AbstractLogRecord.CONTENT_TYPE)).intValue()));
log.setContent((String)searchHit.getSourceAsMap().get(AbstractLogRecord.CONTENT));
logs.getLogs().add(log);
}
return logs;
}
}
......@@ -87,6 +87,7 @@ public class H2StorageProvider extends ModuleProvider {
this.registerServiceImplementation(IAlarmQueryDAO.class, new H2AlarmQueryDAO(h2Client));
this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(h2Client));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(h2Client));
this.registerServiceImplementation(ILogQueryDAO.class, new H2LogQueryDAO(h2Client));
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import com.google.common.base.Strings;
import java.io.IOException;
import java.sql.*;
import java.util.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import static org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord.*;
/**
* @author wusheng
*/
public class H2LogQueryDAO implements ILogQueryDAO {
private JDBCHikariCPClient h2Client;
public H2LogQueryDAO(JDBCHikariCPClient h2Client) {
this.h2Client = h2Client;
}
@Override
public Logs queryLogs(String metricName, int serviceId, int serviceInstanceId, int endpointId,
String traceId, LogState state,
String stateCode, Pagination paging, int from, int limit, long startSecondTB,
long endSecondTB) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> parameters = new ArrayList<>(10);
sql.append("from ").append(metricName).append(" where ");
sql.append(" 1=1 ");
if (startSecondTB != 0 && endSecondTB != 0) {
sql.append(" and ").append(AbstractLogRecord.TIME_BUCKET).append(" >= ?");
parameters.add(startSecondTB);
sql.append(" and ").append(AbstractLogRecord.TIME_BUCKET).append(" <= ?");
parameters.add(endSecondTB);
}
if (serviceId != Const.NONE) {
sql.append(" and ").append(SERVICE_ID).append(" = ?");
parameters.add(serviceId);
}
if (serviceInstanceId != Const.NONE) {
sql.append(" and ").append(AbstractLogRecord.SERVICE_INSTANCE_ID).append(" = ?");
parameters.add(serviceInstanceId);
}
if (endpointId != Const.NONE) {
sql.append(" and ").append(AbstractLogRecord.ENDPOINT_ID).append(" = ?");
parameters.add(endpointId);
}
if (!Strings.isNullOrEmpty(stateCode)) {
sql.append(" and ").append(AbstractLogRecord.STATUS_CODE).append(" = ?");
parameters.add(stateCode);
}
if (!Strings.isNullOrEmpty(traceId)) {
sql.append(" and ").append(TRACE_ID).append(" = ?");
parameters.add(traceId);
}
if (LogState.ERROR.equals(state)) {
sql.append(" and ").append(AbstractLogRecord.IS_ERROR).append(" = ?");
parameters.add(BooleanUtils.booleanToValue(true));
} else if (LogState.ERROR.equals(state)) {
sql.append(" and ").append(AbstractLogRecord.IS_ERROR).append(" = ?");
parameters.add(BooleanUtils.booleanToValue(false));
}
Logs logs = new Logs();
try (Connection connection = h2Client.getConnection()) {
try (ResultSet resultSet = h2Client.executeQuery(connection, buildCountStatement(sql.toString()), parameters.toArray(new Object[0]))) {
while (resultSet.next()) {
logs.setTotal(resultSet.getInt("total"));
}
}
buildLimit(sql, from, limit);
try (ResultSet resultSet = h2Client.executeQuery(connection, "select * " + sql.toString(), parameters.toArray(new Object[0]))) {
while (resultSet.next()) {
Log log = new Log();
log.setServiceId(resultSet.getInt(SERVICE_ID));
log.setServiceInstanceId(resultSet.getInt(SERVICE_INSTANCE_ID));
log.setEndpointId(resultSet.getInt(ENDPOINT_ID));
log.setTraceId(resultSet.getString(TRACE_ID));
log.setTimestamp(resultSet.getString(TIMESTAMP));
log.setStatusCode(resultSet.getString(STATUS_CODE));
log.setContentType(ContentType.instanceOf(resultSet.getInt(CONTENT_TYPE)));
log.setContent(resultSet.getString(CONTENT));
logs.getLogs().add(log);
}
}
} catch (SQLException e) {
throw new IOException(e);
}
return logs;
}
protected String buildCountStatement(String sql) {
return "select count(1) total from (select 1 " + sql + " )";
}
protected void buildLimit(StringBuilder sql, int from, int limit) {
sql.append(" LIMIT ").append(limit);
sql.append(" OFFSET ").append(from);
}
}
......@@ -103,7 +103,7 @@ public class H2TraceQueryDAO implements ITraceQueryDAO {
TraceBrief traceBrief = new TraceBrief();
try (Connection connection = h2Client.getConnection()) {
try (ResultSet resultSet = h2Client.executeQuery(connection, "select count(1) total from (select 1 " + sql.toString() + " )", parameters.toArray(new Object[0]))) {
try (ResultSet resultSet = h2Client.executeQuery(connection, buildCountStatement(sql.toString()), parameters.toArray(new Object[0]))) {
while (resultSet.next()) {
traceBrief.setTotal(resultSet.getInt("total"));
}
......@@ -132,6 +132,10 @@ public class H2TraceQueryDAO implements ITraceQueryDAO {
return traceBrief;
}
protected String buildCountStatement(String sql) {
return "select count(1) total from (select 1 " + sql + " )";
}
protected void buildLimit(StringBuilder sql, int from, int limit) {
sql.append(" LIMIT ").append(limit);
sql.append(" OFFSET ").append(from);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2LogQueryDAO;
/**
* @author wusheng
*/
public class MySQLLogQueryDAO extends H2LogQueryDAO {
public MySQLLogQueryDAO(JDBCHikariCPClient h2Client) {
super(h2Client);
}
protected String buildCountStatement(String sql) {
return "select count(1) total from (select 1 " + sql + " )";
}
protected void buildLimit(StringBuilder sql, int from, int limit) {
sql.append(" LIMIT ").append(limit);
sql.append(" OFFSET ").append(from);
}
}
......@@ -93,6 +93,7 @@ public class MySQLStorageProvider extends ModuleProvider {
this.registerServiceImplementation(IAlarmQueryDAO.class, new MySQLAlarmQueryDAO(mysqlClient));
this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(mysqlClient));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(mysqlClient));
this.registerServiceImplementation(ILogQueryDAO.class, new MySQLLogQueryDAO(mysqlClient));
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
......
......@@ -18,16 +18,8 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
import com.google.common.base.Strings;
import java.io.IOException;
import java.sql.*;
import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TraceQueryDAO;
import org.elasticsearch.search.sort.SortOrder;
/**
* @author wusheng
......@@ -37,97 +29,8 @@ public class MySQLTraceQueryDAO extends H2TraceQueryDAO {
super(mysqlClient);
}
@Override
public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration,
String endpointName, int serviceId, int serviceInstanceId, int endpointId, String traceId, int limit, int from,
TraceState traceState, QueryOrder queryOrder) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> parameters = new ArrayList<>(10);
sql.append("from ").append(SegmentRecord.INDEX_NAME).append(" where ");
sql.append(" 1=1 ");
if (startSecondTB != 0 && endSecondTB != 0) {
sql.append(" and ").append(SegmentRecord.TIME_BUCKET).append(" >= ?");
parameters.add(startSecondTB);
sql.append(" and ").append(SegmentRecord.TIME_BUCKET).append(" <= ?");
parameters.add(endSecondTB);
}
if (minDuration != 0 || maxDuration != 0) {
if (minDuration != 0) {
sql.append(" and ").append(SegmentRecord.LATENCY).append(" >= ?");
parameters.add(minDuration);
}
if (maxDuration != 0) {
sql.append(" and ").append(SegmentRecord.LATENCY).append(" <= ?");
parameters.add(maxDuration);
}
}
if (!Strings.isNullOrEmpty(endpointName)) {
sql.append(" and ").append(SegmentRecord.ENDPOINT_NAME).append(" like '%" + endpointName + "%'");
}
if (serviceId != 0) {
sql.append(" and ").append(SegmentRecord.SERVICE_ID).append(" = ?");
parameters.add(serviceId);
}
if (serviceInstanceId != 0) {
sql.append(" and ").append(SegmentRecord.SERVICE_INSTANCE_ID).append(" = ?");
parameters.add(serviceInstanceId);
}
if (endpointId != 0) {
sql.append(" and ").append(SegmentRecord.ENDPOINT_ID).append(" = ?");
parameters.add(endpointId);
}
if (!Strings.isNullOrEmpty(traceId)) {
sql.append(" and ").append(SegmentRecord.TRACE_ID).append(" = ?");
parameters.add(traceId);
}
switch (traceState) {
case ERROR:
sql.append(" and ").append(SegmentRecord.IS_ERROR).append(" = ").append(BooleanUtils.TRUE);
break;
case SUCCESS:
sql.append(" and ").append(SegmentRecord.IS_ERROR).append(" = ").append(BooleanUtils.FALSE);
break;
}
switch (queryOrder) {
case BY_START_TIME:
sql.append(" order by ").append(SegmentRecord.START_TIME).append(" ").append(SortOrder.DESC);
break;
case BY_DURATION:
sql.append(" order by ").append(SegmentRecord.LATENCY).append(" ").append(SortOrder.DESC);
break;
}
TraceBrief traceBrief = new TraceBrief();
try (Connection connection = getClient().getConnection()) {
try (ResultSet resultSet = getClient().executeQuery(connection, "select count(1) total from (select 1 " + sql.toString() + " ) AS TRACE", parameters.toArray(new Object[0]))) {
while (resultSet.next()) {
traceBrief.setTotal(resultSet.getInt("total"));
}
}
buildLimit(sql, from, limit);
try (ResultSet resultSet = getClient().executeQuery(connection, "select * " + sql.toString(), parameters.toArray(new Object[0]))) {
while (resultSet.next()) {
BasicTrace basicTrace = new BasicTrace();
basicTrace.setSegmentId(resultSet.getString(SegmentRecord.SEGMENT_ID));
basicTrace.setStart(resultSet.getString(SegmentRecord.START_TIME));
basicTrace.getEndpointNames().add(resultSet.getString(SegmentRecord.ENDPOINT_NAME));
basicTrace.setDuration(resultSet.getInt(SegmentRecord.LATENCY));
basicTrace.setError(BooleanUtils.valueToBoolean(resultSet.getInt(SegmentRecord.IS_ERROR)));
String traceIds = resultSet.getString(SegmentRecord.TRACE_ID);
basicTrace.getTraceIds().add(traceIds);
traceBrief.getTraces().add(basicTrace);
}
}
} catch (SQLException e) {
throw new IOException(e);
}
return traceBrief;
@Override protected String buildCountStatement(String sql) {
return "select count(1) total from (select 1 " + sql + " ) AS TRACE";
}
@Override protected void buildLimit(StringBuilder sql, int from, int limit) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册