diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index 0639cd91a5ad4f611cc13fbad5923a8df762832a..d4fa4de510997dfaa269a2a8e75884a5a410b899 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -49,7 +49,7 @@ typedef struct { } ctx[1]; // reader - SSttFileReader *sttReader; + TSttFileReaderArray sttReaderArray[1]; // iter TTsdbIterArray dataIterArray[1]; @@ -226,7 +226,7 @@ static int32_t tsdbCommitOpenReader(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; - ASSERT(committer->sttReader == NULL); + ASSERT(TARRAY2_SIZE(committer->sttReaderArray) == 0); if (committer->ctx->fset == NULL // || committer->sttTrigger > 1 // @@ -241,30 +241,31 @@ static int32_t tsdbCommitOpenReader(SCommitter2 *committer) { ASSERT(lvl->level == 0); - if (TARRAY2_SIZE(lvl->fobjArr) == 0) { - return 0; - } + STFileObj *fobj = NULL; + TARRAY2_FOREACH(lvl->fobjArr, fobj) { + SSttFileReader *sttReader; - ASSERT(TARRAY2_SIZE(lvl->fobjArr) == 1); + SSttFileReaderConfig config = { + .tsdb = committer->tsdb, + .szPage = committer->szPage, + .file = fobj->f[0], + }; - STFileObj *fobj = TARRAY2_FIRST(lvl->fobjArr); + code = tsdbSttFileReaderOpen(fobj->fname, &config, &sttReader); + TSDB_CHECK_CODE(code, lino, _exit); - SSttFileReaderConfig config = { - .tsdb = committer->tsdb, - .szPage = committer->szPage, - .file = fobj->f[0], - }; - code = tsdbSttFileReaderOpen(fobj->fname, &config, &committer->sttReader); - TSDB_CHECK_CODE(code, lino, _exit); + code = TARRAY2_APPEND(committer->sttReaderArray, sttReader); + TSDB_CHECK_CODE(code, lino, _exit); - STFileOp op = { - .optype = TSDB_FOP_REMOVE, - .fid = fobj->f->fid, - .of = fobj->f[0], - }; + STFileOp op = { + .optype = TSDB_FOP_REMOVE, + .fid = fobj->f->fid, + .of = fobj->f[0], + }; - code = TARRAY2_APPEND(committer->fopArray, op); - TSDB_CHECK_CODE(code, lino, _exit); + code = TARRAY2_APPEND(committer->fopArray, op); + TSDB_CHECK_CODE(code, lino, _exit); + } _exit: if (code) { @@ -273,7 +274,10 @@ _exit: return code; } -static int32_t tsdbCommitCloseReader(SCommitter2 *committer) { return tsdbSttFileReaderClose(&committer->sttReader); } +static int32_t tsdbCommitCloseReader(SCommitter2 *committer) { + TARRAY2_CLEAR(committer->sttReaderArray, tsdbSttFileReaderClose); + return 0; +} static int32_t tsdbCommitOpenIter(SCommitter2 *committer) { int32_t code = 0; @@ -310,10 +314,11 @@ static int32_t tsdbCommitOpenIter(SCommitter2 *committer) { TSDB_CHECK_CODE(code, lino, _exit); // STT - if (committer->sttReader) { + SSttFileReader *sttReader; + TARRAY2_FOREACH(committer->sttReaderArray, sttReader) { // data iter config.type = TSDB_ITER_TYPE_STT; - config.sttReader = committer->sttReader; + config.sttReader = sttReader; code = tsdbIterOpen(&config, &iter); TSDB_CHECK_CODE(code, lino, _exit); @@ -323,7 +328,7 @@ static int32_t tsdbCommitOpenIter(SCommitter2 *committer) { // tomb iter config.type = TSDB_ITER_TYPE_STT_TOMB; - config.sttReader = committer->sttReader; + config.sttReader = sttReader; code = tsdbIterOpen(&config, &iter); TSDB_CHECK_CODE(code, lino, _exit);