From 9e98c64b1e75ba07858dab856f1f969533d520de Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 30 Jun 2023 22:57:49 +0800 Subject: [PATCH] fix(tsdb): set correct stt load info. update the test case. --- source/dnode/vnode/src/inc/tsdb.h | 2 +- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 21 ++++---- source/dnode/vnode/src/tsdb/tsdbRead2.c | 53 ++++++++------------- tests/system-test/2-query/smaBasic.py | 2 +- 4 files changed, 34 insertions(+), 44 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index fc4513c896..49df3f662a 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -816,7 +816,7 @@ SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colLi void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el); void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); -void* destroySttBlockReader(SArray* pLDataIterArray); +void *destroySttBlockReader(SArray *pLDataIterArray, int64_t *blocks, double *el); // tsdbCache ============================================================================================== typedef struct SCacheRowsReader { diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index c5b35cfd36..45a20315b1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -134,14 +134,13 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { return NULL; } -static void destroyLDataIterFn(void* param) { - SLDataIter** pIter = (SLDataIter**) param; - tLDataIterClose2(*pIter); - destroyLastBlockLoadInfo((*pIter)->pBlockLoadInfo); - taosMemoryFree(*pIter); +static void destroyLDataIter(SLDataIter* pIter) { + tLDataIterClose2(pIter); + destroyLastBlockLoadInfo(pIter->pBlockLoadInfo); + taosMemoryFree(pIter); } -void* destroySttBlockReader(SArray* pLDataIterArray) { +void* destroySttBlockReader(SArray* pLDataIterArray, int64_t* blocks, double* el) { if (pLDataIterArray == NULL) { return NULL; } @@ -149,7 +148,13 @@ void* destroySttBlockReader(SArray* pLDataIterArray) { int32_t numOfLevel = taosArrayGetSize(pLDataIterArray); for(int32_t i = 0; i < numOfLevel; ++i) { SArray* pList = taosArrayGetP(pLDataIterArray, i); - taosArrayDestroyEx(pList, destroyLDataIterFn); + for(int32_t j = 0; j < taosArrayGetSize(pList); ++j) { + SLDataIter* pIter = taosArrayGetP(pList, j); + *el += pIter->pBlockLoadInfo->elapsedTime; + *blocks += pIter->pBlockLoadInfo->loadBlocks; + destroyLDataIter(pIter); + } + taosArrayDestroy(pList); } taosArrayDestroy(pLDataIterArray); @@ -499,7 +504,7 @@ void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) { if (index != -1) { pIter->iSttBlk = index; pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk); - tsdbDebug("try next last file block:%d from %d, trigger by uid:%" PRIu64 ", file index:%d, %s", pIter->iSttBlk, + tsdbDebug("try next last file block:%d from stt fileIdx:%d, trigger by uid:%" PRIu64 ", file index:%d, %s", pIter->iSttBlk, oldIndex, pIter->uid, pIter->iStt, idStr); } else { tsdbDebug("no more last block qualified, uid:%" PRIu64 ", file index:%d, %s", pIter->uid, oldIndex, idStr); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 134b717093..00a613eded 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -564,17 +564,17 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA pLReader->uid = 0; tMergeTreeClose(&pLReader->mergeTree); - if (pLReader->pInfo == NULL) { - // here we ignore the first column, which is always be the primary timestamp column - SBlockLoadSuppInfo* pInfo = &pReader->suppInfo; - // todo dynamic number of stt - int32_t numOfStt = pReader->pTsdb->pVnode->config.sttTrigger; - pLReader->pInfo = tCreateLastBlockLoadInfo(pReader->pSchema, &pInfo->colId[1], pInfo->numOfCols - 1, numOfStt); - if (pLReader->pInfo == NULL) { - tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr); - return terrno; - } - } +// if (pLReader->pInfo == NULL) { +// // here we ignore the first column, which is always be the primary timestamp column +// SBlockLoadSuppInfo* pInfo = &pReader->suppInfo; +// // todo dynamic number of stt +// int32_t numOfStt = pReader->pTsdb->pVnode->config.sttTrigger; +// pLReader->pInfo = tCreateLastBlockLoadInfo(pReader->pSchema, &pInfo->colId[1], pInfo->numOfCols - 1, numOfStt); +// if (pLReader->pInfo == NULL) { +// tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr); +// return terrno; +// } +// } tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr); return TSDB_CODE_SUCCESS; @@ -592,16 +592,13 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo } SIOCostSummary* pSum = &pReader->cost; - getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime); pIter->pLastBlockReader->uid = 0; tMergeTreeClose(&pIter->pLastBlockReader->mergeTree); - pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray); + pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pSum->lastBlockLoad, &pSum->lastBlockLoadTime); pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES); - resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo); - // check file the time range of coverage STimeWindow win = {0}; @@ -1580,17 +1577,11 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI } SBlockLoadSuppInfo* pSup = &pReader->suppInfo; - TABLEID tid = {.suid = pReader->suid, .uid = uid}; - code = tBlockDataInit(pBlockData, &tid, pSchema, &pSup->colId[1], pSup->numOfCols - 1); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SBrinRecord* pRecord = &pBlockInfo->record; - code = tsdbDataFileReadBlockDataByColumn(pReader->pFileReader, pRecord, pBlockData, pReader->pSchema, &pSup->colId[1], + code = tsdbDataFileReadBlockDataByColumn(pReader->pFileReader, pRecord, pBlockData, pSchema, &pSup->colId[1], pSup->numOfCols - 1); if (code != TSDB_CODE_SUCCESS) { tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 @@ -2010,8 +2001,8 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* // 4. output buffer should be large enough to hold all rows in current block // 5. delete info should not overlap with current block data // 6. current block should not contain the duplicated ts -static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, - STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) { +static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, + TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) { SDataBlockToLoadInfo info = {0}; getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader, pReader); @@ -2022,8 +2013,8 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlock // log the reason why load the datablock for profile if (loadDataBlock) { tsdbDebug("%p uid:%" PRIu64 - " need to load the datablock, overlapneighbor:%d, hasDup:%d, partiallyRequired:%d, " - "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s", + " need to load the datablock, overlapneighbor:%d, hasDup:%d, partiallyRequired:%d, " + "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s", pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired, info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock, pReader->idStr); @@ -3566,8 +3557,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { // update the last key for the corresponding table pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey; - tsdbDebug("%p uid:%" PRIu64 - " clean file block retrieved from file, global index:%d, " + tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, " "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow, pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr); @@ -3893,8 +3883,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { initBlockDumpInfo(pReader, pBlockIter); } else { // all data blocks in files are checked, let's check the data in last files. -// ASSERT(pReader->status.pCurrentFileset->nSttF > 0); - // data blocks in current file are exhausted, let's try the next file now SBlockData* pBlockData = &pReader->status.fileBlockData; if (pBlockData->uid != 0) { @@ -4913,13 +4901,10 @@ void tsdbReaderClose2(STsdbReader* pReader) { if (pFilesetIter->pLastBlockReader != NULL) { SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader; tMergeTreeClose(&pLReader->mergeTree); - - getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime); - pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo); taosMemoryFree(pLReader); } - destroySttBlockReader(pReader->status.pLDataIterArray); + destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime); taosMemoryFreeClear(pReader->status.uidList.tableUidList); tsdbDebug( diff --git a/tests/system-test/2-query/smaBasic.py b/tests/system-test/2-query/smaBasic.py index 43c379ee53..c221a70605 100644 --- a/tests/system-test/2-query/smaBasic.py +++ b/tests/system-test/2-query/smaBasic.py @@ -127,7 +127,7 @@ class TDTestCase: self.c2Sum = None # create database db - sql = f"create database db vgroups 5 replica 3" + sql = f"create database db vgroups 5 replica 3 stt_trigger 1" tdLog.info(sql) tdSql.execute(sql) sql = f"use db" -- GitLab