From a05383e29e8b0d1f7eead1aea52ca3d864f56a5b Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 17 Jul 2023 15:07:10 +0800 Subject: [PATCH] tsdb/cache: new load tomb callback from tsdb mergetree --- source/dnode/vnode/src/inc/tsdb.h | 6 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 20 ++++++ source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 1 - source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 80 +++++++++++---------- source/dnode/vnode/src/tsdb/tsdbRead2.c | 1 + source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 1 - 6 files changed, 68 insertions(+), 41 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 99353f5485..ff157afdf1 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -802,6 +802,10 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter); +struct SSttFileReader; +typedef int32_t (*_load_tomb_fn)(STsdbReader *pReader, struct SSttFileReader *pSttFileReader, + SSttBlockLoadInfo *pLoadInfo); + typedef struct { int8_t backward; STsdb *pTsdb; @@ -815,6 +819,7 @@ typedef struct { STSchema *pSchema; int16_t *pCols; int32_t numOfCols; + _load_tomb_fn loadTombFn; void *pReader; void *idstr; } SMergeTreeConf; @@ -851,7 +856,6 @@ typedef struct STsdbReaderInfo { typedef struct SCacheRowsReader { STsdb *pTsdb; STsdbReaderInfo info; - int8_t cacheReader; // always true for cache reader TdThreadMutex readerMutex; SVnode *pVnode; STSchema *pSchema; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index ee117d723f..feaff677da 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1720,6 +1720,24 @@ typedef struct { SMergeTree *pMergeTree; } SFSLastIter; +static int32_t loadSttTombData(STsdbReader *pReader, SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pLoadInfo) { + int32_t code = 0; + /* + if (pLoadInfo->pTombBlockArray == NULL) { + pLoadInfo->pTombBlockArray = taosArrayInit(4, POINTER_BYTES); + } + + const TTombBlkArray *pBlkArray = NULL; + int32_t code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + return doLoadTombDataFromTombBlk(pBlkArray, pReader, pSttFileReader, false); + */ + return code; +} + static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) { int32_t code = 0; @@ -1742,6 +1760,7 @@ static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb .pSttFileBlockIterArray = pr->pLDataIterArray, .pCols = aCols, .numOfCols = nCols, + .loadTombFn = loadSttTombData, .pReader = pr, .idstr = pr->idstr, }; @@ -2742,6 +2761,7 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI for (int i = 0; i < nMax; ++i) { TSDBKEY maxKey1 = TSDBROW_KEY(max[i]); + // TODO: build skyline here bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline); if (!deleted) { iMerge[nMerge] = iMax[i]; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 6b06831a2e..1b6fc2c9e4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -191,7 +191,6 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, taosThreadMutexInit(&p->readerMutex, NULL); p->lastTs = INT64_MIN; - p->cacheReader = 1; *pReader = p; return TSDB_CODE_SUCCESS; diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index d856f2ae9f..c8672e5b9d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -107,8 +107,8 @@ void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double } } -static void freeTombBlock(void* param) { - STombBlock** pTombBlock = (STombBlock**) param; +static void freeTombBlock(void *param) { + STombBlock **pTombBlock = (STombBlock **)param; tTombBlockDestroy(*pTombBlock); taosMemoryFree(*pTombBlock); } @@ -135,22 +135,22 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { return NULL; } -static void destroyLDataIter(SLDataIter* pIter) { +static void destroyLDataIter(SLDataIter *pIter) { tLDataIterClose2(pIter); destroyLastBlockLoadInfo(pIter->pBlockLoadInfo); taosMemoryFree(pIter); } -void* destroySttBlockReader(SArray* pLDataIterArray, int64_t* blocks, double* el) { +void *destroySttBlockReader(SArray *pLDataIterArray, int64_t *blocks, double *el) { if (pLDataIterArray == NULL) { return NULL; } int32_t numOfLevel = taosArrayGetSize(pLDataIterArray); - for(int32_t i = 0; i < numOfLevel; ++i) { - SArray* pList = taosArrayGetP(pLDataIterArray, i); - for(int32_t j = 0; j < taosArrayGetSize(pList); ++j) { - SLDataIter* pIter = taosArrayGetP(pList, j); + for (int32_t i = 0; i < numOfLevel; ++i) { + SArray *pList = taosArrayGetP(pLDataIterArray, i); + for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) { + SLDataIter *pIter = taosArrayGetP(pList, j); *el += pIter->pBlockLoadInfo->elapsedTime; *blocks += pIter->pBlockLoadInfo->loadBlocks; destroyLDataIter(pIter); @@ -193,7 +193,8 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { int64_t st = taosGetTimestampUs(); SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex]; - code = tsdbSttFileReadBlockDataByColumn(pIter->pReader, pIter->pSttBlk, pBlock, pInfo->pSchema, &pInfo->colIds[1], pInfo->numOfCols - 1); + code = tsdbSttFileReadBlockDataByColumn(pIter->pReader, pIter->pSttBlk, pBlock, pInfo->pSchema, &pInfo->colIds[1], + pInfo->numOfCols - 1); if (code != TSDB_CODE_SUCCESS) { goto _exit; } @@ -311,13 +312,13 @@ static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint } int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid, - uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo, - const char *idStr, bool strictTimeRange) { + uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo, + const char *idStr, bool strictTimeRange) { return 0; } -static int32_t loadSttBlockInfo(SLDataIter *pIter, SSttBlockLoadInfo* pBlockLoadInfo, uint64_t suid) { - TSttBlkArray* pArray = pBlockLoadInfo->pBlockArray; +static int32_t loadSttBlockInfo(SLDataIter *pIter, SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid) { + TSttBlkArray *pArray = pBlockLoadInfo->pBlockArray; if (TARRAY2_SIZE(pArray) <= 0) { return TSDB_CODE_SUCCESS; } @@ -327,7 +328,7 @@ static int32_t loadSttBlockInfo(SLDataIter *pIter, SSttBlockLoadInfo* pBlockLoad // all identical if (pStart->suid == pEnd->suid) { - if (pStart->suid != suid) { // no qualified stt block existed + if (pStart->suid != suid) { // no qualified stt block existed taosArrayClear(pBlockLoadInfo->aSttBlk); pIter->iSttBlk = -1; return TSDB_CODE_SUCCESS; @@ -338,7 +339,7 @@ static int32_t loadSttBlockInfo(SLDataIter *pIter, SSttBlockLoadInfo* pBlockLoad } else { SArray *pTmp = taosArrayInit(TARRAY2_SIZE(pArray), sizeof(SSttBlk)); for (int32_t i = 0; i < TARRAY2_SIZE(pArray); ++i) { - SSttBlk* p = &pArray->data[i]; + SSttBlk *p = &pArray->data[i]; if (p->suid < suid) { continue; } @@ -357,19 +358,19 @@ static int32_t loadSttBlockInfo(SLDataIter *pIter, SSttBlockLoadInfo* pBlockLoad return TSDB_CODE_SUCCESS; } -static int32_t loadSttTombBlockData(SSttFileReader* pSttFileReader, uint64_t suid, SSttBlockLoadInfo* pLoadInfo) { +static int32_t loadSttTombBlockData(SSttFileReader *pSttFileReader, uint64_t suid, SSttBlockLoadInfo *pLoadInfo) { if (pLoadInfo->pTombBlockArray == NULL) { pLoadInfo->pTombBlockArray = taosArrayInit(4, POINTER_BYTES); } - const TTombBlkArray* pBlkArray = NULL; - int32_t code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray); + const TTombBlkArray *pBlkArray = NULL; + int32_t code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray); if (code != TSDB_CODE_SUCCESS) { return code; } - for(int32_t j = 0; j < pBlkArray->size; ++j) { - STombBlk* pTombBlk = &pBlkArray->data[j]; + for (int32_t j = 0; j < pBlkArray->size; ++j) { + STombBlk *pTombBlk = &pBlkArray->data[j]; if (pTombBlk->maxTbid.suid < suid) { continue; // todo use binary search instead } @@ -378,13 +379,13 @@ static int32_t loadSttTombBlockData(SSttFileReader* pSttFileReader, uint64_t sui break; } - STombBlock* pTombBlock = taosMemoryCalloc(1, sizeof(STombBlock)); + STombBlock *pTombBlock = taosMemoryCalloc(1, sizeof(STombBlock)); code = tsdbSttFileReadTombBlock(pSttFileReader, pTombBlk, pTombBlock); if (code != TSDB_CODE_SUCCESS) { // todo handle error } - void* p = taosArrayPush(pLoadInfo->pTombBlockArray, &pTombBlock); + void *p = taosArrayPush(pLoadInfo->pTombBlockArray, &pTombBlock); if (p == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -393,9 +394,10 @@ static int32_t loadSttTombBlockData(SSttFileReader* pSttFileReader, uint64_t sui return TSDB_CODE_SUCCESS; } -int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward, uint64_t suid, - uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo, - const char *idStr, bool strictTimeRange, void* pReader1) { +int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward, + uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, + SSttBlockLoadInfo *pBlockLoadInfo, const char *idStr, bool strictTimeRange, + _load_tomb_fn loadTombFn, void *pReader1) { int32_t code = TSDB_CODE_SUCCESS; pIter->uid = uid; @@ -424,7 +426,8 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader return code; } - code = loadSttTombDataForAll(pReader1, pIter->pReader, pBlockLoadInfo); + // code = loadSttTombDataForAll(pReader1, pIter->pReader, pBlockLoadInfo); + code = loadTombFn(pReader1, pIter->pReader, pBlockLoadInfo); double el = (taosGetTimestampUs() - st) / 1000.0; tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr); @@ -507,8 +510,8 @@ void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) { if (index != -1) { pIter->iSttBlk = index; pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk); - tsdbDebug("try next last file block:%d from stt fileIdx:%d, trigger by uid:%" PRIu64 ", file index:%d, %s", pIter->iSttBlk, - oldIndex, pIter->uid, pIter->iStt, idStr); + tsdbDebug("try next last file block:%d from stt fileIdx:%d, trigger by uid:%" PRIu64 ", file index:%d, %s", + pIter->iSttBlk, oldIndex, pIter->uid, pIter->iStt, idStr); } else { tsdbDebug("no more last block qualified, uid:%" PRIu64 ", file index:%d, %s", pIter->uid, oldIndex, idStr); } @@ -679,7 +682,7 @@ static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SR int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, - bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter) { + bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter) { int32_t code = TSDB_CODE_SUCCESS; pMTree->backward = backward; @@ -721,7 +724,7 @@ _end: return code; } -int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf* pConf) { +int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) { int32_t code = TSDB_CODE_SUCCESS; pMTree->pIter = NULL; @@ -743,30 +746,30 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf* pConf) { } while (taosArrayGetSize(pConf->pSttFileBlockIterArray) < size) { - SArray* pList = taosArrayInit(4, POINTER_BYTES); + SArray *pList = taosArrayInit(4, POINTER_BYTES); taosArrayPush(pConf->pSttFileBlockIterArray, &pList); } - for(int32_t j = 0; j < size; ++j) { + for (int32_t j = 0; j < size; ++j) { SSttLvl *pSttLevel = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->data[j]; ASSERT(pSttLevel->level == j); - SArray* pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j); + SArray *pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j); int32_t numOfIter = taosArrayGetSize(pList); if (numOfIter < TARRAY2_SIZE(pSttLevel->fobjArr)) { int32_t inc = TARRAY2_SIZE(pSttLevel->fobjArr) - numOfIter; - for(int32_t k = 0; k < inc; ++k) { + for (int32_t k = 0; k < inc; ++k) { SLDataIter *pIter = taosMemoryCalloc(1, sizeof(SLDataIter)); taosArrayPush(pList, &pIter); } } for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) { // open all last file - SLDataIter* pIter = taosArrayGetP(pList, i); + SLDataIter *pIter = taosArrayGetP(pList, i); - SSttFileReader *pSttFileReader = pIter->pReader; - SSttBlockLoadInfo* pLoadInfo = pIter->pBlockLoadInfo; + SSttFileReader *pSttFileReader = pIter->pReader; + SSttBlockLoadInfo *pLoadInfo = pIter->pBlockLoadInfo; // open stt file reader if not if (pSttFileReader == NULL) { @@ -785,7 +788,8 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf* pConf) { memset(pIter, 0, sizeof(SLDataIter)); code = tLDataIterOpen2(pIter, pSttFileReader, i, pMTree->backward, pConf->suid, pConf->uid, &pConf->timewindow, - &pConf->verRange, pLoadInfo, pMTree->idStr, pConf->strictTimeRange, pConf->pReader); + &pConf->verRange, pLoadInfo, pMTree->idStr, pConf->strictTimeRange, pConf->loadTombFn, + pConf->pReader); if (code != TSDB_CODE_SUCCESS) { goto _end; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index e8abaa4e33..113d939bb8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2108,6 +2108,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan .pSttFileBlockIterArray = pReader->status.pLDataIterArray, .pCols = pReader->suppInfo.colId, .numOfCols = pReader->suppInfo.numOfCols, + .loadTombFn = loadSttTombDataForAll, .pReader = pReader, .idstr = pReader->idStr, }; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 33bb343e01..c343eafb54 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -186,7 +186,6 @@ typedef struct SReaderStatus { struct STsdbReader { STsdb* pTsdb; STsdbReaderInfo info; - int8_t cacheReader; // always false for tsdb reader TdThreadMutex readerMutex; EReaderStatus flag; int32_t code; -- GitLab