未验证 提交 28530cd7 编写于 作者: D Daming 提交者: GitHub

Upgrade the InfluxDB storage-plugin to protocol V3 (#4641)

Co-authored-by: wu-sheng's avatar吴晟 Wu Sheng <wu.sheng@foxmail.com>
Co-authored-by: Nkezhenxu94 <kezhenxu94@163.com>
Co-authored-by: Nkezhenxu94 <kezhenxu94@apache.org>
上级 21bb638d
......@@ -37,7 +37,7 @@ jobs:
strategy:
matrix:
coordinator: ['zk']
storage: ['mysql', 'es6', 'es7'] #TODO: 'influxdb'
storage: ['mysql', 'es6', 'es7', 'influxdb']
env:
SW_COORDINATOR: ${{ matrix.coordinator }}
SW_STORAGE: ${{ matrix.storage }}
......
......@@ -36,7 +36,7 @@ jobs:
timeout-minutes: 90
strategy:
matrix:
storage: ['h2', 'mysql', 'es6', 'es7'] #TODO: 'influxdb'
storage: ['h2', 'mysql', 'es6', 'es7', 'influxdb']
env:
SW_STORAGE: ${{ matrix.storage }}
steps:
......
......@@ -36,7 +36,7 @@ jobs:
timeout-minutes: 90
strategy:
matrix:
storage: ['mysql', 'es6', 'es7'] #TODO: 'influxdb'
storage: ['mysql', 'es6', 'es7', 'influxdb']
env:
SW_STORAGE: ${{ matrix.storage }}
steps:
......
......@@ -36,7 +36,7 @@ jobs:
timeout-minutes: 90
strategy:
matrix:
storage: ['es6', 'es7'] #TODO: 'influxdb'
storage: ['es6', 'es7', 'influxdb']
env:
SW_STORAGE: ${{ matrix.storage }}
steps:
......
......@@ -46,7 +46,6 @@ jobs:
- { name: 'kotlin-coroutine-scenario', title: 'Kotlin Coroutine 1.0.1-1.3.3 (4)' }
- { name: 'lettuce-scenario', title: 'Lettuce 5.x (17)' }
- { name: 'mongodb-3.x-scenario', title: 'Mongodb 3.4.0-3.11.1 (22)' }
- { name: 'mysql-scenario', title: 'MySQL 5.1.2-8.0.15 (53)' }
- { name: 'netty-socketio-scenario', title: 'Netty-SocketIO 1.x (4)' }
steps:
- uses: actions/checkout@v2
......
......@@ -33,6 +33,7 @@ jobs:
fail-fast: true
matrix:
case:
- { name: 'mysql-scenario', title: 'MySQL 5.1.2-8.0.15 (30)' }
- { name: 'undertow-scenario', title: 'Undertow 1.3.0-2.0.27 (23)' }
- { name: 'webflux-scenario', title: 'Spring-WebFlux 2.x (7)' }
- { name: 'zookeeper-scenario', title: 'Zookeeper 3.4.x (14)' }
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.storage.plugin.influxdb;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;
......@@ -73,6 +74,7 @@ public class InfluxClient implements Client {
InfluxDB.ResponseFormat.MSGPACK
);
influx.query(new Query("CREATE DATABASE " + database));
influx.enableGzip();
influx.enableBatch(config.getActions(), config.getDuration(), TimeUnit.MILLISECONDS);
influx.setDatabase(database);
......@@ -99,7 +101,7 @@ public class InfluxClient implements Client {
}
try {
QueryResult result = getInflux().query(query);
QueryResult result = getInflux().query(new Query(query.getCommand()));
if (result.hasError()) {
throw new IOException(result.getError());
}
......@@ -136,6 +138,22 @@ public class InfluxClient implements Client {
return series.get(0);
}
/**
* Execute a query against InfluxDB with a `select count(*)` statement and return the count only.
*
* @throws IOException if there is an error on the InfluxDB server or communication error
*/
public int getCounter(Query query) throws IOException {
QueryResult.Series series = queryForSingleSeries(query);
if (log.isDebugEnabled()) {
log.debug("SQL: {} result: {}", query.getCommand(), series);
}
if (Objects.isNull(series)) {
return 0;
}
return ((Number) series.getValues().get(0).get(1)).intValue();
}
/**
* Data management, to drop a time-series by measurement and time-series name specified. If an exception isn't
* thrown, it means execution success. Notice, drop series don't support to drop series by range
......
......@@ -18,9 +18,31 @@
package org.apache.skywalking.oap.server.storage.plugin.influxdb;
public interface InfluxModelConstants {
/**
* Override column because the 'duration' is the identifier of InfluxDB.
*/
String DURATION = "dur";
public interface InfluxConstants {
String ID_COLUMN = "id";
String NAME = "\"name\"";
String ALL_FIELDS = "*::field";
String SORT_DES = "top";
String SORT_ASC = "bottom";
String DURATION = "\"" + "duration" + "\"";
interface TagName {
String ID_COLUMN = "_id";
String NAME = "_name";
String ENTITY_ID = "_entity_id";
String TIME_BUCKET = "_time_bucket";
String NODE_TYPE = "_node_type";
String SERVICE_ID = "_service_id";
}
}
......@@ -23,6 +23,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
......@@ -46,10 +47,10 @@ import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.HistoryDele
import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.InfluxStorageDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.AggregationQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.AlarmQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.InfluxMetadataQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.InfluxNetworkAddressAlias;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.LogQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.MetadataQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.MetricsQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.NetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ProfileTaskLogQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ProfileTaskQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ProfileThreadSnapshotQuery;
......@@ -60,7 +61,7 @@ import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.TraceQuery
@Slf4j
public class InfluxStorageProvider extends ModuleProvider {
private InfluxStorageConfig config;
private InfluxClient influxClient;
private InfluxClient client;
public InfluxStorageProvider() {
config = new InfluxStorageConfig();
......@@ -83,35 +84,42 @@ public class InfluxStorageProvider extends ModuleProvider {
@Override
public void prepare() throws ServiceNotProvidedException {
influxClient = new InfluxClient(config);
client = new InfluxClient(config);
this.registerServiceImplementation(IBatchDAO.class, new BatchDAO(influxClient));
this.registerServiceImplementation(StorageDAO.class, new InfluxStorageDAO(influxClient));
this.registerServiceImplementation(IBatchDAO.class, new BatchDAO(client));
this.registerServiceImplementation(StorageDAO.class, new InfluxStorageDAO(client));
this.registerServiceImplementation(INetworkAddressAliasDAO.class, new InfluxNetworkAddressAlias(influxClient));
this.registerServiceImplementation(IMetadataQueryDAO.class, new InfluxMetadataQueryDAO(influxClient));
this.registerServiceImplementation(INetworkAddressAliasDAO.class, new NetworkAddressAliasDAO(client));
this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQuery(client));
this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQuery(influxClient));
this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQuery(influxClient));
this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQuery(influxClient));
this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQuery(influxClient));
this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQuery(influxClient));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQuery(influxClient));
this.registerServiceImplementation(ILogQueryDAO.class, new LogQuery(influxClient));
this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQuery(client));
this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQuery(client));
this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQuery(client));
this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQuery(client));
this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQuery(client));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQuery(client));
this.registerServiceImplementation(ILogQueryDAO.class, new LogQuery(client));
this.registerServiceImplementation(IProfileTaskQueryDAO.class, new ProfileTaskQuery(influxClient));
this.registerServiceImplementation(IProfileTaskQueryDAO.class, new ProfileTaskQuery(client));
this.registerServiceImplementation(
IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQuery(influxClient));
IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQuery(client));
this.registerServiceImplementation(
IProfileTaskLogQueryDAO.class, new ProfileTaskLogQuery(influxClient, config.getFetchTaskLogMaxSize()));
IProfileTaskLogQueryDAO.class, new ProfileTaskLogQuery(client, config.getFetchTaskLogMaxSize()));
this.registerServiceImplementation(
IHistoryDeleteDAO.class, new HistoryDeleteDAO(influxClient));
IHistoryDeleteDAO.class, new HistoryDeleteDAO(client));
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
influxClient.connect();
client.connect();
InfluxTableInstaller installer = new InfluxTableInstaller(getManager());
try {
installer.install(client);
} catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
@Override
......
......@@ -16,22 +16,28 @@
*
*/
package org.apache.skywalking.oap.server.storage.plugin.influxdb.query;
package org.apache.skywalking.oap.server.storage.plugin.influxdb;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
public class InfluxNetworkAddressAlias implements INetworkAddressAliasDAO {
private InfluxClient client;
public class InfluxTableInstaller extends ModelInstaller {
public InfluxNetworkAddressAlias(final InfluxClient client) {
this.client = client;
public InfluxTableInstaller(ModuleManager moduleManager) {
super(moduleManager);
}
@Override
public List<NetworkAddressAlias> loadLastUpdate(final long timeBucket) {
return null;
protected boolean isExists(final Client client, final Model model) throws StorageException {
TableMetaInfo.addModel(model);
return true;
}
@Override
protected void createTable(final Client client, final Model model) throws StorageException {
// Automatically create table
}
}
......@@ -18,18 +18,80 @@
package org.apache.skywalking.oap.server.storage.plugin.influxdb;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.model.ColumnName;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
@Getter
@Builder
@AllArgsConstructor
public class TableMetaInfo {
private static Map<String, Model> TABLES = new HashMap<>();
private static final Map<String, TableMetaInfo> TABLES = new HashMap<>();
private Map<String, String> storageAndColumnMap;
private Map<String, String> storageAndTagMap;
private Model model;
public static void addModel(Model model) {
TABLES.put(model.getName(), model);
final List<ModelColumn> columns = model.getColumns();
final Map<String, String> storageAndTagMap = Maps.newHashMap();
final Map<String, String> storageAndColumnMap = Maps.newHashMap();
columns.forEach(column -> {
ColumnName columnName = column.getColumnName();
storageAndColumnMap.put(columnName.getStorageName(), columnName.getName());
});
if (model.getName().endsWith("_traffic")) {
// instance_traffic name, service_id
// endpoint_traffic name, service_id
storageAndTagMap.put(InstanceTraffic.NAME, InfluxConstants.TagName.NAME);
if (InstanceTraffic.INDEX_NAME.equals(model.getName())
|| EndpointTraffic.INDEX_NAME.equals(model.getName())) {
storageAndTagMap.put(EndpointTraffic.SERVICE_ID, InfluxConstants.TagName.SERVICE_ID);
} else {
// service_traffic name, node_type
storageAndTagMap.put(ServiceTraffic.NODE_TYPE, InfluxConstants.TagName.NODE_TYPE);
}
} else {
// Specifies ENTITY_ID, TIME_BUCKET, NODE_TYPE, SERVICE_ID as tag
if (storageAndColumnMap.containsKey(Metrics.ENTITY_ID)) {
storageAndTagMap.put(Metrics.ENTITY_ID, InfluxConstants.TagName.ENTITY_ID);
}
if (storageAndColumnMap.containsKey(Record.TIME_BUCKET)) {
storageAndTagMap.put(Record.TIME_BUCKET, InfluxConstants.TagName.TIME_BUCKET);
}
if (storageAndColumnMap.containsKey(ServiceTraffic.NODE_TYPE)) {
storageAndTagMap.put(ServiceTraffic.NODE_TYPE, InfluxConstants.TagName.NODE_TYPE);
}
if (storageAndColumnMap.containsKey(SegmentRecord.SERVICE_ID)) {
storageAndTagMap.put(SegmentRecord.SERVICE_ID, InfluxConstants.TagName.SERVICE_ID);
}
}
TableMetaInfo info = TableMetaInfo.builder()
.model(model)
.storageAndTagMap(storageAndTagMap)
.storageAndColumnMap(storageAndColumnMap)
.build();
TABLES.put(model.getName(), info);
}
public static Model get(String moduleName) {
public static TableMetaInfo get(String moduleName) {
return TABLES.get(moduleName);
}
}
......@@ -21,7 +21,6 @@ package org.apache.skywalking.oap.server.storage.plugin.influxdb.base;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.model.Model;
......@@ -29,15 +28,13 @@ import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.influxdb.dto.Point;
/**
* InfluxDB Point wrapper.
*/
public class InfluxInsertRequest implements InsertRequest, UpdateRequest {
public static final String ID = "id";
private Point.Builder builder;
private Map<String, Object> fields = Maps.newHashMap();
......@@ -57,9 +54,8 @@ public class InfluxInsertRequest implements InsertRequest, UpdateRequest {
}
}
builder = Point.measurement(model.getName())
.addField(ID, storageData.id())
.fields(fields)
.tag(InfluxClient.TAG_TIME_BUCKET, String.valueOf(fields.get(Metrics.TIME_BUCKET)));
.addField(InfluxConstants.ID_COLUMN, storageData.id())
.fields(fields);
}
public InfluxInsertRequest time(long time, TimeUnit unit) {
......@@ -68,9 +64,7 @@ public class InfluxInsertRequest implements InsertRequest, UpdateRequest {
}
public InfluxInsertRequest addFieldAsTag(String fieldName, String tagName) {
if (fields.containsKey(fieldName)) {
builder.tag(tagName, String.valueOf(fields.get(fieldName)));
}
builder.tag(tagName, String.valueOf(fields.get(fieldName)));
return this;
}
......
......@@ -26,30 +26,27 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.WhereQueryImpl;
import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.ALL_FIELDS;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
@Slf4j
public class MetricsDAO implements IMetricsDAO {
public static final String TAG_ENTITY_ID = "_entity_id";
public static final String TAG_ENDPOINT_OWNER_SERVICE = "_service_id";
public static final String TAG_ENDPOINT_NAME = "_endpoint_name";
private final StorageBuilder<Metrics> storageBuilder;
private final InfluxClient client;
......@@ -62,10 +59,13 @@ public class MetricsDAO implements IMetricsDAO {
@Override
public List<Metrics> multiGet(Model model, List<String> ids) throws IOException {
WhereQueryImpl<SelectQueryImpl> query = select()
.regex("*::field")
.raw(ALL_FIELDS)
.from(client.getDatabase(), model.getName())
.where(contains("id", Joiner.on("|").join(ids)));
QueryResult.Series series = client.queryForSingleSeries(query);
if (log.isDebugEnabled()) {
log.debug("SQL: {} result: {}", query.getCommand(), series);
}
if (series == null) {
return Collections.emptyList();
......@@ -73,10 +73,9 @@ public class MetricsDAO implements IMetricsDAO {
final List<Metrics> metrics = Lists.newArrayList();
List<String> columns = series.getColumns();
Map<String, String> storageAndColumnNames = Maps.newHashMap();
for (ModelColumn column : model.getColumns()) {
storageAndColumnNames.put(column.getColumnName().getStorageName(), column.getColumnName().getName());
}
TableMetaInfo metaInfo = TableMetaInfo.get(model.getName());
Map<String, String> storageAndColumnMap = metaInfo.getStorageAndColumnMap();
series.getValues().forEach(values -> {
Map<String, Object> data = Maps.newHashMap();
......@@ -87,7 +86,7 @@ public class MetricsDAO implements IMetricsDAO {
value = ((StorageDataComplexObject) value).toStorageData();
}
data.put(storageAndColumnNames.get(columns.get(i)), value);
data.put(storageAndColumnMap.get(columns.get(i)), value);
}
metrics.add(storageBuilder.map2Data(data));
......@@ -99,16 +98,15 @@ public class MetricsDAO implements IMetricsDAO {
@Override
public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
final long timestamp = TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling());
if (metrics instanceof EndpointTraffic || metrics instanceof ServiceTraffic || metrics instanceof InstanceTraffic) {
return new InfluxInsertRequest(model, metrics, storageBuilder)
.time(timestamp, TimeUnit.MILLISECONDS)
.addFieldAsTag(EndpointTraffic.SERVICE_ID, TAG_ENDPOINT_OWNER_SERVICE)
.addFieldAsTag(EndpointTraffic.NAME, TAG_ENDPOINT_NAME);
} else {
return new InfluxInsertRequest(model, metrics, storageBuilder)
.time(timestamp, TimeUnit.MILLISECONDS)
.addFieldAsTag(Metrics.ENTITY_ID, TAG_ENTITY_ID);
}
TableMetaInfo tableMetaInfo = TableMetaInfo.get(model.getName());
final InfluxInsertRequest request = new InfluxInsertRequest(model, metrics, storageBuilder)
.time(timestamp, TimeUnit.MILLISECONDS);
tableMetaInfo.getStorageAndTagMap().forEach((field, tag) -> {
request.addFieldAsTag(field, tag);
});
return request;
}
@Override
......
......@@ -23,15 +23,13 @@ import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.influxdb.dto.Point;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo;
public class NoneStreamDAO implements INoneStreamDAO {
public static final String TAG_SERVICE_ID = "_service_id";
private static final int PADDING_SIZE = 1_000_000;
private static final AtomicRangeInteger SUFFIX = new AtomicRangeInteger(0, PADDING_SIZE);
......@@ -45,13 +43,14 @@ public class NoneStreamDAO implements INoneStreamDAO {
@Override
public void insert(final Model model, final NoneStream noneStream) throws IOException {
final long timestamp = TimeBucket.getTimestamp(
noneStream.getTimeBucket(), model.getDownsampling()) * PADDING_SIZE + SUFFIX.getAndIncrement();
Point point = new InfluxInsertRequest(model, noneStream, storageBuilder)
.time(timestamp, TimeUnit.NANOSECONDS)
.addFieldAsTag(ProfileTaskRecord.SERVICE_ID, TAG_SERVICE_ID).getPoint();
client.write(point);
final long timestamp = TimeBucket.getTimestamp(noneStream.getTimeBucket(), model.getDownsampling())
* PADDING_SIZE + SUFFIX.getAndIncrement();
final InfluxInsertRequest request = new InfluxInsertRequest(model, noneStream, storageBuilder)
.time(timestamp, TimeUnit.NANOSECONDS);
TableMetaInfo.get(model.getName()).getStorageAndTagMap().forEach((field, tag) -> {
request.addFieldAsTag(field, tag);
});
client.write(request.getPoint());
}
}
......@@ -22,16 +22,15 @@ import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo;
public class RecordDAO implements IRecordDAO {
public static final String TAG_SERVICE_ID = "_service_id";
private static final int PADDING_SIZE = 1_000_000;
private static final AtomicRangeInteger SUFFIX = new AtomicRangeInteger(0, PADDING_SIZE);
......@@ -45,11 +44,14 @@ public class RecordDAO implements IRecordDAO {
@Override
public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
final long timestamp = TimeBucket.getTimestamp(
record.getTimeBucket(), model.getDownsampling()) * PADDING_SIZE + SUFFIX.getAndIncrement();
final long timestamp = TimeBucket.getTimestamp(record.getTimeBucket(), model.getDownsampling())
* PADDING_SIZE + SUFFIX.getAndIncrement();
return new InfluxInsertRequest(model, record, storageBuilder)
.time(timestamp, TimeUnit.NANOSECONDS)
.addFieldAsTag(SegmentRecord.SERVICE_ID, TAG_SERVICE_ID);
final InfluxInsertRequest request = new InfluxInsertRequest(model, record, storageBuilder)
.time(timestamp, TimeUnit.NANOSECONDS);
TableMetaInfo.get(model.getName()).getStorageAndTagMap().forEach((field, tag) -> {
request.addFieldAsTag(field, tag);
});
return request;
}
}
......@@ -25,13 +25,11 @@ import java.util.Comparator;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
import org.apache.skywalking.oap.server.core.query.entity.Order;
import org.apache.skywalking.oap.server.core.query.entity.TopNEntity;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.MetricsDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.SelectSubQueryImpl;
......@@ -68,7 +66,7 @@ public class AggregationQuery implements IAggregationQueryDAO {
long startTB, long endTB, Order order) throws IOException {
return getTopNEntity(
downsampling, indName,
subQuery(InstanceTraffic.SERVICE_ID, serviceId, indName, valueCName, startTB, endTB), order, topN
subQuery(InfluxConstants.TagName.SERVICE_ID, serviceId, indName, valueCName, startTB, endTB), order, topN
);
}
......@@ -84,7 +82,7 @@ public class AggregationQuery implements IAggregationQueryDAO {
long startTB, long endTB, Order order) throws IOException {
return getTopNEntity(
downsampling, indName,
subQuery(EndpointTraffic.SERVICE_ID, serviceId, indName, valueCName, startTB, endTB), order, topN
subQuery(InfluxConstants.TagName.SERVICE_ID, serviceId, indName, valueCName, startTB, endTB), order, topN
);
}
......@@ -95,14 +93,14 @@ public class AggregationQuery implements IAggregationQueryDAO {
int topN) throws IOException {
// Have to re-sort here. Because the function, top()/bottom(), get the result ordered by the `time`.
Comparator<TopNEntity> comparator = DESCENDING;
String functionName = "top";
String functionName = InfluxConstants.SORT_DES;
if (order == Order.ASC) {
functionName = "bottom";
functionName = InfluxConstants.SORT_ASC;
comparator = ASCENDING;
}
SelectQueryImpl query = select().function(functionName, "mean", topN).as("value")
.column(MetricsDAO.TAG_ENTITY_ID)
.column(InfluxConstants.TagName.ENTITY_ID)
.from(client.getDatabase(), measurement);
query.setSubQuery(subQuery);
......@@ -135,7 +133,7 @@ public class AggregationQuery implements IAggregationQueryDAO {
.and(eq(serviceColumnName, serviceId))
.and(gte(InfluxClient.TIME, InfluxClient.timeInterval(startTB)))
.and(lte(InfluxClient.TIME, InfluxClient.timeInterval(endTB)))
.groupBy(MetricsDAO.TAG_ENTITY_ID);
.groupBy(InfluxConstants.TagName.ENTITY_ID);
}
private SelectSubQueryImpl<SelectQueryImpl> subQuery(String name, String columnName, long startTB, long endTB) {
......@@ -143,7 +141,7 @@ public class AggregationQuery implements IAggregationQueryDAO {
.where()
.and(gte(InfluxClient.TIME, InfluxClient.timeInterval(startTB)))
.and(lte(InfluxClient.TIME, InfluxClient.timeInterval(endTB)))
.groupBy(MetricsDAO.TAG_ENTITY_ID);
.groupBy(InfluxConstants.TagName.ENTITY_ID);
}
private static final Comparator<TopNEntity> ASCENDING = Comparator.comparingLong(TopNEntity::getValue);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.influxdb.query;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
import org.apache.skywalking.oap.server.core.query.entity.Database;
import org.apache.skywalking.oap.server.core.query.entity.Endpoint;
import org.apache.skywalking.oap.server.core.query.entity.Service;
import org.apache.skywalking.oap.server.core.query.entity.ServiceInstance;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.MetricsDAO;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.WhereQueryImpl;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
public class InfluxMetadataQueryDAO implements IMetadataQueryDAO {
private InfluxClient client;
// 'name' is InfluxDB keyword, so escapes it
private static final String ENDPOINT_NAME = '\"' + EndpointTraffic.NAME + '\"';
public InfluxMetadataQueryDAO(final InfluxClient client) {
this.client = client;
}
@Override
public int numOfService(final long startTimestamp, final long endTimestamp) throws IOException {
return 0;
}
@Override
public int numOfEndpoint() throws IOException {
final SelectQueryImpl selectQuery = select()
.count(EndpointTraffic.ENTITY_ID)
.from(client.getDatabase(), EndpointTraffic.INDEX_NAME);
Query query = new Query(selectQuery.getCommand());
final QueryResult.Series series = client.queryForSingleSeries(query);
if (series == null) {
return 0;
}
return ((Number) series.getValues().get(0).get(1)).intValue();
}
@Override
public int numOfConjectural(final int nodeTypeValue) throws IOException {
return 0;
}
@Override
public List<Service> getAllServices(final long startTimestamp, final long endTimestamp) throws IOException {
return null;
}
@Override
public List<Service> getAllBrowserServices(final long startTimestamp, final long endTimestamp) throws IOException {
return null;
}
@Override
public List<Database> getAllDatabases() throws IOException {
return null;
}
@Override
public List<Service> searchServices(final long startTimestamp,
final long endTimestamp,
final String keyword) throws IOException {
return null;
}
@Override
public Service searchService(final String serviceCode) throws IOException {
return null;
}
@Override
public List<Endpoint> searchEndpoint(final String keyword,
final String serviceId,
final int limit) throws IOException {
WhereQueryImpl<SelectQueryImpl> endpointQuery = select()
.column(EndpointTraffic.SERVICE_ID)
.column(ENDPOINT_NAME)
.from(client.getDatabase(), EndpointTraffic.INDEX_NAME)
.where();
endpointQuery.where(eq(MetricsDAO.TAG_ENDPOINT_OWNER_SERVICE, String.valueOf(serviceId)));
if (!Strings.isNullOrEmpty(keyword)) {
endpointQuery.where(contains(MetricsDAO.TAG_ENDPOINT_NAME, keyword.replaceAll("/", "\\\\/")));
}
endpointQuery.limit(limit);
Query query = new Query(endpointQuery.getCommand());
final QueryResult.Series series = client.queryForSingleSeries(query);
List<Endpoint> list = new ArrayList<>(limit);
if (series != null) {
series.getValues().forEach(values -> {
EndpointTraffic endpointTraffic = new EndpointTraffic();
endpointTraffic.setServiceId((String) values.get(1));
endpointTraffic.setName((String) values.get(2));
Endpoint endpoint = new Endpoint();
endpoint.setId(IDManager.EndpointID.buildId(endpointTraffic.getServiceId(), endpointTraffic.getName()));
endpoint.setName(endpointTraffic.getName());
list.add(endpoint);
});
}
return list;
}
@Override
public List<ServiceInstance> getServiceInstances(final long startTimestamp,
final long endTimestamp,
final String serviceId) throws IOException {
return null;
}
}
......@@ -32,8 +32,9 @@ import org.apache.skywalking.oap.server.core.query.entity.LogState;
import org.apache.skywalking.oap.server.core.query.entity.Logs;
import org.apache.skywalking.oap.server.core.query.entity.Pagination;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.RecordDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.elasticsearch.common.Strings;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
......@@ -51,6 +52,7 @@ import static org.apache.skywalking.oap.server.core.analysis.manual.log.Abstract
import static org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord.STATUS_CODE;
import static org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord.TIMESTAMP;
import static org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord.TRACE_ID;
import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.ALL_FIELDS;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.lte;
......@@ -68,11 +70,11 @@ public class LogQuery implements ILogQueryDAO {
public Logs queryLogs(String metricName, int serviceId, int serviceInstanceId, String endpointId, String traceId,
LogState state, String stateCode, Pagination paging, int from, int limit,
long startTB, long endTB) throws IOException {
WhereQueryImpl<SelectQueryImpl> recallQuery = select().regex("*::field")
WhereQueryImpl<SelectQueryImpl> recallQuery = select().raw(ALL_FIELDS)
.from(client.getDatabase(), metricName)
.where();
if (serviceId != Const.NONE) {
recallQuery.and(eq(RecordDAO.TAG_SERVICE_ID, String.valueOf(serviceId)));
recallQuery.and(eq(InfluxConstants.TagName.SERVICE_ID, String.valueOf(serviceId)));
}
if (serviceInstanceId != Const.NONE) {
recallQuery.and(eq(SERVICE_INSTANCE_ID, serviceInstanceId));
......@@ -131,8 +133,12 @@ public class LogQuery implements ILogQueryDAO {
Map<String, Object> data = Maps.newHashMap();
Log log = new Log();
for (int i = 0; i < columns.size(); i++) {
data.put(columns.get(i), values.get(i));
for (int i = 1; i < columns.size(); i++) {
Object value = values.get(i);
if (value instanceof StorageDataComplexObject) {
value = ((StorageDataComplexObject) value).toStorageData();
}
data.put(columns.get(i), value);
}
log.setContent((String) data.get(CONTENT));
log.setContentType(ContentType.instanceOf((int) data.get(CONTENT_TYPE)));
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.influxdb.query;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.NodeType;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
import org.apache.skywalking.oap.server.core.query.entity.Attribute;
import org.apache.skywalking.oap.server.core.query.entity.Database;
import org.apache.skywalking.oap.server.core.query.entity.Endpoint;
import org.apache.skywalking.oap.server.core.query.entity.Language;
import org.apache.skywalking.oap.server.core.query.entity.LanguageTrans;
import org.apache.skywalking.oap.server.core.query.entity.Service;
import org.apache.skywalking.oap.server.core.query.entity.ServiceInstance;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.SelectSubQueryImpl;
import org.influxdb.querybuilder.WhereQueryImpl;
import org.influxdb.querybuilder.WhereSubQueryImpl;
import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.ID_COLUMN;
import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.NAME;
import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.TagName;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
@Slf4j
public class MetadataQuery implements IMetadataQueryDAO {
private static final Gson GSON = new Gson();
private final InfluxClient client;
public MetadataQuery(final InfluxClient client) {
this.client = client;
}
@Override
public int numOfService(final long startTimestamp, final long endTimestamp) throws IOException {
WhereQueryImpl query = select().raw("count(distinct " + ID_COLUMN + ")")
.from(client.getDatabase(), ServiceTraffic.INDEX_NAME)
.where()
.and(
eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value())
));
return client.getCounter(query);
}
@Override
public int numOfEndpoint() throws IOException {
SelectQueryImpl query = select()
.raw("count(distinct " + ID_COLUMN + ")")
.from(client.getDatabase(), EndpointTraffic.INDEX_NAME);
return client.getCounter(query);
}
@Override
public int numOfConjectural(final int nodeTypeValue) throws IOException {
WhereQueryImpl<SelectQueryImpl> query = select().raw("count(distinct " + ID_COLUMN + ")")
.from(client.getDatabase(), ServiceTraffic.INDEX_NAME)
.where(eq(
InfluxConstants.TagName.NODE_TYPE,
String.valueOf(nodeTypeValue)
));
return client.getCounter(query);
}
@Override
public List<Service> getAllServices(final long startTimestamp, final long endTimestamp) throws IOException {
SelectSubQueryImpl<SelectQueryImpl> subQuery = select()
.fromSubQuery(client.getDatabase())
.column(ID_COLUMN).column(NAME)
.from(ServiceTraffic.INDEX_NAME)
.where(eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value())))
.groupBy(TagName.NAME, TagName.NODE_TYPE);
SelectQueryImpl query = select(ID_COLUMN, NAME).from(client.getDatabase());
query.setSubQuery(subQuery);
return buildServices(query);
}
@Override
public List<Service> getAllBrowserServices(long startTimestamp, long endTimestamp) throws IOException {
WhereQueryImpl<SelectQueryImpl> query = select(ID_COLUMN, NAME)
.from(client.getDatabase(), ServiceTraffic.INDEX_NAME)
.where(eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value())));
return buildServices(query);
}
@Override
public List<Database> getAllDatabases() throws IOException {
SelectSubQueryImpl<SelectQueryImpl> subQuery = select()
.fromSubQuery(client.getDatabase())
.column(ID_COLUMN).column(NAME)
.from(ServiceTraffic.INDEX_NAME)
.where(eq(InfluxConstants.TagName.NODE_TYPE, NodeType.Database.value()))
.groupBy(TagName.NAME, TagName.NODE_TYPE);
SelectQueryImpl query = select(ID_COLUMN, NAME).from(client.getDatabase());
query.setSubQuery(subQuery);
QueryResult.Series series = client.queryForSingleSeries(query);
if (log.isDebugEnabled()) {
log.debug("SQL: {} result: {}", query.getCommand(), series);
}
List<Database> databases = Lists.newArrayList();
if (Objects.nonNull(series)) {
for (List<Object> values : series.getValues()) {
Database database = new Database();
database.setId((String) values.get(1));
database.setName((String) values.get(2));
databases.add(database);
}
}
return databases;
}
@Override
public List<Service> searchServices(long startTimestamp, long endTimestamp, String keyword) throws IOException {
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = select()
.fromSubQuery(client.getDatabase())
.column(ID_COLUMN)
.column(NAME)
.from(ServiceTraffic.INDEX_NAME)
.where(eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value())));
if (!Strings.isNullOrEmpty(keyword)) {
subQuery.and(contains(ServiceTraffic.NAME, keyword));
}
subQuery.groupBy(TagName.NAME, TagName.NODE_TYPE);
SelectQueryImpl query = select(ID_COLUMN, NAME).from(client.getDatabase());
query.setSubQuery(subQuery);
return buildServices(query);
}
@Override
public Service searchService(String serviceCode) throws IOException {
WhereQueryImpl<SelectQueryImpl> query = select(ID_COLUMN, NAME)
.from(client.getDatabase(), ServiceTraffic.INDEX_NAME)
.where(eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value())));
query.and(eq(ServiceTraffic.NODE_TYPE, serviceCode));
return buildServices(query).get(0);
}
@Override
public List<Endpoint> searchEndpoint(final String keyword,
final String serviceId,
final int limit) throws IOException {
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = select()
.fromSubQuery(client.getDatabase())
.column(ID_COLUMN)
.column(NAME)
.from(EndpointTraffic.INDEX_NAME)
.where(eq(InfluxConstants.TagName.SERVICE_ID, String.valueOf(serviceId)));
if (!Strings.isNullOrEmpty(keyword)) {
subQuery.where(contains(EndpointTraffic.NAME, keyword.replaceAll("/", "\\\\/")));
}
subQuery.groupBy(TagName.NAME, TagName.SERVICE_ID);
SelectQueryImpl query = select(ID_COLUMN, NAME)
.from(client.getDatabase());
query.setSubQuery(subQuery);
query.limit(limit);
final QueryResult.Series series = client.queryForSingleSeries(query);
if (log.isDebugEnabled()) {
log.debug("SQL: {} result: {}", query.getCommand(), series);
}
List<Endpoint> list = new ArrayList<>(limit);
if (series != null) {
series.getValues().forEach(values -> {
Endpoint endpoint = new Endpoint();
endpoint.setId((String) values.get(1));
endpoint.setName((String) values.get(2));
list.add(endpoint);
});
}
return list;
}
@Override
public List<ServiceInstance> getServiceInstances(final long startTimestamp,
final long endTimestamp,
final String serviceId) throws IOException {
final long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(startTimestamp);
SelectSubQueryImpl<SelectQueryImpl> subQuery = select()
.fromSubQuery(client.getDatabase())
.column(ID_COLUMN).column(NAME).column(InstanceTraffic.PROPERTIES)
.from(InstanceTraffic.INDEX_NAME)
.where()
.and(gte(InstanceTraffic.LAST_PING_TIME_BUCKET, minuteTimeBucket))
.and(eq(InfluxConstants.TagName.SERVICE_ID, serviceId))
.groupBy(TagName.NAME, TagName.SERVICE_ID);
SelectQueryImpl query = select().column(ID_COLUMN)
.column(NAME)
.column(InstanceTraffic.PROPERTIES)
.from(client.getDatabase(), InstanceTraffic.INDEX_NAME);
query.setSubQuery(subQuery);
QueryResult.Series series = client.queryForSingleSeries(query);
if (log.isDebugEnabled()) {
log.debug("SQL: {} result: {}", query.getCommand(), series);
}
if (Objects.isNull(series)) {
return Collections.EMPTY_LIST;
}
List<List<Object>> result = series.getValues();
List<ServiceInstance> instances = Lists.newArrayList();
for (List<Object> values : result) {
ServiceInstance serviceInstance = new ServiceInstance();
serviceInstance.setId((String) values.get(1));
serviceInstance.setName((String) values.get(2));
serviceInstance.setInstanceUUID(serviceInstance.getId());
String propertiesString = (String) values.get(3);
if (!Strings.isNullOrEmpty(propertiesString)) {
JsonObject properties = GSON.fromJson(propertiesString, JsonObject.class);
for (Map.Entry<String, JsonElement> property : properties.entrySet()) {
String key = property.getKey();
String value = property.getValue().getAsString();
if (key.equals(InstanceTraffic.PropertyUtil.LANGUAGE)) {
serviceInstance.setLanguage(LanguageTrans.INSTANCE.value(value));
} else {
serviceInstance.getAttributes().add(new Attribute(key, value));
}
}
} else {
serviceInstance.setLanguage(Language.UNKNOWN);
}
instances.add(serviceInstance);
}
return instances;
}
private List<Service> buildServices(Query query) throws IOException {
QueryResult.Series series = client.queryForSingleSeries(query);
if (log.isDebugEnabled()) {
log.debug("SQL: {} result: {}", query.getCommand(), series);
}
ArrayList<Service> services = Lists.newArrayList();
if (Objects.nonNull(series)) {
for (List<Object> values : series.getValues()) {
Service service = new Service();
service.setId((String) values.get(1));
service.setName((String) values.get(2));
services.add(service);
}
}
return services;
}
}
......@@ -38,14 +38,16 @@ import org.apache.skywalking.oap.server.core.query.sql.KeyValues;
import org.apache.skywalking.oap.server.core.query.sql.Where;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.MetricsDAO;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.SelectionQueryImpl;
import org.influxdb.querybuilder.WhereQueryImpl;
import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.ID_COLUMN;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
......@@ -74,7 +76,7 @@ public class MetricsQuery implements IMetricsQueryDAO {
WhereQueryImpl<SelectQueryImpl> queryWhereQuery = query.from(client.getDatabase(), measurement).where();
Map<String, Class<?>> columnTypes = Maps.newHashMap();
for (ModelColumn column : TableMetaInfo.get(measurement).getColumns()) {
for (ModelColumn column : TableMetaInfo.get(measurement).getModel().getColumns()) {
columnTypes.put(column.getColumnName().getStorageName(), column.getType());
}
......@@ -84,6 +86,7 @@ public class MetricsQuery implements IMetricsQueryDAO {
StringBuilder clauseBuilder = new StringBuilder();
for (KeyValues kv : whereKeyValues) {
final List<String> values = kv.getValues();
ids.addAll(values);
Class<?> type = columnTypes.get(kv.getKey());
if (values.size() == 1) {
......@@ -93,16 +96,15 @@ public class MetricsQuery implements IMetricsQueryDAO {
}
clauseBuilder.append(kv.getKey()).append("=").append(value).append(" OR ");
} else {
ids.addAll(values);
if (type == String.class) {
clauseBuilder.append(kv.getKey())
.append(" =~ /")
.append(Joiner.on("|").join(values))
.append("/ OR ");
continue;
}
for (String value : values) {
clauseBuilder.append(kv.getKey()).append(" = '").append(value).append("' OR ");
} else {
for (String value : values) {
clauseBuilder.append(kv.getKey()).append(" = '").append(value).append("' OR ");
}
}
}
}
......@@ -111,17 +113,17 @@ public class MetricsQuery implements IMetricsQueryDAO {
queryWhereQuery
.and(gte(InfluxClient.TIME, InfluxClient.timeInterval(startTB, downsampling)))
.and(lte(InfluxClient.TIME, InfluxClient.timeInterval(endTB, downsampling)))
.groupBy(MetricsDAO.TAG_ENTITY_ID);
.groupBy(InfluxConstants.TagName.ENTITY_ID);
IntValues intValues = new IntValues();
List<QueryResult.Series> seriesList = client.queryForSeries(queryWhereQuery);
if (log.isDebugEnabled()) {
log.debug("SQL: {} result set: {}", queryWhereQuery.getCommand(), seriesList);
}
if (!(seriesList == null || seriesList.isEmpty())) {
if (CollectionUtils.isNotEmpty(seriesList)) {
for (QueryResult.Series series : seriesList) {
KVInt kv = new KVInt();
kv.setId(series.getTags().get(MetricsDAO.TAG_ENTITY_ID));
kv.setId(series.getTags().get(InfluxConstants.TagName.ENTITY_ID));
Number value = (Number) series.getValues().get(0).get(1);
kv.setValue(value.longValue());
......@@ -139,16 +141,16 @@ public class MetricsQuery implements IMetricsQueryDAO {
String valueCName)
throws IOException {
WhereQueryImpl<SelectQueryImpl> query = select()
.column("id")
.column(ID_COLUMN)
.column(valueCName)
.from(client.getDatabase(), measurement)
.where();
if (ids != null && !ids.isEmpty()) {
if (CollectionUtils.isNotEmpty(ids)) {
if (ids.size() == 1) {
query.where(eq("id", ids.get(0)));
query.where(eq(ID_COLUMN, ids.get(0)));
} else {
query.where(contains("id", Joiner.on("|").join(ids)));
query.where(contains(ID_COLUMN, Joiner.on("|").join(ids)));
}
}
List<QueryResult.Series> seriesList = client.queryForSeries(query);
......@@ -157,7 +159,7 @@ public class MetricsQuery implements IMetricsQueryDAO {
}
IntValues intValues = new IntValues();
if (!(seriesList == null || seriesList.isEmpty())) {
if (CollectionUtils.isNotEmpty(seriesList)) {
seriesList.get(0).getValues().forEach(values -> {
KVInt kv = new KVInt();
kv.setValue(((Number) values.get(2)).longValue());
......@@ -197,7 +199,7 @@ public class MetricsQuery implements IMetricsQueryDAO {
.from(client.getDatabase(), measurement)
.where();
if (ids != null && !ids.isEmpty()) {
if (CollectionUtils.isNotEmpty(ids)) {
if (ids.size() == 1) {
query.where(eq("id", ids.get(0)));
} else {
......@@ -212,7 +214,7 @@ public class MetricsQuery implements IMetricsQueryDAO {
for (int i = 0; i < intValues.length; i++) {
intValues[i] = new IntValues();
}
if (series == null || series.isEmpty()) {
if (CollectionUtils.isEmpty(series)) {
return intValues;
}
series.get(0).getValues().forEach(values -> {
......@@ -253,9 +255,9 @@ public class MetricsQuery implements IMetricsQueryDAO {
.column(ThermodynamicMetrics.STEP)
.column(ThermodynamicMetrics.NUM_OF_STEPS)
.column(ThermodynamicMetrics.DETAIL_GROUP)
.column("id")
.column(ID_COLUMN)
.from(client.getDatabase(), measurement)
.where(contains("id", Joiner.on("|").join(ids)));
.where(contains(ID_COLUMN, Joiner.on("|").join(ids)));
Map<String, List<Long>> thermodynamicValueMatrix = new HashMap<>();
QueryResult.Series series = client.queryForSingleSeries(query);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.influxdb.query;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.WhereQueryImpl;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
@Slf4j
public class NetworkAddressAliasDAO implements INetworkAddressAliasDAO {
private final NetworkAddressAlias.Builder builder = new NetworkAddressAlias.Builder();
private InfluxClient client;
public NetworkAddressAliasDAO(final InfluxClient client) {
this.client = client;
}
@Override
public List<NetworkAddressAlias> loadLastUpdate(final long timeBucket) {
List<NetworkAddressAlias> networkAddressAliases = new ArrayList<>();
WhereQueryImpl<SelectQueryImpl> query = select().raw(InfluxConstants.ALL_FIELDS)
.from(client.getDatabase(), NetworkAddressAlias.INDEX_NAME)
.where(gte(
NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET,
timeBucket
));
try {
QueryResult.Series series = client.queryForSingleSeries(query);
if (log.isDebugEnabled()) {
log.debug("SQL: {} result: {}", query.getCommand(), series);
}
if (Objects.isNull(series)) {
return networkAddressAliases;
}
List<List<Object>> result = series.getValues();
List<String> columns = series.getColumns();
Map<String, String> columnAndFieldMap = TableMetaInfo.get(NetworkAddressAlias.INDEX_NAME)
.getStorageAndColumnMap();
for (List<Object> values : result) {
Map<String, Object> map = Maps.newHashMap();
for (int i = 1; i < columns.size(); i++) {
map.put(columnAndFieldMap.get(columns.get(i)), values.get(i));
}
networkAddressAliases.add(builder.map2Data(map));
}
} catch (IOException e) {
log.error(e.getMessage(), e);
}
return networkAddressAliases;
}
}
......@@ -19,17 +19,16 @@
package org.apache.skywalking.oap.server.storage.plugin.influxdb.query;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLog;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLogOperationType;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.WhereQueryImpl;
......@@ -49,8 +48,8 @@ public class ProfileTaskLogQuery implements IProfileTaskLogQueryDAO {
@Override
public List<ProfileTaskLog> getTaskLogList() throws IOException {
WhereQueryImpl<SelectQueryImpl> query = select()
.function("top", ProfileTaskLogRecord.OPERATION_TIME, fetchTaskLogMaxSize)
.column("id")
.function(InfluxConstants.SORT_DES, ProfileTaskLogRecord.OPERATION_TIME, fetchTaskLogMaxSize)
.column(InfluxConstants.ID_COLUMN)
.column(ProfileTaskLogRecord.TASK_ID)
.column(ProfileTaskLogRecord.INSTANCE_ID)
.column(ProfileTaskLogRecord.OPERATION_TIME)
......@@ -65,26 +64,18 @@ public class ProfileTaskLogQuery implements IProfileTaskLogQueryDAO {
if (series == null) {
return Collections.emptyList();
}
List<String> columns = series.getColumns();
Map<String, Integer> columnsMap = Maps.newHashMap();
for (int i = 0; i < columns.size(); i++) {
columnsMap.put(columns.get(i), i);
}
List<ProfileTaskLog> taskLogs = Lists.newArrayList();
final List<ProfileTaskLog> taskLogs = Lists.newArrayList();
series.getValues().stream()
// re-sort by self, because of the result order by time.
.sorted((a, b) -> Long.compare(((Number) b.get(1)).longValue(), ((Number) a.get(1)).longValue()))
.forEach(values -> {
taskLogs.add(ProfileTaskLog.builder()
.id((String) values.get(columnsMap.get("id")))
.taskId((String) values.get(columnsMap.get(ProfileTaskLogRecord.TASK_ID)))
.instanceId(
(String) values.get(columnsMap.get(ProfileTaskLogRecord.INSTANCE_ID)))
.operationTime(
(Long) values.get(columnsMap.get(ProfileTaskLogRecord.OPERATION_TIME)))
.id((String) values.get(2))
.taskId((String) values.get(3))
.instanceId((String) values.get(4))
.operationTime(((Number) values.get(5)).longValue())
.operationType(ProfileTaskLogOperationType.parse(
(int) values.get(columnsMap.get(ProfileTaskLogRecord.OPERATION_TYPE))))
((Number) values.get(6)).intValue()))
.build());
});
return taskLogs;
......
......@@ -22,13 +22,13 @@ import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxModelConstants;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.NoneStreamDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.WhereQueryImpl;
......@@ -38,8 +38,9 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.lte;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
@Slf4j
public class ProfileTaskQuery implements IProfileTaskQueryDAO {
private InfluxClient client;
private final InfluxClient client;
public ProfileTaskQuery(InfluxClient client) {
this.client = client;
......@@ -52,19 +53,22 @@ public class ProfileTaskQuery implements IProfileTaskQueryDAO {
final Long endTimeBucket,
final Integer limit) throws IOException {
WhereQueryImpl<SelectQueryImpl> query =
select("id", ProfileTaskRecord.SERVICE_ID,
ProfileTaskRecord.ENDPOINT_NAME, ProfileTaskRecord.START_TIME,
ProfileTaskRecord.CREATE_TIME,
InfluxModelConstants.DURATION,
ProfileTaskRecord.MIN_DURATION_THRESHOLD,
ProfileTaskRecord.DUMP_PERIOD,
ProfileTaskRecord.MAX_SAMPLING_COUNT
select(
InfluxConstants.ID_COLUMN,
ProfileTaskRecord.SERVICE_ID,
ProfileTaskRecord.ENDPOINT_NAME,
ProfileTaskRecord.START_TIME,
ProfileTaskRecord.CREATE_TIME,
InfluxConstants.DURATION,
ProfileTaskRecord.MIN_DURATION_THRESHOLD,
ProfileTaskRecord.DUMP_PERIOD,
ProfileTaskRecord.MAX_SAMPLING_COUNT
)
.from(client.getDatabase(), ProfileTaskRecord.INDEX_NAME)
.where();
if (StringUtil.isNotEmpty(serviceId)) {
query.and(eq(NoneStreamDAO.TAG_SERVICE_ID, serviceId));
query.and(eq(InfluxConstants.TagName.SERVICE_ID, serviceId));
}
if (StringUtil.isNotEmpty(endpointName)) {
query.and(eq(ProfileTaskRecord.ENDPOINT_NAME, endpointName));
......@@ -81,6 +85,9 @@ public class ProfileTaskQuery implements IProfileTaskQueryDAO {
List<ProfileTask> tasks = Lists.newArrayList();
QueryResult.Series series = client.queryForSingleSeries(query);
if (log.isDebugEnabled()) {
log.debug("SQL: {} result: {}", query.getCommand(), series);
}
if (series != null) {
series.getValues().forEach(values -> {
tasks.add(profileTaskBuilder(values));
......@@ -94,20 +101,26 @@ public class ProfileTaskQuery implements IProfileTaskQueryDAO {
if (StringUtil.isEmpty(id)) {
return null;
}
SelectQueryImpl query = select("id", ProfileTaskRecord.SERVICE_ID,
ProfileTaskRecord.ENDPOINT_NAME, ProfileTaskRecord.START_TIME,
ProfileTaskRecord.CREATE_TIME,
InfluxModelConstants.DURATION,
ProfileTaskRecord.MIN_DURATION_THRESHOLD,
ProfileTaskRecord.DUMP_PERIOD,
ProfileTaskRecord.MAX_SAMPLING_COUNT
SelectQueryImpl query = select(
InfluxConstants.ID_COLUMN,
ProfileTaskRecord.SERVICE_ID,
ProfileTaskRecord.ENDPOINT_NAME,
ProfileTaskRecord.START_TIME,
ProfileTaskRecord.CREATE_TIME,
InfluxConstants.DURATION,
ProfileTaskRecord.MIN_DURATION_THRESHOLD,
ProfileTaskRecord.DUMP_PERIOD,
ProfileTaskRecord.MAX_SAMPLING_COUNT
)
.from(client.getDatabase(), ProfileTaskRecord.INDEX_NAME)
.where()
.and(eq("id", id))
.and(eq(InfluxConstants.ID_COLUMN, id))
.limit(1);
QueryResult.Series series = client.queryForSingleSeries(query);
if (log.isDebugEnabled()) {
log.debug("SQL: {} result: {}", query.getCommand(), series);
}
if (Objects.nonNull(series)) {
return profileTaskBuilder(series.getValues().get(0));
}
......
......@@ -26,6 +26,8 @@ import java.util.Base64;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
......@@ -33,6 +35,7 @@ import org.apache.skywalking.oap.server.core.query.entity.BasicTrace;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.elasticsearch.common.Strings;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.WhereQueryImpl;
......@@ -43,6 +46,7 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.lte;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
@Slf4j
public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDAO {
private final InfluxClient client;
......@@ -60,7 +64,7 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
final LinkedList<String> segments = new LinkedList<>();
QueryResult.Series series = client.queryForSingleSeries(query);
if (series == null) {
if (Objects.isNull(series)) {
return Collections.emptyList();
}
series.getValues().forEach(values -> {
......@@ -72,7 +76,7 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
}
query = select()
.function("bottom", SegmentRecord.START_TIME, segments.size())
.function(InfluxConstants.SORT_ASC, SegmentRecord.START_TIME, segments.size())
.column(SegmentRecord.SEGMENT_ID)
.column(SegmentRecord.START_TIME)
.column(SegmentRecord.ENDPOINT_NAME)
......@@ -130,8 +134,15 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
.and(gte(ProfileThreadSnapshotRecord.SEQUENCE, minSequence))
.and(lte(ProfileThreadSnapshotRecord.SEQUENCE, maxSequence));
QueryResult.Series series = client.queryForSingleSeries(query);
if (log.isDebugEnabled()) {
log.debug("SQL: {} result: {}", query.getCommand(), series);
}
if (Objects.isNull(series)) {
return Collections.EMPTY_LIST;
}
ArrayList<ProfileThreadSnapshotRecord> result = new ArrayList<>(maxSequence - minSequence);
client.queryForSingleSeries(query).getValues().forEach(values -> {
series.getValues().forEach(values -> {
ProfileThreadSnapshotRecord record = new ProfileThreadSnapshotRecord();
record.setTaskId((String) values.get(1));
......@@ -165,7 +176,10 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
.where()
.and(eq(SegmentRecord.SEGMENT_ID, segmentId));
List<QueryResult.Series> series = client.queryForSeries(query);
if (series == null || series.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("SQL: {} result set: {}", query.getCommand(), series);
}
if (Objects.isNull(series) || series.isEmpty()) {
return null;
}
......@@ -198,10 +212,6 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
.and(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
.and(gte(ProfileThreadSnapshotRecord.DUMP_TIME, start))
.and(lte(ProfileThreadSnapshotRecord.DUMP_TIME, end));
QueryResult.Series series = client.queryForSingleSeries(query);
if (series == null) {
return -1;
}
return ((Number) series.getValues().get(0).get(1)).intValue();
return client.getCounter(query);
}
}
......@@ -30,7 +30,7 @@ import org.apache.skywalking.oap.server.core.query.entity.Order;
import org.apache.skywalking.oap.server.core.query.entity.TopNRecord;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.RecordDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.WhereQueryImpl;
......@@ -50,11 +50,11 @@ public class TopNRecordsQuery implements ITopNRecordsQueryDAO {
@Override
public List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName,
String serviceId, int topN, Order order) throws IOException {
String function = "bottom";
String function = InfluxConstants.SORT_ASC;
// Have to re-sort here. Because the function, top()/bottom(), get the result ordered by the `time`.
Comparator<TopNRecord> comparator = Comparator.comparingLong(TopNRecord::getLatency);
if (order.equals(Order.DES)) {
function = "top";
function = InfluxConstants.SORT_DES;
comparator = (a, b) -> Long.compare(b.getLatency(), a.getLatency());
}
......@@ -68,7 +68,7 @@ public class TopNRecordsQuery implements ITopNRecordsQueryDAO {
.and(lte(TopN.TIME_BUCKET, endSecondTB));
if (StringUtil.isNotEmpty(serviceId)) {
query.and(eq(RecordDAO.TAG_SERVICE_ID, serviceId));
query.and(eq(InfluxConstants.TagName.SERVICE_ID, serviceId));
}
QueryResult.Series series = client.queryForSingleSeries(query);
......
......@@ -29,14 +29,17 @@ import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.S
import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationServerSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.query.entity.Call;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.SelectSubQueryImpl;
import org.influxdb.querybuilder.WhereNested;
import org.influxdb.querybuilder.WhereQueryImpl;
import org.influxdb.querybuilder.WhereSubQueryImpl;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
......@@ -56,7 +59,7 @@ public class TopologyQuery implements ITopologyQueryDAO {
long endTB,
List<String> serviceIds) throws IOException {
String measurement = ServiceRelationServerSideMetrics.INDEX_NAME;
WhereQueryImpl query = buildServiceCallsQuery(
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery(
measurement,
startTB,
endTB,
......@@ -64,7 +67,8 @@ public class TopologyQuery implements ITopologyQueryDAO {
ServiceRelationServerSideMetrics.DEST_SERVICE_ID,
serviceIds
);
return buildServiceCalls(query, DetectPoint.SERVER);
return buildServiceCalls(buildQuery(subQuery), DetectPoint.SERVER);
}
@Override
......@@ -72,7 +76,7 @@ public class TopologyQuery implements ITopologyQueryDAO {
long endTB,
List<String> serviceIds) throws IOException {
String measurement = ServiceRelationClientSideMetrics.INDEX_NAME;
WhereQueryImpl query = buildServiceCallsQuery(
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery(
measurement,
startTB,
endTB,
......@@ -80,14 +84,14 @@ public class TopologyQuery implements ITopologyQueryDAO {
ServiceRelationServerSideMetrics.DEST_SERVICE_ID,
serviceIds
);
return buildServiceCalls(query, DetectPoint.CLIENT);
return buildServiceCalls(buildQuery(subQuery), DetectPoint.CLIENT);
}
@Override
public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(DownSampling downsampling, long startTB,
long endTB) throws IOException {
String measurement = ServiceRelationServerSideMetrics.INDEX_NAME;
WhereQueryImpl query = buildServiceCallsQuery(
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery(
measurement,
startTB,
endTB,
......@@ -95,14 +99,14 @@ public class TopologyQuery implements ITopologyQueryDAO {
ServiceRelationServerSideMetrics.DEST_SERVICE_ID,
new ArrayList<>(0)
);
return buildServiceCalls(query, DetectPoint.SERVER);
return buildServiceCalls(buildQuery(subQuery), DetectPoint.SERVER);
}
@Override
public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(DownSampling downsampling, long startTB,
long endTB) throws IOException {
String tableName = ServiceRelationClientSideMetrics.INDEX_NAME;
WhereQueryImpl query = buildServiceCallsQuery(
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery(
tableName,
startTB,
endTB,
......@@ -110,7 +114,7 @@ public class TopologyQuery implements ITopologyQueryDAO {
ServiceRelationServerSideMetrics.DEST_SERVICE_ID,
new ArrayList<>(0)
);
return buildServiceCalls(query, DetectPoint.CLIENT);
return buildServiceCalls(buildQuery(subQuery), DetectPoint.CLIENT);
}
@Override
......@@ -120,14 +124,15 @@ public class TopologyQuery implements ITopologyQueryDAO {
long startTB,
long endTB) throws IOException {
String measurement = ServiceInstanceRelationServerSideMetrics.INDEX_NAME;
WhereQueryImpl query = buildServiceInstanceCallsQuery(measurement,
startTB,
endTB,
ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID,
ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID,
clientServiceId, serverServiceId
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceInstanceCallsQuery(
measurement,
startTB,
endTB,
ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID,
ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID,
clientServiceId, serverServiceId
);
return buildInstanceCalls(query, DetectPoint.SERVER);
return buildInstanceCalls(buildQuery(subQuery), DetectPoint.SERVER);
}
@Override
......@@ -137,14 +142,15 @@ public class TopologyQuery implements ITopologyQueryDAO {
long startTB,
long endTB) throws IOException {
String measurement = ServiceInstanceRelationClientSideMetrics.INDEX_NAME;
WhereQueryImpl query = buildServiceInstanceCallsQuery(measurement,
startTB,
endTB,
ServiceInstanceRelationClientSideMetrics.SOURCE_SERVICE_ID,
ServiceInstanceRelationClientSideMetrics.DEST_SERVICE_ID,
clientServiceId, serverServiceId
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceInstanceCallsQuery(
measurement,
startTB,
endTB,
ServiceInstanceRelationClientSideMetrics.SOURCE_SERVICE_ID,
ServiceInstanceRelationClientSideMetrics.DEST_SERVICE_ID,
clientServiceId, serverServiceId
);
return buildInstanceCalls(query, DetectPoint.CLIENT);
return buildInstanceCalls(buildQuery(subQuery), DetectPoint.CLIENT);
}
@Override
......@@ -154,7 +160,7 @@ public class TopologyQuery implements ITopologyQueryDAO {
String destEndpointId) throws IOException {
String measurement = EndpointRelationServerSideMetrics.INDEX_NAME;
WhereQueryImpl query = buildServiceCallsQuery(
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery(
measurement,
startTB,
endTB,
......@@ -162,9 +168,9 @@ public class TopologyQuery implements ITopologyQueryDAO {
EndpointRelationServerSideMetrics.DEST_ENDPOINT,
Collections.emptyList()
);
query.and(eq(EndpointRelationServerSideMetrics.DEST_ENDPOINT, destEndpointId));
subQuery.and(eq(EndpointRelationServerSideMetrics.DEST_ENDPOINT, destEndpointId));
WhereQueryImpl query2 = buildServiceCallsQuery(
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery2 = buildServiceCallsQuery(
measurement,
startTB,
endTB,
......@@ -172,61 +178,73 @@ public class TopologyQuery implements ITopologyQueryDAO {
EndpointRelationServerSideMetrics.DEST_ENDPOINT,
Collections.emptyList()
);
query2.and(eq(EndpointRelationServerSideMetrics.SOURCE_ENDPOINT, destEndpointId));
subQuery2.and(eq(EndpointRelationServerSideMetrics.SOURCE_ENDPOINT, destEndpointId));
List<Call.CallDetail> calls = buildEndpointCalls(query, DetectPoint.SERVER);
calls.addAll(buildEndpointCalls(query2, DetectPoint.CLIENT));
List<Call.CallDetail> calls = buildEndpointCalls(buildQuery(subQuery), DetectPoint.SERVER);
calls.addAll(buildEndpointCalls(buildQuery(subQuery), DetectPoint.CLIENT));
return calls;
}
private WhereQueryImpl buildServiceCallsQuery(String measurement, long startTB, long endTB, String sourceCName,
String destCName, List<String> serviceIds) {
WhereQueryImpl query = select()
.function("distinct", Metrics.ENTITY_ID, ServiceRelationServerSideMetrics.COMPONENT_ID)
.from(client.getDatabase(), measurement)
private WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> buildServiceCallsQuery(
String measurement,
long startTB,
long endTB,
String sourceCName,
String destCName,
List<String> serviceIds) {
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = select()
.fromSubQuery(client.getDatabase())
.function("distinct", ServiceInstanceRelationServerSideMetrics.COMPONENT_ID)
.as(ServiceInstanceRelationClientSideMetrics.COMPONENT_ID)
.from(measurement)
.where()
.and(gte(InfluxClient.TIME, InfluxClient.timeInterval(startTB)))
.and(lte(InfluxClient.TIME, InfluxClient.timeInterval(endTB)));
if (!serviceIds.isEmpty()) {
WhereNested whereNested = query.andNested();
WhereNested whereNested = subQuery.andNested();
for (String id : serviceIds) {
whereNested.or(eq(sourceCName, id))
.or(eq(destCName, id));
}
whereNested.close();
}
return query;
return subQuery;
}
private WhereQueryImpl buildServiceInstanceCallsQuery(String measurement,
long startTB,
long endTB,
String sourceCName,
String destCName,
String sourceServiceId,
String destServiceId) {
WhereQueryImpl query = select()
.function("distinct", Metrics.ENTITY_ID, ServiceInstanceRelationServerSideMetrics.COMPONENT_ID)
.from(client.getDatabase(), measurement)
private WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> buildServiceInstanceCallsQuery(
String measurement,
long startTB,
long endTB,
String sourceCName,
String destCName,
String sourceServiceId,
String destServiceId) {
WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = select()
.fromSubQuery(client.getDatabase())
.function("distinct", ServiceInstanceRelationServerSideMetrics.COMPONENT_ID)
.as(ServiceInstanceRelationClientSideMetrics.COMPONENT_ID)
.from(measurement)
.where()
.and(gte(InfluxClient.TIME, InfluxClient.timeInterval(startTB)))
.and(lte(InfluxClient.TIME, InfluxClient.timeInterval(endTB)));
StringBuilder builder = new StringBuilder("((");
builder.append(sourceCName).append("=").append(sourceServiceId)
.append(" and ")
.append(destCName).append("=").append(destServiceId)
.append(") or (")
.append(sourceCName).append("=").append(destServiceId)
.append(") and (")
.append(destCName).append("=").append(sourceServiceId)
.append("))");
query.where(builder.toString());
return query;
builder.append(sourceCName).append("='").append(sourceServiceId)
.append("' and ")
.append(destCName).append("='").append(destServiceId)
.append("') or (")
.append(sourceCName).append("='").append(destServiceId)
.append("') and (")
.append(destCName).append("='").append(sourceServiceId)
.append("'))");
subQuery.where(builder.toString());
subQuery.groupBy(InfluxConstants.TagName.ENTITY_ID);
return subQuery;
}
private List<Call.CallDetail> buildServiceCalls(WhereQueryImpl query,
private List<Call.CallDetail> buildServiceCalls(Query query,
DetectPoint detectPoint) throws IOException {
QueryResult.Series series = client.queryForSingleSeries(query);
......@@ -240,7 +258,7 @@ public class TopologyQuery implements ITopologyQueryDAO {
List<Call.CallDetail> calls = new ArrayList<>();
series.getValues().forEach(values -> {
Call.CallDetail call = new Call.CallDetail();
String entityId = (String) values.get(1);
String entityId = String.valueOf(values.get(1));
int componentId = (int) values.get(2);
call.buildFromServiceRelation(entityId, componentId, detectPoint);
calls.add(call);
......@@ -248,7 +266,15 @@ public class TopologyQuery implements ITopologyQueryDAO {
return calls;
}
private List<Call.CallDetail> buildInstanceCalls(WhereQueryImpl query,
private Query buildQuery(WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery) {
SelectQueryImpl query = select().column(InfluxConstants.TagName.ENTITY_ID)
.column(ServiceInstanceRelationClientSideMetrics.COMPONENT_ID)
.from(client.getDatabase());
query.setSubQuery(subQuery.groupBy(InfluxConstants.TagName.ENTITY_ID));
return query;
}
private List<Call.CallDetail> buildInstanceCalls(Query query,
DetectPoint detectPoint) throws IOException {
QueryResult.Series series = client.queryForSingleSeries(query);
......@@ -270,7 +296,7 @@ public class TopologyQuery implements ITopologyQueryDAO {
return calls;
}
private List<Call.CallDetail> buildEndpointCalls(WhereQueryImpl query,
private List<Call.CallDetail> buildEndpointCalls(Query query,
DetectPoint detectPoint) throws IOException {
QueryResult.Series series = client.queryForSingleSeries(query);
......
......@@ -34,7 +34,7 @@ import org.apache.skywalking.oap.server.core.query.entity.TraceState;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.RecordDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.elasticsearch.common.Strings;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
......@@ -78,7 +78,7 @@ public class TraceQuery implements ITraceQueryDAO {
}
WhereQueryImpl<SelectQueryImpl> recallQuery = select()
.function("top", orderBy, limit + from)
.function(InfluxConstants.SORT_DES, orderBy, limit + from)
.column(SegmentRecord.SEGMENT_ID)
.column(SegmentRecord.START_TIME)
.column(SegmentRecord.ENDPOINT_NAME)
......@@ -102,7 +102,7 @@ public class TraceQuery implements ITraceQueryDAO {
recallQuery.and(contains(SegmentRecord.ENDPOINT_NAME, endpointName.replaceAll("/", "\\\\/")));
}
if (StringUtil.isNotEmpty(serviceId)) {
recallQuery.and(eq(RecordDAO.TAG_SERVICE_ID, String.valueOf(serviceId)));
recallQuery.and(eq(InfluxConstants.TagName.SERVICE_ID, serviceId));
}
if (StringUtil.isNotEmpty(serviceInstanceId)) {
recallQuery.and(eq(SegmentRecord.SERVICE_INSTANCE_ID, serviceInstanceId));
......@@ -201,9 +201,9 @@ public class TraceQuery implements ITraceQueryDAO {
segmentRecord.setEndTime((long) values.get(7));
segmentRecord.setLatency((int) values.get(8));
segmentRecord.setIsError((int) values.get(9));
segmentRecord.setVersion((int) values.get(10));
segmentRecord.setVersion((int) values.get(11));
String base64 = (String) values.get(9);
String base64 = (String) values.get(10);
if (!Strings.isNullOrEmpty(base64)) {
segmentRecord.setDataBinary(Base64.getDecoder().decode(base64));
}
......
......@@ -22,8 +22,6 @@ services:
- 8086
networks:
- e2e
depends_on:
- h2db
healthcheck:
test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/8086"]
interval: 5s
......@@ -37,8 +35,6 @@ services:
environment:
SW_STORAGE: influxdb
depends_on:
h2db:
condition: service_healthy
influxdb:
condition: service_healthy
......
......@@ -187,7 +187,6 @@ fi
supported_versions=`grep -v -E "^$|^#" ${supported_version_file}`
for version in ${supported_versions}
do
waitForAvailable
testcase_name="${scenario_name}-${version}"
# testcase working directory, there are logs, data and packages.
......@@ -218,8 +217,10 @@ do
[[ $? -ne 0 ]] && exitWithMessage "${testcase_name}, generate script failure!"
echo "start container of testcase.name=${testcase_name}"
bash ${case_work_base}/scenario.sh ${task_state_house} 1>${case_work_logs_dir}/${testcase_name}.log &
bash ${case_work_base}/scenario.sh ${task_state_house} 1>${case_work_logs_dir}/${testcase_name}.log
sleep 3
waitForAvailable
rm -rf ${case_work_base}
done
echo -e "\033[33m${scenario_name} has already sumbitted\033[0m"
......
......@@ -36,13 +36,4 @@
5.1.26
5.1.24
5.1.22
5.1.20
5.1.18
5.1.16
5.1.14
5.1.12
5.1.10
5.1.8
5.1.6
5.1.4
5.1.2
\ No newline at end of file
5.1.20
\ No newline at end of file
......@@ -25,6 +25,7 @@ dependencies:
solr-server:
image: solr:${CASE_SERVER_IMAGE_VERSION}
hostname: solr-server
removeOnExit: true
entrypoint:
- docker-entrypoint.sh
- solr-precreate
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册