From cf02ac7229c6f9bc7be6b9c6746009cf2a65277f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 21 Jun 2023 18:57:14 +0800 Subject: [PATCH] fix(tsdb): extract tomb data in stt files. --- source/dnode/vnode/src/inc/tsdb.h | 1 + source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 123 +++++++++++++------- source/dnode/vnode/src/tsdb/tsdbRead2.c | 62 +++++++++- tests/system-test/1-insert/delete_stable.py | 2 +- 4 files changed, 140 insertions(+), 48 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index d33b8b00bf..ee7ebfcd4c 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -706,6 +706,7 @@ typedef struct SSttBlockLoadInfo { void *pBlockArray; SArray *aSttBlk; + SArray *pTombBlockArray; // tomb block array list int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. int32_t currentLoadBlockIndex; int32_t loadBlocks; diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 87e0df63b9..918f2763b1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -269,6 +269,83 @@ int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t return 0; } +static int32_t loadSttBlockInfo(SLDataIter *pIter, SSttBlockLoadInfo* pBlockLoadInfo, uint64_t suid) { + TSttBlkArray* pArray = pBlockLoadInfo->pBlockArray; + if (TARRAY2_SIZE(pArray) <= 0) { + return TSDB_CODE_SUCCESS; + } + + SSttBlk *pStart = &pArray->data[0]; + SSttBlk *pEnd = &pArray->data[TARRAY2_SIZE(pArray) - 1]; + + // all identical + if (pStart->suid == pEnd->suid) { + if (pStart->suid != suid) { // no qualified stt block existed + taosArrayClear(pBlockLoadInfo->aSttBlk); + pIter->iSttBlk = -1; + return TSDB_CODE_SUCCESS; + } else { // all blocks are qualified + taosArrayClear(pBlockLoadInfo->aSttBlk); + taosArrayAddBatch(pBlockLoadInfo->aSttBlk, pArray->data, pArray->size); + } + } else { + SArray *pTmp = taosArrayInit(TARRAY2_SIZE(pArray), sizeof(SSttBlk)); + for (int32_t i = 0; i < TARRAY2_SIZE(pArray); ++i) { + SSttBlk* p = &pArray->data[i]; + if (p->suid < suid) { + continue; + } + + if (p->suid == suid) { + taosArrayPush(pTmp, p); + } else if (p->suid > suid) { + break; + } + } + + taosArrayDestroy(pBlockLoadInfo->aSttBlk); + pBlockLoadInfo->aSttBlk = pTmp; + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t loadSttTombBlockData(SSttFileReader* pSttFileReader, uint64_t suid, SSttBlockLoadInfo* pLoadInfo) { + if (pLoadInfo->pTombBlockArray == NULL) { + pLoadInfo->pTombBlockArray = taosArrayInit(4, POINTER_BYTES); + } + + const TTombBlkArray* pBlkArray = NULL; + int32_t code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + for(int32_t j = 0; j < pBlkArray->size; ++j) { + STombBlk* pTombBlk = &pBlkArray->data[j]; + if (pTombBlk->maxTbid.suid < suid) { + continue; // todo use binary search instead + } + + if (pTombBlk->minTbid.suid > suid) { + break; + } + + STombBlock* pTombBlock = taosMemoryCalloc(1, sizeof(STombBlock)); + code = tsdbSttFileReadTombBlock(pSttFileReader, pTombBlk, pTombBlock); + if (code != TSDB_CODE_SUCCESS) { + // todo handle error + } + + void* p = taosArrayPush(pLoadInfo->pTombBlockArray, &pTombBlock); + if (p == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + return TSDB_CODE_SUCCESS; +} + int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pReader, int32_t iStt, int8_t backward, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo, const char *idStr, bool strictTimeRange) { @@ -290,51 +367,15 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pReader, int32 pBlockLoadInfo->sttBlockLoaded = true; code = tsdbSttFileReadSttBlk(pIter->pReader, (const TSttBlkArray **)&pBlockLoadInfo->pBlockArray); - if (code) { + if (code != TSDB_CODE_SUCCESS) { return code; } - // only apply to the child tables, ordinary tables will not incur this filter procedure. - TSttBlkArray* pArray = pBlockLoadInfo->pBlockArray; - size_t size = pArray->size; + code = loadSttBlockInfo(pIter, pBlockLoadInfo, suid); - if (size >= 1) { - SSttBlk *pStart = &pArray->data[0]; - SSttBlk *pEnd = &pArray->data[size - 1]; - - // all identical - if (pStart->suid == pEnd->suid) { - if (pStart->suid != suid) { - // no qualified stt block existed - taosArrayClear(pBlockLoadInfo->aSttBlk); - - pIter->iSttBlk = -1; - double el = (taosGetTimestampUs() - st) / 1000.0; - tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr); - return code; - } else { // all blocks are qualified - taosArrayClear(pBlockLoadInfo->aSttBlk); - taosArrayAddBatch(pBlockLoadInfo->aSttBlk, pArray->data, pArray->size); - } - } else { - SArray *pTmp = taosArrayInit(size, sizeof(SSttBlk)); - for (int32_t i = 0; i < size; ++i) { - SSttBlk* p = &pArray->data[i]; - uint64_t s = p->suid; - if (s < suid) { - continue; - } - - if (s == suid) { - taosArrayPush(pTmp, p); - } else if (s > suid) { - break; - } - } - - taosArrayDestroy(pBlockLoadInfo->aSttBlk); - pBlockLoadInfo->aSttBlk = pTmp; - } + code = loadSttTombBlockData(pReader, suid, pBlockLoadInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; } double el = (taosGetTimestampUs() - st) / 1000.0; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index f90969bcaa..71922e82cc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2766,6 +2766,51 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum return true; } +static int32_t loadTomRecordInfoFromSttFiles(SSttBlockLoadInfo* pBlockLoadInfo, uint64_t suid, + STableBlockScanInfo* pBlockScanInfo, uint64_t maxVer) { + int32_t size = taosArrayGetSize(pBlockLoadInfo->pTombBlockArray); + if (size <= 0) { + return TSDB_CODE_SUCCESS; + } + + uint64_t uid = pBlockScanInfo->uid; + if (pBlockScanInfo->pDelData == NULL) { + pBlockScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData)); + } + + for(int32_t i = 0; i < size; ++i) { + STombBlock* pBlock = taosArrayGetP(pBlockLoadInfo->pTombBlockArray, i); + + STombRecord record = {0}; + for(int32_t j = 0; j < pBlock->suid->size; ++j) { + int32_t code = tTombBlockGet(pBlock, j, &record); + if (code != TSDB_CODE_SUCCESS) { + // todo handle error + } + + if (record.suid < suid) { + continue; + } + + // todo use binary search instead here + if (record.uid < uid) { + continue; + } + + if (record.uid > uid) { + break; + } + + if (record.version <= maxVer) { + SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; + taosArrayPush(pBlockScanInfo->pDelData, &delData); + } + } + } + + return TSDB_CODE_SUCCESS; +} + static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { // the last block reader has been initialized for this table. if (pLBlockReader->uid == pScanInfo->uid) { @@ -2776,7 +2821,6 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan tMergeTreeClose(&pLBlockReader->mergeTree); } - initMemDataIterator(pScanInfo, pReader); pLBlockReader->uid = pScanInfo->uid; STimeWindow w = pLBlockReader->window; @@ -2788,14 +2832,20 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey, pScanInfo->uid, pReader->idStr); - int32_t code = tMergeTreeOpen2(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), - pReader->pTsdb, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, - pLBlockReader->pInfo, false, pReader->idStr, false, pReader->status.pLDataIter, - pReader->status.pCurrentFileset); + int32_t code = + tMergeTreeOpen2(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pTsdb, + pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo, false, + pReader->idStr, false, pReader->status.pLDataIter, pReader->status.pCurrentFileset); + if (code != TSDB_CODE_SUCCESS) { + return false; + } + + code = loadTomRecordInfoFromSttFiles(pLBlockReader->pInfo, pReader->suid, pScanInfo, pReader->verRange.maxVer); if (code != TSDB_CODE_SUCCESS) { return false; } + initMemDataIterator(pScanInfo, pReader); return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange); } @@ -3212,8 +3262,8 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr STFileObj* pTombFileObj = pReader->status.pCurrentFileset->farr[3]; if (pTombFileObj!= NULL) { const TTombBlkArray* pBlkArray = NULL; - int32_t code = tsdbDataFileReadTombBlk(pReader->pFileReader, &pBlkArray); + int32_t i = 0, j = 0; // todo find the correct start position. diff --git a/tests/system-test/1-insert/delete_stable.py b/tests/system-test/1-insert/delete_stable.py index 8ebe7b6692..fb0c0c6ced 100644 --- a/tests/system-test/1-insert/delete_stable.py +++ b/tests/system-test/1-insert/delete_stable.py @@ -27,7 +27,7 @@ class TDTestCase: def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug("start to execute %s" % __file__) - tdSql.init(conn.cursor()) + tdSql.init(conn.cursor(), True) self.dbname = 'db_test' self.ns_dbname = 'ns_test' self.us_dbname = 'us_test' -- GitLab