未验证 提交 399917af 编写于 作者: H Haimei Guo 提交者: GitHub

[IOTDB-670] raw data query interface (#1704)

上级 287f3fa9
......@@ -60,6 +60,7 @@ public class SessionExample {
insertRecords();
nonQuery();
query();
rawDataQuery();
queryByIterator();
deleteData();
deleteTimeseries();
......@@ -408,6 +409,24 @@ public class SessionExample {
dataSet.closeOperationHandle();
}
private static void rawDataQuery() throws IoTDBConnectionException, StatementExecutionException {
List<String> paths = new ArrayList<>();
paths.add("root.sg1.d1.s1");
paths.add("root.sg1.d1.s2");
paths.add("root.sg1.d1.s3");
long startTime = 10L;
long endTime = 200L;
SessionDataSet dataSet;
dataSet = session.executeRawDataQuery(paths, startTime, endTime);
System.out.println(dataSet.getColumnNames());
dataSet.setFetchSize(1024);
while (dataSet.hasNext()) {
System.out.println(dataSet.next());
}
dataSet.closeOperationHandle();
}
private static void queryByIterator()
throws IoTDBConnectionException, StatementExecutionException {
SessionDataSet dataSet;
......
......@@ -85,6 +85,7 @@ public class IoTDBConstant {
public static final String COLUMN_DONE = "done";
public static final String PATH_WILDCARD = "*";
public static final String TIME = "time";
// data folder name
public static final String SEQUENCE_FLODER_NAME = "sequence";
......
......@@ -36,7 +36,18 @@ import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer;
import org.apache.iotdb.db.qp.strategy.optimizer.DnfFilterOptimizer;
import org.apache.iotdb.db.qp.strategy.optimizer.MergeSingleFilterOptimizer;
import org.apache.iotdb.db.qp.strategy.optimizer.RemoveNotOptimizer;
import org.apache.iotdb.db.qp.logical.crud.SelectOperator;
import org.apache.iotdb.db.qp.logical.crud.FromOperator;
import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
import org.apache.iotdb.tsfile.read.common.Path;
import static org.apache.iotdb.db.conf.IoTDBConstant.TIME;
import java.util.HashSet;
import java.util.List;
/**
* provide a integration method for other user.
......@@ -64,6 +75,50 @@ public class Planner {
return physicalGenerator.transformToPhysicalPlan(operator);
}
/**
* convert raw data query to physical plan directly
*/
public PhysicalPlan rawDataQueryReqToPhysicalPlan(TSRawDataQueryReq rawDataQueryReq)
throws QueryProcessException, IllegalPathException {
List<String> paths = rawDataQueryReq.getPaths();
long startTime = rawDataQueryReq.getStartTime();
long endTime = rawDataQueryReq.getEndTime();
//construct query operator and set its global time filter
QueryOperator queryOp = new QueryOperator(SQLConstant.TOK_QUERY);
FromOperator fromOp = new FromOperator(SQLConstant.TOK_FROM);
SelectOperator selectOp = new SelectOperator(SQLConstant.TOK_SELECT);
//iterate the path list and add it to from operator
for (String p : paths) {
PartialPath path = new PartialPath(p);
fromOp.addPrefixTablePath(path);
}
selectOp.addSelectPath(new PartialPath(""));
queryOp.setSelectOperator(selectOp);
queryOp.setFromOperator(fromOp);
//set time filter operator
FilterOperator filterOp = new FilterOperator(SQLConstant.KW_AND);
PartialPath timePath = new PartialPath(TIME);
filterOp.setSinglePath(timePath);
Set<PartialPath> pathSet = new HashSet<>();
pathSet.add(timePath);
filterOp.setIsSingle(true);
filterOp.setPathSet(pathSet);
BasicFunctionOperator left = new BasicFunctionOperator(SQLConstant.GREATERTHANOREQUALTO, timePath, Long.toString(startTime));
BasicFunctionOperator right = new BasicFunctionOperator(SQLConstant.LESSTHAN, timePath, Long.toString(endTime));
filterOp.addChildOperator(left);
filterOp.addChildOperator(right);
queryOp.setFilterOperator(filterOp);
SFWOperator op = (SFWOperator) logicalOptimize(queryOp);
PhysicalGenerator physicalGenerator = new PhysicalGenerator();
return physicalGenerator.transformToPhysicalPlan(op);
}
/**
* given an unoptimized logical operator tree and return a optimized result.
......
......@@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.ValueFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import static org.apache.iotdb.db.conf.IoTDBConstant.TIME;
/**
* all basic operator in filter.
......@@ -36,7 +37,7 @@ public enum BasicOperatorType {
EQ {
@Override
public <T extends Comparable<T>> IUnaryExpression getUnaryExpression(Path path, T value) {
if (path.equals("time")) {
if (path.equals(TIME)) {
return new GlobalTimeExpression(TimeFilter.eq((Long) value));
} else {
return new SingleSeriesExpression(path, ValueFilter.eq(value));
......@@ -56,7 +57,7 @@ public enum BasicOperatorType {
LTEQ {
@Override
public <T extends Comparable<T>> IUnaryExpression getUnaryExpression(Path path, T value) {
if (path.equals("time")) {
if (path.equals(TIME)) {
return new GlobalTimeExpression(TimeFilter.ltEq((Long) value));
} else {
return new SingleSeriesExpression(path, ValueFilter.ltEq(value));
......@@ -76,7 +77,7 @@ public enum BasicOperatorType {
LT {
@Override
public <T extends Comparable<T>> IUnaryExpression getUnaryExpression(Path path, T value) {
if (path.equals("time")) {
if (path.equals(TIME)) {
return new GlobalTimeExpression(TimeFilter.lt((Long) value));
} else {
return new SingleSeriesExpression(path, ValueFilter.lt(value));
......@@ -96,7 +97,7 @@ public enum BasicOperatorType {
GTEQ {
@Override
public <T extends Comparable<T>> IUnaryExpression getUnaryExpression(Path path, T value) {
if (path.equals("time")) {
if (path.equals(TIME)) {
return new GlobalTimeExpression(TimeFilter.gtEq((Long) value));
} else {
return new SingleSeriesExpression(path, ValueFilter.gtEq(value));
......@@ -116,7 +117,7 @@ public enum BasicOperatorType {
GT {
@Override
public <T extends Comparable<T>> IUnaryExpression getUnaryExpression(Path path, T value) {
if (path.equals("time")) {
if (path.equals(TIME)) {
return new GlobalTimeExpression(TimeFilter.gt((Long) value));
} else {
return new SingleSeriesExpression(path, ValueFilter.gt(value));
......@@ -136,7 +137,7 @@ public enum BasicOperatorType {
NOTEQUAL {
@Override
public <T extends Comparable<T>> IUnaryExpression getUnaryExpression(Path path, T value) {
if (path.equals("time")) {
if (path.equals(TIME)) {
return new GlobalTimeExpression(TimeFilter.notEq((Long) value));
} else {
return new SingleSeriesExpression(path, ValueFilter.notEq(value));
......
......@@ -129,6 +129,7 @@ import org.apache.thrift.TException;
import org.apache.thrift.server.ServerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
/**
......@@ -542,6 +543,45 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
}
public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
try {
if (!checkLogin(req.getSessionId())) {
logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
return RpcUtils.getTSExecuteStatementResp(TSStatusCode.NOT_LOGIN_ERROR);
}
PhysicalPlan physicalPlan;
try {
physicalPlan =
processor.rawDataQueryReqToPhysicalPlan(req);
} catch (QueryProcessException | SQLParserException e) {
logger.info(ERROR_PARSING_SQL, e.getMessage());
return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR, e.getMessage());
}
if (!physicalPlan.isQuery()) {
return RpcUtils.getTSExecuteStatementResp(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
}
return internalExecuteQueryStatement("", generateQueryId(true), physicalPlan, req.fetchSize,
sessionIdUsernameMap.get(req.getSessionId()));
} catch (ParseCancellationException e) {
logger.warn(ERROR_PARSING_SQL, e.getMessage());
return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR,
ERROR_PARSING_SQL + e.getMessage());
} catch (SQLParserException e) {
logger.error("check metadata error: ", e);
return RpcUtils.getTSExecuteStatementResp(
TSStatusCode.METADATA_ERROR, "Check metadata error: " + e.getMessage());
} catch (Exception e) {
logger.error("{}: server Internal Error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getTSExecuteStatementResp(
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()));
}
}
/**
* @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByTimePlan, some
* AuthorPlan
......
......@@ -66,6 +66,7 @@ import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
public class Session {
......@@ -929,6 +930,36 @@ public class Session {
}
}
/**
* query eg. select * from paths where time >= startTime and time < endTime
* time interval include startTime and exclude endTime
* @param paths
* @param startTime included
* @param endTime excluded
* @return
* @throws StatementExecutionException
* @throws IoTDBConnectionException
*/
public SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime)
throws StatementExecutionException, IoTDBConnectionException {
TSRawDataQueryReq execReq = new TSRawDataQueryReq(sessionId, paths, startTime, endTime);
execReq.setFetchSize(fetchSize);
TSExecuteStatementResp execResp;
try {
execResp = client.executeRawDataQuery(execReq);
} catch (TException e) {
throw new IoTDBConnectionException(e);
}
RpcUtils.verifySuccess(execResp.getStatus());
return new SessionDataSet("", execResp.getColumns(), execResp.getDataTypeList(),
execResp.columnNameIndexMap,
execResp.getQueryId(), client, sessionId, execResp.queryDataSet,
execResp.isIgnoreTimeStamp());
}
/**
* check whether the batch has been sorted
*
......
......@@ -269,6 +269,20 @@ public class IoTDBSessionComplexIT {
session.close();
}
@Test
public void testRawDataQuery() throws IoTDBConnectionException, StatementExecutionException {
session = new Session("127.0.0.1", 6667, "root", "root");
session.open();
session.setStorageGroup("root.sg1");
createTimeseries();
insertRecords();
rawDataQuery();
}
@Test
public void test() throws ClassNotFoundException, SQLException,
IoTDBConnectionException, StatementExecutionException {
......@@ -537,6 +551,32 @@ public class IoTDBSessionComplexIT {
session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
}
private void rawDataQuery()
throws StatementExecutionException, IoTDBConnectionException {
List<String> paths = new ArrayList<>();
paths.add("root.sg1.d2.*");
paths.add("root.sg1.d2.s1");
paths.add("root.sg1.d2.s2");
SessionDataSet sessionDataSet = session
.executeRawDataQuery(paths, 450L, 600L);
sessionDataSet.setFetchSize(1024);
int count = 0;
System.out.println(sessionDataSet.getColumnNames());
while (sessionDataSet.hasNext()) {
count++;
StringBuilder sb = new StringBuilder();
List<Field> fields = sessionDataSet.next().getFields();
for (Field f : fields) {
sb.append(f.getStringValue()).append(",");
}
System.out.println(sb.toString());
}
Assert.assertEquals(50, count);
sessionDataSet.closeOperationHandle();
}
private void insertTablet(String deviceId)
throws IoTDBConnectionException, StatementExecutionException {
......
......@@ -250,6 +250,14 @@ struct TSCreateTimeseriesReq {
9: optional string measurementAlias
}
struct TSRawDataQueryReq {
1: required i64 sessionId
2: required list<string> paths
3: optional i32 fetchSize
4: required i64 startTime
5: required i64 endTime
}
struct TSCreateMultiTimeseriesReq {
1: required i64 sessionId
2: required list<string> paths
......@@ -331,5 +339,7 @@ service TSIService {
TSStatus deleteData(1:TSDeleteDataReq req);
TSExecuteStatementResp executeRawDataQuery(1:TSRawDataQueryReq req);
i64 requestStatementId(1:i64 sessionId);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册