From e2f1102622a34be4ff8813338a013a1e893fbcc4 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Wed, 19 May 2021 15:27:16 +0800 Subject: [PATCH] init --- src/query/src/qExecutor.c | 33 +++++++ src/tsdb/src/tsdbRead.c | 194 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 217 insertions(+), 10 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 40759c93c4..b48436a617 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1979,6 +1979,37 @@ static bool isFirstLastRowQuery(SQueryAttr *pQueryAttr) { return false; } +static bool isCachedLastQuery(SQueryAttr *pQueryAttr) { + for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { + int32_t functionID = pQueryAttr->pExpr1[i].base.functionId; + if (functionID == TSDB_FUNC_LAST || functionID == TSDB_FUNC_LAST_DST) { + continue; + } + + return false; + } + + if (!TSWINDOW_IS_EQUAL(pQueryAttr->window, TSWINDOW_INITIALIZER)) { + return false; + } + + if (pQueryAttr->groupbyColumn) { + return false; + } + + if (pQueryAttr->interval.interval > 0) { + return false; + } + + if (pQueryAttr->numOfFilterCols > 0 || pQueryAttr->havingNum > 0) { + return false; + } + + return true; +} + + + /** * The following 4 kinds of query are treated as the tags query * tagprj, tid_tag query, count(tbname), 'abc' (user defined constant value column) query @@ -3963,6 +3994,8 @@ static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64 } } } + } else if (isCachedLastQuery(pQueryAttr)) { + pRuntimeEnv->pQueryHandle = tsdbQueryCacheLast(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); } else if (pQueryAttr->pointInterpQuery) { pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); } else { diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 9df25409de..b3460ac3b2 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -62,12 +62,20 @@ typedef struct SLoadCompBlockInfo { int32_t fileId; } SLoadCompBlockInfo; +typedef struct SCacheLastColInfo { + int16_t size; + int16_t num; + int16_t fetchIdx; + int16_t *idx; +} SCacheLastColInfo; + typedef struct STableCheckInfo { STableId tableId; TSKEY lastKey; STable* pTableObj; SBlockInfo* pCompInfo; int32_t compSize; + SCacheLastColInfo cacheLast; // cache last column chosen int32_t numOfBlocks:29; // number of qualified data blocks not the original blocks int8_t chosen:2; // indicate which iterator should move forward bool initBuf; // whether to initialize the in-memory skip list iterator or not @@ -107,7 +115,7 @@ typedef struct STsdbQueryHandle { SArray* pTableCheckInfo; // SArray int32_t activeIndex; bool checkFiles; // check file stage - bool cachelastrow; // check if last row cached + int8_t cachelastrow; // check if last row cached bool loadExternalRow; // load time window external data rows bool currentLoadExternalRows; // current load external rows int32_t loadType; // block load type @@ -512,6 +520,8 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon pQueryHandle->next = doFreeColumnInfoData(pQueryHandle->next); } + + TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) { pCond->twindow = updateLastrowForEachGroup(groupList); @@ -528,10 +538,111 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable } assert(pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey <= pCond->twindow.ekey); - pQueryHandle->type = TSDB_QUERY_TYPE_LAST; + if (pQueryHandle->cachelastrow) { + pQueryHandle->type = TSDB_QUERY_TYPE_LAST; + } + return pQueryHandle; } + + +STimeWindow updateCacheLastForEachGroup(STableGroupInfo *groupList) { + STimeWindow window = {INT64_MAX, INT64_MIN}; + + int32_t totalNumOfTable = 0; + + // NOTE: starts from the buffer in case of descending timestamp order check data blocks + size_t numOfGroups = taosArrayGetSize(groupList->pGroupList); + for(int32_t j = 0; j < numOfGroups; ++j) { + SArray* pGroup = taosArrayGetP(groupList->pGroupList, j); + TSKEY key = TSKEY_INITIAL_VAL; + + STableKeyInfo keyInfo = {0}; + + size_t numOfTables = taosArrayGetSize(pGroup); + for(int32_t i = 0; i < numOfTables; ++i) { + STableKeyInfo* pInfo = (STableKeyInfo*) taosArrayGet(pGroup, i); + + // if the lastKey equals to INT64_MIN, there is no data in this table + TSKEY lastKey = ((STable*)(pInfo->pTable))->lastKey; + if (key < lastKey) { + key = lastKey; + + keyInfo.pTable = pInfo->pTable; + keyInfo.lastKey = key; + pInfo->lastKey = key; + + if (key < window.skey) { + window.skey = key; + } + + if (key > window.ekey) { + window.ekey = key; + } + } + } + + // clear current group, unref unused table + for (int32_t i = 0; i < numOfTables; ++i) { + STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i); + + // keyInfo.pTable may be NULL here. + if (pInfo->pTable != keyInfo.pTable) { + tsdbUnRefTable(pInfo->pTable); + } + } + + taosArrayClear(pGroup); + + // more than one table in each group, only one table left for each group + if (keyInfo.pTable != NULL) { + totalNumOfTable++; + taosArrayPush(pGroup, &keyInfo); + } else { + taosArrayDestroy(pGroup); + + taosArrayRemove(groupList->pGroupList, j); + numOfGroups -= 1; + j -= 1; + } + } + + // window does not being updated, so set the original + if (window.skey == INT64_MAX && window.ekey == INT64_MIN) { + window = TSWINDOW_INITIALIZER; + assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == 0); + } + + groupList->numOfTables = totalNumOfTable; + return window; +} + + +TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) { + pCond->twindow = updateCacheLastForEachGroup(groupList); + + // no qualified table + if (groupList->numOfTables == 0) { + return NULL; + } + + STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef); + int32_t code = checkForCachedLastRow(pQueryHandle, groupList); + if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0 + terrno = code; + return NULL; + } + + assert(pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey <= pCond->twindow.ekey); + if (pQueryHandle->cachelastrow == 2) { + pQueryHandle->type = TSDB_QUERY_TYPE_LAST; + } + + return pQueryHandle; +} + + SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle) { assert(pHandle != NULL); @@ -2460,6 +2571,58 @@ static bool loadCachedLastRow(STsdbQueryHandle* pQueryHandle) { return false; } + +static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) { + // the last row is cached in buffer, return it directly. + // here note that the pQueryHandle->window must be the TS_INITIALIZER + int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle)); + size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); + assert(numOfTables > 0 && numOfCols > 0); + + SQueryFilePos* cur = &pQueryHandle->cur; + + SDataRow pRow = NULL; + TSKEY key = TSKEY_INITIAL_VAL; + int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; + + if (pQueryHandle->activeIndex < 0) { + updateCacheLastForEachGroup(pQueryHandle); + } + + if (pQueryHandle->activeIndex < numOfTables) { + STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex); + + if (pQueryHandle->cachelastrow == 1) { + int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key); + if (ret != TSDB_CODE_SUCCESS) { + return false; + } + + copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj, NULL); + tfree(pRow); + + // update the last key value + pCheckInfo->lastKey = key + step; + + cur->rows = 1; // only one row + cur->lastKey = key + step; + cur->mixBlock = true; + cur->win.skey = key; + cur->win.ekey = key; + } else if (pQueryHandle->cachelastrow == 2) { + + } else { + tsdbError("invalid cachelastrow:%d", pQueryHandle->cachelastrow); + return false; + } + + return true; + } + + return false; +} + + static bool loadDataBlockFromTableSeq(STsdbQueryHandle* pQueryHandle) { size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); assert(numOfTables > 0); @@ -2496,8 +2659,12 @@ bool tsdbNextDataBlock(TsdbQueryHandleT pHandle) { int64_t stime = taosGetTimestampUs(); int64_t elapsedTime = stime; - if (pQueryHandle->type == TSDB_QUERY_TYPE_LAST && pQueryHandle->cachelastrow) { - return loadCachedLastRow(pQueryHandle); + if (pQueryHandle->type == TSDB_QUERY_TYPE_LAST) { + if (pQueryHandle->cachelastrow == 1) { + return loadCachedLastRow(pQueryHandle); + } if (pQueryHandle->cachelastrow == 2) else { + return loadCachedLast(pQueryHandle); + } } if (pQueryHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) { @@ -2683,7 +2850,7 @@ int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey) { TSDB_RLOCK_TABLE(pTable); *lastKey = pTable->lastKey; - if ((*lastKey) != TSKEY_INITIAL_VAL && pTable->lastRow) { + if ((*lastKey) != TSKEY_INITIAL_VAL && pTable->lastRow == 1) { *pRes = tdDataRowDup(pTable->lastRow); if (*pRes == NULL) { TSDB_RUNLOCK_TABLE(pTable); @@ -2706,12 +2873,19 @@ int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *g STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(group, 0); - int32_t code = tsdbGetCachedLastRow(pInfo->pTable, &pRow, &key); - if (code != TSDB_CODE_SUCCESS) { - pQueryHandle->cachelastrow = false; - } else { - pQueryHandle->cachelastrow = (pRow != NULL); + int32_t code = 0; + + if (((STable*)pInfo->pTable)->lastRow == 1) { + code = tsdbGetCachedLastRow(pInfo->pTable, &pRow, &key); + if (code != TSDB_CODE_SUCCESS) { + pQueryHandle->cachelastrow = 0; + } else { + pQueryHandle->cachelastrow = ((STable*)pInfo->pTable)->lastRow; + } + } else if (((STable*)pInfo->pTable)->lastCols && ((STable*)pInfo->pTable)->lastColNum > 0 && ((STable*)pInfo->pTable)->lastRow == 2){ + pQueryHandle->cachelastrow = ((STable*)pInfo->pTable)->lastRow; } + // update the tsdb query time range if (pQueryHandle->cachelastrow) { -- GitLab