From c1c11d5e6f1ddf659ddf638770b69a7cf10669db Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Dec 2020 13:10:58 +0800 Subject: [PATCH] [TD-2512] --- src/query/src/qExecutor.c | 1 - src/tsdb/src/tsdbRead.c | 74 +++++++++++++++++++++++++++++++-------- 2 files changed, 59 insertions(+), 16 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 2b992a931d..ddd313a019 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4662,7 +4662,6 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) // update the query time window pQuery->window = cond.twindow; - if (pQInfo->tableGroupInfo.numOfTables == 0) { pQInfo->tableqinfoGroupInfo.numOfTables = 0; } else { diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index e79b033d32..ed0ad76b1e 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -111,6 +111,7 @@ typedef struct STsdbQueryHandle { int32_t activeIndex; bool checkFiles; // check file stage bool cachelastrow; // check if last row cached + SArray* cachedRows; // cached last rows void* qinfo; // query info handle, for debug purpose int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows SFileGroup* pFileGroup; @@ -135,7 +136,8 @@ typedef struct STableGroupSupporter { } STableGroupSupporter; static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList); -static void checkCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList); +static int32_t extractCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList); +static int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey); static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle); static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock); @@ -380,7 +382,11 @@ TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab } STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo, pMemRef); - checkCachedLastRow(pQueryHandle, groupList); + int32_t code = extractCachedLastRow(pQueryHandle, groupList); + if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0 + terrno = code; + return NULL; + } assert(pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey <= pCond->twindow.ekey); return pQueryHandle; @@ -2150,7 +2156,36 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { } else if (pQueryHandle->type == TSDB_QUERY_TYPE_LAST && pQueryHandle->cachelastrow) { // the last row is cached in buffer, return it directly. // here note that the pQueryHandle->window may need to be updated. - assert(0); + int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle)); + SQueryFilePos* cur = &pQueryHandle->cur; + + SDataRow pRow = NULL; + TSKEY key = TSKEY_INITIAL_VAL; + int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; + + while (pQueryHandle->activeIndex < numOfTables) { + STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex); + int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key); + if (ret != TSDB_CODE_SUCCESS) { + return false; + } + + copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj); + tfree(pRow); + + // update the last key value + pCheckInfo->lastKey = key + step; + + cur->rows = 1; // only one row + cur->lastKey = key + step; + cur->mixBlock = true; + + pQueryHandle->activeIndex += 1; + + return true; + } + + return false; } if (pQueryHandle->checkFiles) { @@ -2188,33 +2223,42 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { * 2. has data but not loaded, just return lastKey but not set pRes * 3. has data and loaded, return lastKey and set pRes */ -TSKEY tsdbGetCachedLastRow(STable* pTable, void** pRes) { - TSKEY lastKey; - +int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey) { TSDB_RLOCK_TABLE(pTable); - lastKey = pTable->lastKey; + *lastKey = pTable->lastKey; - if (lastKey != TSKEY_INITIAL_VAL && pTable->lastRow) { + if ((*lastKey) != TSKEY_INITIAL_VAL && pTable->lastRow) { *pRes = tdDataRowDup(pTable->lastRow); if (*pRes == NULL) { - // TODO: handle error + TSDB_RUNLOCK_TABLE(pTable); + return TSDB_CODE_TDB_OUT_OF_MEMORY; } } + TSDB_RUNLOCK_TABLE(pTable); - return lastKey; + return TSDB_CODE_SUCCESS; } -void checkCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList) { +int32_t extractCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList) { assert(pQueryHandle != NULL && groupList != NULL); - void* pRes = NULL; - SArray* group = taosArrayGet(groupList->pGroupList, 0); + SDataRow pRow = NULL; + TSKEY key = TSKEY_INITIAL_VAL; + + SArray* group = taosArrayGetP(groupList->pGroupList, 0); assert(group != NULL); STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(group, 0); - int32_t ret = tsdbGetCachedLastRow(pInfo->pTable, &pRes); - pQueryHandle->cachelastrow = (ret == TSDB_CODE_SUCCESS); + int32_t code = tsdbGetCachedLastRow(pInfo->pTable, &pRow, &key); + if (code != TSDB_CODE_SUCCESS) { + pQueryHandle->cachelastrow = false; + } else { + pQueryHandle->cachelastrow = (pRow != NULL); + } + + tfree(pRow); + return code; } STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) { -- GitLab