From 399917af04b4c2cbbf4b1281b38e4fb9ebf0771a Mon Sep 17 00:00:00 2001 From: Haimei Guo <68632589+haimeiguo@users.noreply.github.com> Date: Wed, 9 Sep 2020 09:28:43 +0800 Subject: [PATCH] [IOTDB-670] raw data query interface (#1704) --- .../java/org/apache/iotdb/SessionExample.java | 19 +++++++ .../apache/iotdb/db/conf/IoTDBConstant.java | 1 + .../java/org/apache/iotdb/db/qp/Planner.java | 55 +++++++++++++++++++ .../db/qp/logical/crud/BasicOperatorType.java | 13 +++-- .../iotdb/db/service/TSServiceImpl.java | 40 ++++++++++++++ .../org/apache/iotdb/session/Session.java | 31 +++++++++++ .../iotdb/session/IoTDBSessionComplexIT.java | 40 ++++++++++++++ thrift/src/main/thrift/rpc.thrift | 10 ++++ 8 files changed, 203 insertions(+), 6 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index a819e3a19f..0534683f32 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -60,6 +60,7 @@ public class SessionExample { insertRecords(); nonQuery(); query(); + rawDataQuery(); queryByIterator(); deleteData(); deleteTimeseries(); @@ -408,6 +409,24 @@ public class SessionExample { dataSet.closeOperationHandle(); } + private static void rawDataQuery() throws IoTDBConnectionException, StatementExecutionException { + List paths = new ArrayList<>(); + paths.add("root.sg1.d1.s1"); + paths.add("root.sg1.d1.s2"); + paths.add("root.sg1.d1.s3"); + long startTime = 10L; + long endTime = 200L; + + SessionDataSet dataSet; + dataSet = session.executeRawDataQuery(paths, startTime, endTime); + System.out.println(dataSet.getColumnNames()); + dataSet.setFetchSize(1024); + while (dataSet.hasNext()) { + System.out.println(dataSet.next()); + } + dataSet.closeOperationHandle(); + } + private static void queryByIterator() throws IoTDBConnectionException, StatementExecutionException { SessionDataSet dataSet; diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java index a1c41b0cba..3e8b4df399 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java @@ -85,6 +85,7 @@ public class IoTDBConstant { public static final String COLUMN_DONE = "done"; public static final String PATH_WILDCARD = "*"; + public static final String TIME = "time"; // data folder name public static final String SEQUENCE_FLODER_NAME = "sequence"; diff --git a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java index eacd5e9bfd..45d852665e 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java @@ -36,7 +36,18 @@ import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer; import org.apache.iotdb.db.qp.strategy.optimizer.DnfFilterOptimizer; import org.apache.iotdb.db.qp.strategy.optimizer.MergeSingleFilterOptimizer; import org.apache.iotdb.db.qp.strategy.optimizer.RemoveNotOptimizer; +import org.apache.iotdb.db.qp.logical.crud.SelectOperator; +import org.apache.iotdb.db.qp.logical.crud.FromOperator; +import org.apache.iotdb.db.qp.logical.crud.QueryOperator; +import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator; +import org.apache.iotdb.db.qp.constant.SQLConstant; +import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.utils.TestOnly; +import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq; +import org.apache.iotdb.tsfile.read.common.Path; +import static org.apache.iotdb.db.conf.IoTDBConstant.TIME; +import java.util.HashSet; +import java.util.List; /** * provide a integration method for other user. @@ -64,6 +75,50 @@ public class Planner { return physicalGenerator.transformToPhysicalPlan(operator); } + /** + * convert raw data query to physical plan directly + */ + public PhysicalPlan rawDataQueryReqToPhysicalPlan(TSRawDataQueryReq rawDataQueryReq) + throws QueryProcessException, IllegalPathException { + List paths = rawDataQueryReq.getPaths(); + long startTime = rawDataQueryReq.getStartTime(); + long endTime = rawDataQueryReq.getEndTime(); + + //construct query operator and set its global time filter + QueryOperator queryOp = new QueryOperator(SQLConstant.TOK_QUERY); + FromOperator fromOp = new FromOperator(SQLConstant.TOK_FROM); + SelectOperator selectOp = new SelectOperator(SQLConstant.TOK_SELECT); + + //iterate the path list and add it to from operator + for (String p : paths) { + PartialPath path = new PartialPath(p); + fromOp.addPrefixTablePath(path); + } + selectOp.addSelectPath(new PartialPath("")); + + queryOp.setSelectOperator(selectOp); + queryOp.setFromOperator(fromOp); + + //set time filter operator + FilterOperator filterOp = new FilterOperator(SQLConstant.KW_AND); + PartialPath timePath = new PartialPath(TIME); + filterOp.setSinglePath(timePath); + Set pathSet = new HashSet<>(); + pathSet.add(timePath); + filterOp.setIsSingle(true); + filterOp.setPathSet(pathSet); + + BasicFunctionOperator left = new BasicFunctionOperator(SQLConstant.GREATERTHANOREQUALTO, timePath, Long.toString(startTime)); + BasicFunctionOperator right = new BasicFunctionOperator(SQLConstant.LESSTHAN, timePath, Long.toString(endTime)); + filterOp.addChildOperator(left); + filterOp.addChildOperator(right); + + queryOp.setFilterOperator(filterOp); + + SFWOperator op = (SFWOperator) logicalOptimize(queryOp); + PhysicalGenerator physicalGenerator = new PhysicalGenerator(); + return physicalGenerator.transformToPhysicalPlan(op); + } /** * given an unoptimized logical operator tree and return a optimized result. diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/BasicOperatorType.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/BasicOperatorType.java index d7f7d2c3d2..64681d3344 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/BasicOperatorType.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/BasicOperatorType.java @@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression; import org.apache.iotdb.tsfile.read.filter.TimeFilter; import org.apache.iotdb.tsfile.read.filter.ValueFilter; import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import static org.apache.iotdb.db.conf.IoTDBConstant.TIME; /** * all basic operator in filter. @@ -36,7 +37,7 @@ public enum BasicOperatorType { EQ { @Override public > IUnaryExpression getUnaryExpression(Path path, T value) { - if (path.equals("time")) { + if (path.equals(TIME)) { return new GlobalTimeExpression(TimeFilter.eq((Long) value)); } else { return new SingleSeriesExpression(path, ValueFilter.eq(value)); @@ -56,7 +57,7 @@ public enum BasicOperatorType { LTEQ { @Override public > IUnaryExpression getUnaryExpression(Path path, T value) { - if (path.equals("time")) { + if (path.equals(TIME)) { return new GlobalTimeExpression(TimeFilter.ltEq((Long) value)); } else { return new SingleSeriesExpression(path, ValueFilter.ltEq(value)); @@ -76,7 +77,7 @@ public enum BasicOperatorType { LT { @Override public > IUnaryExpression getUnaryExpression(Path path, T value) { - if (path.equals("time")) { + if (path.equals(TIME)) { return new GlobalTimeExpression(TimeFilter.lt((Long) value)); } else { return new SingleSeriesExpression(path, ValueFilter.lt(value)); @@ -96,7 +97,7 @@ public enum BasicOperatorType { GTEQ { @Override public > IUnaryExpression getUnaryExpression(Path path, T value) { - if (path.equals("time")) { + if (path.equals(TIME)) { return new GlobalTimeExpression(TimeFilter.gtEq((Long) value)); } else { return new SingleSeriesExpression(path, ValueFilter.gtEq(value)); @@ -116,7 +117,7 @@ public enum BasicOperatorType { GT { @Override public > IUnaryExpression getUnaryExpression(Path path, T value) { - if (path.equals("time")) { + if (path.equals(TIME)) { return new GlobalTimeExpression(TimeFilter.gt((Long) value)); } else { return new SingleSeriesExpression(path, ValueFilter.gt(value)); @@ -136,7 +137,7 @@ public enum BasicOperatorType { NOTEQUAL { @Override public > IUnaryExpression getUnaryExpression(Path path, T value) { - if (path.equals("time")) { + if (path.equals(TIME)) { return new GlobalTimeExpression(TimeFilter.notEq((Long) value)); } else { return new SingleSeriesExpression(path, ValueFilter.notEq(value)); diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 3b1bd73f71..7c7fac12e7 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -129,6 +129,7 @@ import org.apache.thrift.TException; import org.apache.thrift.server.ServerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq; /** @@ -542,6 +543,45 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } } + public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) { + try { + if (!checkLogin(req.getSessionId())) { + logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME); + return RpcUtils.getTSExecuteStatementResp(TSStatusCode.NOT_LOGIN_ERROR); + } + + PhysicalPlan physicalPlan; + try { + physicalPlan = + processor.rawDataQueryReqToPhysicalPlan(req); + } catch (QueryProcessException | SQLParserException e) { + logger.info(ERROR_PARSING_SQL, e.getMessage()); + return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR, e.getMessage()); + } + + if (!physicalPlan.isQuery()) { + return RpcUtils.getTSExecuteStatementResp( + TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement."); + } + + return internalExecuteQueryStatement("", generateQueryId(true), physicalPlan, req.fetchSize, + sessionIdUsernameMap.get(req.getSessionId())); + + } catch (ParseCancellationException e) { + logger.warn(ERROR_PARSING_SQL, e.getMessage()); + return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR, + ERROR_PARSING_SQL + e.getMessage()); + } catch (SQLParserException e) { + logger.error("check metadata error: ", e); + return RpcUtils.getTSExecuteStatementResp( + TSStatusCode.METADATA_ERROR, "Check metadata error: " + e.getMessage()); + } catch (Exception e) { + logger.error("{}: server Internal Error: ", IoTDBConstant.GLOBAL_DB_NAME, e); + return RpcUtils.getTSExecuteStatementResp( + RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage())); + } + } + /** * @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByTimePlan, some * AuthorPlan diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java index 6ea2810d75..10954efe97 100644 --- a/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/session/src/main/java/org/apache/iotdb/session/Session.java @@ -66,6 +66,7 @@ import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq; public class Session { @@ -929,6 +930,36 @@ public class Session { } } + /** + * query eg. select * from paths where time >= startTime and time < endTime + * time interval include startTime and exclude endTime + * @param paths + * @param startTime included + * @param endTime excluded + * @return + * @throws StatementExecutionException + * @throws IoTDBConnectionException + */ + + public SessionDataSet executeRawDataQuery(List paths, long startTime, long endTime) + throws StatementExecutionException, IoTDBConnectionException { + TSRawDataQueryReq execReq = new TSRawDataQueryReq(sessionId, paths, startTime, endTime); + execReq.setFetchSize(fetchSize); + + TSExecuteStatementResp execResp; + try { + execResp = client.executeRawDataQuery(execReq); + } catch (TException e) { + throw new IoTDBConnectionException(e); + } + + RpcUtils.verifySuccess(execResp.getStatus()); + return new SessionDataSet("", execResp.getColumns(), execResp.getDataTypeList(), + execResp.columnNameIndexMap, + execResp.getQueryId(), client, sessionId, execResp.queryDataSet, + execResp.isIgnoreTimeStamp()); + } + /** * check whether the batch has been sorted * diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java index f057b2e423..59b2a0f1c6 100644 --- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java +++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java @@ -269,6 +269,20 @@ public class IoTDBSessionComplexIT { session.close(); } + @Test + public void testRawDataQuery() throws IoTDBConnectionException, StatementExecutionException { + session = new Session("127.0.0.1", 6667, "root", "root"); + session.open(); + + session.setStorageGroup("root.sg1"); + + createTimeseries(); + + insertRecords(); + + rawDataQuery(); + } + @Test public void test() throws ClassNotFoundException, SQLException, IoTDBConnectionException, StatementExecutionException { @@ -537,6 +551,32 @@ public class IoTDBSessionComplexIT { session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList); } + private void rawDataQuery() + throws StatementExecutionException, IoTDBConnectionException { + List paths = new ArrayList<>(); + paths.add("root.sg1.d2.*"); + paths.add("root.sg1.d2.s1"); + paths.add("root.sg1.d2.s2"); + + SessionDataSet sessionDataSet = session + .executeRawDataQuery(paths, 450L, 600L); + sessionDataSet.setFetchSize(1024); + + int count = 0; + System.out.println(sessionDataSet.getColumnNames()); + while (sessionDataSet.hasNext()) { + count++; + StringBuilder sb = new StringBuilder(); + List fields = sessionDataSet.next().getFields(); + for (Field f : fields) { + sb.append(f.getStringValue()).append(","); + } + System.out.println(sb.toString()); + } + Assert.assertEquals(50, count); + sessionDataSet.closeOperationHandle(); + } + private void insertTablet(String deviceId) throws IoTDBConnectionException, StatementExecutionException { diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift index 7d44416e79..5087f7a859 100644 --- a/thrift/src/main/thrift/rpc.thrift +++ b/thrift/src/main/thrift/rpc.thrift @@ -250,6 +250,14 @@ struct TSCreateTimeseriesReq { 9: optional string measurementAlias } +struct TSRawDataQueryReq { + 1: required i64 sessionId + 2: required list paths + 3: optional i32 fetchSize + 4: required i64 startTime + 5: required i64 endTime +} + struct TSCreateMultiTimeseriesReq { 1: required i64 sessionId 2: required list paths @@ -331,5 +339,7 @@ service TSIService { TSStatus deleteData(1:TSDeleteDataReq req); + TSExecuteStatementResp executeRawDataQuery(1:TSRawDataQueryReq req); + i64 requestStatementId(1:i64 sessionId); } -- GitLab