diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index d45a6f19f0159c335a9066ca8082f658cdedf067..8c7f9be062a4c47d1b4352fe12f06262867f8d1d 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -643,20 +643,34 @@ typedef struct { TSDBROW row; } SRowInfo; +typedef struct SSttBlockLoadInfo { + int32_t sttFileIndex; + SBlockData blockData[2]; + SArray *aSttBlk; + int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. + int32_t currentLoadBlockIndex; +} SSttBlockLoadInfo; + typedef struct SMergeTree { int8_t backward; SRBTree rbt; SArray *pIterList; SLDataIter *pIter; + bool destroyLoadInfo; + SSttBlockLoadInfo* pLoadInfo; } SMergeTree; int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, - STimeWindow *pTimeWindow, SVersionRange *pVerRange); + STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pLoadInfo); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); bool tMergeTreeNext(SMergeTree *pMTree); TSDBROW tMergeTreeGetRow(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree); +SSttBlockLoadInfo* tCreateLastBlockLoadInfo(); +void resetLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo); +void* destroyLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo); + // ========== inline functions ========== static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { TSDBKEY *pKey1 = (TSDBKEY *)p1; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 9ac62b4b59c51a397776789d9b80d66d978d45d4..8af764c4bcafee9e8793431c29307d3f5b446fe4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -457,7 +457,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid, &(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX}, - &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}); + &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, NULL); bool hasVal = tMergeTreeNext(&state->mergeTree); if (!hasVal) { state->state = SFSLASTNEXTROW_FILESET; diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index a072f22fa985f6715f14469fd2b88b54d2e8b225..beb0004e24a7367c82deaf1d1239780b73982c14 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -22,26 +22,105 @@ struct SLDataIter { SDataFReader *pReader; int32_t iStt; int8_t backward; - SArray *aSttBlk; int32_t iSttBlk; - SBlockData bData[2]; - int32_t loadIndex; int32_t iRow; SRowInfo rInfo; uint64_t uid; STimeWindow timeWindow; SVersionRange verRange; + + SSttBlockLoadInfo* pBlockLoadInfo; }; -static SBlockData *getCurrentBlock(SLDataIter *pIter) { return &pIter->bData[pIter->loadIndex]; } +SSttBlockLoadInfo* tCreateLastBlockLoadInfo() { + SSttBlockLoadInfo* pLoadInfo = taosMemoryCalloc(TSDB_DEFAULT_STT_FILE, sizeof(SSttBlockLoadInfo)); + if (pLoadInfo == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) { + pLoadInfo[i].blockIndex[0] = -1; + pLoadInfo[i].blockIndex[1] = -1; + pLoadInfo[i].currentLoadBlockIndex = 1; + + int32_t code = tBlockDataCreate(&pLoadInfo[i].blockData[0]); + if (code) { + terrno = code; + } + + code = tBlockDataCreate(&pLoadInfo[i].blockData[1]); + if (code) { + terrno = code; + } + + pLoadInfo[i].aSttBlk = taosArrayInit(4, sizeof(SSttBlk)); + } + + return pLoadInfo; +} + +void resetLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) { + for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) { + pLoadInfo[i].currentLoadBlockIndex = 1; + pLoadInfo[i].blockIndex[0] = -1; + pLoadInfo[i].blockIndex[1] = -1; -static SBlockData *getNextBlock(SLDataIter *pIter) { - pIter->loadIndex ^= 1; - return getCurrentBlock(pIter); + taosArrayClear(pLoadInfo[i].aSttBlk); + } +} + +void* destroyLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) { + for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) { + pLoadInfo[i].currentLoadBlockIndex = 1; + pLoadInfo[i].blockIndex[0] = -1; + pLoadInfo[i].blockIndex[1] = -1; + + tBlockDataDestroy(&pLoadInfo[i].blockData[0], true); + tBlockDataDestroy(&pLoadInfo[i].blockData[1], true); + + taosArrayDestroy(pLoadInfo[i].aSttBlk); + } + + taosMemoryFree(pLoadInfo); + return NULL; +} + +static SBlockData* loadBlockIfMissing(SLDataIter *pIter) { + int32_t code = 0; + + SSttBlockLoadInfo* pInfo = pIter->pBlockLoadInfo; + if (pInfo->blockIndex[0] == pIter->iStt) { + return &pInfo->blockData[0]; + } + + if (pInfo->blockIndex[1] == pIter->iStt) { + return &pInfo->blockData[1]; + } + + pInfo->currentLoadBlockIndex ^= 1; + if (pIter->pSttBlk != NULL) { // current block not loaded yet + code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, &pInfo->blockData[pInfo->currentLoadBlockIndex]); + if (code != TSDB_CODE_SUCCESS) { + goto _exit; + } + + pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iStt; + pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1; + } + + return &pInfo->blockData[pInfo->currentLoadBlockIndex]; + + _exit: + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + } + + return NULL; } int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid, - uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange) { + uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo* pBlockLoadInfo) { int32_t code = 0; *pIter = taosMemoryCalloc(1, sizeof(SLDataIter)); if (*pIter == NULL) { @@ -55,34 +134,35 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t (*pIter)->backward = backward; (*pIter)->verRange = *pRange; (*pIter)->timeWindow = *pTimeWindow; - (*pIter)->aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); - if ((*pIter)->aSttBlk == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - code = tBlockDataCreate(&(*pIter)->bData[0]); - if (code) { - goto _exit; - } + (*pIter)->pBlockLoadInfo = pBlockLoadInfo; - code = tBlockDataCreate(&(*pIter)->bData[1]); - if (code) { - goto _exit; + if (pBlockLoadInfo->aSttBlk == NULL) { + // loaded into the common shared objects + pBlockLoadInfo->aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); + if (pBlockLoadInfo->aSttBlk == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + code = tsdbReadSttBlk(pReader, iStt, pBlockLoadInfo->aSttBlk); + if (code) { + goto _exit; + } } - code = tsdbReadSttBlk(pReader, iStt, (*pIter)->aSttBlk); + code = tsdbReadSttBlk(pReader, iStt, pBlockLoadInfo->aSttBlk); if (code) { goto _exit; } - size_t size = taosArrayGetSize((*pIter)->aSttBlk); + size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk); // find the start block int32_t index = -1; if (!backward) { // asc for (int32_t i = 0; i < size; ++i) { - SSttBlk *p = taosArrayGet((*pIter)->aSttBlk, i); + SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i); if (p->suid != suid) { continue; } @@ -94,7 +174,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t } } else { // desc for (int32_t i = size - 1; i >= 0; --i) { - SSttBlk *p = taosArrayGet((*pIter)->aSttBlk, i); + SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i); if (p->suid != suid) { continue; } @@ -108,7 +188,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t (*pIter)->iSttBlk = index; if (index != -1) { - (*pIter)->pSttBlk = taosArrayGet((*pIter)->aSttBlk, (*pIter)->iSttBlk); + (*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk); } _exit: @@ -116,9 +196,6 @@ _exit: } void tLDataIterClose(SLDataIter *pIter) { - tBlockDataDestroy(&pIter->bData[0], 1); - tBlockDataDestroy(&pIter->bData[1], 1); - taosArrayDestroy(pIter->aSttBlk); taosMemoryFree(pIter); } @@ -127,9 +204,9 @@ void tLDataIterNextBlock(SLDataIter *pIter) { pIter->iSttBlk += step; int32_t index = -1; - size_t size = taosArrayGetSize(pIter->aSttBlk); + size_t size = taosArrayGetSize(pIter->pBlockLoadInfo->aSttBlk); for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) { - SSttBlk *p = taosArrayGet(pIter->aSttBlk, i); + SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i); if ((!pIter->backward) && p->minUid > pIter->uid) { break; } @@ -169,7 +246,7 @@ void tLDataIterNextBlock(SLDataIter *pIter) { if (index == -1) { pIter->pSttBlk = NULL; } else { - pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk); + pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk); } } @@ -178,7 +255,8 @@ static void findNextValidRow(SLDataIter *pIter) { bool hasVal = false; int32_t i = pIter->iRow; - SBlockData *pBlockData = getCurrentBlock(pIter); + + SBlockData *pBlockData = loadBlockIfMissing(pIter); for (; i < pBlockData->nRow && i >= 0; i += step) { if (pBlockData->aUid != NULL) { @@ -238,18 +316,8 @@ bool tLDataIterNextRow(SLDataIter *pIter) { return false; } - int32_t iBlockL = pIter->iSttBlk; - SBlockData *pBlockData = getCurrentBlock(pIter); - - if (pBlockData->nRow == 0 && pIter->pSttBlk != NULL) { // current block not loaded yet - pBlockData = getNextBlock(pIter); - code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlockData); - if (code != TSDB_CODE_SUCCESS) { - goto _exit; - } - - pIter->iRow = (pIter->backward) ? pBlockData->nRow : -1; - } + int32_t iBlockL = pIter->iSttBlk; + SBlockData *pBlockData = loadBlockIfMissing(pIter); pIter->iRow += step; @@ -266,12 +334,7 @@ bool tLDataIterNextRow(SLDataIter *pIter) { } if (iBlockL != pIter->iSttBlk) { - pBlockData = getNextBlock(pIter); - code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlockData); - if (code) { - goto _exit; - } - pIter->iRow = pIter->backward ? (pBlockData->nRow - 1) : 0; + pBlockData = loadBlockIfMissing(pIter); } } @@ -313,7 +376,7 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) { } int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, - STimeWindow *pTimeWindow, SVersionRange *pVerRange) { + STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pBlockLoadInfo) { pMTree->backward = backward; pMTree->pIter = NULL; pMTree->pIterList = taosArrayInit(4, POINTER_BYTES); @@ -322,21 +385,33 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead } tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn); - int32_t code = TSDB_CODE_OUT_OF_MEMORY; + int32_t code = TSDB_CODE_SUCCESS; + + SSttBlockLoadInfo* pLoadInfo = NULL; + if (pBlockLoadInfo == NULL) { + if (pMTree->pLoadInfo == NULL) { + pMTree->destroyLoadInfo = true; + pMTree->pLoadInfo = tCreateLastBlockLoadInfo(); + } + + pLoadInfo = pMTree->pLoadInfo; + } else { + pLoadInfo = pBlockLoadInfo; + } - struct SLDataIter *pIterList[TSDB_DEFAULT_STT_FILE] = {0}; for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file - code = tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange); + struct SLDataIter* pIter = NULL; + code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, &pLoadInfo[i]); if (code != TSDB_CODE_SUCCESS) { goto _end; } - bool hasVal = tLDataIterNextRow(pIterList[i]); + bool hasVal = tLDataIterNextRow(pIter); if (hasVal) { - taosArrayPush(pMTree->pIterList, &pIterList[i]); - tMergeTreeAddIter(pMTree, pIterList[i]); + taosArrayPush(pMTree->pIterList, &pIter); + tMergeTreeAddIter(pMTree, pIter); } else { - tLDataIterClose(pIterList[i]); + tLDataIterClose(pIter); } } @@ -393,4 +468,9 @@ void tMergeTreeClose(SMergeTree *pMTree) { pMTree->pIterList = taosArrayDestroy(pMTree->pIterList); pMTree->pIter = NULL; + + if (pMTree->destroyLoadInfo) { + pMTree->pLoadInfo = destroyLastBlockLoadInfo(pMTree->pLoadInfo); + pMTree->destroyLoadInfo = false; + } } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 8a51fc73a6380975dccc212ba1bf3c7bdc782a81..e404dd3d3a001c7da93049f96f800e59655c1430 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -17,8 +17,6 @@ #include "tsdb.h" #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) -#define ALL_ROWS_CHECKED_INDEX (INT16_MIN) -#define INITIAL_ROW_INDEX_VAL (-1) typedef enum { EXTERNAL_ROWS_PREV = 0x1, @@ -88,6 +86,7 @@ typedef struct SLastBlockReader { int32_t order; uint64_t uid; SMergeTree mergeTree; + SSttBlockLoadInfo* pInfo; } SLastBlockReader; typedef struct SFilesetIter { @@ -226,13 +225,12 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK return NULL; } - int32_t step = ASCENDING_TRAVERSE(pTsdbReader->order)? 1:-1; for (int32_t j = 0; j < numOfTables; ++j) { STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid}; if (ASCENDING_TRAVERSE(pTsdbReader->order)) { - info.lastKey = pTsdbReader->window.skey - step; + info.lastKey = pTsdbReader->window.skey - 1; } else { - info.lastKey = pTsdbReader->window.ekey - step; + info.lastKey = pTsdbReader->window.ekey + 1; } taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); @@ -319,8 +317,7 @@ static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* cap } // init file iterator -static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, - STsdbReader* pReader /*int32_t order, const char* idstr*/) { +static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) { size_t numOfFileset = taosArrayGetSize(aDFileSet); pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset; @@ -345,6 +342,12 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, pLReader->uid = 0; tMergeTreeClose(&pLReader->mergeTree); + pLReader->pInfo = tCreateLastBlockLoadInfo(); + if (pLReader->pInfo == NULL) { + tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr); + return terrno; + } + tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr); return TSDB_CODE_SUCCESS; } @@ -360,6 +363,7 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) { pIter->pLastBlockReader->uid = 0; tMergeTreeClose(&pIter->pLastBlockReader->mergeTree); + resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo); // check file the time range of coverage STimeWindow win = {0}; @@ -1377,7 +1381,6 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, bool mergeBlockData) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - // SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); STSRow* pTSRow = NULL; @@ -1866,36 +1869,35 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc } } -static bool initLastBlockReader(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo, - STsdbReader* pReader) { +static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { // the last block reader has been initialized for this table. - if (pLastBlockReader->uid == pBlockScanInfo->uid) { + if (pLBlockReader->uid == pScanInfo->uid) { return true; } - if (pLastBlockReader->uid != 0) { - tMergeTreeClose(&pLastBlockReader->mergeTree); + if (pLBlockReader->uid != 0) { + tMergeTreeClose(&pLBlockReader->mergeTree); } - initMemDataIterator(pBlockScanInfo, pReader); - pLastBlockReader->uid = pBlockScanInfo->uid; + initMemDataIterator(pScanInfo, pReader); + pLBlockReader->uid = pScanInfo->uid; - int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order)? 1:-1; - STimeWindow w = pLastBlockReader->window; - if (ASCENDING_TRAVERSE(pLastBlockReader->order)) { - w.skey = pBlockScanInfo->lastKey + step; + int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order)? 1:-1; + STimeWindow w = pLBlockReader->window; + if (ASCENDING_TRAVERSE(pLBlockReader->order)) { + w.skey = pScanInfo->lastKey + step; } else { - w.ekey = pBlockScanInfo->lastKey + step; + w.ekey = pScanInfo->lastKey + step; } int32_t code = - tMergeTreeOpen(&pLastBlockReader->mergeTree, (pLastBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader, - pReader->suid, pBlockScanInfo->uid, &w, &pLastBlockReader->verRange); + tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader, + pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo); if (code != TSDB_CODE_SUCCESS) { return false; } - return nextRowFromLastBlocks(pLastBlockReader, pBlockScanInfo); + return nextRowFromLastBlocks(pLBlockReader, pScanInfo); } static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) { @@ -3305,6 +3307,7 @@ void tsdbReaderClose(STsdbReader* pReader) { SFilesetIter* pFilesetIter = &pReader->status.fileIter; if (pFilesetIter->pLastBlockReader != NULL) { tMergeTreeClose(&pFilesetIter->pLastBlockReader->mergeTree); + pFilesetIter->pLastBlockReader->pInfo = destroyLastBlockLoadInfo(pFilesetIter->pLastBlockReader->pInfo); taosMemoryFree(pFilesetIter->pLastBlockReader); }