未验证 提交 8d33f027 编写于 作者: D Daming 提交者: GitHub

chore: code polish (#6025)

上级 1297d2e1
...@@ -46,17 +46,13 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.ti; ...@@ -46,17 +46,13 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.ti;
*/ */
@Slf4j @Slf4j
public class InfluxClient implements Client, HealthCheckable { public class InfluxClient implements Client, HealthCheckable {
private InfluxStorageConfig config; private final DelegatedHealthChecker healthChecker = new DelegatedHealthChecker();
private final InfluxStorageConfig config;
private InfluxDB influx; private InfluxDB influx;
private DelegatedHealthChecker healthChecker = new DelegatedHealthChecker();
/** /**
* A constant, the name of time field in Time-series database. * A constant, the name of time field in Time-series database.
*/ */
public static final String TIME = "time"; public static final String TIME = "time";
/**
* A constant, the name of tag of time_bucket.
*/
public static final String TAG_TIME_BUCKET = "_time_bucket";
private final String database; private final String database;
...@@ -217,7 +213,7 @@ public class InfluxClient implements Client, HealthCheckable { ...@@ -217,7 +213,7 @@ public class InfluxClient implements Client, HealthCheckable {
this.healthChecker.health(); this.healthChecker.health();
} catch (Throwable e) { } catch (Throwable e) {
healthChecker.unHealth(e); healthChecker.unHealth(e);
throw e; throw new IOException(e);
} }
} }
......
...@@ -69,7 +69,7 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; ...@@ -69,7 +69,7 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
@Slf4j @Slf4j
public class InfluxStorageProvider extends ModuleProvider { public class InfluxStorageProvider extends ModuleProvider {
private InfluxStorageConfig config; private final InfluxStorageConfig config;
private InfluxClient client; private InfluxClient client;
public InfluxStorageProvider() { public InfluxStorageProvider() {
...@@ -123,8 +123,11 @@ public class InfluxStorageProvider extends ModuleProvider { ...@@ -123,8 +123,11 @@ public class InfluxStorageProvider extends ModuleProvider {
@Override @Override
public void start() throws ServiceNotProvidedException, ModuleStartException { public void start() throws ServiceNotProvidedException, ModuleStartException {
MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME).provider().getService(MetricsCreator.class); MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME)
HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge("storage_influxdb", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); .provider()
.getService(MetricsCreator.class);
HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge(
"storage_influxdb", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
client.registerChecker(healthChecker); client.registerChecker(healthChecker);
try { try {
client.connect(); client.connect();
...@@ -137,7 +140,7 @@ public class InfluxStorageProvider extends ModuleProvider { ...@@ -137,7 +140,7 @@ public class InfluxStorageProvider extends ModuleProvider {
} }
@Override @Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException { public void notifyAfterCompleted() throws ServiceNotProvidedException {
} }
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
package org.apache.skywalking.oap.server.storage.plugin.influxdb; package org.apache.skywalking.oap.server.storage.plugin.influxdb;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller; import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
import org.apache.skywalking.oap.server.library.client.Client; import org.apache.skywalking.oap.server.library.client.Client;
...@@ -31,13 +30,13 @@ public class InfluxTableInstaller extends ModelInstaller { ...@@ -31,13 +30,13 @@ public class InfluxTableInstaller extends ModelInstaller {
} }
@Override @Override
protected boolean isExists(final Model model) throws StorageException { protected boolean isExists(final Model model) {
TableMetaInfo.addModel(model); TableMetaInfo.addModel(model);
return true; return true;
} }
@Override @Override
protected void createTable(final Model model) throws StorageException { protected void createTable(final Model model) {
// Automatically create table // Automatically create table
} }
} }
...@@ -41,9 +41,9 @@ import org.apache.skywalking.oap.server.core.storage.model.ModelColumn; ...@@ -41,9 +41,9 @@ import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
public class TableMetaInfo { public class TableMetaInfo {
private static final Map<String, TableMetaInfo> TABLES = new HashMap<>(); private static final Map<String, TableMetaInfo> TABLES = new HashMap<>();
private Map<String, String> storageAndColumnMap; private final Map<String, String> storageAndColumnMap;
private Map<String, String> storageAndTagMap; private final Map<String, String> storageAndTagMap;
private Model model; private final Model model;
public static void addModel(Model model) { public static void addModel(Model model) {
final List<ModelColumn> columns = model.getColumns(); final List<ModelColumn> columns = model.getColumns();
...@@ -88,7 +88,7 @@ public class TableMetaInfo { ...@@ -88,7 +88,7 @@ public class TableMetaInfo {
} }
} }
TableMetaInfo info = TableMetaInfo.builder() final TableMetaInfo info = TableMetaInfo.builder()
.model(model) .model(model)
.storageAndTagMap(storageAndTagMap) .storageAndTagMap(storageAndTagMap)
.storageAndColumnMap(storageAndColumnMap) .storageAndColumnMap(storageAndColumnMap)
......
...@@ -36,17 +36,17 @@ import org.influxdb.dto.Point; ...@@ -36,17 +36,17 @@ import org.influxdb.dto.Point;
* InfluxDB Point wrapper. * InfluxDB Point wrapper.
*/ */
public class InfluxInsertRequest implements InsertRequest, UpdateRequest { public class InfluxInsertRequest implements InsertRequest, UpdateRequest {
private Point.Builder builder; private final Point.Builder builder;
private Map<String, Object> fields = Maps.newHashMap(); private final Map<String, Object> fields = Maps.newHashMap();
public InfluxInsertRequest(Model model, StorageData storageData, StorageBuilder storageBuilder) { public <T extends StorageData> InfluxInsertRequest(Model model, T storageData, StorageBuilder<T> storageBuilder) {
Map<String, Object> objectMap = storageBuilder.data2Map(storageData); final Map<String, Object> objectMap = storageBuilder.data2Map(storageData);
if (SegmentRecord.INDEX_NAME.equals(model.getName())) { if (SegmentRecord.INDEX_NAME.equals(model.getName())) {
objectMap.remove(SegmentRecord.TAGS); objectMap.remove(SegmentRecord.TAGS);
} }
for (ModelColumn column : model.getColumns()) { for (ModelColumn column : model.getColumns()) {
Object value = objectMap.get(column.getColumnName().getName()); final Object value = objectMap.get(column.getColumnName().getName());
if (value instanceof StorageDataComplexObject) { if (value instanceof StorageDataComplexObject) {
fields.put( fields.put(
......
...@@ -44,7 +44,7 @@ public class InfluxStorageDAO implements StorageDAO { ...@@ -44,7 +44,7 @@ public class InfluxStorageDAO implements StorageDAO {
@Override @Override
public IRecordDAO newRecordDao(StorageBuilder<Record> storageBuilder) { public IRecordDAO newRecordDao(StorageBuilder<Record> storageBuilder) {
return new RecordDAO(influxClient, storageBuilder); return new RecordDAO(storageBuilder);
} }
@Override @Override
......
...@@ -40,8 +40,8 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select; ...@@ -40,8 +40,8 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
@Slf4j @Slf4j
public class ManagementDAO implements IManagementDAO { public class ManagementDAO implements IManagementDAO {
private InfluxClient client; private final InfluxClient client;
private StorageBuilder<ManagementData> storageBuilder; private final StorageBuilder<ManagementData> storageBuilder;
public ManagementDAO(InfluxClient client, StorageBuilder<ManagementData> storageBuilder) { public ManagementDAO(InfluxClient client, StorageBuilder<ManagementData> storageBuilder) {
this.client = client; this.client = client;
......
...@@ -58,7 +58,7 @@ public class MetricsDAO implements IMetricsDAO { ...@@ -58,7 +58,7 @@ public class MetricsDAO implements IMetricsDAO {
@Override @Override
public List<Metrics> multiGet(Model model, List<String> ids) throws IOException { public List<Metrics> multiGet(Model model, List<String> ids) throws IOException {
WhereQueryImpl<SelectQueryImpl> query = select() final WhereQueryImpl<SelectQueryImpl> query = select()
.raw(ALL_FIELDS) .raw(ALL_FIELDS)
.from(client.getDatabase(), model.getName()) .from(client.getDatabase(), model.getName())
.where(contains("id", Joiner.on("|").join(ids))); .where(contains("id", Joiner.on("|").join(ids)));
...@@ -72,10 +72,10 @@ public class MetricsDAO implements IMetricsDAO { ...@@ -72,10 +72,10 @@ public class MetricsDAO implements IMetricsDAO {
} }
final List<Metrics> metrics = Lists.newArrayList(); final List<Metrics> metrics = Lists.newArrayList();
List<String> columns = series.getColumns(); final List<String> columns = series.getColumns();
TableMetaInfo metaInfo = TableMetaInfo.get(model.getName()); final TableMetaInfo metaInfo = TableMetaInfo.get(model.getName());
Map<String, String> storageAndColumnMap = metaInfo.getStorageAndColumnMap(); final Map<String, String> storageAndColumnMap = metaInfo.getStorageAndColumnMap();
series.getValues().forEach(values -> { series.getValues().forEach(values -> {
Map<String, Object> data = Maps.newHashMap(); Map<String, Object> data = Maps.newHashMap();
...@@ -96,21 +96,19 @@ public class MetricsDAO implements IMetricsDAO { ...@@ -96,21 +96,19 @@ public class MetricsDAO implements IMetricsDAO {
} }
@Override @Override
public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException { public InsertRequest prepareBatchInsert(Model model, Metrics metrics) {
final long timestamp = TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling()); final long timestamp = TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling());
TableMetaInfo tableMetaInfo = TableMetaInfo.get(model.getName()); final TableMetaInfo tableMetaInfo = TableMetaInfo.get(model.getName());
final InfluxInsertRequest request = new InfluxInsertRequest(model, metrics, storageBuilder) final InfluxInsertRequest request = new InfluxInsertRequest(model, metrics, storageBuilder)
.time(timestamp, TimeUnit.MILLISECONDS); .time(timestamp, TimeUnit.MILLISECONDS);
tableMetaInfo.getStorageAndTagMap().forEach((field, tag) -> { tableMetaInfo.getStorageAndTagMap().forEach(request::addFieldAsTag);
request.addFieldAsTag(field, tag);
});
return request; return request;
} }
@Override @Override
public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException { public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) {
return (UpdateRequest) this.prepareBatchInsert(model, metrics); return (UpdateRequest) this.prepareBatchInsert(model, metrics);
} }
} }
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
package org.apache.skywalking.oap.server.storage.plugin.influxdb.base; package org.apache.skywalking.oap.server.storage.plugin.influxdb.base;
import java.io.IOException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger; import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
...@@ -33,8 +32,8 @@ public class NoneStreamDAO implements INoneStreamDAO { ...@@ -33,8 +32,8 @@ public class NoneStreamDAO implements INoneStreamDAO {
private static final int PADDING_SIZE = 1_000_000; private static final int PADDING_SIZE = 1_000_000;
private static final AtomicRangeInteger SUFFIX = new AtomicRangeInteger(0, PADDING_SIZE); private static final AtomicRangeInteger SUFFIX = new AtomicRangeInteger(0, PADDING_SIZE);
private InfluxClient client; private final InfluxClient client;
private StorageBuilder<NoneStream> storageBuilder; private final StorageBuilder<NoneStream> storageBuilder;
public NoneStreamDAO(InfluxClient client, StorageBuilder<NoneStream> storageBuilder) { public NoneStreamDAO(InfluxClient client, StorageBuilder<NoneStream> storageBuilder) {
this.client = client; this.client = client;
...@@ -42,15 +41,13 @@ public class NoneStreamDAO implements INoneStreamDAO { ...@@ -42,15 +41,13 @@ public class NoneStreamDAO implements INoneStreamDAO {
} }
@Override @Override
public void insert(final Model model, final NoneStream noneStream) throws IOException { public void insert(final Model model, final NoneStream noneStream) {
final long timestamp = TimeBucket.getTimestamp(noneStream.getTimeBucket(), model.getDownsampling()) final long timestamp = TimeBucket.getTimestamp(noneStream.getTimeBucket(), model.getDownsampling())
* PADDING_SIZE + SUFFIX.getAndIncrement(); * PADDING_SIZE + SUFFIX.getAndIncrement();
final InfluxInsertRequest request = new InfluxInsertRequest(model, noneStream, storageBuilder) final InfluxInsertRequest request = new InfluxInsertRequest(model, noneStream, storageBuilder)
.time(timestamp, TimeUnit.NANOSECONDS); .time(timestamp, TimeUnit.NANOSECONDS);
TableMetaInfo.get(model.getName()).getStorageAndTagMap().forEach((field, tag) -> { TableMetaInfo.get(model.getName()).getStorageAndTagMap().forEach(request::addFieldAsTag);
request.addFieldAsTag(field, tag);
});
client.write(request.getPoint()); client.write(request.getPoint());
} }
} }
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
package org.apache.skywalking.oap.server.storage.plugin.influxdb.base; package org.apache.skywalking.oap.server.storage.plugin.influxdb.base;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -33,44 +32,38 @@ import org.apache.skywalking.oap.server.core.storage.IRecordDAO; ...@@ -33,44 +32,38 @@ import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo; import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo;
public class RecordDAO implements IRecordDAO { public class RecordDAO implements IRecordDAO {
private static final int PADDING_SIZE = 1_000_000; private static final int PADDING_SIZE = 1_000_000;
private static final AtomicRangeInteger SUFFIX = new AtomicRangeInteger(0, PADDING_SIZE); private static final AtomicRangeInteger SUFFIX = new AtomicRangeInteger(0, PADDING_SIZE);
private InfluxClient client; private final StorageBuilder<Record> storageBuilder;
private StorageBuilder<Record> storageBuilder;
public RecordDAO(InfluxClient client, StorageBuilder<Record> storageBuilder) { public RecordDAO(StorageBuilder<Record> storageBuilder) {
this.client = client;
this.storageBuilder = storageBuilder; this.storageBuilder = storageBuilder;
} }
@Override @Override
public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException { public InsertRequest prepareBatchInsert(Model model, Record record) {
final long timestamp = TimeBucket.getTimestamp(record.getTimeBucket(), model.getDownsampling()) final long timestamp = TimeBucket.getTimestamp(record.getTimeBucket(), model.getDownsampling())
* PADDING_SIZE + SUFFIX.getAndIncrement(); * PADDING_SIZE
+ SUFFIX.getAndIncrement();
final InfluxInsertRequest request = new InfluxInsertRequest(model, record, storageBuilder) final InfluxInsertRequest request = new InfluxInsertRequest(model, record, storageBuilder)
.time(timestamp, TimeUnit.NANOSECONDS); .time(timestamp, TimeUnit.NANOSECONDS);
TableMetaInfo.get(model.getName()).getStorageAndTagMap().forEach((field, tag) -> {
request.addFieldAsTag(field, tag); TableMetaInfo.get(model.getName()).getStorageAndTagMap().forEach(request::addFieldAsTag);
});
if (SegmentRecord.INDEX_NAME.equals(model.getName())) { if (SegmentRecord.INDEX_NAME.equals(model.getName())) {
Map<String, List<SpanTag>> collect = ((SegmentRecord) record).getTagsRawData() Map<String, List<SpanTag>> collect = ((SegmentRecord) record).getTagsRawData()
.stream() .stream()
.collect( .collect(
Collectors.groupingBy(SpanTag::getKey)); Collectors.groupingBy(SpanTag::getKey));
collect.entrySet().forEach(e -> { collect.forEach((key, value) -> request.tag(
request.tag(e.getKey(), "'" + Joiner.on("'") key,
.join(e.getValue() "'" + Joiner.on("'").join(value.stream().map(SpanTag::getValue).collect(Collectors.toSet())) + "'"
.stream() ));
.map(SpanTag::getValue)
.collect(Collectors.toSet())) + "'");
});
} }
return request; return request;
} }
......
...@@ -44,7 +44,11 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select; ...@@ -44,7 +44,11 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
@Slf4j @Slf4j
public class AggregationQuery implements IAggregationQueryDAO { public class AggregationQuery implements IAggregationQueryDAO {
private InfluxClient client; private static final Comparator<SelectedRecord> ASCENDING =
Comparator.comparingLong(a -> Long.parseLong(a.getValue()));
private static final Comparator<SelectedRecord> DESCENDING = (a, b) ->
Long.compare(Long.parseLong(b.getValue()), Long.parseLong(a.getValue()));
private final InfluxClient client;
public AggregationQuery(InfluxClient client) { public AggregationQuery(InfluxClient client) {
this.client = client; this.client = client;
...@@ -72,11 +76,12 @@ public class AggregationQuery implements IAggregationQueryDAO { ...@@ -72,11 +76,12 @@ public class AggregationQuery implements IAggregationQueryDAO {
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> where = select() WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> where = select()
.fromSubQuery(client.getDatabase()) .fromSubQuery(client.getDatabase())
.mean(valueColumnName) .mean(valueColumnName)
.from(condition.getName()).where(); .from(condition.getName())
.where();
if (additionalConditions != null) { if (additionalConditions != null) {
additionalConditions.forEach(moreCondition -> { additionalConditions.forEach(moreCondition ->
where.and(eq(moreCondition.getKey(), moreCondition.getValue())); where.and(eq(moreCondition.getKey(), moreCondition.getValue()))
}); );
} }
final SelectSubQueryImpl<SelectQueryImpl> subQuery = where final SelectSubQueryImpl<SelectQueryImpl> subQuery = where
.and(gte(InfluxClient.TIME, InfluxClient.timeIntervalTS(duration.getStartTimestamp()))) .and(gte(InfluxClient.TIME, InfluxClient.timeIntervalTS(duration.getStartTimestamp())))
...@@ -102,13 +107,8 @@ public class AggregationQuery implements IAggregationQueryDAO { ...@@ -102,13 +107,8 @@ public class AggregationQuery implements IAggregationQueryDAO {
entities.add(entity); entities.add(entity);
}); });
Collections.sort(entities, comparator); // re-sort by self, because of the result order by time. entities.sort(comparator); // re-sort by self, because of the result order by time.
return entities; return entities;
} }
private static final Comparator<SelectedRecord> ASCENDING = (a, b) -> Long.compare(
Long.parseLong(a.getValue()), Long.parseLong(b.getValue()));
private static final Comparator<SelectedRecord> DESCENDING = (a, b) -> Long.compare(
Long.parseLong(b.getValue()), Long.parseLong(a.getValue()));
} }
...@@ -73,9 +73,7 @@ public class AlarmQuery implements IAlarmQueryDAO { ...@@ -73,9 +73,7 @@ public class AlarmQuery implements IAlarmQueryDAO {
WhereQueryImpl<SelectQueryImpl> countQuery = select().count(AlarmRecord.ID0) WhereQueryImpl<SelectQueryImpl> countQuery = select().count(AlarmRecord.ID0)
.from(client.getDatabase(), AlarmRecord.INDEX_NAME) .from(client.getDatabase(), AlarmRecord.INDEX_NAME)
.where(); .where();
recallQuery.getClauses().forEach(clause -> { recallQuery.getClauses().forEach(countQuery::where);
countQuery.where(clause);
});
Query query = new Query(countQuery.getCommand() + recallQuery.getCommand()); Query query = new Query(countQuery.getCommand() + recallQuery.getCommand());
List<QueryResult.Result> results = client.query(query); List<QueryResult.Result> results = client.query(query);
......
...@@ -80,7 +80,7 @@ public class MetadataQuery implements IMetadataQueryDAO { ...@@ -80,7 +80,7 @@ public class MetadataQuery implements IMetadataQueryDAO {
@Override @Override
public List<Service> getAllBrowserServices() throws IOException { public List<Service> getAllBrowserServices() throws IOException {
WhereQueryImpl<SelectQueryImpl> query = select(ID_COLUMN, NAME, ServiceTraffic.GROUP) final WhereQueryImpl<SelectQueryImpl> query = select(ID_COLUMN, NAME, ServiceTraffic.GROUP)
.from(client.getDatabase(), ServiceTraffic.INDEX_NAME) .from(client.getDatabase(), ServiceTraffic.INDEX_NAME)
.where(eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Browser.value()))); .where(eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Browser.value())));
return buildServices(query); return buildServices(query);
...@@ -88,16 +88,16 @@ public class MetadataQuery implements IMetadataQueryDAO { ...@@ -88,16 +88,16 @@ public class MetadataQuery implements IMetadataQueryDAO {
@Override @Override
public List<Database> getAllDatabases() throws IOException { public List<Database> getAllDatabases() throws IOException {
WhereQueryImpl<SelectQueryImpl> query = select(ID_COLUMN, NAME, ServiceTraffic.GROUP) final WhereQueryImpl<SelectQueryImpl> query = select(ID_COLUMN, NAME, ServiceTraffic.GROUP)
.from(client.getDatabase(), ServiceTraffic.INDEX_NAME) .from(client.getDatabase(), ServiceTraffic.INDEX_NAME)
.where(eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Database.value()))); .where(eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Database.value())));
QueryResult.Series series = client.queryForSingleSeries(query); final QueryResult.Series series = client.queryForSingleSeries(query);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("SQL: {} result: {}", query.getCommand(), series); log.debug("SQL: {} result: {}", query.getCommand(), series);
} }
List<Database> databases = Lists.newArrayList(); final List<Database> databases = Lists.newArrayList();
if (Objects.nonNull(series)) { if (Objects.nonNull(series)) {
for (List<Object> values : series.getValues()) { for (List<Object> values : series.getValues()) {
Database database = new Database(); Database database = new Database();
...@@ -111,8 +111,7 @@ public class MetadataQuery implements IMetadataQueryDAO { ...@@ -111,8 +111,7 @@ public class MetadataQuery implements IMetadataQueryDAO {
@Override @Override
public List<Service> searchServices(String keyword) throws IOException { public List<Service> searchServices(String keyword) throws IOException {
final WhereQueryImpl<SelectQueryImpl> where = select( final WhereQueryImpl<SelectQueryImpl> where = select(ID_COLUMN, NAME, ServiceTraffic.GROUP)
ID_COLUMN, NAME, ServiceTraffic.GROUP)
.from(client.getDatabase(), ServiceTraffic.INDEX_NAME) .from(client.getDatabase(), ServiceTraffic.INDEX_NAME)
.where(eq(TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value()))); .where(eq(TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value())));
if (!Strings.isNullOrEmpty(keyword)) { if (!Strings.isNullOrEmpty(keyword)) {
...@@ -123,12 +122,11 @@ public class MetadataQuery implements IMetadataQueryDAO { ...@@ -123,12 +122,11 @@ public class MetadataQuery implements IMetadataQueryDAO {
@Override @Override
public Service searchService(String serviceCode) throws IOException { public Service searchService(String serviceCode) throws IOException {
WhereQueryImpl<SelectQueryImpl> where = select( final WhereQueryImpl<SelectQueryImpl> whereQuery = select(ID_COLUMN, NAME, ServiceTraffic.GROUP)
ID_COLUMN, NAME, ServiceTraffic.GROUP)
.from(client.getDatabase(), ServiceTraffic.INDEX_NAME) .from(client.getDatabase(), ServiceTraffic.INDEX_NAME)
.where(eq(TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value()))) .where(eq(TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value())));
.and(eq(ServiceTraffic.NAME, serviceCode)); whereQuery.and(eq(ServiceTraffic.NAME, serviceCode));
return buildServices(where).get(0); return buildServices(whereQuery).get(0);
} }
@Override @Override
...@@ -147,7 +145,7 @@ public class MetadataQuery implements IMetadataQueryDAO { ...@@ -147,7 +145,7 @@ public class MetadataQuery implements IMetadataQueryDAO {
final QueryResult.Series series = client.queryForSingleSeries(where); final QueryResult.Series series = client.queryForSingleSeries(where);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("SQL: {} result: {}", where.getCommand(), series); log.debug("SQL: {} result: {}.", where.getCommand(), series);
} }
List<Endpoint> list = new ArrayList<>(limit); List<Endpoint> list = new ArrayList<>(limit);
...@@ -189,7 +187,7 @@ public class MetadataQuery implements IMetadataQueryDAO { ...@@ -189,7 +187,7 @@ public class MetadataQuery implements IMetadataQueryDAO {
} }
if (Objects.isNull(series)) { if (Objects.isNull(series)) {
return Collections.EMPTY_LIST; return Collections.emptyList();
} }
List<List<Object>> result = series.getValues(); List<List<Object>> result = series.getValues();
......
...@@ -63,22 +63,20 @@ public class MetricsQuery implements IMetricsQueryDAO { ...@@ -63,22 +63,20 @@ public class MetricsQuery implements IMetricsQueryDAO {
public long readMetricsValue(final MetricsCondition condition, public long readMetricsValue(final MetricsCondition condition,
final String valueColumnName, final String valueColumnName,
final Duration duration) throws IOException { final Duration duration) throws IOException {
int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName()); final int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName()); final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
if (function == Function.Latest) { if (function == Function.Latest) {
return readMetricsValues(condition, valueColumnName, duration).getValues().latestValue(defaultValue); return readMetricsValues(condition, valueColumnName, duration).getValues().latestValue(defaultValue);
} }
final String measurement = condition.getName(); final String measurement = condition.getName();
SelectionQueryImpl query = select(); final SelectionQueryImpl query = select();
switch (function) { if (function == Function.Avg) {
case Avg: query.mean(valueColumnName);
query.mean(valueColumnName); } else {
break; query.sum(valueColumnName);
default:
query.sum(valueColumnName);
} }
WhereQueryImpl<SelectQueryImpl> queryWhereQuery = query.from(client.getDatabase(), measurement).where(); final WhereQueryImpl<SelectQueryImpl> queryWhereQuery = query.from(client.getDatabase(), measurement).where();
final String entityId = condition.getEntity().buildId(); final String entityId = condition.getEntity().buildId();
if (entityId != null) { if (entityId != null) {
...@@ -90,7 +88,7 @@ public class MetricsQuery implements IMetricsQueryDAO { ...@@ -90,7 +88,7 @@ public class MetricsQuery implements IMetricsQueryDAO {
.and(lte(InfluxClient.TIME, InfluxClient.timeIntervalTS(duration.getEndTimestamp()))) .and(lte(InfluxClient.TIME, InfluxClient.timeIntervalTS(duration.getEndTimestamp())))
.groupBy(InfluxConstants.TagName.ENTITY_ID); .groupBy(InfluxConstants.TagName.ENTITY_ID);
List<QueryResult.Series> seriesList = client.queryForSeries(queryWhereQuery); final List<QueryResult.Series> seriesList = client.queryForSeries(queryWhereQuery);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("SQL: {} result set: {}", queryWhereQuery.getCommand(), seriesList); log.debug("SQL: {} result set: {}", queryWhereQuery.getCommand(), seriesList);
} }
...@@ -109,12 +107,10 @@ public class MetricsQuery implements IMetricsQueryDAO { ...@@ -109,12 +107,10 @@ public class MetricsQuery implements IMetricsQueryDAO {
final String valueColumnName, final String valueColumnName,
final Duration duration) throws IOException { final Duration duration) throws IOException {
final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints(); final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
List<String> ids = new ArrayList<>(pointOfTimes.size()); final List<String> ids = new ArrayList<>(pointOfTimes.size());
pointOfTimes.forEach(pointOfTime -> { pointOfTimes.forEach(pointOfTime -> ids.add(pointOfTime.id(condition.getEntity().buildId())));
ids.add(pointOfTime.id(condition.getEntity().buildId()));
});
WhereQueryImpl<SelectQueryImpl> query = select() final WhereQueryImpl<SelectQueryImpl> query = select()
.column(ID_COLUMN) .column(ID_COLUMN)
.column(valueColumnName) .column(valueColumnName)
.from(client.getDatabase(), condition.getName()) .from(client.getDatabase(), condition.getName())
...@@ -156,12 +152,10 @@ public class MetricsQuery implements IMetricsQueryDAO { ...@@ -156,12 +152,10 @@ public class MetricsQuery implements IMetricsQueryDAO {
final List<String> labels, final List<String> labels,
final Duration duration) throws IOException { final Duration duration) throws IOException {
final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints(); final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
List<String> ids = new ArrayList<>(pointOfTimes.size()); final List<String> ids = new ArrayList<>(pointOfTimes.size());
pointOfTimes.forEach(pointOfTime -> { pointOfTimes.forEach(pointOfTime -> ids.add(pointOfTime.id(condition.getEntity().buildId())));
ids.add(pointOfTime.id(condition.getEntity().buildId()));
});
WhereQueryImpl<SelectQueryImpl> query = select() final WhereQueryImpl<SelectQueryImpl> query = select()
.column(ID_COLUMN) .column(ID_COLUMN)
.column(valueColumnName) .column(valueColumnName)
.from(client.getDatabase(), condition.getName()) .from(client.getDatabase(), condition.getName())
...@@ -174,12 +168,12 @@ public class MetricsQuery implements IMetricsQueryDAO { ...@@ -174,12 +168,12 @@ public class MetricsQuery implements IMetricsQueryDAO {
query.where(contains(ID_COLUMN, Joiner.on("|").join(ids))); query.where(contains(ID_COLUMN, Joiner.on("|").join(ids)));
} }
} }
List<QueryResult.Series> series = client.queryForSeries(query); final List<QueryResult.Series> series = client.queryForSeries(query);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("SQL: {} result set: {}", query.getCommand(), series); log.debug("SQL: {} result set: {}", query.getCommand(), series);
} }
Map<String, DataTable> idMap = new HashMap<>(); final Map<String, DataTable> idMap = new HashMap<>();
if (!CollectionUtils.isEmpty(series)) { if (!CollectionUtils.isEmpty(series)) {
series.get(0).getValues().forEach(values -> { series.get(0).getValues().forEach(values -> {
final String id = (String) values.get(1); final String id = (String) values.get(1);
...@@ -196,26 +190,23 @@ public class MetricsQuery implements IMetricsQueryDAO { ...@@ -196,26 +190,23 @@ public class MetricsQuery implements IMetricsQueryDAO {
final String valueColumnName, final String valueColumnName,
final Duration duration) throws IOException { final Duration duration) throws IOException {
final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints(); final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
List<String> ids = new ArrayList<>(pointOfTimes.size()); final List<String> ids = new ArrayList<>(pointOfTimes.size());
pointOfTimes.forEach(pointOfTime -> { pointOfTimes.forEach(pointOfTime -> ids.add(pointOfTime.id(condition.getEntity().buildId())));
ids.add(pointOfTime.id(condition.getEntity().buildId()));
});
WhereQueryImpl<SelectQueryImpl> query = select() final WhereQueryImpl<SelectQueryImpl> query = select()
.column(ID_COLUMN) .column(ID_COLUMN)
.column(valueColumnName) .column(valueColumnName)
.from(client.getDatabase(), condition.getName()) .from(client.getDatabase(), condition.getName())
.where(contains(ID_COLUMN, Joiner.on("|").join(ids))); .where(contains(ID_COLUMN, Joiner.on("|").join(ids)));
Map<String, List<Long>> thermodynamicValueMatrix = new HashMap<>();
QueryResult.Series series = client.queryForSingleSeries(query); final QueryResult.Series series = client.queryForSingleSeries(query);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("SQL: {} result set: {}", query.getCommand(), series); log.debug("SQL: {} result set: {}", query.getCommand(), series);
} }
final int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName()); final int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
HeatMap heatMap = new HeatMap(); final HeatMap heatMap = new HeatMap();
if (series != null) { if (series != null) {
for (List<Object> values : series.getValues()) { for (List<Object> values : series.getValues()) {
heatMap.buildColumn(values.get(1).toString(), values.get(2).toString(), defaultValue); heatMap.buildColumn(values.get(1).toString(), values.get(2).toString(), defaultValue);
...@@ -223,7 +214,6 @@ public class MetricsQuery implements IMetricsQueryDAO { ...@@ -223,7 +214,6 @@ public class MetricsQuery implements IMetricsQueryDAO {
} }
heatMap.fixMissingColumns(ids, defaultValue); heatMap.fixMissingColumns(ids, defaultValue);
return heatMap; return heatMap;
} }
} }
\ No newline at end of file
...@@ -40,7 +40,7 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select; ...@@ -40,7 +40,7 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
@Slf4j @Slf4j
public class NetworkAddressAliasDAO implements INetworkAddressAliasDAO { public class NetworkAddressAliasDAO implements INetworkAddressAliasDAO {
private final NetworkAddressAlias.Builder builder = new NetworkAddressAlias.Builder(); private final NetworkAddressAlias.Builder builder = new NetworkAddressAlias.Builder();
private InfluxClient client; private final InfluxClient client;
public NetworkAddressAliasDAO(final InfluxClient client) { public NetworkAddressAliasDAO(final InfluxClient client) {
this.client = client; this.client = client;
......
...@@ -37,8 +37,8 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select; ...@@ -37,8 +37,8 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
@Slf4j @Slf4j
public class ProfileTaskLogQuery implements IProfileTaskLogQueryDAO { public class ProfileTaskLogQuery implements IProfileTaskLogQueryDAO {
private InfluxClient client; private final InfluxClient client;
private int fetchTaskLogMaxSize; private final int fetchTaskLogMaxSize;
public ProfileTaskLogQuery(InfluxClient client, int fetchTaskLogMaxSize) { public ProfileTaskLogQuery(InfluxClient client, int fetchTaskLogMaxSize) {
this.client = client; this.client = client;
...@@ -68,16 +68,14 @@ public class ProfileTaskLogQuery implements IProfileTaskLogQueryDAO { ...@@ -68,16 +68,14 @@ public class ProfileTaskLogQuery implements IProfileTaskLogQueryDAO {
series.getValues().stream() series.getValues().stream()
// re-sort by self, because of the result order by time. // re-sort by self, because of the result order by time.
.sorted((a, b) -> Long.compare(((Number) b.get(1)).longValue(), ((Number) a.get(1)).longValue())) .sorted((a, b) -> Long.compare(((Number) b.get(1)).longValue(), ((Number) a.get(1)).longValue()))
.forEach(values -> { .forEach(values -> taskLogs.add(ProfileTaskLog.builder()
taskLogs.add(ProfileTaskLog.builder() .id((String) values.get(2))
.id((String) values.get(2)) .taskId((String) values.get(3))
.taskId((String) values.get(3)) .instanceId((String) values.get(4))
.instanceId((String) values.get(4)) .operationTime(((Number) values.get(5)).longValue())
.operationTime(((Number) values.get(5)).longValue()) .operationType(ProfileTaskLogOperationType.parse(
.operationType(ProfileTaskLogOperationType.parse( ((Number) values.get(6)).intValue()))
((Number) values.get(6)).intValue())) .build()));
.build());
});
return taskLogs; return taskLogs;
} }
} }
...@@ -52,7 +52,7 @@ public class ProfileTaskQuery implements IProfileTaskQueryDAO { ...@@ -52,7 +52,7 @@ public class ProfileTaskQuery implements IProfileTaskQueryDAO {
final Long startTimeBucket, final Long startTimeBucket,
final Long endTimeBucket, final Long endTimeBucket,
final Integer limit) throws IOException { final Integer limit) throws IOException {
WhereQueryImpl<SelectQueryImpl> query = final WhereQueryImpl<SelectQueryImpl> query =
select( select(
InfluxConstants.ID_COLUMN, InfluxConstants.ID_COLUMN,
ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.SERVICE_ID,
...@@ -83,15 +83,13 @@ public class ProfileTaskQuery implements IProfileTaskQueryDAO { ...@@ -83,15 +83,13 @@ public class ProfileTaskQuery implements IProfileTaskQueryDAO {
query.limit(limit); query.limit(limit);
} }
List<ProfileTask> tasks = Lists.newArrayList(); final List<ProfileTask> tasks = Lists.newArrayList();
QueryResult.Series series = client.queryForSingleSeries(query); QueryResult.Series series = client.queryForSingleSeries(query);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("SQL: {} result: {}", query.getCommand(), series); log.debug("SQL: {} result: {}", query.getCommand(), series);
} }
if (series != null) { if (series != null) {
series.getValues().forEach(values -> { series.getValues().forEach(values -> tasks.add(profileTaskBuilder(values)));
tasks.add(profileTaskBuilder(values));
});
} }
return tasks; return tasks;
} }
...@@ -101,7 +99,7 @@ public class ProfileTaskQuery implements IProfileTaskQueryDAO { ...@@ -101,7 +99,7 @@ public class ProfileTaskQuery implements IProfileTaskQueryDAO {
if (StringUtil.isEmpty(id)) { if (StringUtil.isEmpty(id)) {
return null; return null;
} }
SelectQueryImpl query = select( final SelectQueryImpl query = select(
InfluxConstants.ID_COLUMN, InfluxConstants.ID_COLUMN,
ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.SERVICE_ID,
ProfileTaskRecord.ENDPOINT_NAME, ProfileTaskRecord.ENDPOINT_NAME,
...@@ -117,7 +115,7 @@ public class ProfileTaskQuery implements IProfileTaskQueryDAO { ...@@ -117,7 +115,7 @@ public class ProfileTaskQuery implements IProfileTaskQueryDAO {
.and(eq(InfluxConstants.ID_COLUMN, id)) .and(eq(InfluxConstants.ID_COLUMN, id))
.limit(1); .limit(1);
QueryResult.Series series = client.queryForSingleSeries(query); final QueryResult.Series series = client.queryForSingleSeries(query);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("SQL: {} result: {}", query.getCommand(), series); log.debug("SQL: {} result: {}", query.getCommand(), series);
} }
...@@ -127,7 +125,7 @@ public class ProfileTaskQuery implements IProfileTaskQueryDAO { ...@@ -127,7 +125,7 @@ public class ProfileTaskQuery implements IProfileTaskQueryDAO {
return null; return null;
} }
private static final ProfileTask profileTaskBuilder(List<Object> values) { private static ProfileTask profileTaskBuilder(List<Object> values) {
return ProfileTask.builder() return ProfileTask.builder()
.id((String) values.get(1)) .id((String) values.get(1))
.serviceId((String) values.get(2)) .serviceId((String) values.get(2))
......
...@@ -38,6 +38,7 @@ import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; ...@@ -38,6 +38,7 @@ import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants; import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.influxdb.dto.QueryResult; import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.WhereQueryImpl; import org.influxdb.querybuilder.WhereQueryImpl;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains; import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains;
...@@ -56,26 +57,25 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA ...@@ -56,26 +57,25 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
@Override @Override
public List<BasicTrace> queryProfiledSegments(String taskId) throws IOException { public List<BasicTrace> queryProfiledSegments(String taskId) throws IOException {
WhereQueryImpl query = select(ProfileThreadSnapshotRecord.SEGMENT_ID) final WhereQueryImpl<SelectQueryImpl> countQuery = select(ProfileThreadSnapshotRecord.SEGMENT_ID)
.from(client.getDatabase(), ProfileThreadSnapshotRecord.INDEX_NAME) .from(client.getDatabase(), ProfileThreadSnapshotRecord.INDEX_NAME)
.where() .where();
.and(eq(ProfileThreadSnapshotRecord.TASK_ID, taskId))
.and(eq(ProfileThreadSnapshotRecord.SEQUENCE, 0)); countQuery.and(eq(ProfileThreadSnapshotRecord.TASK_ID, taskId))
.and(eq(ProfileThreadSnapshotRecord.SEQUENCE, 0));
final LinkedList<String> segments = new LinkedList<>(); final LinkedList<String> segments = new LinkedList<>();
QueryResult.Series series = client.queryForSingleSeries(query); QueryResult.Series series = client.queryForSingleSeries(countQuery);
if (Objects.isNull(series)) { if (Objects.isNull(series)) {
return Collections.emptyList(); return Collections.emptyList();
} }
series.getValues().forEach(values -> { series.getValues().forEach(values -> segments.add((String) values.get(1)));
segments.add((String) values.get(1));
});
if (segments.isEmpty()) { if (segments.isEmpty()) {
return Collections.emptyList(); return Collections.emptyList();
} }
query = select() final WhereQueryImpl<SelectQueryImpl> whereQuery = select()
.function(InfluxConstants.SORT_ASC, SegmentRecord.START_TIME, segments.size()) .function(InfluxConstants.SORT_ASC, SegmentRecord.START_TIME, segments.size())
.column(SegmentRecord.SEGMENT_ID) .column(SegmentRecord.SEGMENT_ID)
.column(SegmentRecord.START_TIME) .column(SegmentRecord.START_TIME)
...@@ -84,16 +84,16 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA ...@@ -84,16 +84,16 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
.column(SegmentRecord.IS_ERROR) .column(SegmentRecord.IS_ERROR)
.column(SegmentRecord.TRACE_ID) .column(SegmentRecord.TRACE_ID)
.from(client.getDatabase(), SegmentRecord.INDEX_NAME) .from(client.getDatabase(), SegmentRecord.INDEX_NAME)
.where() .where();
.and(contains(SegmentRecord.SEGMENT_ID, Joiner.on("|").join(segments))); whereQuery.and(contains(SegmentRecord.SEGMENT_ID, Joiner.on("|").join(segments)));
ArrayList<BasicTrace> result = Lists.newArrayListWithCapacity(segments.size()); ArrayList<BasicTrace> result = Lists.newArrayListWithCapacity(segments.size());
client.queryForSingleSeries(query) client.queryForSingleSeries(whereQuery)
.getValues() .getValues()
.stream() .stream()
.sorted((a, b) -> Long.compare(((Number) b.get(1)).longValue(), ((Number) a.get(1)).longValue())) .sorted((a, b) -> Long.compare(((Number) b.get(1)).longValue(), ((Number) a.get(1)).longValue()))
.forEach(values -> { .forEach(values -> {
BasicTrace basicTrace = new BasicTrace(); final BasicTrace basicTrace = new BasicTrace();
basicTrace.setSegmentId((String) values.get(2)); basicTrace.setSegmentId((String) values.get(2));
basicTrace.setStart(String.valueOf(((Number) values.get(3)).longValue())); basicTrace.setStart(String.valueOf(((Number) values.get(3)).longValue()));
...@@ -122,7 +122,7 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA ...@@ -122,7 +122,7 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
@Override @Override
public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, int minSequence, public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, int minSequence,
int maxSequence) throws IOException { int maxSequence) throws IOException {
WhereQueryImpl query = select( WhereQueryImpl<SelectQueryImpl> whereQuery = select(
ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.TASK_ID,
ProfileThreadSnapshotRecord.SEGMENT_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.DUMP_TIME,
...@@ -130,18 +130,19 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA ...@@ -130,18 +130,19 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
ProfileThreadSnapshotRecord.STACK_BINARY ProfileThreadSnapshotRecord.STACK_BINARY
) )
.from(client.getDatabase(), ProfileThreadSnapshotRecord.INDEX_NAME) .from(client.getDatabase(), ProfileThreadSnapshotRecord.INDEX_NAME)
.where(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId)) .where(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId));
.and(gte(ProfileThreadSnapshotRecord.SEQUENCE, minSequence))
.and(lte(ProfileThreadSnapshotRecord.SEQUENCE, maxSequence)); whereQuery.and(gte(ProfileThreadSnapshotRecord.SEQUENCE, minSequence))
.and(lte(ProfileThreadSnapshotRecord.SEQUENCE, maxSequence));
QueryResult.Series series = client.queryForSingleSeries(query); final QueryResult.Series series = client.queryForSingleSeries(whereQuery);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("SQL: {} result: {}", query.getCommand(), series); log.debug("SQL: {} result: {}", whereQuery.getCommand(), series);
} }
if (Objects.isNull(series)) { if (Objects.isNull(series)) {
return Collections.EMPTY_LIST; return Collections.emptyList();
} }
ArrayList<ProfileThreadSnapshotRecord> result = new ArrayList<>(maxSequence - minSequence); final ArrayList<ProfileThreadSnapshotRecord> result = new ArrayList<>(maxSequence - minSequence);
series.getValues().forEach(values -> { series.getValues().forEach(values -> {
ProfileThreadSnapshotRecord record = new ProfileThreadSnapshotRecord(); ProfileThreadSnapshotRecord record = new ProfileThreadSnapshotRecord();
...@@ -162,29 +163,31 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA ...@@ -162,29 +163,31 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
@Override @Override
public SegmentRecord getProfiledSegment(String segmentId) throws IOException { public SegmentRecord getProfiledSegment(String segmentId) throws IOException {
WhereQueryImpl query = select().column(SegmentRecord.SEGMENT_ID) WhereQueryImpl<SelectQueryImpl> whereQuery = select()
.column(SegmentRecord.TRACE_ID) .column(SegmentRecord.SEGMENT_ID)
.column(SegmentRecord.SERVICE_ID) .column(SegmentRecord.TRACE_ID)
.column(SegmentRecord.ENDPOINT_NAME) .column(SegmentRecord.SERVICE_ID)
.column(SegmentRecord.START_TIME) .column(SegmentRecord.ENDPOINT_NAME)
.column(SegmentRecord.END_TIME) .column(SegmentRecord.START_TIME)
.column(SegmentRecord.LATENCY) .column(SegmentRecord.END_TIME)
.column(SegmentRecord.IS_ERROR) .column(SegmentRecord.LATENCY)
.column(SegmentRecord.DATA_BINARY) .column(SegmentRecord.IS_ERROR)
.column(SegmentRecord.VERSION) .column(SegmentRecord.DATA_BINARY)
.from(client.getDatabase(), SegmentRecord.INDEX_NAME) .column(SegmentRecord.VERSION)
.where() .from(client.getDatabase(), SegmentRecord.INDEX_NAME)
.and(eq(SegmentRecord.SEGMENT_ID, segmentId)); .where();
List<QueryResult.Series> series = client.queryForSeries(query);
whereQuery.and(eq(SegmentRecord.SEGMENT_ID, segmentId));
List<QueryResult.Series> series = client.queryForSeries(whereQuery);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("SQL: {} result set: {}", query.getCommand(), series); log.debug("SQL: {} result set: {}", whereQuery.getCommand(), series);
} }
if (Objects.isNull(series) || series.isEmpty()) { if (Objects.isNull(series) || series.isEmpty()) {
return null; return null;
} }
List<Object> values = series.get(0).getValues().get(0); final List<Object> values = series.get(0).getValues().get(0);
SegmentRecord segmentRecord = new SegmentRecord(); final SegmentRecord segmentRecord = new SegmentRecord();
segmentRecord.setSegmentId((String) values.get(1)); segmentRecord.setSegmentId((String) values.get(1));
segmentRecord.setTraceId((String) values.get(2)); segmentRecord.setTraceId((String) values.get(2));
...@@ -196,7 +199,7 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA ...@@ -196,7 +199,7 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
segmentRecord.setIsError(((Number) values.get(8)).intValue()); segmentRecord.setIsError(((Number) values.get(8)).intValue());
segmentRecord.setVersion(((Number) values.get(10)).intValue()); segmentRecord.setVersion(((Number) values.get(10)).intValue());
String base64 = (String) values.get(9); final String base64 = (String) values.get(9);
if (!Strings.isNullOrEmpty(base64)) { if (!Strings.isNullOrEmpty(base64)) {
segmentRecord.setDataBinary(Base64.getDecoder().decode(base64)); segmentRecord.setDataBinary(Base64.getDecoder().decode(base64));
} }
...@@ -205,13 +208,14 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA ...@@ -205,13 +208,14 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
} }
private int querySequenceWithAgg(String function, String segmentId, long start, long end) throws IOException { private int querySequenceWithAgg(String function, String segmentId, long start, long end) throws IOException {
WhereQueryImpl query = select() WhereQueryImpl<SelectQueryImpl> query = select()
.function(function, ProfileThreadSnapshotRecord.SEQUENCE) .function(function, ProfileThreadSnapshotRecord.SEQUENCE)
.from(client.getDatabase(), ProfileThreadSnapshotRecord.INDEX_NAME) .from(client.getDatabase(), ProfileThreadSnapshotRecord.INDEX_NAME)
.where() .where();
.and(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
.and(gte(ProfileThreadSnapshotRecord.DUMP_TIME, start)) query.and(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
.and(lte(ProfileThreadSnapshotRecord.DUMP_TIME, end)); .and(gte(ProfileThreadSnapshotRecord.DUMP_TIME, start))
.and(lte(ProfileThreadSnapshotRecord.DUMP_TIME, end));
return client.getCounter(query); return client.getCounter(query);
} }
} }
...@@ -35,6 +35,7 @@ import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO; ...@@ -35,6 +35,7 @@ import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants; import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.influxdb.dto.QueryResult; import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.WhereQueryImpl; import org.influxdb.querybuilder.WhereQueryImpl;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq; import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
...@@ -62,13 +63,14 @@ public class TopNRecordsQuery implements ITopNRecordsQueryDAO { ...@@ -62,13 +63,14 @@ public class TopNRecordsQuery implements ITopNRecordsQueryDAO {
comparator = DESCENDING; comparator = DESCENDING;
} }
WhereQueryImpl query = select() final WhereQueryImpl<SelectQueryImpl> query = select()
.function(function, valueColumnName, condition.getTopN()) .function(function, valueColumnName, condition.getTopN())
.column(TopN.STATEMENT) .column(TopN.STATEMENT)
.column(TopN.TRACE_ID) .column(TopN.TRACE_ID)
.from(client.getDatabase(), condition.getName()) .from(client.getDatabase(), condition.getName())
.where() .where();
.and(gte(TopN.TIME_BUCKET, duration.getStartTimeBucketInSec()))
query.and(gte(TopN.TIME_BUCKET, duration.getStartTimeBucketInSec()))
.and(lte(TopN.TIME_BUCKET, duration.getEndTimeBucketInSec())); .and(lte(TopN.TIME_BUCKET, duration.getEndTimeBucketInSec()));
if (StringUtil.isNotEmpty(condition.getParentService())) { if (StringUtil.isNotEmpty(condition.getParentService())) {
...@@ -94,12 +96,12 @@ public class TopNRecordsQuery implements ITopNRecordsQueryDAO { ...@@ -94,12 +96,12 @@ public class TopNRecordsQuery implements ITopNRecordsQueryDAO {
records.add(record); records.add(record);
}); });
Collections.sort(records, comparator); // re-sort by self, because of the result order by time. records.sort(comparator); // re-sort by self, because of the result order by time.
return records; return records;
} }
private static final Comparator<SelectedRecord> ASCENDING = (a, b) -> Long.compare( private static final Comparator<SelectedRecord> ASCENDING = Comparator.comparingLong(
Long.parseLong(a.getValue()), Long.parseLong(b.getValue())); a -> Long.parseLong(a.getValue()));
private static final Comparator<SelectedRecord> DESCENDING = (a, b) -> Long.compare( private static final Comparator<SelectedRecord> DESCENDING = (a, b) -> Long.compare(
Long.parseLong(b.getValue()), Long.parseLong(a.getValue())); Long.parseLong(b.getValue()), Long.parseLong(a.getValue()));
......
...@@ -54,11 +54,11 @@ public class TopologyQuery implements ITopologyQueryDAO { ...@@ -54,11 +54,11 @@ public class TopologyQuery implements ITopologyQueryDAO {
} }
@Override @Override
public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(long startTB, public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(final long startTB,
long endTB, final long endTB,
List<String> serviceIds) throws IOException { final List<String> serviceIds) throws IOException {
String measurement = ServiceRelationServerSideMetrics.INDEX_NAME; final String measurement = ServiceRelationServerSideMetrics.INDEX_NAME;
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery( final WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery(
measurement, measurement,
startTB, startTB,
endTB, endTB,
...@@ -71,12 +71,11 @@ public class TopologyQuery implements ITopologyQueryDAO { ...@@ -71,12 +71,11 @@ public class TopologyQuery implements ITopologyQueryDAO {
} }
@Override @Override
public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(long startTB, public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(final long startTB,
long endTB, final long endTB,
List<String> serviceIds) throws IOException { List<String> serviceIds) throws IOException {
String measurement = ServiceRelationClientSideMetrics.INDEX_NAME; final WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery(
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery( ServiceRelationClientSideMetrics.INDEX_NAME,
measurement,
startTB, startTB,
endTB, endTB,
ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID, ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID,
...@@ -87,11 +86,10 @@ public class TopologyQuery implements ITopologyQueryDAO { ...@@ -87,11 +86,10 @@ public class TopologyQuery implements ITopologyQueryDAO {
} }
@Override @Override
public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(long startTB, public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(final long startTB,
long endTB) throws IOException { final long endTB) throws IOException {
String measurement = ServiceRelationServerSideMetrics.INDEX_NAME; final WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery(
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery( ServiceRelationServerSideMetrics.INDEX_NAME,
measurement,
startTB, startTB,
endTB, endTB,
ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID, ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID,
...@@ -102,11 +100,10 @@ public class TopologyQuery implements ITopologyQueryDAO { ...@@ -102,11 +100,10 @@ public class TopologyQuery implements ITopologyQueryDAO {
} }
@Override @Override
public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(long startTB, public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(final long startTB,
long endTB) throws IOException { final long endTB) throws IOException {
String tableName = ServiceRelationClientSideMetrics.INDEX_NAME;
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery( WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery(
tableName, ServiceRelationClientSideMetrics.INDEX_NAME,
startTB, startTB,
endTB, endTB,
ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID, ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID,
...@@ -117,13 +114,12 @@ public class TopologyQuery implements ITopologyQueryDAO { ...@@ -117,13 +114,12 @@ public class TopologyQuery implements ITopologyQueryDAO {
} }
@Override @Override
public List<Call.CallDetail> loadInstanceRelationDetectedAtServerSide(String clientServiceId, public List<Call.CallDetail> loadInstanceRelationDetectedAtServerSide(final String clientServiceId,
String serverServiceId, final String serverServiceId,
long startTB, final long startTB,
long endTB) throws IOException { final long endTB) throws IOException {
String measurement = ServiceInstanceRelationServerSideMetrics.INDEX_NAME;
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceInstanceCallsQuery( WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceInstanceCallsQuery(
measurement, ServiceInstanceRelationServerSideMetrics.INDEX_NAME,
startTB, startTB,
endTB, endTB,
ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID, ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID,
...@@ -134,13 +130,12 @@ public class TopologyQuery implements ITopologyQueryDAO { ...@@ -134,13 +130,12 @@ public class TopologyQuery implements ITopologyQueryDAO {
} }
@Override @Override
public List<Call.CallDetail> loadInstanceRelationDetectedAtClientSide(String clientServiceId, public List<Call.CallDetail> loadInstanceRelationDetectedAtClientSide(final String clientServiceId,
String serverServiceId, final String serverServiceId,
long startTB, final long startTB,
long endTB) throws IOException { final long endTB) throws IOException {
String measurement = ServiceInstanceRelationClientSideMetrics.INDEX_NAME;
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceInstanceCallsQuery( WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceInstanceCallsQuery(
measurement, ServiceInstanceRelationClientSideMetrics.INDEX_NAME,
startTB, startTB,
endTB, endTB,
ServiceInstanceRelationClientSideMetrics.SOURCE_SERVICE_ID, ServiceInstanceRelationClientSideMetrics.SOURCE_SERVICE_ID,
...@@ -151,13 +146,11 @@ public class TopologyQuery implements ITopologyQueryDAO { ...@@ -151,13 +146,11 @@ public class TopologyQuery implements ITopologyQueryDAO {
} }
@Override @Override
public List<Call.CallDetail> loadEndpointRelation(long startTB, public List<Call.CallDetail> loadEndpointRelation(final long startTB,
long endTB, final long endTB,
String destEndpointId) throws IOException { final String destEndpointId) throws IOException {
String measurement = EndpointRelationServerSideMetrics.INDEX_NAME; final WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery(
EndpointRelationServerSideMetrics.INDEX_NAME,
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery(
measurement,
startTB, startTB,
endTB, endTB,
EndpointRelationServerSideMetrics.SOURCE_ENDPOINT, EndpointRelationServerSideMetrics.SOURCE_ENDPOINT,
...@@ -166,8 +159,8 @@ public class TopologyQuery implements ITopologyQueryDAO { ...@@ -166,8 +159,8 @@ public class TopologyQuery implements ITopologyQueryDAO {
); );
subQuery.and(eq(EndpointRelationServerSideMetrics.DEST_ENDPOINT, destEndpointId)); subQuery.and(eq(EndpointRelationServerSideMetrics.DEST_ENDPOINT, destEndpointId));
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery2 = buildServiceCallsQuery( final WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery2 = buildServiceCallsQuery(
measurement, EndpointRelationServerSideMetrics.INDEX_NAME,
startTB, startTB,
endTB, endTB,
EndpointRelationServerSideMetrics.SOURCE_ENDPOINT, EndpointRelationServerSideMetrics.SOURCE_ENDPOINT,
...@@ -176,19 +169,20 @@ public class TopologyQuery implements ITopologyQueryDAO { ...@@ -176,19 +169,20 @@ public class TopologyQuery implements ITopologyQueryDAO {
); );
subQuery2.and(eq(EndpointRelationServerSideMetrics.SOURCE_ENDPOINT, destEndpointId)); subQuery2.and(eq(EndpointRelationServerSideMetrics.SOURCE_ENDPOINT, destEndpointId));
List<Call.CallDetail> calls = buildEndpointCalls(buildQuery(subQuery), DetectPoint.SERVER); final List<Call.CallDetail> calls = buildEndpointCalls(buildQuery(subQuery), DetectPoint.SERVER);
calls.addAll(buildEndpointCalls(buildQuery(subQuery), DetectPoint.CLIENT)); calls.addAll(buildEndpointCalls(buildQuery(subQuery), DetectPoint.CLIENT));
return calls; return calls;
} }
private WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> buildServiceCallsQuery( private WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> buildServiceCallsQuery(
String measurement, final String measurement,
long startTB, final long startTB,
long endTB, final long endTB,
String sourceCName, final String sourceCName,
String destCName, final String destCName,
List<String> serviceIds) { final List<String> serviceIds) {
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = select()
final WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = select()
.fromSubQuery(client.getDatabase()) .fromSubQuery(client.getDatabase())
.function("distinct", ServiceInstanceRelationServerSideMetrics.COMPONENT_ID) .function("distinct", ServiceInstanceRelationServerSideMetrics.COMPONENT_ID)
.as(ServiceInstanceRelationClientSideMetrics.COMPONENT_ID) .as(ServiceInstanceRelationClientSideMetrics.COMPONENT_ID)
...@@ -198,7 +192,8 @@ public class TopologyQuery implements ITopologyQueryDAO { ...@@ -198,7 +192,8 @@ public class TopologyQuery implements ITopologyQueryDAO {
.and(lte(InfluxClient.TIME, InfluxClient.timeIntervalTB(endTB))); .and(lte(InfluxClient.TIME, InfluxClient.timeIntervalTB(endTB)));
if (!serviceIds.isEmpty()) { if (!serviceIds.isEmpty()) {
WhereNested whereNested = subQuery.andNested(); WhereNested<WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl>> whereNested = subQuery
.andNested();
for (String id : serviceIds) { for (String id : serviceIds) {
whereNested.or(eq(sourceCName, id)) whereNested.or(eq(sourceCName, id))
.or(eq(destCName, id)); .or(eq(destCName, id));
...@@ -209,24 +204,25 @@ public class TopologyQuery implements ITopologyQueryDAO { ...@@ -209,24 +204,25 @@ public class TopologyQuery implements ITopologyQueryDAO {
} }
private WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> buildServiceInstanceCallsQuery( private WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> buildServiceInstanceCallsQuery(
String measurement, final String measurement,
long startTB, final long startTB,
long endTB, final long endTB,
String sourceCName, final String sourceCName,
String destCName, final String destCName,
String sourceServiceId, final String sourceServiceId,
String destServiceId) { final String destServiceId) {
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = select() final WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = select()
.fromSubQuery(client.getDatabase()) .fromSubQuery(client.getDatabase())
.function("distinct", ServiceInstanceRelationServerSideMetrics.COMPONENT_ID) .function("distinct", ServiceInstanceRelationServerSideMetrics.COMPONENT_ID)
.as(ServiceInstanceRelationClientSideMetrics.COMPONENT_ID) .as(ServiceInstanceRelationClientSideMetrics.COMPONENT_ID)
.from(measurement) .from(measurement)
.where() .where();
.and(gte(InfluxClient.TIME, InfluxClient.timeIntervalTB(startTB)))
.and(lte(InfluxClient.TIME, InfluxClient.timeIntervalTB(endTB))); subQuery.and(gte(InfluxClient.TIME, InfluxClient.timeIntervalTB(startTB)))
.and(lte(InfluxClient.TIME, InfluxClient.timeIntervalTB(endTB)));
StringBuilder builder = new StringBuilder("(("); final StringBuilder builder = new StringBuilder("((");
builder.append(sourceCName).append("='").append(sourceServiceId) builder.append(sourceCName).append("='").append(sourceServiceId)
.append("' and ") .append("' and ")
.append(destCName).append("='").append(destServiceId) .append(destCName).append("='").append(destServiceId)
...@@ -242,7 +238,7 @@ public class TopologyQuery implements ITopologyQueryDAO { ...@@ -242,7 +238,7 @@ public class TopologyQuery implements ITopologyQueryDAO {
private List<Call.CallDetail> buildServiceCalls(Query query, private List<Call.CallDetail> buildServiceCalls(Query query,
DetectPoint detectPoint) throws IOException { DetectPoint detectPoint) throws IOException {
QueryResult.Series series = client.queryForSingleSeries(query); final QueryResult.Series series = client.queryForSingleSeries(query);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("SQL: {} result set: {}", query.getCommand(), series); log.debug("SQL: {} result set: {}", query.getCommand(), series);
...@@ -251,7 +247,7 @@ public class TopologyQuery implements ITopologyQueryDAO { ...@@ -251,7 +247,7 @@ public class TopologyQuery implements ITopologyQueryDAO {
return Collections.emptyList(); return Collections.emptyList();
} }
List<Call.CallDetail> calls = new ArrayList<>(); final List<Call.CallDetail> calls = new ArrayList<>();
series.getValues().forEach(values -> { series.getValues().forEach(values -> {
Call.CallDetail call = new Call.CallDetail(); Call.CallDetail call = new Call.CallDetail();
String entityId = String.valueOf(values.get(1)); String entityId = String.valueOf(values.get(1));
...@@ -270,8 +266,7 @@ public class TopologyQuery implements ITopologyQueryDAO { ...@@ -270,8 +266,7 @@ public class TopologyQuery implements ITopologyQueryDAO {
return query; return query;
} }
private List<Call.CallDetail> buildInstanceCalls(Query query, private List<Call.CallDetail> buildInstanceCalls(Query query, DetectPoint detectPoint) throws IOException {
DetectPoint detectPoint) throws IOException {
QueryResult.Series series = client.queryForSingleSeries(query); QueryResult.Series series = client.queryForSingleSeries(query);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
......
...@@ -178,23 +178,24 @@ public class TraceQuery implements ITraceQueryDAO { ...@@ -178,23 +178,24 @@ public class TraceQuery implements ITraceQueryDAO {
@Override @Override
public List<SegmentRecord> queryByTraceId(String traceId) throws IOException { public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
WhereQueryImpl query = select().column(SegmentRecord.SEGMENT_ID) WhereQueryImpl<SelectQueryImpl> whereQuery = select().column(SegmentRecord.SEGMENT_ID)
.column(SegmentRecord.TRACE_ID) .column(SegmentRecord.TRACE_ID)
.column(SegmentRecord.SERVICE_ID) .column(SegmentRecord.SERVICE_ID)
.column(SegmentRecord.SERVICE_INSTANCE_ID) .column(SegmentRecord.SERVICE_INSTANCE_ID)
.column(SegmentRecord.ENDPOINT_NAME) .column(SegmentRecord.ENDPOINT_NAME)
.column(SegmentRecord.START_TIME) .column(SegmentRecord.START_TIME)
.column(SegmentRecord.END_TIME) .column(SegmentRecord.END_TIME)
.column(SegmentRecord.LATENCY) .column(SegmentRecord.LATENCY)
.column(SegmentRecord.IS_ERROR) .column(SegmentRecord.IS_ERROR)
.column(SegmentRecord.DATA_BINARY) .column(SegmentRecord.DATA_BINARY)
.column(SegmentRecord.VERSION) .column(SegmentRecord.VERSION)
.from(client.getDatabase(), SegmentRecord.INDEX_NAME) .from(client.getDatabase(), SegmentRecord.INDEX_NAME)
.where() .where();
.and(eq(SegmentRecord.TRACE_ID, traceId));
List<QueryResult.Series> series = client.queryForSeries(query); whereQuery.and(eq(SegmentRecord.TRACE_ID, traceId));
List<QueryResult.Series> series = client.queryForSeries(whereQuery);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("SQL: {} result set: {}", query.getCommand(), series); log.debug("SQL: {} result set: {}", whereQuery.getCommand(), series);
} }
if (series == null || series.isEmpty()) { if (series == null || series.isEmpty()) {
return Collections.emptyList(); return Collections.emptyList();
...@@ -226,7 +227,7 @@ public class TraceQuery implements ITraceQueryDAO { ...@@ -226,7 +227,7 @@ public class TraceQuery implements ITraceQueryDAO {
} }
@Override @Override
public List<Span> doFlexibleTraceQuery(String traceId) throws IOException { public List<Span> doFlexibleTraceQuery(String traceId) {
return Collections.emptyList(); return Collections.emptyList();
} }
} }
...@@ -50,9 +50,9 @@ public class UITemplateManagementDAOImpl implements UITemplateManagementDAO { ...@@ -50,9 +50,9 @@ public class UITemplateManagementDAOImpl implements UITemplateManagementDAO {
@Override @Override
public List<DashboardConfiguration> getAllTemplates(final Boolean includingDisabled) throws IOException { public List<DashboardConfiguration> getAllTemplates(final Boolean includingDisabled) throws IOException {
WhereQueryImpl<SelectQueryImpl> where = select().raw("*::field") final WhereQueryImpl<SelectQueryImpl> where = select().raw("*::field")
.from(client.getDatabase(), UITemplate.INDEX_NAME) .from(client.getDatabase(), UITemplate.INDEX_NAME)
.where(); .where();
if (!includingDisabled) { if (!includingDisabled) {
where.and(eq(UITemplate.DISABLED, BooleanUtils.FALSE)); where.and(eq(UITemplate.DISABLED, BooleanUtils.FALSE));
} }
...@@ -82,11 +82,11 @@ public class UITemplateManagementDAOImpl implements UITemplateManagementDAO { ...@@ -82,11 +82,11 @@ public class UITemplateManagementDAOImpl implements UITemplateManagementDAO {
final UITemplate.Builder builder = new UITemplate.Builder(); final UITemplate.Builder builder = new UITemplate.Builder();
final UITemplate uiTemplate = setting.toEntity(); final UITemplate uiTemplate = setting.toEntity();
Point point = Point.measurement(UITemplate.INDEX_NAME) final Point point = Point.measurement(UITemplate.INDEX_NAME)
.tag(InfluxConstants.TagName.ID_COLUMN, uiTemplate.id()) .tag(InfluxConstants.TagName.ID_COLUMN, uiTemplate.id())
.fields(builder.data2Map(uiTemplate)) .fields(builder.data2Map(uiTemplate))
.time(1L, TimeUnit.NANOSECONDS) .time(1L, TimeUnit.NANOSECONDS)
.build(); .build();
client.write(point); client.write(point);
return TemplateChangeStatus.builder().status(true).build(); return TemplateChangeStatus.builder().status(true).build();
} }
...@@ -102,11 +102,11 @@ public class UITemplateManagementDAOImpl implements UITemplateManagementDAO { ...@@ -102,11 +102,11 @@ public class UITemplateManagementDAOImpl implements UITemplateManagementDAO {
QueryResult.Series series = client.queryForSingleSeries(query); QueryResult.Series series = client.queryForSingleSeries(query);
if (Objects.nonNull(series)) { if (Objects.nonNull(series)) {
Point point = Point.measurement(UITemplate.INDEX_NAME) final Point point = Point.measurement(UITemplate.INDEX_NAME)
.fields(builder.data2Map(uiTemplate)) .fields(builder.data2Map(uiTemplate))
.tag(InfluxConstants.TagName.ID_COLUMN, uiTemplate.id()) .tag(InfluxConstants.TagName.ID_COLUMN, uiTemplate.id())
.time(1L, TimeUnit.NANOSECONDS) .time(1L, TimeUnit.NANOSECONDS)
.build(); .build();
client.write(point); client.write(point);
return TemplateChangeStatus.builder().status(true).build(); return TemplateChangeStatus.builder().status(true).build();
} else { } else {
...@@ -121,11 +121,11 @@ public class UITemplateManagementDAOImpl implements UITemplateManagementDAO { ...@@ -121,11 +121,11 @@ public class UITemplateManagementDAOImpl implements UITemplateManagementDAO {
.where(eq(InfluxConstants.TagName.ID_COLUMN, name)); .where(eq(InfluxConstants.TagName.ID_COLUMN, name));
QueryResult.Series series = client.queryForSingleSeries(query); QueryResult.Series series = client.queryForSingleSeries(query);
if (Objects.nonNull(series)) { if (Objects.nonNull(series)) {
Point point = Point.measurement(UITemplate.INDEX_NAME) final Point point = Point.measurement(UITemplate.INDEX_NAME)
.tag(InfluxConstants.TagName.ID_COLUMN, name) .tag(InfluxConstants.TagName.ID_COLUMN, name)
.addField(UITemplate.DISABLED, BooleanUtils.TRUE) .addField(UITemplate.DISABLED, BooleanUtils.TRUE)
.time(1L, TimeUnit.NANOSECONDS) .time(1L, TimeUnit.NANOSECONDS)
.build(); .build();
client.write(point); client.write(point);
return TemplateChangeStatus.builder().status(true).build(); return TemplateChangeStatus.builder().status(true).build();
} else { } else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册