From 71df0a00be2b03dd7a7ec2ea73fe4ed1a9abaf3f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 12 Jun 2023 17:17:00 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/dev/tsdbCommit.c | 159 +++++++++---------- source/dnode/vnode/src/tsdb/dev/tsdbMerge.c | 2 + 2 files changed, 73 insertions(+), 88 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c index 9646262049..6c9ad70e66 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c @@ -39,6 +39,7 @@ typedef struct { TSKEY nextKey; int32_t fid; int32_t expLevel; + SDiskID did; TSKEY minKey; TSKEY maxKey; STFileSet *fset; @@ -60,12 +61,6 @@ static int32_t tsdbCommitOpenNewSttWriter(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; - SDiskID did[1]; - if (tfsAllocDisk(committer->tsdb->pVnode->pTfs, committer->ctx->expLevel, did) < 0) { - code = TSDB_CODE_FS_NO_VALID_DISK; - TSDB_CHECK_CODE(code, lino, _exit); - } - SSttFileWriterConfig config[1] = {{ .tsdb = committer->tsdb, .maxRow = committer->maxRow, @@ -75,7 +70,7 @@ static int32_t tsdbCommitOpenNewSttWriter(SCommitter2 *committer) { .file = { .type = TSDB_FTYPE_STT, - .did = did[0], + .did = committer->ctx->did, .fid = committer->ctx->fid, .cid = committer->ctx->cid, }, @@ -122,31 +117,62 @@ static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; - // if (committer->sttTrigger == 1) { - // SDataFileWriterConfig config = { - // // TODO - // }; - - // code = tsdbDataFileWriterOpen(&config, &committer->dataWriter); - // TSDB_CHECK_CODE(code, lino, _exit); - // // TODO - // } - // stt writer - if (!committer->ctx->fset) { - return tsdbCommitOpenNewSttWriter(committer); - } + if (committer->ctx->fset == NULL) { + code = tsdbCommitOpenNewSttWriter(committer); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + const SSttLvl *lvl0 = tsdbTFileSetGetSttLvl(committer->ctx->fset, 0); + if (lvl0 == NULL || TARRAY2_SIZE(lvl0->fobjArr) == 0) { + code = tsdbCommitOpenNewSttWriter(committer); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + STFileObj *fobj = TARRAY2_LAST(lvl0->fobjArr); + if (fobj->f->stt->nseg >= committer->sttTrigger) { + code = tsdbCommitOpenNewSttWriter(committer); + TSDB_CHECK_CODE(code, lino, _exit); - const SSttLvl *lvl0 = tsdbTFileSetGetSttLvl(committer->ctx->fset, 0); - if (lvl0 == NULL || TARRAY2_SIZE(lvl0->fobjArr) == 0) { - return tsdbCommitOpenNewSttWriter(committer); + if (committer->sttTrigger == 1) { + SSttFileReaderConfig sttFileReaderConfig = { + .tsdb = committer->tsdb, + .szPage = committer->szPage, + .file = fobj->f[0], + }; + code = tsdbSttFileReaderOpen(NULL, &sttFileReaderConfig, &committer->sttReader); + TSDB_CHECK_CODE(code, lino, _exit); + } + } else { + code = tsdbCommitOpenExistSttWriter(committer, fobj->f); + TSDB_CHECK_CODE(code, lino, _exit); + } + } } - STFileObj *fobj = TARRAY2_LAST(lvl0->fobjArr); - if (fobj->f->stt->nseg >= committer->sttTrigger) { - return tsdbCommitOpenNewSttWriter(committer); - } else { - return tsdbCommitOpenExistSttWriter(committer, fobj->f); + // data writer + if (committer->sttTrigger == 1) { + // data writer + SDataFileWriterConfig config = { + .tsdb = committer->tsdb, + .cmprAlg = committer->cmprAlg, + .maxRow = committer->maxRow, + .szPage = committer->szPage, + .fid = committer->ctx->fid, + .cid = committer->ctx->cid, + .did = committer->ctx->did, + .compactVersion = committer->compactVersion, + }; + + if (committer->ctx->fset) { + for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ftype++) { + if (committer->ctx->fset->farr[ftype] != NULL) { + config.files[ftype].exist = true; + config.files[ftype].file = committer->ctx->fset->farr[ftype]->f[0]; + } + } + } + + code = tsdbDataFileWriterOpen(&config, &committer->dataWriter); + TSDB_CHECK_CODE(code, lino, _exit); } _exit: @@ -156,13 +182,6 @@ _exit: return code; } -static int32_t tsdbCommitWriteDelData(SCommitter2 *committer, int64_t suid, int64_t uid, int64_t version, int64_t sKey, - int64_t eKey) { - int32_t code = 0; - // TODO - return code; -} - static int32_t tsdbCommitTSDataOpenIterMerger(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; @@ -455,54 +474,6 @@ _exit: return code; } -static int32_t tsdbCommitTombDataToStt(SCommitter2 *committer) { - int32_t code = 0; - int32_t lino = 0; - - for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->iterMerger));) { - code = tsdbSttFileWriteTombRecord(committer->sttWriter, record); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbIterMergerNext(committer->iterMerger); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); - } - return code; -} - -static int32_t tsdbCommitTombDataToData(SCommitter2 *committer) { - int32_t code = 0; - int32_t lino = 0; - - if (committer->dataWriter == NULL || tsdbSttFileWriterIsOpened(committer->sttWriter)) { - for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->iterMerger));) { - code = tsdbSttFileWriteTombRecord(committer->sttWriter, record); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbIterMergerNext(committer->iterMerger); - TSDB_CHECK_CODE(code, lino, _exit); - } - } else { - for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->iterMerger));) { - code = tsdbDataFileWriteTombRecord(committer->dataWriter, record); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbIterMergerNext(committer->iterMerger); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); - } - return code; -} - static int32_t tsdbCommitTombDataOpenIter(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; @@ -561,12 +532,22 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) { code = tsdbCommitTombDataOpenIter(committer); TSDB_CHECK_CODE(code, lino, _exit); - if (committer->sttTrigger > 1) { - code = tsdbCommitTombDataToStt(committer); - TSDB_CHECK_CODE(code, lino, _exit); + if (committer->dataWriter == NULL || tsdbSttFileWriterIsOpened(committer->sttWriter)) { + for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->iterMerger));) { + code = tsdbSttFileWriteTombRecord(committer->sttWriter, record); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbIterMergerNext(committer->iterMerger); + TSDB_CHECK_CODE(code, lino, _exit); + } } else { - code = tsdbCommitTombDataToData(committer); - TSDB_CHECK_CODE(code, lino, _exit); + for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->iterMerger));) { + code = tsdbDataFileWriteTombRecord(committer->dataWriter, record); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbIterMergerNext(committer->iterMerger); + TSDB_CHECK_CODE(code, lino, _exit); + } } code = tsdbCommitTombDataCloseIter(committer); @@ -588,6 +569,8 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) { committer->ctx->expLevel = tsdbFidLevel(committer->ctx->fid, &tsdb->keepCfg, committer->ctx->now); tsdbFidKeyRange(committer->ctx->fid, committer->minutes, committer->precision, &committer->ctx->minKey, &committer->ctx->maxKey); + code = tfsAllocDisk(committer->tsdb->pVnode->pTfs, committer->ctx->expLevel, &committer->ctx->did); + TSDB_CHECK_CODE(code, lino, _exit); STFileSet fset = {.fid = committer->ctx->fid}; committer->ctx->fset = &fset; committer->ctx->fset = TARRAY2_SEARCH_EX(committer->fsetArr, &committer->ctx->fset, tsdbTFileSetCmprFn, TD_EQ); diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index 79f2067af5..eafe226992 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -661,6 +661,8 @@ int32_t tsdbMerge(void *arg) { .sttTrigger = tsdb->pVnode->config.sttTrigger, }}; + ASSERT(merger->sttTrigger > 1); + code = tsdbFSCreateCopySnapshot(tsdb->pFS, &merger->fsetArr); TSDB_CHECK_CODE(code, lino, _exit); -- GitLab