未验证 提交 350e971d 编写于 作者: M Minglei Jin 提交者: GitHub

Merge pull request #9895 from taosdata/fix/TS-575-V24

[TS-575]<fix>(query): support super table do ts order and limit optimization 
......@@ -7541,9 +7541,8 @@ int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlN
*/
if (pQueryInfo->limit.limit > 0) {
pQueryInfo->vgroupLimit = pQueryInfo->limit.limit + pQueryInfo->limit.offset;
pQueryInfo->limit.limit = -1;
pQueryInfo->limit.limit += pQueryInfo->limit.offset;
}
pQueryInfo->limit.offset = 0;
}
} else {
......
......@@ -3167,6 +3167,8 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
const int32_t table_index = 0;
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pPQueryInfo = tscGetQueryInfo(pCmd); // Parent SQueryInfo
SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, TSDB_SQL_SELECT, prevSqlObj);
if (pNew != NULL) { // the sub query of two-stage super table query
......@@ -3176,8 +3178,14 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo
pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
// clear the limit/offset info, since it should not be sent to vnode to be executed.
pQueryInfo->limit.limit = -1;
if (pQueryInfo->limit.offset > 0 && pQueryInfo->limit.limit > 0) {
pQueryInfo->limit.limit += pQueryInfo->limit.offset;
}
pQueryInfo->limit.offset = 0;
// if groupby must retrieve all subquery data
if(pPQueryInfo->groupbyColumn || pPQueryInfo->groupbyTag) {
pQueryInfo->limit.limit = -1;
}
assert(trsupport->subqueryIndex < pSql->subState.numOfSub);
......
......@@ -432,6 +432,16 @@ void* getJsonTagValueElment(void* data, char* key, int32_t keyLen, char* out, in
void getJsonTagValueAll(void* data, void* dst, int16_t bytes);
char* parseTagDatatoJson(void *p);
//
// scan callback
//
// type define
#define READ_TABLE 1
#define READ_QUERY 2
typedef bool (*readover_callback)(void* param, int8_t type, int32_t tid);
void tsdbAddScanCallback(TsdbQueryHandleT* queryHandle, readover_callback callback, void* param);
#ifdef __cplusplus
}
#endif
......
......@@ -325,6 +325,8 @@ typedef struct SQueryRuntimeEnv {
SHashObj *pTableRetrieveTsMap;
SUdfInfo *pUdfInfo;
bool udfIsCopy;
SHashObj *pTablesRead; // record child tables already read rows by tid hash
int32_t cntTableReadOver; // read table over count
} SQueryRuntimeEnv;
enum {
......@@ -721,4 +723,10 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t
int32_t getColumnDataFromId(void *param, int32_t id, void **data);
void qInfoLogSSDataBlock(SSDataBlock* block, char* location);
// add table read rows count. pHashTables must not be NULL
void addTableReadRows(SQueryRuntimeEnv* pEnv, int32_t tid, int32_t rows);
// tsdb scan table callback table or query is over. param is SQueryRuntimeEnv*
bool qReadOverCB(void* param, int8_t type, int32_t tid);
#endif // TDENGINE_QEXECUTOR_H
......@@ -2057,6 +2057,22 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv->prevRow = malloc(POINTER_BYTES * pQueryAttr->numOfCols + pQueryAttr->srcRowSize);
pRuntimeEnv->tagVal = malloc(pQueryAttr->tagLen);
// malloc pTablesRead value if super table && project query and && has order by && limit is true
if( pRuntimeEnv->pQueryHandle && // client merge no tsdb query, so pQueryHandle is NULL, except client merge case in here
pQueryAttr->limit.limit > 0 &&
pQueryAttr->limit.offset == 0 && // if have offset, ignore limit optimization
pQueryAttr->stableQuery &&
isProjQuery(pQueryAttr) &&
pQueryAttr->order.orderColId != -1 ) {
// can be optimizate limit
pRuntimeEnv->pTablesRead = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pRuntimeEnv->pTablesRead) // must malloc ok, set callback to tsdb
tsdbAddScanCallback(pRuntimeEnv->pQueryHandle, qReadOverCB, pRuntimeEnv);
} else {
pRuntimeEnv->pTablesRead = NULL;
}
pRuntimeEnv->cntTableReadOver= 0;
// NOTE: pTableCheckInfo need to update the query time range and the lastKey info
pRuntimeEnv->pTableRetrieveTsMap = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
......@@ -5990,6 +6006,9 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
SSDataBlock* pBlock = pProjectInfo->existDataBlock;
// record table read rows
addTableReadRows(pRuntimeEnv, pBlock->info.tid, pBlock->info.rows);
pProjectInfo->existDataBlock = NULL;
*newgroup = true;
......@@ -6036,6 +6055,9 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
break;
}
// record table read rows
addTableReadRows(pRuntimeEnv, pBlock->info.tid, pBlock->info.rows);
// Return result of the previous group in the firstly.
if (*newgroup) {
if (pRes->info.rows > 0) {
......@@ -10015,3 +10037,69 @@ void freeQueryAttr(SQueryAttr* pQueryAttr) {
filterFreeInfo(pQueryAttr->pFilters);
}
}
// add table read rows count. pHashTables must not be NULL
void addTableReadRows(SQueryRuntimeEnv* pEnv, int32_t tid, int32_t rows) {
SHashObj* pHashObj = pEnv->pTablesRead;
int32_t limit = (int32_t)pEnv->pQueryAttr->limit.limit;
if (pHashObj == NULL) {
return ;
}
// read old value
int32_t v = 0;
int32_t* pv = (int32_t* )taosHashGet(pHashObj, &tid, sizeof(int32_t));
if (pv && *pv > 0) {
v = *pv;
}
bool over = v >= limit;
// add new and save
v += rows;
taosHashPut(pHashObj, &tid, sizeof(int32_t), &rows, sizeof(int32_t));
// update read table over cnt
if (!over && v >= limit) {
pEnv->cntTableReadOver += 1;
}
}
// tsdb scan table callback table or query is over. param is SQueryRuntimeEnv*
bool qReadOverCB(void* param, int8_t type, int32_t tid) {
SQueryRuntimeEnv* pEnv = (SQueryRuntimeEnv* )param;
if (pEnv->pTablesRead == NULL) {
return false;
}
// check query is over
if (pEnv->cntTableReadOver >= pEnv->pQueryAttr->tableGroupInfo.numOfTables) {
return true;
}
// if type is read_query can return
if (type == READ_QUERY) {
return false;
}
// read tid value
int32_t* pv = (int32_t* )taosHashGet(pEnv->pTablesRead, &tid, sizeof(int32_t));
if (pv == NULL) {
return false;
}
// compare
if (pEnv->pQueryAttr->limit.limit > 0 && *pv >= pEnv->pQueryAttr->limit.limit ) {
return true; // need data is read ok
}
return false;
}
// check query read is over, retur true over. param is SQueryRuntimeEnv*
bool queryReadOverCB(void* param) {
SQueryRuntimeEnv* pEnv = (SQueryRuntimeEnv* )param;
if (pEnv->cntTableReadOver >= pEnv->pQueryAttr->tableGroupInfo.numOfTables) {
return true;
}
return false;
}
\ No newline at end of file
......@@ -39,6 +39,9 @@
.tid = (_checkInfo)->tableId.tid, \
.uid = (_checkInfo)->tableId.uid})
#define IS_END_BLOCK(cur, numOfBlocks, ascTrav) \
((cur->slot == numOfBlocks - 1 && ascTrav) || (cur->slot == 0 && !ascTrav))
// limit offset start optimization for rows read over this value
#define OFFSET_SKIP_THRESHOLD 5000
......@@ -153,6 +156,10 @@ typedef struct STsdbQueryHandle {
SArray *prev; // previous row which is before than time window
SArray *next; // next row which is after the query time window
SIOCostSummary cost;
// callback
readover_callback readover_cb;
void* param;
} STsdbQueryHandle;
typedef struct STableGroupSupporter {
......@@ -182,6 +189,7 @@ static void* doFreeColumnInfoData(SArray* pColumnInfoData);
static void* destroyTableCheckInfo(SArray* pTableCheckInfo);
static bool tsdbGetExternalRow(TsdbQueryHandleT pHandle);
static int32_t tsdbQueryTableList(STable* pTable, SArray* pRes, void* filterInfo);
static STableBlockInfo* moveToNextDataBlockInCurrentFile(STsdbQueryHandle* pQueryHandle);
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
pBlockLoadInfo->slot = -1;
......@@ -2560,26 +2568,25 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exists);
static int32_t getDataBlockRv(STsdbQueryHandle* pQueryHandle, STableBlockInfo* pNext, bool *exists) {
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
SQueryFilePos* cur = &pQueryHandle->cur;
while(1) {
while(pNext) {
int32_t code = loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
// load error or have data, return
if (code != TSDB_CODE_SUCCESS || *exists) {
return code;
}
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// no data, continue to find next block util have data
if (IS_END_BLOCK(cur, pQueryHandle->numOfBlocks, ASCENDING_TRAVERSE(pQueryHandle->order))) {
// all data blocks in current file has been checked already, try next file if exists
return getFirstFileDataBlock(pQueryHandle, exists);
} else { // next block of the same file
cur->slot += step;
cur->mixBlock = false;
cur->blockCompleted = false;
pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
pNext = moveToNextDataBlockInCurrentFile(pQueryHandle);
}
}
return TSDB_CODE_SUCCESS; // pNext == NULL no other blocks to move to
}
static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exists) {
......@@ -2594,6 +2601,15 @@ static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exist
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
STimeWindow win = TSWINDOW_INITIALIZER;
// check query scan data is over for limit query
if (pQueryHandle->readover_cb && pQueryHandle->readover_cb(pQueryHandle->param, READ_QUERY, -1)) {
// query scan data is over , no need read more
cur->fid = INT32_MIN;
*exists = false;
tsdbInfo("%p LIMIT_READ query is over and stop read. tables=%d qId=0x%"PRIx64, pQueryHandle, numOfTables, pQueryHandle->qId);
return TSDB_CODE_SUCCESS;
}
while (true) {
tsdbRLockFS(REPO_FS(pQueryHandle->pTsdb));
......@@ -2670,20 +2686,52 @@ static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exist
return getDataBlockRv(pQueryHandle, pBlockInfo, exists);
}
static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) {
assert(cur != NULL && numOfBlocks > 0);
return (cur->slot == numOfBlocks - 1 && ascTrav) || (cur->slot == 0 && !ascTrav);
}
static void moveToNextDataBlockInCurrentFile(STsdbQueryHandle* pQueryHandle) {
static STableBlockInfo* moveToNextDataBlockInCurrentFile(STsdbQueryHandle* pQueryHandle) {
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
SQueryFilePos* cur = &pQueryHandle->cur;
if (IS_END_BLOCK(cur, pQueryHandle->numOfBlocks, ASCENDING_TRAVERSE(pQueryHandle->order))) {
return NULL;
}
assert(cur->slot < pQueryHandle->numOfBlocks && cur->slot >= 0);
cur->slot += step;
cur->mixBlock = false;
cur->blockCompleted = false;
// no callback check
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
if(pQueryHandle->readover_cb == NULL) {
return pBlockInfo;
}
// have callback check
int32_t tid = -1;
bool over = false;
do {
// tid changed, re-get over of tid status
if(tid != pBlockInfo->pTableCheckInfo->tableId.tid) {
tid = pBlockInfo->pTableCheckInfo->tableId.tid;
over = pQueryHandle->readover_cb(pQueryHandle->param, READ_TABLE, pBlockInfo->pTableCheckInfo->tableId.tid);
if (!over) // this tid not over
return pBlockInfo;
}
//
// this tid is over, skip all blocks of this tid in following
//
// check end
if (IS_END_BLOCK(cur, pQueryHandle->numOfBlocks, ASCENDING_TRAVERSE(pQueryHandle->order)))
return NULL;
// move next
cur->slot += step;
cur->mixBlock = false;
cur->blockCompleted = false;
pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
} while(1);
return NULL;
}
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo) {
......@@ -2816,11 +2864,14 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
// current block is empty, try next block in file
// all data blocks in current file has been checked already, try next file if exists
if (isEndFileDataBlock(cur, pQueryHandle->numOfBlocks, ASCENDING_TRAVERSE(pQueryHandle->order))) {
if (IS_END_BLOCK(cur, pQueryHandle->numOfBlocks, ASCENDING_TRAVERSE(pQueryHandle->order))) {
return getFirstFileDataBlock(pQueryHandle, exists);
} else {
moveToNextDataBlockInCurrentFile(pQueryHandle);
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
// get next block in currentfile. return NULL if no block in current file
STableBlockInfo* pNext = moveToNextDataBlockInCurrentFile(pQueryHandle);
if (pNext == NULL) // file end
return getFirstFileDataBlock(pQueryHandle, exists);
else
return getDataBlockRv(pQueryHandle, pNext, exists);
}
}
......@@ -4600,3 +4651,11 @@ int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle) {
}
return 0;
}
// add scan table need callback
void tsdbAddScanCallback(TsdbQueryHandleT* queryHandle, readover_callback callback, void* param) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle;
pQueryHandle->readover_cb = callback;
pQueryHandle->param = param;
return ;
}
\ No newline at end of file
......@@ -56,6 +56,13 @@ class TDTestCase:
self.test_case2()
tdLog.debug(" LIMIT test_case2 ............ [OK]")
# insert data
self.insert_data("t2", self.ts, 100*10000, 30000);
self.insert_data("t3", self.ts, 200*10000, 30000);
# test supper table
self.test_limit()
tdLog.debug(" LIMIT test super table ............ [OK]")
# stop
def stop(self):
......@@ -186,6 +193,31 @@ class TDTestCase:
tdSql.waitedQuery(sql, 3, WAITS)
tdSql.checkData(0, 1, 1)
# test limit
def test_limit(self):
#
# base test
#
# offset
sql = "select * from st order by ts limit 20"
tdSql.waitedQuery(sql, 20, WAITS)
tdSql.checkData(19, 1, 6)
sql = "select * from st order by ts desc limit 20"
tdSql.waitedQuery(sql, 20, WAITS)
tdSql.checkData(19, 1, 2999980)
sql = "select * from st where ts>='2017-07-14 10:40:10' and ts<'2017-07-22 18:40:10' order by ts limit 16;"
tdSql.waitedQuery(sql, 16, WAITS)
tdSql.checkData(15, 1, 15)
sql = "select * from st where ts>='2017-07-14 10:40:10' and ts<'2017-07-22 18:40:10' order by ts desc limit 16;"
tdSql.waitedQuery(sql, 16, WAITS)
tdSql.checkData(15, 1, 720004)
sql = "select * from st where ts>='2017-07-14 10:40:10' and ts<'2017-07-22 18:40:10' order by ts desc limit 16;"
tdSql.waitedQuery(sql, 16, WAITS)
tdSql.checkData(15, 1, 720004)
sql = "select * from st where ts>='2017-07-14 10:40:10' and ts<'2017-07-22 18:40:10' order by ts desc limit 16 offset 3;"
tdSql.waitedQuery(sql, 16, WAITS)
tdSql.checkData(15, 1, 720003)
#
# add case with filename
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册