From 94f4d746b489c65a3dd0f7b88b4a826a29fc9068 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 1 Sep 2022 15:33:01 +0800 Subject: [PATCH] tsdbCache: support multi-last data reading --- source/dnode/vnode/src/tsdb/tsdbCache.c | 121 ++++-------------------- 1 file changed, 18 insertions(+), 103 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 059a3a2d83..158cf2a0c3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -418,31 +418,16 @@ typedef enum { } SFSLASTNEXTROWSTATES; typedef struct { - SFSLASTNEXTROWSTATES state; // [input] - STsdb *pTsdb; // [input] - SBlockIdx *pBlockIdxExp; // [input] - STSchema *pTSchema; // [input] - tb_uid_t suid; + SFSLASTNEXTROWSTATES state; // [input] + STsdb *pTsdb; // [input] tb_uid_t uid; int32_t nFileSet; int32_t iFileSet; SArray *aDFileSet; SDataFReader *pDataFReader; - SArray *aBlockL; - SBlockL *pBlockL; - SBlockData *pBlockDataL; - SBlockData blockDataL; - int32_t nRow; - int32_t iRow; TSDBROW row; - /* - SArray *aBlockIdx; - SBlockIdx *pBlockIdx; - SMapData blockMap; - int32_t nBlock; - int32_t iBlock; - SBlock block; - */ + + SMergeTree mergeTree; } SFSLastNextRowIter; static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { @@ -451,22 +436,16 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { switch (state->state) { case SFSLASTNEXTROW_FS: - // state->aDFileSet = state->pTsdb->pFS->cState->aDFileSet; state->nFileSet = taosArrayGetSize(state->aDFileSet); state->iFileSet = state->nFileSet; - state->pBlockDataL = NULL; - case SFSLASTNEXTROW_FILESET: { SDFileSet *pFileSet = NULL; _next_fileset: if (--state->iFileSet >= 0) { pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet); } else { - if (state->pBlockDataL) { - tBlockDataDestroy(state->pBlockDataL, 1); - state->pBlockDataL = NULL; - } + // tMergeTreeClose(&state->mergeTree); *ppRow = NULL; return code; @@ -475,68 +454,24 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet); if (code) goto _err; - if (!state->aBlockL) { - state->aBlockL = taosArrayInit(0, sizeof(SBlockL)); - } else { - taosArrayClear(state->aBlockL); - } - - code = tsdbReadBlockL(state->pDataFReader, 0, state->aBlockL); - if (code) goto _err; - - // SBlockL *pBlockL = (SBlockL *)taosArrayGet(state->aBlockL, state->iBlockL); - - state->pBlockL = taosArraySearch(state->aBlockL, state->pBlockIdxExp, tCmprBlockL, TD_EQ); - if (!state->pBlockL) { + tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->uid, + &(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX}, + &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}); + bool hasVal = tMergeTreeNext(&state->mergeTree); + if (!hasVal) { + state->state = SFSLASTNEXTROW_FILESET; + // tMergeTreeClose(&state->mergeTree); goto _next_fileset; } - - int64_t suid = state->pBlockL->suid; - int64_t uid = state->pBlockL->maxUid; - - if (!state->pBlockDataL) { - state->pBlockDataL = &state->blockDataL; - - tBlockDataCreate(state->pBlockDataL); - } - code = tBlockDataInit(state->pBlockDataL, suid, suid ? 0 : uid, state->pTSchema); - if (code) goto _err; - } - case SFSLASTNEXTROW_BLOCKDATA: - code = tsdbReadLastBlock(state->pDataFReader, 0, state->pBlockL, state->pBlockDataL); - if (code) goto _err; - - state->nRow = state->blockDataL.nRow; - state->iRow = state->nRow - 1; - - if (!state->pBlockDataL->uid) { - while (state->pBlockIdxExp->uid != state->pBlockDataL->aUid[state->iRow]) { - --state->iRow; - } - } - state->state = SFSLASTNEXTROW_BLOCKROW; + } case SFSLASTNEXTROW_BLOCKROW: - if (state->pBlockDataL->uid) { - if (state->iRow >= 0) { - state->row = tsdbRowFromBlockData(state->pBlockDataL, state->iRow); - *ppRow = &state->row; - - if (--state->iRow < 0) { - state->state = SFSLASTNEXTROW_FILESET; - } - } - } else { - if (state->iRow >= 0 && state->pBlockIdxExp->uid == state->pBlockDataL->aUid[state->iRow]) { - state->row = tsdbRowFromBlockData(state->pBlockDataL, state->iRow); - *ppRow = &state->row; - - if (--state->iRow < 0 || state->pBlockIdxExp->uid != state->pBlockDataL->aUid[state->iRow]) { - state->state = SFSLASTNEXTROW_FILESET; - } - } + state->row = tMergeTreeGetRow(&state->mergeTree); + *ppRow = &state->row; + bool hasVal = tMergeTreeNext(&state->mergeTree); + if (!hasVal) { + state->state = SFSLASTNEXTROW_FILESET; } - return code; default: ASSERT(0); @@ -548,15 +483,6 @@ _err: tsdbDataFReaderClose(&state->pDataFReader); state->pDataFReader = NULL; } - if (state->aBlockL) { - taosArrayDestroy(state->aBlockL); - state->aBlockL = NULL; - } - if (state->pBlockDataL) { - tBlockDataDestroy(state->pBlockDataL, 1); - state->pBlockDataL = NULL; - } - *ppRow = NULL; return code; @@ -574,14 +500,6 @@ int32_t clearNextRowFromFSLast(void *iter) { tsdbDataFReaderClose(&state->pDataFReader); state->pDataFReader = NULL; } - if (state->aBlockL) { - taosArrayDestroy(state->aBlockL); - state->aBlockL = NULL; - } - if (state->pBlockDataL) { - tBlockDataDestroy(state->pBlockDataL, 1); - state->pBlockDataL = NULL; - } return code; } @@ -972,9 +890,6 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS; pIter->fsLastState.pTsdb = pTsdb; pIter->fsLastState.aDFileSet = pIter->pReadSnap->fs.aDFileSet; - pIter->fsLastState.pBlockIdxExp = &pIter->idx; - pIter->fsLastState.pTSchema = pTSchema; - pIter->fsLastState.suid = suid; pIter->fsLastState.uid = uid; pIter->fsState.state = SFSNEXTROW_FS; -- GitLab