From 98349a4265fa59c837b8bf308b2c5e3ab80a2da5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 9 Jul 2023 18:05:24 +0800 Subject: [PATCH] refactor(tsdb): do some internal refactor. --- source/dnode/vnode/src/inc/tsdb.h | 21 +++- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 27 ++--- source/dnode/vnode/src/tsdb/tsdbRead2.c | 24 +++- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 124 ++++++++++++-------- 4 files changed, 120 insertions(+), 76 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 7bd9cc4457..030f7a0950 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -795,10 +795,23 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter); -int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint64_t suid, uint64_t uid, - STimeWindow *pTimeWindow, SVersionRange *pVerRange, const char *idStr, - bool strictTimeRange, SArray *pSttFileBlockIterArray, void *pCurrentFileSet, STSchema* pSchema, - int16_t* pCols, int32_t numOfCols, void* pReader); +typedef struct { + int8_t backward; + STsdb *pTsdb; + uint64_t suid; + uint64_t uid; + STimeWindow timewindow; + SVersionRange verRange; + bool strictTimeRange; + SArray *pSttFileBlockIterArray; + void *pCurrentFileset; + STSchema *pSchema; + int16_t *pCols; + int32_t numOfCols; + void *pReader; + void *idstr; +} SMergeTreeConf; +int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf* pConf); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); bool tMergeTreeNext(SMergeTree *pMTree); diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 6c94cb5cec..d856f2ae9f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -721,15 +721,12 @@ _end: return code; } -int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint64_t suid, uint64_t uid, - STimeWindow *pTimeWindow, SVersionRange *pVerRange, const char *idStr, - bool strictTimeRange, SArray *pSttFileBlockIterArray, void *pCurrentFileSet, STSchema* pSchema, - int16_t* pCols, int32_t numOfCols, void* pReader) { +int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf* pConf) { int32_t code = TSDB_CODE_SUCCESS; - pMTree->backward = backward; pMTree->pIter = NULL; - pMTree->idStr = idStr; + pMTree->backward = pConf->backward; + pMTree->idStr = pConf->idstr; if (!pMTree->backward) { // asc tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn); @@ -740,21 +737,21 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint6 pMTree->ignoreEarlierTs = false; // todo handle other level of stt files, here only deal with the first level stt - int32_t size = ((STFileSet *)pCurrentFileSet)->lvlArr->size; + int32_t size = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->size; if (size == 0) { goto _end; } - while (taosArrayGetSize(pSttFileBlockIterArray) < size) { + while (taosArrayGetSize(pConf->pSttFileBlockIterArray) < size) { SArray* pList = taosArrayInit(4, POINTER_BYTES); - taosArrayPush(pSttFileBlockIterArray, &pList); + taosArrayPush(pConf->pSttFileBlockIterArray, &pList); } for(int32_t j = 0; j < size; ++j) { - SSttLvl *pSttLevel = ((STFileSet *)pCurrentFileSet)->lvlArr->data[j]; + SSttLvl *pSttLevel = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->data[j]; ASSERT(pSttLevel->level == j); - SArray* pList = taosArrayGetP(pSttFileBlockIterArray, j); + SArray* pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j); int32_t numOfIter = taosArrayGetSize(pList); if (numOfIter < TARRAY2_SIZE(pSttLevel->fobjArr)) { @@ -773,7 +770,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint6 // open stt file reader if not if (pSttFileReader == NULL) { - SSttFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.szPage}; + SSttFileReaderConfig conf = {.tsdb = pConf->pTsdb, .szPage = pConf->pTsdb->pVnode->config.szPage}; conf.file[0] = *pSttLevel->fobjArr->data[i]->f; code = tsdbSttFileReaderOpen(pSttLevel->fobjArr->data[i]->fname, &conf, &pSttFileReader); @@ -783,12 +780,12 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint6 } if (pLoadInfo == NULL) { - pLoadInfo = tCreateOneLastBlockLoadInfo(pSchema, pCols, numOfCols); + pLoadInfo = tCreateOneLastBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols); } memset(pIter, 0, sizeof(SLDataIter)); - code = tLDataIterOpen2(pIter, pSttFileReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, - pLoadInfo, pMTree->idStr, strictTimeRange, pReader); + code = tLDataIterOpen2(pIter, pSttFileReader, i, pMTree->backward, pConf->suid, pConf->uid, &pConf->timewindow, + &pConf->verRange, pLoadInfo, pMTree->idStr, pConf->strictTimeRange, pConf->pReader); if (code != TSDB_CODE_SUCCESS) { goto _end; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 3e84864ce5..f753bf34eb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2095,17 +2095,31 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey, pScanInfo->uid, pReader->idStr); - int32_t code = tMergeTreeOpen2(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pTsdb, - pReader->info.suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pReader->idStr, - false, pReader->status.pLDataIterArray, pReader->status.pCurrentFileset, - pReader->info.pSchema, pReader->suppInfo.colId, pReader->suppInfo.numOfCols, pReader); + SMergeTreeConf conf = { + .uid = pScanInfo->uid, + .suid = pReader->info.suid, + .pTsdb = pReader->pTsdb, + .timewindow = w, + .verRange = pLBlockReader->verRange, + .strictTimeRange = false, + .pSchema = pReader->info.pSchema, + .pCurrentFileset = pReader->status.pCurrentFileset, + .backward = (pLBlockReader->order == TSDB_ORDER_DESC), + .pSttFileBlockIterArray = pReader->status.pLDataIterArray, + .pCols = pReader->suppInfo.colId, + .numOfCols = pReader->suppInfo.numOfCols, + .pReader = pReader, + .idstr = pReader->idStr, + }; + + int32_t code = tMergeTreeOpen2(&pLBlockReader->mergeTree, &conf); if (code != TSDB_CODE_SUCCESS) { return false; } initMemDataIterator(pScanInfo, pReader); - initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost); + code = nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->info.verRange); int64_t el = taosGetTimestampUs() - st; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 8a00fc24d8..a2f32c86a3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -452,13 +452,77 @@ bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) { return true; } +typedef enum { + BLK_CHECK_CONTINUE = 0x1, + BLK_CHECK_QUIT = 0x2, +} ETombBlkCheckEnum; + +static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_t numOfTables, int32_t* j, + STableBlockScanInfo** pScanInfo, ETombBlkCheckEnum* pRet) { + int32_t code = 0; + STombRecord record = {0}; + uint64_t uid = pReader->status.uidList.tableUidList[*j]; + + for (int32_t k = 0; k < TARRAY2_SIZE(pBlock->suid); ++k) { + code = tTombBlockGet(pBlock, k, &record); + if (code != TSDB_CODE_SUCCESS) { + *pRet = BLK_CHECK_QUIT; + return code; + } + + if (record.suid < pReader->info.suid) { + continue; + } + + if (record.suid > pReader->info.suid) { + *pRet = BLK_CHECK_QUIT; + return TSDB_CODE_SUCCESS; + } + + bool newTable = false; + if (uid < record.uid) { + while ((*j) < numOfTables && pReader->status.uidList.tableUidList[*j] < record.uid) { + (*j) += 1; + newTable = true; + } + + if ((*j) >= numOfTables) { + *pRet = BLK_CHECK_QUIT; + return TSDB_CODE_SUCCESS; + } + + uid = pReader->status.uidList.tableUidList[*j]; + } + + if (record.uid < uid) { + continue; + } + + ASSERT(record.suid == pReader->info.suid && uid == record.uid); + + if (newTable) { + (*pScanInfo) = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); + if ((*pScanInfo)->pfileDelData == NULL) { + (*pScanInfo)->pfileDelData = taosArrayInit(4, sizeof(SDelData)); + } + } + + if (record.version <= pReader->info.verRange.maxVer) { + SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; + taosArrayPush((*pScanInfo)->pfileDelData, &delData); + } + } + + *pRet = BLK_CHECK_CONTINUE; + return TSDB_CODE_SUCCESS; +} + // load tomb data API static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STsdbReader* pReader, void* pFileReader, bool isFile) { - int32_t code = 0; - + int32_t code = 0; STableUidList* pList = &pReader->status.uidList; - int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); + int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); int32_t i = 0, j = 0; while (i < pTombBlkArray->size && j < numOfTables) { @@ -496,59 +560,15 @@ static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STs pScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData)); } - STombRecord record = {0}; - for (int32_t k = 0; k < TARRAY2_SIZE(block.suid); ++k) { - code = tTombBlockGet(&block, k, &record); - if (code != TSDB_CODE_SUCCESS) { - tTombBlockDestroy(&block); - return code; - } - - if (record.suid < pReader->info.suid) { - continue; - } - - if (record.suid > pReader->info.suid) { - tTombBlockDestroy(&block); - return TSDB_CODE_SUCCESS; - } - - bool newTable = false; - if (uid < record.uid) { - while (j < numOfTables && pReader->status.uidList.tableUidList[j] < record.uid) { - j += 1; - newTable = true; - } + ETombBlkCheckEnum ret = 0; + code = doCheckTombBlock(&block, pReader, numOfTables, &j, &pScanInfo, &ret); - if (j >= numOfTables) { - tTombBlockDestroy(&block); - return TSDB_CODE_SUCCESS; - } - - uid = pReader->status.uidList.tableUidList[j]; - } - - if (record.uid < uid) { - continue; - } - - ASSERT(record.suid == pReader->info.suid && uid == record.uid); - - if (newTable) { - pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); - if (pScanInfo->pfileDelData == NULL) { - pScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData)); - } - } - - if (record.version <= pReader->info.verRange.maxVer) { - SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; - taosArrayPush(pScanInfo->pfileDelData, &delData); - } + tTombBlockDestroy(&block); + if (code != TSDB_CODE_SUCCESS || ret == BLK_CHECK_QUIT) { + return code; } i += 1; - tTombBlockDestroy(&block); } return TSDB_CODE_SUCCESS; -- GitLab