diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index ce88aa55d35f13ef08c94da687050a2af7dde841..7e5db55ebac5a271f3e3172bc386ffb042d8aafc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -21,8 +21,8 @@ typedef struct { TFileSetArray *fsetArr; TFileOpArray fopArray[1]; - SSkmInfo skmTb[1]; - SSkmInfo skmRow[1]; + // SSkmInfo skmTb[1]; + // SSkmInfo skmRow[1]; int32_t minutes; int8_t precision; @@ -56,45 +56,29 @@ typedef struct { SIterMerger *tombIterMerger; // writer - SBlockData blockData[2]; - int32_t blockDataIdx; - SDataFileWriter *dataWriter; - SSttFileWriter *sttWriter; + SFSetWriter *writer; } SCommitter2; static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; - // stt writer - SSttFileWriterConfig config[1] = {{ + SFSetWriterConfig config = { .tsdb = committer->tsdb, + .toSttOnly = true, + .compactVersion = committer->compactVersion, + .minRow = committer->minRow, .maxRow = committer->maxRow, .szPage = committer->szPage, .cmprAlg = committer->cmprAlg, - .compactVersion = committer->compactVersion, - .did = committer->ctx->did, .fid = committer->ctx->fid, .cid = committer->ctx->cid, + .did = committer->ctx->did, .level = 0, - }}; - - code = tsdbSttFileWriterOpen(config, &committer->sttWriter); - TSDB_CHECK_CODE(code, lino, _exit); + }; - // data writer if (committer->sttTrigger == 1) { - // data writer - SDataFileWriterConfig config = { - .tsdb = committer->tsdb, - .cmprAlg = committer->cmprAlg, - .maxRow = committer->maxRow, - .szPage = committer->szPage, - .fid = committer->ctx->fid, - .cid = committer->ctx->cid, - .did = committer->ctx->did, - .compactVersion = committer->compactVersion, - }; + config.toSttOnly = false; if (committer->ctx->fset) { for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ftype++) { @@ -104,11 +88,11 @@ static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) { } } } - - code = tsdbDataFileWriterOpen(&config, &committer->dataWriter); - TSDB_CHECK_CODE(code, lino, _exit); } + code = tsdbFSetWriterOpen(&config, &committer->writer); + TSDB_CHECK_CODE(code, lino, _exit); + _exit: if (code) { TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); @@ -117,22 +101,10 @@ _exit: } static int32_t tsdbCommitCloseWriter(SCommitter2 *committer) { - int32_t code = 0; - int32_t lino = 0; - - code = tsdbSttFileWriterClose(&committer->sttWriter, 0, committer->fopArray); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbDataFileWriterClose(&committer->dataWriter, 0, committer->fopArray); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); - } - return code; + return tsdbFSetWriterClose(&committer->writer, 0, committer->fopArray); } +#if 0 static int32_t tsdbCommitTSDataToDataTableBegin(SCommitter2 *committer, const TABLEID *tbid) { int32_t code = 0; int32_t lino = 0; @@ -328,17 +300,30 @@ _exit: } return code; } +#endif static int32_t tsdbCommitTSData(SCommitter2 *committer) { - int32_t code = 0; - int32_t lino = 0; + int32_t code = 0; + int32_t lino = 0; + SMetaInfo info; - // loop iter - if (committer->sttTrigger == 1) { - code = tsdbCommitTSDataToData(committer); + for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->dataIterMerger)) != NULL;) { + if (row->uid != committer->ctx->tbid->uid) { + // Ignore table of obsolescence + if (metaGetInfo(committer->tsdb->pVnode->pMeta, row->uid, &info, NULL) != 0) { + code = tsdbIterMergerSkipTableData(committer->dataIterMerger, (TABLEID *)row); + TSDB_CHECK_CODE(code, lino, _exit); + continue; + } + + committer->ctx->tbid->suid = row->suid; + committer->ctx->tbid->uid = row->uid; + } + + code = tsdbFSetWriteRow(committer->writer, row); TSDB_CHECK_CODE(code, lino, _exit); - } else { - code = tsdbCommitTSDataToStt(committer); + + code = tsdbIterMergerNext(committer->dataIterMerger); TSDB_CHECK_CODE(code, lino, _exit); } @@ -353,22 +338,12 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; - if (committer->dataWriter == NULL || tsdbSttFileWriterIsOpened(committer->sttWriter)) { - for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) { - code = tsdbSttFileWriteTombRecord(committer->sttWriter, record); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbIterMergerNext(committer->tombIterMerger); - TSDB_CHECK_CODE(code, lino, _exit); - } - } else { - for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) { - code = tsdbDataFileWriteTombRecord(committer->dataWriter, record); - TSDB_CHECK_CODE(code, lino, _exit); + for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) { + code = tsdbFSetWriteTombRecord(committer->writer, record); + TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbIterMergerNext(committer->tombIterMerger); - TSDB_CHECK_CODE(code, lino, _exit); - } + code = tsdbIterMergerNext(committer->tombIterMerger); + TSDB_CHECK_CODE(code, lino, _exit); } _exit: @@ -529,8 +504,7 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) { ASSERT(TARRAY2_SIZE(committer->dataIterArray) == 0); ASSERT(committer->dataIterMerger == NULL); - ASSERT(committer->sttWriter == NULL); - ASSERT(committer->dataWriter == NULL); + ASSERT(committer->writer == NULL); code = tsdbCommitOpenReader(committer); TSDB_CHECK_CODE(code, lino, _exit); @@ -660,8 +634,7 @@ static int32_t tsdbCloseCommitter(SCommitter2 *committer, int32_t eno) { ASSERT(0); } - ASSERT(committer->dataWriter == NULL); - ASSERT(committer->sttWriter == NULL); + ASSERT(committer->writer == NULL); ASSERT(committer->dataIterMerger == NULL); ASSERT(committer->tombIterMerger == NULL); TARRAY2_DESTROY(committer->dataIterArray, NULL); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.h b/source/dnode/vnode/src/tsdb/tsdbCommit2.h index 72d7eb48eeb119de5918dfb2096383f75cbada65..41f72f345b4575f90c0632b857e0d0eae6f89a7a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.h +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.h @@ -15,6 +15,7 @@ #include "tsdbDataFileRW.h" #include "tsdbFS2.h" +#include "tsdbFSetRW.h" #include "tsdbIter.h" #include "tsdbSttFileRW.h" diff --git a/source/dnode/vnode/src/tsdb/tsdbFSetRW.h b/source/dnode/vnode/src/tsdb/tsdbFSetRW.h index 76f734c16f2eede744f14b4580e4137762095dcf..b5710407cfe40e28736c2949d3f3421e131c6624 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSetRW.h +++ b/source/dnode/vnode/src/tsdb/tsdbFSetRW.h @@ -41,7 +41,6 @@ typedef struct { bool exist; STFile file; } files[TSDB_FTYPE_MAX]; - STFile sttFile; } SFSetWriterConfig; int32_t tsdbFSetWriterOpen(SFSetWriterConfig *config, SFSetWriter **writer); diff --git a/source/dnode/vnode/src/tsdb/tsdbMerge.c b/source/dnode/vnode/src/tsdb/tsdbMerge.c index 358d735b1dc033fe9d2e21fba9ffffb53c8c737a..97229714e878130f9b3c27b89b6bd6a3a287eb58 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/tsdbMerge.c @@ -19,15 +19,13 @@ typedef struct { STsdb *tsdb; TFileSetArray *fsetArr; - int32_t sttTrigger; - int32_t maxRow; - int32_t minRow; - int32_t szPage; - int8_t cmprAlg; - int64_t compactVersion; - int64_t cid; - SSkmInfo skmTb[1]; - SSkmInfo skmRow[1]; + int32_t sttTrigger; + int32_t maxRow; + int32_t minRow; + int32_t szPage; + int8_t cmprAlg; + int64_t compactVersion; + int64_t cid; // context struct { @@ -37,10 +35,7 @@ typedef struct { bool toData; int32_t level; SSttLvl *lvl; - // STFileObj *fobj; TABLEID tbid[1]; - int32_t blockDataIdx; - SBlockData blockData[2]; } ctx[1]; TFileOpArray fopArr[1]; @@ -53,8 +48,7 @@ typedef struct { TTsdbIterArray tombIterArr[1]; SIterMerger *tombIterMerger; // writer - SSttFileWriter *sttWriter; - SDataFileWriter *dataWriter; + SFSetWriter *writer; } SMerger; static int32_t tsdbMergerOpen(SMerger *merger) { @@ -86,8 +80,7 @@ static int32_t tsdbMergerClose(SMerger *merger) { } taosThreadRwlockUnlock(&merger->tsdb->rwLock); - ASSERT(merger->dataWriter == NULL); - ASSERT(merger->sttWriter == NULL); + ASSERT(merger->writer == NULL); ASSERT(merger->dataIterMerger == NULL); ASSERT(TARRAY2_SIZE(merger->dataIterArr) == 0); ASSERT(TARRAY2_SIZE(merger->sttReaderArr) == 0); @@ -96,11 +89,6 @@ static int32_t tsdbMergerClose(SMerger *merger) { TARRAY2_DESTROY(merger->dataIterArr, NULL); TARRAY2_DESTROY(merger->sttReaderArr, NULL); TARRAY2_DESTROY(merger->fopArr, NULL); - for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->blockData); i++) { - tBlockDataDestroy(merger->ctx->blockData + i); - } - tDestroyTSchema(merger->skmTb->pTSchema); - tDestroyTSchema(merger->skmRow->pTSchema); _exit: if (code) { @@ -109,6 +97,7 @@ _exit: return code; } +#if 0 static int32_t tsdbMergeToDataTableEnd(SMerger *merger) { if (merger->ctx->blockData[0].nRow + merger->ctx->blockData[1].nRow == 0) return 0; @@ -297,6 +286,7 @@ _exit: } return code; } +#endif static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) { int32_t code = 0; @@ -413,49 +403,36 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) { code = TSDB_CODE_FS_NO_VALID_DISK; TSDB_CHECK_CODE(code, lino, _exit); } - - { - // to new level - SSttFileWriterConfig config[1] = {{ - .tsdb = merger->tsdb, - .maxRow = merger->maxRow, - .szPage = merger->szPage, - .cmprAlg = merger->cmprAlg, - .compactVersion = merger->compactVersion, - .did = did, - .fid = merger->ctx->fset->fid, - .cid = merger->cid, - .level = merger->ctx->level, - }}; - code = tsdbSttFileWriterOpen(config, &merger->sttWriter); - TSDB_CHECK_CODE(code, lino, _exit); - } + SFSetWriterConfig config = { + .tsdb = merger->tsdb, + .toSttOnly = true, + .compactVersion = merger->compactVersion, + .minRow = merger->minRow, + .maxRow = merger->maxRow, + .szPage = merger->szPage, + .cmprAlg = merger->cmprAlg, + .fid = merger->ctx->fset->fid, + .cid = merger->cid, + .did = did, + .level = merger->ctx->level, + }; if (merger->ctx->toData) { - SDataFileWriterConfig config[1] = {{ - .tsdb = merger->tsdb, - .cmprAlg = merger->cmprAlg, - .maxRow = merger->maxRow, - .szPage = merger->szPage, - .fid = merger->ctx->fset->fid, - .cid = merger->cid, - .did = did, - .compactVersion = merger->compactVersion, - }}; + config.toSttOnly = false; - for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) { - if (merger->ctx->fset->farr[i]) { - config->files[i].exist = true; - config->files[i].file = merger->ctx->fset->farr[i]->f[0]; + for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) { + if (merger->ctx->fset->farr[ftype]) { + config.files[ftype].exist = true; + config.files[ftype].file = merger->ctx->fset->farr[ftype]->f[0]; } else { - config->files[i].exist = false; + config.files[ftype].exist = false; } } - - code = tsdbDataFileWriterOpen(config, &merger->dataWriter); - TSDB_CHECK_CODE(code, lino, _exit); } + code = tsdbFSetWriterOpen(&config, &merger->writer); + TSDB_CHECK_CODE(code, lino, _exit); + _exit: if (code) { TSDB_ERROR_LOG(vid, lino, code); @@ -470,15 +447,10 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) { ASSERT(TARRAY2_SIZE(merger->sttReaderArr) == 0); ASSERT(TARRAY2_SIZE(merger->dataIterArr) == 0); ASSERT(merger->dataIterMerger == NULL); - ASSERT(merger->sttWriter == NULL); - ASSERT(merger->dataWriter == NULL); + ASSERT(merger->writer == NULL); merger->ctx->tbid->suid = 0; merger->ctx->tbid->uid = 0; - merger->ctx->blockDataIdx = 0; - for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->blockData); ++i) { - tBlockDataReset(merger->ctx->blockData + i); - } // open reader code = tsdbMergeFileSetBeginOpenReader(merger); @@ -500,23 +472,7 @@ _exit: } static int32_t tsdbMergeFileSetEndCloseWriter(SMerger *merger) { - int32_t code = 0; - int32_t lino = 0; - int32_t vid = TD_VID(merger->tsdb->pVnode); - - code = tsdbSttFileWriterClose(&merger->sttWriter, 0, merger->fopArr); - TSDB_CHECK_CODE(code, lino, _exit); - - if (merger->ctx->toData) { - code = tsdbDataFileWriterClose(&merger->dataWriter, 0, merger->fopArr); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - TSDB_ERROR_LOG(vid, lino, code); - } - return code; + return tsdbFSetWriterClose(&merger->writer, 0, merger->fopArr); } static int32_t tsdbMergeFileSetEndCloseIter(SMerger *merger) { @@ -560,12 +516,49 @@ static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) { code = tsdbMergeFileSetBegin(merger); TSDB_CHECK_CODE(code, lino, _exit); - // do merge - if (merger->ctx->toData) { - code = tsdbMergeToDataLevel(merger); + // data + SMetaInfo info; + SRowInfo *row; + merger->ctx->tbid->suid = 0; + merger->ctx->tbid->uid = 0; + while ((row = tsdbIterMergerGetData(merger->dataIterMerger)) != NULL) { + if (row->uid != merger->ctx->tbid->uid) { + if (metaGetInfo(merger->tsdb->pVnode->pMeta, row->uid, &info, NULL) != 0) { + code = tsdbIterMergerSkipTableData(merger->dataIterMerger, (TABLEID *)row); + TSDB_CHECK_CODE(code, lino, _exit); + continue; + } + + merger->ctx->tbid->uid = row->uid; + merger->ctx->tbid->suid = row->suid; + } + + code = tsdbFSetWriteRow(merger->writer, row); TSDB_CHECK_CODE(code, lino, _exit); - } else { - code = tsdbMergeToUpperLevel(merger); + + code = tsdbIterMergerNext(merger->dataIterMerger); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // tomb + STombRecord *record; + merger->ctx->tbid->suid = 0; + merger->ctx->tbid->uid = 0; + while ((record = tsdbIterMergerGetTombRecord(merger->tombIterMerger)) != NULL) { + if (record->uid != merger->ctx->tbid->uid) { + merger->ctx->tbid->uid = record->uid; + merger->ctx->tbid->suid = record->suid; + + if (metaGetInfo(merger->tsdb->pVnode->pMeta, record->uid, &info, NULL) != 0) { + code = tsdbIterMergerSkipTableData(merger->tombIterMerger, merger->ctx->tbid); + TSDB_CHECK_CODE(code, lino, _exit); + continue; + } + } + code = tsdbFSetWriteTombRecord(merger->writer, record); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbIterMergerNext(merger->tombIterMerger); TSDB_CHECK_CODE(code, lino, _exit); } diff --git a/source/dnode/vnode/src/tsdb/tsdbMerge.h b/source/dnode/vnode/src/tsdb/tsdbMerge.h index e4c7aef614a44c7309dfe15e0aedf4884213151d..69d802fd2776eddba8d65090dfe5717ba4bb76bc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMerge.h +++ b/source/dnode/vnode/src/tsdb/tsdbMerge.h @@ -15,6 +15,7 @@ #include "tsdbDataFileRW.h" #include "tsdbFS2.h" +#include "tsdbFSetRW.h" #include "tsdbIter.h" #include "tsdbSttFileRW.h" #include "tsdbUtil2.h"