diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index eeff90bd5399c1ff2e08b1254fc63c9e53d3cbc3..4406a3ae5486b612cf9a3255b6d35f4e24823ad6 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -432,6 +432,16 @@ void* getJsonTagValueElment(void* data, char* key, int32_t keyLen, char* out, in void getJsonTagValueAll(void* data, void* dst, int16_t bytes); char* parseTagDatatoJson(void *p); +// +// scan callback +// + +// type define +#define READ_TABLE 1 +#define READ_QUERY 2 +typedef bool (*readover_callback)(void* param, int8_t type, int32_t tid); +void tsdbAddScanCallback(TsdbQueryHandleT* queryHandle, readover_callback callback, void* param); + #ifdef __cplusplus } #endif diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 0b938078e39e8a61d3c2d871192717fdc4dc82e7..cf1c0d544fab59698c683fbc8a2e351469256c89 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -325,6 +325,8 @@ typedef struct SQueryRuntimeEnv { SHashObj *pTableRetrieveTsMap; SUdfInfo *pUdfInfo; bool udfIsCopy; + SHashObj *pTablesRead; // record child tables already read rows by tid hash + int32_t cntTableReadOver; // read table over count } SQueryRuntimeEnv; enum { @@ -721,4 +723,12 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t int32_t getColumnDataFromId(void *param, int32_t id, void **data); void qInfoLogSSDataBlock(SSDataBlock* block, char* location); + +// add table read rows count. pHashTables must not be NULL +void addTableReadRows(SQueryRuntimeEnv* pEnv, int32_t tid, int32_t rows); +// tsdb scan table callback table or query is over. param is SQueryRuntimeEnv* +#define READ_TABLE 1 +#define READ_QUERY 2 +bool qReadOverCB(void* param, int8_t type, int32_t tid); + #endif // TDENGINE_QEXECUTOR_H diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index caa199f2ab2a463baaa8c222bec93fa826f8d145..6d3751d9035a55f275aeca68b8d8a0af9d0594ad 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2039,6 +2039,22 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf pRuntimeEnv->prevRow = malloc(POINTER_BYTES * pQueryAttr->numOfCols + pQueryAttr->srcRowSize); pRuntimeEnv->tagVal = malloc(pQueryAttr->tagLen); + // malloc pTablesRead value if super table && project query and && has order by && limit is true + if( pRuntimeEnv->pQueryHandle && // client merge no tsdb query, so pQueryHandle is NULL, except client merge case in here + pQueryAttr->limit.limit > 0 && + pQueryAttr->limit.offset == 0 && // if have offset, ignore limit optimization + pQueryAttr->stableQuery && + isProjQuery(pQueryAttr) && + pQueryAttr->order.orderColId != -1 ) { + // can be optimizate limit + pRuntimeEnv->pTablesRead = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + if (pRuntimeEnv->pTablesRead) // must malloc ok, set callback to tsdb + tsdbAddScanCallback(pRuntimeEnv->pQueryHandle, qReadOverCB, pRuntimeEnv); + } else { + pRuntimeEnv->pTablesRead = NULL; + } + pRuntimeEnv->cntTableReadOver= 0; + // NOTE: pTableCheckInfo need to update the query time range and the lastKey info pRuntimeEnv->pTableRetrieveTsMap = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); @@ -5771,6 +5787,9 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; SSDataBlock* pBlock = pProjectInfo->existDataBlock; + // record table read rows + addTableReadRows(pRuntimeEnv, pBlock->info.tid, pBlock->info.rows); + pProjectInfo->existDataBlock = NULL; *newgroup = true; @@ -5817,6 +5836,9 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { break; } + // record table read rows + addTableReadRows(pRuntimeEnv, pBlock->info.tid, pBlock->info.rows); + // Return result of the previous group in the firstly. if (*newgroup) { if (pRes->info.rows > 0) { @@ -9489,3 +9511,69 @@ void freeQueryAttr(SQueryAttr* pQueryAttr) { filterFreeInfo(pQueryAttr->pFilters); } } + +// add table read rows count. pHashTables must not be NULL +void addTableReadRows(SQueryRuntimeEnv* pEnv, int32_t tid, int32_t rows) { + SHashObj* pHashObj = pEnv->pTablesRead; + int32_t limit = pEnv->pQueryAttr->limit.limit; + if (pHashObj == NULL) { + return ; + } + + // read old value + int32_t v = 0; + int32_t* pv = (int32_t* )taosHashGet(pHashObj, &tid, sizeof(int32_t)); + if (pv && *pv > 0) { + v = *pv; + } + + bool over = v >= limit; + // add new and save + v += rows; + taosHashPut(pHashObj, &tid, sizeof(int32_t), &rows, sizeof(int32_t)); + + // update read table over cnt + if (!over && v >= limit) { + pEnv->cntTableReadOver += 1; + } +} + +// tsdb scan table callback table or query is over. param is SQueryRuntimeEnv* +bool qReadOverCB(void* param, int8_t type, int32_t tid) { + SQueryRuntimeEnv* pEnv = (SQueryRuntimeEnv* )param; + if (pEnv->pTablesRead == NULL) { + return false; + } + + // check query is over + if (pEnv->cntTableReadOver >= pEnv->pQueryAttr->tableGroupInfo.numOfTables) { + return true; + } + + // if type is read_query can return + if (type == READ_QUERY) { + return false; + } + + // read tid value + int32_t* pv = (int32_t* )taosHashGet(pEnv->pTablesRead, &tid, sizeof(int32_t)); + if (pv == NULL) { + return false; + } + + // compare + if (pEnv->pQueryAttr->limit.limit > 0 && *pv >= pEnv->pQueryAttr->limit.limit ) { + return true; // need data is read ok + } + + return false; +} + +// check query read is over, retur true over. param is SQueryRuntimeEnv* +bool queryReadOverCB(void* param) { + SQueryRuntimeEnv* pEnv = (SQueryRuntimeEnv* )param; + if (pEnv->cntTableReadOver >= pEnv->pQueryAttr->tableGroupInfo.numOfTables) { + return true; + } + return false; +} \ No newline at end of file diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 1423b68ce59b2d7f22b40c884ad144d6496e69d8..ae74e11190a49df063c281cd83751d61c9e23ac7 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -39,6 +39,9 @@ .tid = (_checkInfo)->tableId.tid, \ .uid = (_checkInfo)->tableId.uid}) +#define IS_END_BLOCK(cur, numOfBlocks, ascTrav) \ + ((cur->slot == numOfBlocks - 1 && ascTrav) || (cur->slot == 0 && !ascTrav)) + // limit offset start optimization for rows read over this value #define OFFSET_SKIP_THRESHOLD 5000 @@ -153,6 +156,10 @@ typedef struct STsdbQueryHandle { SArray *prev; // previous row which is before than time window SArray *next; // next row which is after the query time window SIOCostSummary cost; + + // callback + readover_callback readover_cb; + void* param; } STsdbQueryHandle; typedef struct STableGroupSupporter { @@ -182,6 +189,7 @@ static void* doFreeColumnInfoData(SArray* pColumnInfoData); static void* destroyTableCheckInfo(SArray* pTableCheckInfo); static bool tsdbGetExternalRow(TsdbQueryHandleT pHandle); static int32_t tsdbQueryTableList(STable* pTable, SArray* pRes, void* filterInfo); +static STableBlockInfo* moveToNextDataBlockInCurrentFile(STsdbQueryHandle* pQueryHandle); static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) { pBlockLoadInfo->slot = -1; @@ -2560,26 +2568,25 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exists); static int32_t getDataBlockRv(STsdbQueryHandle* pQueryHandle, STableBlockInfo* pNext, bool *exists) { - int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1; SQueryFilePos* cur = &pQueryHandle->cur; - while(1) { + while(pNext) { int32_t code = loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo, exists); + // load error or have data, return if (code != TSDB_CODE_SUCCESS || *exists) { return code; } - if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) || - (cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) { + // no data, continue to find next block util have data + if (IS_END_BLOCK(cur, pQueryHandle->numOfBlocks, ASCENDING_TRAVERSE(pQueryHandle->order))) { // all data blocks in current file has been checked already, try next file if exists return getFirstFileDataBlock(pQueryHandle, exists); } else { // next block of the same file - cur->slot += step; - cur->mixBlock = false; - cur->blockCompleted = false; - pNext = &pQueryHandle->pDataBlockInfo[cur->slot]; + pNext = moveToNextDataBlockInCurrentFile(pQueryHandle); } } + + return TSDB_CODE_SUCCESS; // pNext == NULL no other blocks to move to } static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exists) { @@ -2594,6 +2601,15 @@ static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exist STsdbCfg* pCfg = &pQueryHandle->pTsdb->config; STimeWindow win = TSWINDOW_INITIALIZER; + // check query scan data is over for limit query + if (pQueryHandle->readover_cb && pQueryHandle->readover_cb(pQueryHandle->param, READ_QUERY, -1)) { + // query scan data is over , no need read more + cur->fid = INT32_MIN; + *exists = false; + tsdbInfo("%p LIMIT_READ query is over and stop read. tables=%d qId=0x%"PRIx64, pQueryHandle, numOfTables, pQueryHandle->qId); + return TSDB_CODE_SUCCESS; + } + while (true) { tsdbRLockFS(REPO_FS(pQueryHandle->pTsdb)); @@ -2670,20 +2686,52 @@ static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exist return getDataBlockRv(pQueryHandle, pBlockInfo, exists); } -static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) { - assert(cur != NULL && numOfBlocks > 0); - return (cur->slot == numOfBlocks - 1 && ascTrav) || (cur->slot == 0 && !ascTrav); -} - -static void moveToNextDataBlockInCurrentFile(STsdbQueryHandle* pQueryHandle) { +static STableBlockInfo* moveToNextDataBlockInCurrentFile(STsdbQueryHandle* pQueryHandle) { int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1; SQueryFilePos* cur = &pQueryHandle->cur; + if (IS_END_BLOCK(cur, pQueryHandle->numOfBlocks, ASCENDING_TRAVERSE(pQueryHandle->order))) { + return NULL; + } assert(cur->slot < pQueryHandle->numOfBlocks && cur->slot >= 0); cur->slot += step; cur->mixBlock = false; cur->blockCompleted = false; + + // no callback check + STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; + if(pQueryHandle->readover_cb == NULL) { + return pBlockInfo; + } + + // have callback check + int32_t tid = -1; + bool over = false; + do { + // tid changed, re-get over of tid status + if(tid != pBlockInfo->pTableCheckInfo->tableId.tid) { + tid = pBlockInfo->pTableCheckInfo->tableId.tid; + over = pQueryHandle->readover_cb(pQueryHandle->param, READ_TABLE, pBlockInfo->pTableCheckInfo->tableId.tid); + if (!over) // this tid not over + return pBlockInfo; + } + + // + // this tid is over, skip all blocks of this tid in following + // + + // check end + if (IS_END_BLOCK(cur, pQueryHandle->numOfBlocks, ASCENDING_TRAVERSE(pQueryHandle->order))) + return NULL; + // move next + cur->slot += step; + cur->mixBlock = false; + cur->blockCompleted = false; + pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; + } while(1); + + return NULL; } int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo) { @@ -2816,12 +2864,15 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists // current block is empty, try next block in file // all data blocks in current file has been checked already, try next file if exists - if (isEndFileDataBlock(cur, pQueryHandle->numOfBlocks, ASCENDING_TRAVERSE(pQueryHandle->order))) { + if (IS_END_BLOCK(cur, pQueryHandle->numOfBlocks, ASCENDING_TRAVERSE(pQueryHandle->order))) { return getFirstFileDataBlock(pQueryHandle, exists); } else { - moveToNextDataBlockInCurrentFile(pQueryHandle); - STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot]; - return getDataBlockRv(pQueryHandle, pNext, exists); + // get next block in currentfile. return NULL if no block in current file + STableBlockInfo* pNext = moveToNextDataBlockInCurrentFile(pQueryHandle); + if (pNext == NULL) // file end + return getFirstFileDataBlock(pQueryHandle, exists); + else + return getDataBlockRv(pQueryHandle, pNext, exists); } } } @@ -4600,3 +4651,11 @@ int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle) { } return 0; } + +// add scan table need callback +void tsdbAddScanCallback(TsdbQueryHandleT* queryHandle, readover_callback callback, void* param) { + STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle; + pQueryHandle->readover_cb = callback; + pQueryHandle->param = param; + return ; +} \ No newline at end of file diff --git a/tests/pytest/query/queryLimit.py b/tests/pytest/query/queryLimit.py index b7761ddf2a5594637140ae2b4748df1b1df157f5..a0baad8b0e59a616c1f332732efab3f8537aca4b 100644 --- a/tests/pytest/query/queryLimit.py +++ b/tests/pytest/query/queryLimit.py @@ -56,6 +56,13 @@ class TDTestCase: self.test_case2() tdLog.debug(" LIMIT test_case2 ............ [OK]") + # insert data + self.insert_data("t2", self.ts, 100*10000, 30000); + self.insert_data("t3", self.ts, 200*10000, 30000); + # test supper table + self.test_limit() + tdLog.debug(" LIMIT test super table ............ [OK]") + # stop def stop(self): @@ -186,6 +193,32 @@ class TDTestCase: tdSql.waitedQuery(sql, 3, WAITS) tdSql.checkData(0, 1, 1) + # test limit + def test_limit(self): + # + # base test + # + + # offset + sql = "select * from st order by ts limit 20" + tdSql.waitedQuery(sql, 20, WAITS) + tdSql.checkData(19, 1, 6) + sql = "select * from st order by ts desc limit 20" + tdSql.waitedQuery(sql, 20, WAITS) + tdSql.checkData(18, 1, 2999980) + sql = "select * from st where ts>='2017-07-14 10:40:10' and ts<'2017-07-22 18:40:10' order by ts limit 16;" + tdSql.waitedQuery(sql, 16, WAITS) + tdSql.checkData(15, 1, 15) + sql = "select * from st where ts>='2017-07-14 10:40:10' and ts<'2017-07-22 18:40:10' order by ts desc limit 16;" + tdSql.waitedQuery(sql, 16, WAITS) + tdSql.checkData(15, 1, 720004) + sql = "select * from st where ts>='2017-07-14 10:40:10' and ts<'2017-07-22 18:40:10' order by ts desc limit 16;" + tdSql.waitedQuery(sql, 16, WAITS) + tdSql.checkData(15, 1, 720004) + sql = "select * from st where ts>='2017-07-14 10:40:10' and ts<'2017-07-22 18:40:10' order by ts desc limit 16 offset 2;" + tdSql.waitedQuery(sql, 16, WAITS) + tdSql.checkData(0, 15, 720004) + # # add case with filename