From 33eaeb743cd57a5ebfdb55190754f35bb03d72a6 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 14 Jun 2023 16:15:40 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/tsdbCommit2.c | 4 +- source/dnode/vnode/src/tsdb/tsdbIter.c | 111 +++++++++++++++++----- source/dnode/vnode/src/tsdb/tsdbIter.h | 2 + 3 files changed, 89 insertions(+), 28 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index 9cd18078d4..c824076600 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -190,7 +190,7 @@ static int32_t tsdbCommitTSDataOpenIterMerger(SCommitter2 *committer) { ASSERT(committer->iterMerger == NULL); STsdbIter *iter; - STsdbIterConfig config[1]; + STsdbIterConfig config[1] = {0}; // memtable iter config->type = TSDB_ITER_TYPE_MEMT; @@ -479,7 +479,7 @@ static int32_t tsdbCommitTombDataOpenIter(SCommitter2 *committer) { int32_t lino = 0; STsdbIter *iter; - STsdbIterConfig config[1]; + STsdbIterConfig config[1] = {0}; if (committer->sttReader) { const TSttSegReaderArray *readerArray; diff --git a/source/dnode/vnode/src/tsdb/tsdbIter.c b/source/dnode/vnode/src/tsdb/tsdbIter.c index f4296fb34f..d28a217c3e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbIter.c +++ b/source/dnode/vnode/src/tsdb/tsdbIter.c @@ -18,9 +18,9 @@ // STsdbIter ================ struct STsdbIter { EIterType type; - struct { - bool noMoreData; - } ctx[1]; + bool noMoreData; + bool filterByVersion; + int64_t range[2]; union { SRowInfo row[1]; STombRecord record[1]; @@ -74,8 +74,14 @@ struct STsdbIter { }; static int32_t tsdbSttIterNext(STsdbIter *iter, const TABLEID *tbid) { - while (!iter->ctx->noMoreData) { + while (!iter->noMoreData) { for (; iter->sttData->blockDataIdx < iter->sttData->blockData->nRow; iter->sttData->blockDataIdx++) { + int64_t version = iter->sttData->blockData->aVersion[iter->sttData->blockDataIdx]; + + if (iter->filterByVersion && (version < iter->range[0] || version > iter->range[1])) { + continue; + } + iter->row->suid = iter->sttData->blockData->suid; iter->row->uid = iter->sttData->blockData->uid ? iter->sttData->blockData->uid : iter->sttData->blockData->aUid[iter->sttData->blockDataIdx]; @@ -90,13 +96,17 @@ static int32_t tsdbSttIterNext(STsdbIter *iter, const TABLEID *tbid) { } if (iter->sttData->sttBlkArrayIdx >= TARRAY2_SIZE(iter->sttData->sttBlkArray)) { - iter->ctx->noMoreData = true; + iter->noMoreData = true; break; } for (; iter->sttData->sttBlkArrayIdx < TARRAY2_SIZE(iter->sttData->sttBlkArray); iter->sttData->sttBlkArrayIdx++) { const SSttBlk *sttBlk = TARRAY2_GET_PTR(iter->sttData->sttBlkArray, iter->sttData->sttBlkArrayIdx); + if (iter->filterByVersion && (sttBlk->maxVer < iter->range[0] || sttBlk->minVer > iter->range[1])) { + continue; + } + if (tbid && tbid->suid == sttBlk->suid && tbid->uid == sttBlk->minUid && tbid->uid == sttBlk->maxUid) { continue; } @@ -117,10 +127,15 @@ _exit: static int32_t tsdbDataIterNext(STsdbIter *iter, const TABLEID *tbid) { int32_t code; - while (!iter->ctx->noMoreData) { + while (!iter->noMoreData) { for (;;) { // SBlockData for (; iter->dataData->blockDataIdx < iter->dataData->blockData->nRow; iter->dataData->blockDataIdx++) { + int64_t version = iter->dataData->blockData->aVersion[iter->dataData->blockDataIdx]; + if (iter->filterByVersion && (version < iter->range[0] || version > iter->range[1])) { + continue; + } + if (tbid && tbid->suid == iter->dataData->blockData->suid && tbid->uid == iter->dataData->blockData->uid) { iter->dataData->blockDataIdx = iter->dataData->blockData->nRow; break; @@ -141,6 +156,10 @@ static int32_t tsdbDataIterNext(STsdbIter *iter, const TABLEID *tbid) { SBrinRecord record[1]; tBrinBlockGet(iter->dataData->brinBlock, iter->dataData->brinBlockIdx, record); + if (iter->filterByVersion && (record->maxVer < iter->range[0] || record->minVer > iter->range[1])) { + continue; + } + if (tbid && tbid->suid == record->suid && tbid->uid == record->uid) { continue; } @@ -158,7 +177,7 @@ static int32_t tsdbDataIterNext(STsdbIter *iter, const TABLEID *tbid) { } if (iter->dataData->brinBlkArrayIdx >= TARRAY2_SIZE(iter->dataData->brinBlkArray)) { - iter->ctx->noMoreData = true; + iter->noMoreData = true; break; } @@ -166,6 +185,10 @@ static int32_t tsdbDataIterNext(STsdbIter *iter, const TABLEID *tbid) { iter->dataData->brinBlkArrayIdx++) { const SBrinBlk *brinBlk = TARRAY2_GET_PTR(iter->dataData->brinBlkArray, iter->dataData->brinBlkArrayIdx); + if (iter->filterByVersion && (brinBlk->maxVer < iter->range[0] || brinBlk->minVer > iter->range[1])) { + continue; + } + if (tbid && tbid->uid == brinBlk->minTbid.uid && tbid->uid == brinBlk->maxTbid.uid) { continue; } @@ -186,12 +209,20 @@ _exit: static int32_t tsdbMemTableIterNext(STsdbIter *iter, const TABLEID *tbid) { SRBTreeNode *node; - while (!iter->ctx->noMoreData) { + while (!iter->noMoreData) { for (TSDBROW *row; iter->memtData->tbData && (row = tsdbTbDataIterGet(iter->memtData->tbIter));) { if (tbid && tbid->suid == iter->memtData->tbData->suid && tbid->uid == iter->memtData->tbData->uid) { iter->memtData->tbData = NULL; break; } + + if (iter->filterByVersion) { + int64_t version = TSDBROW_VERSION(row); + if (version < iter->range[0] || version > iter->range[1]) { + continue; + } + } + iter->row->row = row[0]; tsdbTbDataIterNext(iter->memtData->tbIter); @@ -201,7 +232,7 @@ static int32_t tsdbMemTableIterNext(STsdbIter *iter, const TABLEID *tbid) { for (;;) { node = tRBTreeIterNext(iter->memtData->iter); if (!node) { - iter->ctx->noMoreData = true; + iter->noMoreData = true; goto _exit; } @@ -222,16 +253,20 @@ _exit: } static int32_t tsdbDataTombIterNext(STsdbIter *iter, const TABLEID *tbid) { - while (!iter->ctx->noMoreData) { + while (!iter->noMoreData) { for (; iter->dataTomb->tombBlockIdx < TOMB_BLOCK_SIZE(iter->dataTomb->tombBlock); iter->dataTomb->tombBlockIdx++) { iter->record->suid = TARRAY2_GET(iter->dataTomb->tombBlock->suid, iter->dataTomb->tombBlockIdx); iter->record->uid = TARRAY2_GET(iter->dataTomb->tombBlock->uid, iter->dataTomb->tombBlockIdx); + iter->record->version = TARRAY2_GET(iter->dataTomb->tombBlock->version, iter->dataTomb->tombBlockIdx); + + if (iter->filterByVersion && (iter->record->version < iter->range[0] || iter->record->version > iter->range[1])) { + continue; + } if (tbid && iter->record->suid == tbid->suid && iter->record->uid == tbid->uid) { continue; } - iter->record->version = TARRAY2_GET(iter->dataTomb->tombBlock->version, iter->dataTomb->tombBlockIdx); iter->record->skey = TARRAY2_GET(iter->dataTomb->tombBlock->skey, iter->dataTomb->tombBlockIdx); iter->record->ekey = TARRAY2_GET(iter->dataTomb->tombBlock->ekey, iter->dataTomb->tombBlockIdx); iter->dataTomb->tombBlockIdx++; @@ -239,7 +274,7 @@ static int32_t tsdbDataTombIterNext(STsdbIter *iter, const TABLEID *tbid) { } if (iter->dataTomb->tombBlkArrayIdx >= TARRAY2_SIZE(iter->dataTomb->tombBlkArray)) { - iter->ctx->noMoreData = true; + iter->noMoreData = true; goto _exit; } @@ -266,13 +301,18 @@ _exit: } static int32_t tsdbMemTombIterNext(STsdbIter *iter, const TABLEID *tbid) { - while (!iter->ctx->noMoreData) { + while (!iter->noMoreData) { for (; iter->memtTomb->delData;) { if (tbid && tbid->uid == iter->memtTomb->tbData->uid) { iter->memtTomb->delData = NULL; break; } + if (iter->filterByVersion && + (iter->memtTomb->delData->version < iter->range[0] || iter->memtTomb->delData->version > iter->range[1])) { + continue; + } + iter->record->suid = iter->memtTomb->tbData->suid; iter->record->uid = iter->memtTomb->tbData->uid; iter->record->version = iter->memtTomb->delData->version; @@ -286,7 +326,7 @@ static int32_t tsdbMemTombIterNext(STsdbIter *iter, const TABLEID *tbid) { for (;;) { SRBTreeNode *node = tRBTreeIterNext(iter->memtTomb->rbtIter); if (node == NULL) { - iter->ctx->noMoreData = true; + iter->noMoreData = true; goto _exit; } @@ -311,7 +351,7 @@ static int32_t tsdbSttIterOpen(STsdbIter *iter) { if (code) return code; if (TARRAY2_SIZE(iter->sttData->sttBlkArray) == 0) { - iter->ctx->noMoreData = true; + iter->noMoreData = true; return 0; } @@ -330,7 +370,7 @@ static int32_t tsdbDataIterOpen(STsdbIter *iter) { if (code) return code; if (TARRAY2_SIZE(iter->dataData->brinBlkArray) == 0) { - iter->ctx->noMoreData = true; + iter->noMoreData = true; return 0; } @@ -348,6 +388,11 @@ static int32_t tsdbDataIterOpen(STsdbIter *iter) { } static int32_t tsdbMemTableIterOpen(STsdbIter *iter) { + if (iter->memtData->memt->nRow == 0) { + iter->noMoreData = true; + return 0; + } + iter->memtData->iter[0] = tRBTreeIterCreate(iter->memtData->memt->tbDataTree, 1); return tsdbMemTableIterNext(iter, NULL); } @@ -364,7 +409,7 @@ static int32_t tsdbDataTombIterOpen(STsdbIter *iter) { if (code) return code; if (TARRAY2_SIZE(iter->dataTomb->tombBlkArray) == 0) { - iter->ctx->noMoreData = true; + iter->noMoreData = true; return 0; } iter->dataTomb->tombBlkArrayIdx = 0; @@ -379,7 +424,7 @@ static int32_t tsdbMemTombIterOpen(STsdbIter *iter) { int32_t code; if (iter->memtTomb->memt->nDel == 0) { - iter->ctx->noMoreData = true; + iter->noMoreData = true; return 0; } @@ -396,16 +441,20 @@ static int32_t tsdbDataIterClose(STsdbIter *iter) { static int32_t tsdbMemTableIterClose(STsdbIter *iter) { return 0; } static int32_t tsdbSttTombIterNext(STsdbIter *iter, const TABLEID *tbid) { - while (!iter->ctx->noMoreData) { + while (!iter->noMoreData) { for (; iter->sttTomb->tombBlockIdx < TOMB_BLOCK_SIZE(iter->sttTomb->tombBlock); iter->sttTomb->tombBlockIdx++) { iter->record->suid = TARRAY2_GET(iter->sttTomb->tombBlock->suid, iter->sttTomb->tombBlockIdx); iter->record->uid = TARRAY2_GET(iter->sttTomb->tombBlock->uid, iter->sttTomb->tombBlockIdx); + iter->record->version = TARRAY2_GET(iter->sttTomb->tombBlock->version, iter->sttTomb->tombBlockIdx); + + if (iter->filterByVersion && (iter->record->version < iter->range[0] || iter->record->version > iter->range[1])) { + continue; + } if (tbid && iter->record->suid == tbid->suid && iter->record->uid == tbid->uid) { continue; } - iter->record->version = TARRAY2_GET(iter->sttTomb->tombBlock->version, iter->sttTomb->tombBlockIdx); iter->record->skey = TARRAY2_GET(iter->sttTomb->tombBlock->skey, iter->sttTomb->tombBlockIdx); iter->record->ekey = TARRAY2_GET(iter->sttTomb->tombBlock->ekey, iter->sttTomb->tombBlockIdx); iter->sttTomb->tombBlockIdx++; @@ -413,7 +462,7 @@ static int32_t tsdbSttTombIterNext(STsdbIter *iter, const TABLEID *tbid) { } if (iter->sttTomb->tombBlkArrayIdx >= TARRAY2_SIZE(iter->sttTomb->tombBlkArray)) { - iter->ctx->noMoreData = true; + iter->noMoreData = true; goto _exit; } @@ -421,6 +470,10 @@ static int32_t tsdbSttTombIterNext(STsdbIter *iter, const TABLEID *tbid) { iter->sttTomb->tombBlkArrayIdx++) { const STombBlk *tombBlk = TARRAY2_GET_PTR(iter->sttTomb->tombBlkArray, iter->sttTomb->tombBlkArrayIdx); + if (iter->filterByVersion && (tombBlk->maxVer < iter->range[0] || tombBlk->minVer > iter->range[1])) { + continue; + } + if (tbid && tbid->suid == tombBlk->minTbid.suid && tbid->uid == tombBlk->minTbid.uid && tbid->suid == tombBlk->maxTbid.suid && tbid->uid == tombBlk->maxTbid.uid) { continue; @@ -446,7 +499,7 @@ static int32_t tsdbSttTombIterOpen(STsdbIter *iter) { if (code) return code; if (TARRAY2_SIZE(iter->sttTomb->tombBlkArray) == 0) { - iter->ctx->noMoreData = true; + iter->noMoreData = true; return 0; } @@ -466,7 +519,13 @@ int32_t tsdbIterOpen(const STsdbIterConfig *config, STsdbIter **iter) { } iter[0]->type = config->type; - iter[0]->ctx->noMoreData = false; + iter[0]->noMoreData = false; + iter[0]->filterByVersion = config->filterByVersion; + if (iter[0]->filterByVersion) { + iter[0]->range[0] = config->verRange[0]; + iter[0]->range[1] = config->verRange[1]; + } + switch (config->type) { case TSDB_ITER_TYPE_STT: iter[0]->sttData->reader = config->sttReader; @@ -628,7 +687,7 @@ int32_t tsdbIterMergerOpen(const TTsdbIterArray *iterArray, SIterMerger **merger tRBTreeCreate(merger[0]->iterTree, tsdbIterCmprFn); } TARRAY2_FOREACH(iterArray, iter) { - if (iter->ctx->noMoreData) continue; + if (iter->noMoreData) continue; node = tRBTreePut(merger[0]->iterTree, iter->node); ASSERT(node); } @@ -653,7 +712,7 @@ int32_t tsdbIterMergerNext(SIterMerger *merger) { code = tsdbIterNext(merger->iter); if (code) return code; - if (merger->iter->ctx->noMoreData) { + if (merger->iter->noMoreData) { merger->iter = NULL; } else if ((node = tRBTreeMin(merger->iterTree))) { c = merger->iterTree->cmprFn(merger->iter->node, node); @@ -692,7 +751,7 @@ int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid) { int32_t code = tsdbIterSkipTableData(merger->iter, tbid); if (code) return code; - if (merger->iter->ctx->noMoreData) { + if (merger->iter->noMoreData) { merger->iter = NULL; } else if ((node = tRBTreeMin(merger->iterTree))) { c = merger->iterTree->cmprFn(merger->iter->node, node); diff --git a/source/dnode/vnode/src/tsdb/tsdbIter.h b/source/dnode/vnode/src/tsdb/tsdbIter.h index 8c4b2569f4..aa201d3d4d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbIter.h +++ b/source/dnode/vnode/src/tsdb/tsdbIter.h @@ -48,6 +48,8 @@ typedef struct { TSDBKEY from[1]; }; // TSDB_ITER_TYPE_MEMT }; + bool filterByVersion; + int64_t verRange[2]; } STsdbIterConfig; // STsdbIter =============== -- GitLab