diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index c824076600ff6d2644fcc92aa2fd360cb5bf141e..6d9e6705985523ada9ed7b58e214a3d2c6e3f089 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -46,9 +46,14 @@ typedef struct { TABLEID tbid[1]; } ctx[1]; + // reader SSttFileReader *sttReader; - TTsdbIterArray iterArray[1]; - SIterMerger *iterMerger; + + // iter + TTsdbIterArray dataIterArray[1]; + SIterMerger *dataIterMerger; + TTsdbIterArray tombIterArray[1]; + SIterMerger *tombIterMerger; // writer SBlockData blockData[2]; @@ -57,10 +62,11 @@ typedef struct { SSttFileWriter *sttWriter; } SCommitter2; -static int32_t tsdbCommitOpenNewSttWriter(SCommitter2 *committer) { +static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; + // stt writer SSttFileWriterConfig config[1] = {{ .tsdb = committer->tsdb, .maxRow = committer->maxRow, @@ -79,75 +85,6 @@ static int32_t tsdbCommitOpenNewSttWriter(SCommitter2 *committer) { code = tsdbSttFileWriterOpen(config, &committer->sttWriter); TSDB_CHECK_CODE(code, lino, _exit); -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); - } else { - tsdbDebug("vgId:%d %s success", TD_VID(committer->tsdb->pVnode), __func__); - } - return code; -} - -static int32_t tsdbCommitOpenExistSttWriter(SCommitter2 *committer, const STFile *f) { - int32_t code = 0; - int32_t lino = 0; - - SSttFileWriterConfig config[1] = {{ - .tsdb = committer->tsdb, - .maxRow = committer->maxRow, - .szPage = committer->szPage, - .cmprAlg = committer->cmprAlg, - .compactVersion = committer->compactVersion, - .file = f[0], - }}; - - code = tsdbSttFileWriterOpen(config, &committer->sttWriter); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); - } else { - tsdbDebug("vgId:%d %s success", TD_VID(committer->tsdb->pVnode), __func__); - } - return code; -} - -static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) { - int32_t code = 0; - int32_t lino = 0; - - // stt writer - if (committer->ctx->fset == NULL) { - code = tsdbCommitOpenNewSttWriter(committer); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - const SSttLvl *lvl0 = tsdbTFileSetGetSttLvl(committer->ctx->fset, 0); - if (lvl0 == NULL || TARRAY2_SIZE(lvl0->fobjArr) == 0) { - code = tsdbCommitOpenNewSttWriter(committer); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - STFileObj *fobj = TARRAY2_LAST(lvl0->fobjArr); - if (fobj->f->stt->nseg >= committer->sttTrigger) { - code = tsdbCommitOpenNewSttWriter(committer); - TSDB_CHECK_CODE(code, lino, _exit); - - if (committer->sttTrigger == 1) { - SSttFileReaderConfig sttFileReaderConfig = { - .tsdb = committer->tsdb, - .szPage = committer->szPage, - .file = fobj->f[0], - }; - code = tsdbSttFileReaderOpen(NULL, &sttFileReaderConfig, &committer->sttReader); - TSDB_CHECK_CODE(code, lino, _exit); - } - } else { - code = tsdbCommitOpenExistSttWriter(committer, fobj->f); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - } - // data writer if (committer->sttTrigger == 1) { // data writer @@ -182,49 +119,14 @@ _exit: return code; } -static int32_t tsdbCommitTSDataOpenIterMerger(SCommitter2 *committer) { +static int32_t tsdbCommitCloseWriter(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; - ASSERT(TARRAY2_SIZE(committer->iterArray) == 0); - ASSERT(committer->iterMerger == NULL); - - STsdbIter *iter; - STsdbIterConfig config[1] = {0}; - - // memtable iter - config->type = TSDB_ITER_TYPE_MEMT; - config->memt = committer->tsdb->imem; - config->from->ts = committer->ctx->minKey; - config->from->version = VERSION_MIN; - - code = tsdbIterOpen(config, &iter); - TSDB_CHECK_CODE(code, lino, _exit); - - code = TARRAY2_APPEND(committer->iterArray, iter); + code = tsdbSttFileWriterClose(&committer->sttWriter, 0, committer->fopArray); TSDB_CHECK_CODE(code, lino, _exit); - // stt file iter - if (committer->sttReader) { - const TSttSegReaderArray *readerArray; - - tsdbSttFileReaderGetSegReader(committer->sttReader, &readerArray); - - SSttSegReader *segReader; - TARRAY2_FOREACH(readerArray, segReader) { - config->type = TSDB_ITER_TYPE_STT; - config->sttReader = segReader; - } - - code = tsdbIterOpen(config, &iter); - TSDB_CHECK_CODE(code, lino, _exit); - - code = TARRAY2_APPEND(committer->iterArray, iter); - TSDB_CHECK_CODE(code, lino, _exit); - } - - // open iter merger - code = tsdbIterMergerOpen(committer->iterArray, &committer->iterMerger, false); + code = tsdbDataFileWriterClose(&committer->dataWriter, 0, committer->fopArray); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -234,20 +136,6 @@ _exit: return code; } -static int32_t tsdbCommitTSDataCloseIterMerger(SCommitter2 *committer) { - int32_t code = 0; - int32_t lino = 0; - - tsdbIterMergerClose(&committer->iterMerger); - TARRAY2_CLEAR(committer->iterArray, tsdbIterClose); - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); - } - return code; -} - static int32_t tsdbCommitTSDataToDataTableBegin(SCommitter2 *committer, const TABLEID *tbid) { int32_t code = 0; int32_t lino = 0; @@ -342,7 +230,7 @@ static int32_t tsdbCommitTSDataToData(SCommitter2 *committer) { int32_t lino = 0; SMetaInfo info; - for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->iterMerger)) != NULL;) { + for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->dataIterMerger)) != NULL;) { if (row->uid != committer->ctx->tbid->uid) { // end last table write code = tsdbCommitTSDataToDataTableEnd(committer); @@ -350,7 +238,7 @@ static int32_t tsdbCommitTSDataToData(SCommitter2 *committer) { // Ignore table of obsolescence if (metaGetInfo(committer->tsdb->pVnode->pMeta, row->uid, &info, NULL) != 0) { - code = tsdbIterMergerSkipTableData(committer->iterMerger, (TABLEID *)row); + code = tsdbIterMergerSkipTableData(committer->dataIterMerger, (TABLEID *)row); TSDB_CHECK_CODE(code, lino, _exit); continue; } @@ -389,7 +277,7 @@ static int32_t tsdbCommitTSDataToData(SCommitter2 *committer) { TSDB_CHECK_CODE(code, lino, _exit); } - code = tsdbIterMergerNext(committer->iterMerger); + code = tsdbIterMergerNext(committer->dataIterMerger); TSDB_CHECK_CODE(code, lino, _exit); } @@ -410,14 +298,14 @@ static int32_t tsdbCommitTSDataToStt(SCommitter2 *committer) { ASSERT(committer->sttReader == NULL); SMetaInfo info; - for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->iterMerger)) != NULL;) { + for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->dataIterMerger)) != NULL;) { if (row->uid != committer->ctx->tbid->uid) { committer->ctx->tbid->suid = row->suid; committer->ctx->tbid->uid = row->uid; // Ignore table of obsolescence if (metaGetInfo(committer->tsdb->pVnode->pMeta, row->uid, &info, NULL) != 0) { - code = tsdbIterMergerSkipTableData(committer->iterMerger, committer->ctx->tbid); + code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid); TSDB_CHECK_CODE(code, lino, _exit); continue; } @@ -426,13 +314,13 @@ static int32_t tsdbCommitTSDataToStt(SCommitter2 *committer) { TSKEY ts = TSDBROW_TS(&row->row); if (ts > committer->ctx->maxKey) { committer->ctx->nextKey = TMIN(committer->ctx->nextKey, ts); - code = tsdbIterMergerSkipTableData(committer->iterMerger, committer->ctx->tbid); + code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid); TSDB_CHECK_CODE(code, lino, _exit); } else { code = tsdbSttFileWriteRow(committer->sttWriter, row); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbIterMergerNext(committer->iterMerger); + code = tsdbIterMergerNext(committer->dataIterMerger); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -448,12 +336,6 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; - if (committer->tsdb->imem->nRow == 0) goto _exit; - - // open iter and iter merger - code = tsdbCommitTSDataOpenIterMerger(committer); - TSDB_CHECK_CODE(code, lino, _exit); - // loop iter if (committer->sttTrigger == 1) { code = tsdbCommitTSDataToData(committer); @@ -463,9 +345,34 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) { TSDB_CHECK_CODE(code, lino, _exit); } - // close iter and iter merger - code = tsdbCommitTSDataCloseIterMerger(committer); - TSDB_CHECK_CODE(code, lino, _exit); +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbCommitTombData(SCommitter2 *committer) { + int32_t code = 0; + int32_t lino = 0; + + if (committer->dataWriter == NULL || tsdbSttFileWriterIsOpened(committer->sttWriter)) { + for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) { + code = tsdbSttFileWriteTombRecord(committer->sttWriter, record); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbIterMergerNext(committer->tombIterMerger); + TSDB_CHECK_CODE(code, lino, _exit); + } + } else { + for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) { + code = tsdbDataFileWriteTombRecord(committer->dataWriter, record); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbIterMergerNext(committer->tombIterMerger); + TSDB_CHECK_CODE(code, lino, _exit); + } + } _exit: if (code) { @@ -474,42 +381,48 @@ _exit: return code; } -static int32_t tsdbCommitTombDataOpenIter(SCommitter2 *committer) { +static int32_t tsdbCommitOpenReader(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; - STsdbIter *iter; - STsdbIterConfig config[1] = {0}; + ASSERT(committer->sttReader == NULL); - if (committer->sttReader) { - const TSttSegReaderArray *readerArray; + if (committer->ctx->fset == NULL // + || committer->sttTrigger > 1 // + || TARRAY2_SIZE(committer->ctx->fset->lvlArr) == 0 // + ) { + return 0; + } - tsdbSttFileReaderGetSegReader(committer->sttReader, &readerArray); + ASSERT(TARRAY2_SIZE(committer->ctx->fset->lvlArr) == 1); - SSttSegReader *segReader; - TARRAY2_FOREACH(readerArray, segReader) { - config->type = TSDB_ITER_TYPE_STT_TOMB; - config->sttReader = segReader; + SSttLvl *lvl = TARRAY2_FIRST(committer->ctx->fset->lvlArr); - code = tsdbIterOpen(config, &iter); - TSDB_CHECK_CODE(code, lino, _exit); + ASSERT(lvl->level == 0); - code = TARRAY2_APPEND(committer->iterArray, iter); - TSDB_CHECK_CODE(code, lino, _exit); - } + if (TARRAY2_SIZE(lvl->fobjArr) == 0) { + return 0; } - config->type = TSDB_ITER_TYPE_MEMT_TOMB; - config->memt = committer->tsdb->imem; + ASSERT(TARRAY2_SIZE(lvl->fobjArr) == 1); - code = tsdbIterOpen(config, &iter); - TSDB_CHECK_CODE(code, lino, _exit); + STFileObj *fobj = TARRAY2_FIRST(lvl->fobjArr); - code = TARRAY2_APPEND(committer->iterArray, iter); + SSttFileReaderConfig config = { + .tsdb = committer->tsdb, + .szPage = committer->szPage, + .file = fobj->f[0], + }; + code = tsdbSttFileReaderOpen(fobj->fname, &config, &committer->sttReader); TSDB_CHECK_CODE(code, lino, _exit); - // open iter - code = tsdbIterMergerOpen(committer->iterArray, &committer->iterMerger, true); + STFileOp op = { + .optype = TSDB_FOP_REMOVE, + .fid = fobj->f->fid, + .of = fobj->f[0], + }; + + code = TARRAY2_APPEND(committer->fopArray, op); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -519,38 +432,70 @@ _exit: return code; } -static int32_t tsdbCommitTombDataCloseIter(SCommitter2 *committer) { - tsdbIterMergerClose(&committer->iterMerger); - TARRAY2_CLEAR(committer->iterArray, tsdbIterClose); - return 0; -} +static int32_t tsdbCommitCloseReader(SCommitter2 *committer) { return tsdbSttFileReaderClose(&committer->sttReader); } -static int32_t tsdbCommitTombData(SCommitter2 *committer) { +static int32_t tsdbCommitOpenIter(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; - code = tsdbCommitTombDataOpenIter(committer); + ASSERT(TARRAY2_SIZE(committer->dataIterArray) == 0); + ASSERT(committer->dataIterMerger == NULL); + ASSERT(TARRAY2_SIZE(committer->tombIterArray) == 0); + ASSERT(committer->tombIterMerger == NULL); + + STsdbIter *iter; + STsdbIterConfig config = {0}; + + // mem data iter + config.type = TSDB_ITER_TYPE_MEMT; + config.memt = committer->tsdb->imem; + config.from->ts = committer->ctx->minKey; + config.from->version = VERSION_MIN; + + code = tsdbIterOpen(&config, &iter); TSDB_CHECK_CODE(code, lino, _exit); - if (committer->dataWriter == NULL || tsdbSttFileWriterIsOpened(committer->sttWriter)) { - for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->iterMerger));) { - code = tsdbSttFileWriteTombRecord(committer->sttWriter, record); - TSDB_CHECK_CODE(code, lino, _exit); + code = TARRAY2_APPEND(committer->dataIterArray, iter); + TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbIterMergerNext(committer->iterMerger); - TSDB_CHECK_CODE(code, lino, _exit); - } - } else { - for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->iterMerger));) { - code = tsdbDataFileWriteTombRecord(committer->dataWriter, record); - TSDB_CHECK_CODE(code, lino, _exit); + // mem tomb iter + config.type = TSDB_ITER_TYPE_MEMT_TOMB; + config.memt = committer->tsdb->imem; - code = tsdbIterMergerNext(committer->iterMerger); - TSDB_CHECK_CODE(code, lino, _exit); - } + code = tsdbIterOpen(&config, &iter); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(committer->tombIterArray, iter); + TSDB_CHECK_CODE(code, lino, _exit); + + // STT + if (committer->sttReader) { + // data iter + config.type = TSDB_ITER_TYPE_STT; + config.sttReader = committer->sttReader; + + code = tsdbIterOpen(&config, &iter); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(committer->dataIterArray, iter); + TSDB_CHECK_CODE(code, lino, _exit); + + // tomb iter + config.type = TSDB_ITER_TYPE_STT_TOMB; + config.sttReader = committer->sttReader; + + code = tsdbIterOpen(&config, &iter); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(committer->tombIterArray, iter); + TSDB_CHECK_CODE(code, lino, _exit); } - code = tsdbCommitTombDataCloseIter(committer); + // open merger + code = tsdbIterMergerOpen(committer->dataIterArray, &committer->dataIterMerger, false); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbIterMergerOpen(committer->tombIterArray, &committer->tombIterMerger, true); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -560,6 +505,14 @@ _exit: return code; } +static int32_t tsdbCommitCloseIter(SCommitter2 *committer) { + tsdbIterMergerClose(&committer->tombIterMerger); + tsdbIterMergerClose(&committer->dataIterMerger); + TARRAY2_CLEAR(committer->tombIterArray, tsdbIterClose); + TARRAY2_CLEAR(committer->dataIterArray, tsdbIterClose); + return 0; +} + static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; @@ -577,11 +530,17 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) { committer->ctx->tbid->suid = 0; committer->ctx->tbid->uid = 0; - ASSERT(TARRAY2_SIZE(committer->iterArray) == 0); - ASSERT(committer->iterMerger == NULL); + ASSERT(TARRAY2_SIZE(committer->dataIterArray) == 0); + ASSERT(committer->dataIterMerger == NULL); ASSERT(committer->sttWriter == NULL); ASSERT(committer->dataWriter == NULL); + code = tsdbCommitOpenReader(committer); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbCommitOpenIter(committer); + TSDB_CHECK_CODE(code, lino, _exit); + code = tsdbCommitOpenWriter(committer); TSDB_CHECK_CODE(code, lino, _exit); @@ -602,21 +561,14 @@ static int32_t tsdbCommitFileSetEnd(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; - if (committer->sttReader) { - code = tsdbSttFileReaderClose(&committer->sttReader); - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (committer->dataWriter) { - code = tsdbDataFileWriterClose(&committer->dataWriter, 0, committer->fopArray); - TSDB_CHECK_CODE(code, lino, _exit); - } + code = tsdbCommitCloseWriter(committer); + TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbSttFileWriterClose(&committer->sttWriter, 0, committer->fopArray); + code = tsdbCommitCloseIter(committer); TSDB_CHECK_CODE(code, lino, _exit); - tsdbIterMergerClose(&committer->iterMerger); - TARRAY2_CLEAR(committer->iterArray, tsdbIterClose); + code = tsdbCommitCloseReader(committer); + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { @@ -713,8 +665,10 @@ static int32_t tsdbCloseCommitter(SCommitter2 *committer, int32_t eno) { ASSERT(committer->dataWriter == NULL); ASSERT(committer->sttWriter == NULL); - ASSERT(committer->iterMerger == NULL); - TARRAY2_DESTROY(committer->iterArray, NULL); + ASSERT(committer->dataIterMerger == NULL); + ASSERT(committer->tombIterMerger == NULL); + TARRAY2_DESTROY(committer->dataIterArray, NULL); + TARRAY2_DESTROY(committer->tombIterArray, NULL); TARRAY2_DESTROY(committer->fopArray, NULL); tsdbFSDestroyCopySnapshot(&committer->fsetArr); diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index 33bd7c4cc6ac391762d3df6ec06041430eb726b4..7500208ed5a1b69d692385582cdc1be09c818ef3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -1522,6 +1522,8 @@ int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWri } int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, TFileOpArray *opArr) { + if (writer[0] == NULL) return 0; + int32_t code = 0; int32_t lino = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 3b8aa075beae7400e5c05bd43f510029165e312c..ab3bbf78a228fc48ad01d706a04fe1cb2b416c50 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -607,11 +607,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { if (TARRAY2_SIZE(fset->lvlArr) == 0) continue; SSttLvl *lvl0 = TARRAY2_FIRST(fset->lvlArr); - if (lvl0->level != 0 || TARRAY2_SIZE(lvl0->fobjArr) == 0) continue; - - STFileObj *fobj = TARRAY2_FIRST(lvl0->fobjArr); - - if (fobj->f->stt->nseg < fs->tsdb->pVnode->config.sttTrigger) continue; + if (lvl0->level != 0 || TARRAY2_SIZE(lvl0->fobjArr) < fs->tsdb->pVnode->config.sttTrigger) continue; code = vnodeScheduleTaskEx(1, tsdbMerge, fs->tsdb); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/tsdb/tsdbFile2.c b/source/dnode/vnode/src/tsdb/tsdbFile2.c index 96bfcbb1ddc54ab5d80b21d46b9f194a79018f3b..796bb789904d4ded9a150456f8492cd3b19e9405 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile2.c @@ -134,11 +134,6 @@ static int32_t stt_to_json(const STFile *file, cJSON *json) { return TSDB_CODE_OUT_OF_MEMORY; } - /* nseg */ - if (cJSON_AddNumberToObject(json, "nseg", file->stt->nseg) == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - return 0; } @@ -160,14 +155,6 @@ static int32_t stt_from_json(const cJSON *json, STFile *file) { return TSDB_CODE_FILE_CORRUPTED; } - /* nseg */ - item = cJSON_GetObjectItem(json, "nseg"); - if (cJSON_IsNumber(item)) { - file->stt->nseg = item->valuedouble; - } else { - return TSDB_CODE_FILE_CORRUPTED; - } - return 0; } @@ -290,7 +277,7 @@ bool tsdbIsSameTFile(const STFile *f1, const STFile *f2) { bool tsdbIsTFileChanged(const STFile *f1, const STFile *f2) { if (f1->size != f2->size) return true; - if (f1->type == TSDB_FTYPE_STT && f1->stt->nseg != f2->stt->nseg) return true; + // if (f1->type == TSDB_FTYPE_STT && f1->stt->nseg != f2->stt->nseg) return true; return false; } diff --git a/source/dnode/vnode/src/tsdb/tsdbFile2.h b/source/dnode/vnode/src/tsdb/tsdbFile2.h index 3ac04263fd74c6446d5afb7d6cfae4fe91a772ed..11d08e45e667ddea6f8150239b472c86426f841a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFile2.h @@ -64,7 +64,6 @@ struct STFile { union { struct { int32_t level; - int32_t nseg; } stt[1]; }; }; diff --git a/source/dnode/vnode/src/tsdb/tsdbIter.c b/source/dnode/vnode/src/tsdb/tsdbIter.c index d28a217c3e2d6c67691dc7de2149e6fce7468141..58bac224fea89a8be120ddeaeed5e22a2b6f2f5b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbIter.c +++ b/source/dnode/vnode/src/tsdb/tsdbIter.c @@ -28,7 +28,7 @@ struct STsdbIter { SRBTreeNode node[1]; union { struct { - SSttSegReader *reader; + SSttFileReader *reader; const TSttBlkArray *sttBlkArray; int32_t sttBlkArrayIdx; SBlockData blockData[1]; @@ -51,7 +51,7 @@ struct STsdbIter { STbDataIter tbIter[1]; } memtData[1]; struct { - SSttSegReader *reader; + SSttFileReader *reader; const TTombBlkArray *tombBlkArray; int32_t tombBlkArrayIdx; STombBlock tombBlock[1]; diff --git a/source/dnode/vnode/src/tsdb/tsdbIter.h b/source/dnode/vnode/src/tsdb/tsdbIter.h index aa201d3d4dc4261cdfbee24e514bc45cb948680e..367901bd848df5b752a3439249dba95f76369b45 100644 --- a/source/dnode/vnode/src/tsdb/tsdbIter.h +++ b/source/dnode/vnode/src/tsdb/tsdbIter.h @@ -41,7 +41,7 @@ typedef enum { typedef struct { EIterType type; union { - SSttSegReader *sttReader; // TSDB_ITER_TYPE_STT || TSDB_ITER_TYPE_STT_TOMB + SSttFileReader *sttReader; // TSDB_ITER_TYPE_STT || TSDB_ITER_TYPE_STT_TOMB SDataFileReader *dataReader; // TSDB_ITER_TYPE_DATA || TSDB_ITER_TYPE_DATA_TOMB struct { SMemTable *memt; // TSDB_ITER_TYPE_MEMT_TOMB diff --git a/source/dnode/vnode/src/tsdb/tsdbMerge.c b/source/dnode/vnode/src/tsdb/tsdbMerge.c index d1f244d016d16fc4b04aab921046fcfd6f43ca95..f3a599b7a08fbc79e912ab83229e2b8e4e397691 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/tsdbMerge.c @@ -37,7 +37,7 @@ typedef struct { bool toData; int32_t level; SSttLvl *lvl; - STFileObj *fobj; + // STFileObj *fobj; TABLEID tbid[1]; int32_t blockDataIdx; SBlockData blockData[2]; @@ -305,8 +305,6 @@ static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) { merger->ctx->toData = true; merger->ctx->level = 0; - // TARRAY2_FOREACH(merger->ctx->fset->lvlArr, merger->ctx->lvl) { - for (int32_t i = 0;; ++i) { if (i >= TARRAY2_SIZE(merger->ctx->fset->lvlArr)) { merger->ctx->lvl = NULL; @@ -314,38 +312,38 @@ static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) { } merger->ctx->lvl = TARRAY2_GET(merger->ctx->fset->lvlArr, i); - if (merger->ctx->lvl->level != merger->ctx->level || TARRAY2_SIZE(merger->ctx->lvl->fobjArr) == 0) { + if (merger->ctx->lvl->level != merger->ctx->level || + TARRAY2_SIZE(merger->ctx->lvl->fobjArr) + 1 < merger->sttTrigger) { merger->ctx->toData = false; merger->ctx->lvl = NULL; break; } - ASSERT(merger->ctx->lvl->level == 0 || TARRAY2_SIZE(merger->ctx->lvl->fobjArr) == 1); + merger->ctx->level++; - merger->ctx->fobj = TARRAY2_FIRST(merger->ctx->lvl->fobjArr); - if (merger->ctx->fobj->f->stt->nseg < merger->sttTrigger) { - merger->ctx->toData = false; - break; - } else { - merger->ctx->level++; + STFileObj *fobj; + int32_t numFile = 0; + TARRAY2_FOREACH(merger->ctx->lvl->fobjArr, fobj) { + if (numFile == merger->sttTrigger) { + break; + } - // add remove operation STFileOp op = { .optype = TSDB_FOP_REMOVE, .fid = merger->ctx->fset->fid, - .of = merger->ctx->fobj->f[0], + .of = fobj->f[0], }; code = TARRAY2_APPEND(merger->fopArr, op); TSDB_CHECK_CODE(code, lino, _exit); - // open the reader SSttFileReader *reader; - SSttFileReaderConfig config[1] = {{ + SSttFileReaderConfig config = { .tsdb = merger->tsdb, .szPage = merger->szPage, - .file[0] = merger->ctx->fobj->f[0], - }}; - code = tsdbSttFileReaderOpen(merger->ctx->fobj->fname, config, &reader); + .file[0] = fobj->f[0], + }; + + code = tsdbSttFileReaderOpen(fobj->fname, &config, &reader); TSDB_CHECK_CODE(code, lino, _exit); code = TARRAY2_APPEND(merger->sttReaderArr, reader); @@ -367,33 +365,28 @@ static int32_t tsdbMergeFileSetBeginOpenIter(SMerger *merger) { SSttFileReader *sttReader; TARRAY2_FOREACH(merger->sttReaderArr, sttReader) { - const TSttSegReaderArray *segReaderArr; + STsdbIter *iter; + STsdbIterConfig config = {0}; - code = tsdbSttFileReaderGetSegReader(sttReader, &segReaderArr); + // data iter + config.type = TSDB_ITER_TYPE_STT; + config.sttReader = sttReader; + + code = tsdbIterOpen(&config, &iter); TSDB_CHECK_CODE(code, lino, _exit); - SSttSegReader *segReader; - TARRAY2_FOREACH(segReaderArr, segReader) { - STsdbIter *iter; + code = TARRAY2_APPEND(merger->dataIterArr, iter); + TSDB_CHECK_CODE(code, lino, _exit); - STsdbIterConfig config[1] = {{ - .type = TSDB_ITER_TYPE_STT, - .sttReader = segReader, - }}; + // tomb iter + config.type = TSDB_ITER_TYPE_STT_TOMB; + config.sttReader = sttReader; - // data iter - code = tsdbIterOpen(config, &iter); - TSDB_CHECK_CODE(code, lino, _exit); - code = TARRAY2_APPEND(merger->dataIterArr, iter); - TSDB_CHECK_CODE(code, lino, _exit); + code = tsdbIterOpen(&config, &iter); + TSDB_CHECK_CODE(code, lino, _exit); - // tomb iter - config->type = TSDB_ITER_TYPE_STT_TOMB; - code = tsdbIterOpen(config, &iter); - TSDB_CHECK_CODE(code, lino, _exit); - code = TARRAY2_APPEND(merger->tombIterArr, iter); - TSDB_CHECK_CODE(code, lino, _exit); - } + code = TARRAY2_APPEND(merger->tombIterArr, iter); + TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbIterMergerOpen(merger->dataIterArr, &merger->dataIterMerger, false); @@ -414,26 +407,14 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) { int32_t lino = 0; int32_t vid = TD_VID(merger->tsdb->pVnode); - if (merger->ctx->lvl) { - // to existing level - SSttFileWriterConfig config[1] = {{ - .tsdb = merger->tsdb, - .maxRow = merger->maxRow, - .szPage = merger->szPage, - .cmprAlg = merger->cmprAlg, - .compactVersion = merger->compactVersion, - .file = merger->ctx->fobj->f[0], - }}; - code = tsdbSttFileWriterOpen(config, &merger->sttWriter); + SDiskID did; + int32_t level = tsdbFidLevel(merger->ctx->fset->fid, &merger->tsdb->keepCfg, merger->ctx->now); + if (tfsAllocDisk(merger->tsdb->pVnode->pTfs, level, &did) < 0) { + code = TSDB_CODE_FS_NO_VALID_DISK; TSDB_CHECK_CODE(code, lino, _exit); - } else { - SDiskID did[1]; - int32_t level = tsdbFidLevel(merger->ctx->fset->fid, &merger->tsdb->keepCfg, merger->ctx->now); - if (tfsAllocDisk(merger->tsdb->pVnode->pTfs, level, did) < 0) { - code = TSDB_CODE_FS_NO_VALID_DISK; - TSDB_CHECK_CODE(code, lino, _exit); - } + } + { // to new level SSttFileWriterConfig config[1] = {{ .tsdb = merger->tsdb, @@ -444,13 +425,12 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) { .file = { .type = TSDB_FTYPE_STT, - .did = did[0], + .did = did, .fid = merger->ctx->fset->fid, .cid = merger->cid, .size = 0, .stt = {{ .level = merger->ctx->level, - .nseg = 0, }}, }, }}; @@ -459,14 +439,6 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) { } if (merger->ctx->toData) { - SDiskID did; - int32_t level = tsdbFidLevel(merger->ctx->fset->fid, &merger->tsdb->keepCfg, merger->ctx->now); - - if (tfsAllocDisk(merger->tsdb->pVnode->pTfs, level, &did) < 0) { - code = TSDB_CODE_FS_NO_VALID_DISK; - TSDB_CHECK_CODE(code, lino, _exit); - } - SDataFileWriterConfig config[1] = {{ .tsdb = merger->tsdb, .cmprAlg = merger->cmprAlg, @@ -622,11 +594,11 @@ static int32_t tsdbDoMerge(SMerger *merger) { STFileSet *fset; TARRAY2_FOREACH(merger->fsetArr, fset) { - SSttLvl *lvl = TARRAY2_SIZE(fset->lvlArr) > 0 ? TARRAY2_FIRST(fset->lvlArr) : NULL; - if (!lvl || lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) == 0) continue; + if (TARRAY2_SIZE(fset->lvlArr) == 0) continue; + + SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr); - STFileObj *fobj = TARRAY2_FIRST(lvl->fobjArr); - if (fobj->f->stt->nseg < merger->sttTrigger) continue; + if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < merger->sttTrigger) continue; if (!merger->ctx->opened) { code = tsdbMergerOpen(merger); diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c index 683aa1d36ff7150f6e92d5db9dcb7c5966352aa1..3ff0191ce6d736aa136eebdbaf1025e713d5f161 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c @@ -16,7 +16,6 @@ #include "tsdbSttFileRW.h" typedef struct { - int64_t prevFooter; SFDataPtr sttBlkPtr[1]; SFDataPtr statisBlkPtr[1]; SFDataPtr tombBlkPtr[1]; @@ -26,14 +25,8 @@ typedef struct { // SSttFReader ============================================================ struct SSttFileReader { SSttFileReaderConfig config[1]; - TSttSegReaderArray readerArray[1]; STsdbFD *fd; - uint8_t *bufArr[5]; -}; - -struct SSttSegReader { - SSttFileReader *reader; - SSttFooter footer[1]; + SSttFooter footer[1]; struct { bool sttBlkLoaded; bool statisBlkLoaded; @@ -42,42 +35,10 @@ struct SSttSegReader { TSttBlkArray sttBlkArray[1]; TStatisBlkArray statisBlkArray[1]; TTombBlkArray tombBlkArray[1]; + uint8_t *bufArr[5]; }; // SSttFileReader -static int32_t tsdbSttSegReaderOpen(SSttFileReader *reader, int64_t offset, SSttSegReader **segReader) { - ASSERT(offset >= TSDB_FHDR_SIZE); - - int32_t code = 0; - int32_t lino = 0; - - segReader[0] = taosMemoryCalloc(1, sizeof(*segReader[0])); - if (!segReader[0]) return TSDB_CODE_OUT_OF_MEMORY; - - segReader[0]->reader = reader; - code = tsdbReadFile(reader->fd, offset, (uint8_t *)(segReader[0]->footer), sizeof(SSttFooter)); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); - taosMemoryFree(segReader[0]); - segReader[0] = NULL; - } - return code; -} - -static int32_t tsdbSttSegReaderClose(SSttSegReader **reader) { - if (reader[0]) { - TARRAY2_DESTROY(reader[0]->tombBlkArray, NULL); - TARRAY2_DESTROY(reader[0]->statisBlkArray, NULL); - TARRAY2_DESTROY(reader[0]->sttBlkArray, NULL); - taosMemoryFree(reader[0]); - reader[0] = NULL; - } - return 0; -} - int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *config, SSttFileReader **reader) { int32_t code = 0; int32_t lino = 0; @@ -101,21 +62,12 @@ int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *con TSDB_CHECK_CODE(code, lino, _exit); } - // open each segment reader - int64_t size = config->file->size; - while (size > 0) { - SSttSegReader *reader1; - - code = tsdbSttSegReaderOpen(reader[0], size - sizeof(SSttFooter), &reader1); - TSDB_CHECK_CODE(code, lino, _exit); - - code = TARRAY2_APPEND(reader[0]->readerArray, reader1); - TSDB_CHECK_CODE(code, lino, _exit); - - size = reader1->footer->prevFooter; - } + // // open each segment reader + int64_t offset = config->file->size - sizeof(SSttFooter); + ASSERT(offset >= TSDB_FHDR_SIZE); - ASSERT(TARRAY2_SIZE(reader[0]->readerArray) == config->file->stt->nseg); + code = tsdbReadFile(reader[0]->fd, offset, (uint8_t *)(reader[0]->footer), sizeof(SSttFooter)); + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { @@ -131,20 +83,17 @@ int32_t tsdbSttFileReaderClose(SSttFileReader **reader) { tFree(reader[0]->bufArr[i]); } tsdbCloseFile(&reader[0]->fd); - TARRAY2_DESTROY(reader[0]->readerArray, tsdbSttSegReaderClose); + TARRAY2_DESTROY(reader[0]->tombBlkArray, NULL); + TARRAY2_DESTROY(reader[0]->statisBlkArray, NULL); + TARRAY2_DESTROY(reader[0]->sttBlkArray, NULL); taosMemoryFree(reader[0]); reader[0] = NULL; } return 0; } -int32_t tsdbSttFileReaderGetSegReader(SSttFileReader *reader, const TSttSegReaderArray **readerArray) { - readerArray[0] = reader->readerArray; - return 0; -} - // SSttFSegReader -int32_t tsdbSttFileReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **statisBlkArray) { +int32_t tsdbSttFileReadStatisBlk(SSttFileReader *reader, const TStatisBlkArray **statisBlkArray) { if (!reader->ctx->statisBlkLoaded) { if (reader->footer->statisBlkPtr->size > 0) { ASSERT(reader->footer->statisBlkPtr->size % sizeof(SStatisBlk) == 0); @@ -153,8 +102,8 @@ int32_t tsdbSttFileReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray ** void *data = taosMemoryMalloc(reader->footer->statisBlkPtr->size); if (!data) return TSDB_CODE_OUT_OF_MEMORY; - int32_t code = tsdbReadFile(reader->reader->fd, reader->footer->statisBlkPtr->offset, data, - reader->footer->statisBlkPtr->size); + int32_t code = + tsdbReadFile(reader->fd, reader->footer->statisBlkPtr->offset, data, reader->footer->statisBlkPtr->size); if (code) { taosMemoryFree(data); return code; @@ -172,7 +121,7 @@ int32_t tsdbSttFileReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray ** return 0; } -int32_t tsdbSttFileReadTombBlk(SSttSegReader *reader, const TTombBlkArray **tombBlkArray) { +int32_t tsdbSttFileReadTombBlk(SSttFileReader *reader, const TTombBlkArray **tombBlkArray) { if (!reader->ctx->tombBlkLoaded) { if (reader->footer->tombBlkPtr->size > 0) { ASSERT(reader->footer->tombBlkPtr->size % sizeof(STombBlk) == 0); @@ -182,7 +131,7 @@ int32_t tsdbSttFileReadTombBlk(SSttSegReader *reader, const TTombBlkArray **tomb if (!data) return TSDB_CODE_OUT_OF_MEMORY; int32_t code = - tsdbReadFile(reader->reader->fd, reader->footer->tombBlkPtr->offset, data, reader->footer->tombBlkPtr->size); + tsdbReadFile(reader->fd, reader->footer->tombBlkPtr->offset, data, reader->footer->tombBlkPtr->size); if (code) { taosMemoryFree(data); return code; @@ -200,7 +149,7 @@ int32_t tsdbSttFileReadTombBlk(SSttSegReader *reader, const TTombBlkArray **tomb return 0; } -int32_t tsdbSttFileReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlkArray) { +int32_t tsdbSttFileReadSttBlk(SSttFileReader *reader, const TSttBlkArray **sttBlkArray) { if (!reader->ctx->sttBlkLoaded) { if (reader->footer->sttBlkPtr->size > 0) { ASSERT(reader->footer->sttBlkPtr->size % sizeof(SSttBlk) == 0); @@ -209,8 +158,7 @@ int32_t tsdbSttFileReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlk void *data = taosMemoryMalloc(reader->footer->sttBlkPtr->size); if (!data) return TSDB_CODE_OUT_OF_MEMORY; - int32_t code = - tsdbReadFile(reader->reader->fd, reader->footer->sttBlkPtr->offset, data, reader->footer->sttBlkPtr->size); + int32_t code = tsdbReadFile(reader->fd, reader->footer->sttBlkPtr->offset, data, reader->footer->sttBlkPtr->size); if (code) { taosMemoryFree(data); return code; @@ -228,29 +176,27 @@ int32_t tsdbSttFileReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlk return 0; } -int32_t tsdbSttFileReadBlockData(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData) { +int32_t tsdbSttFileReadBlockData(SSttFileReader *reader, const SSttBlk *sttBlk, SBlockData *bData) { int32_t code = 0; int32_t lino = 0; - code = tRealloc(&reader->reader->config->bufArr[0], sttBlk->bInfo.szBlock); + code = tRealloc(&reader->config->bufArr[0], sttBlk->bInfo.szBlock); TSDB_CHECK_CODE(code, lino, _exit); - code = - tsdbReadFile(reader->reader->fd, sttBlk->bInfo.offset, reader->reader->config->bufArr[0], sttBlk->bInfo.szBlock); + code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset, reader->config->bufArr[0], sttBlk->bInfo.szBlock); TSDB_CHECK_CODE(code, lino, _exit); - code = tDecmprBlockData(reader->reader->config->bufArr[0], sttBlk->bInfo.szBlock, bData, - &reader->reader->config->bufArr[1]); + code = tDecmprBlockData(reader->config->bufArr[0], sttBlk->bInfo.szBlock, bData, &reader->config->bufArr[1]); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code); + TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); } return code; } -int32_t tsdbSttFileReadBlockDataByColumn(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData, +int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *sttBlk, SBlockData *bData, STSchema *pTSchema, int16_t cids[], int32_t ncid) { int32_t code = 0; int32_t lino = 0; @@ -260,17 +206,17 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttSegReader *reader, const SSttBlk *s TSDB_CHECK_CODE(code, lino, _exit); // uid + version + tskey - code = tRealloc(&reader->reader->config->bufArr[0], sttBlk->bInfo.szKey); + code = tRealloc(&reader->config->bufArr[0], sttBlk->bInfo.szKey); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbReadFile(reader->reader->fd, sttBlk->bInfo.offset, reader->reader->config->bufArr[0], sttBlk->bInfo.szKey); + code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset, reader->config->bufArr[0], sttBlk->bInfo.szKey); TSDB_CHECK_CODE(code, lino, _exit); // hdr SDiskDataHdr hdr[1]; int32_t size = 0; - size += tGetDiskDataHdr(reader->reader->config->bufArr[0] + size, hdr); + size += tGetDiskDataHdr(reader->config->bufArr[0] + size, hdr); ASSERT(hdr->delimiter == TSDB_FILE_DLMT); @@ -280,8 +226,8 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttSegReader *reader, const SSttBlk *s // uid if (hdr->uid == 0) { ASSERT(hdr->szUid); - code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, hdr->szUid, TSDB_DATA_TYPE_BIGINT, hdr->cmprAlg, - (uint8_t **)&bData->aUid, sizeof(int64_t) * hdr->nRow, &reader->reader->config->bufArr[1]); + code = tsdbDecmprData(reader->config->bufArr[0] + size, hdr->szUid, TSDB_DATA_TYPE_BIGINT, hdr->cmprAlg, + (uint8_t **)&bData->aUid, sizeof(int64_t) * hdr->nRow, &reader->config->bufArr[1]); TSDB_CHECK_CODE(code, lino, _exit); } else { ASSERT(hdr->szUid == 0); @@ -289,14 +235,14 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttSegReader *reader, const SSttBlk *s size += hdr->szUid; // version - code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, hdr->szVer, TSDB_DATA_TYPE_BIGINT, hdr->cmprAlg, - (uint8_t **)&bData->aVersion, sizeof(int64_t) * hdr->nRow, &reader->reader->config->bufArr[1]); + code = tsdbDecmprData(reader->config->bufArr[0] + size, hdr->szVer, TSDB_DATA_TYPE_BIGINT, hdr->cmprAlg, + (uint8_t **)&bData->aVersion, sizeof(int64_t) * hdr->nRow, &reader->config->bufArr[1]); TSDB_CHECK_CODE(code, lino, _exit); size += hdr->szVer; // ts - code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, hdr->szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr->cmprAlg, - (uint8_t **)&bData->aTSKEY, sizeof(TSKEY) * hdr->nRow, &reader->reader->config->bufArr[1]); + code = tsdbDecmprData(reader->config->bufArr[0] + size, hdr->szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr->cmprAlg, + (uint8_t **)&bData->aTSKEY, sizeof(TSKEY) * hdr->nRow, &reader->config->bufArr[1]); TSDB_CHECK_CODE(code, lino, _exit); size += hdr->szKey; @@ -305,11 +251,11 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttSegReader *reader, const SSttBlk *s // other columns if (bData->nColData > 0) { if (hdr->szBlkCol > 0) { - code = tRealloc(&reader->reader->config->bufArr[0], hdr->szBlkCol); + code = tRealloc(&reader->config->bufArr[0], hdr->szBlkCol); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbReadFile(reader->reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey, - reader->reader->config->bufArr[0], hdr->szBlkCol); + code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey, reader->config->bufArr[0], + hdr->szBlkCol); TSDB_CHECK_CODE(code, lino, _exit); } @@ -322,7 +268,7 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttSegReader *reader, const SSttBlk *s while (blockCol && blockCol->cid < colData->cid) { if (size < hdr->szBlkCol) { - size += tGetBlockCol(reader->reader->config->bufArr[0] + size, blockCol); + size += tGetBlockCol(reader->config->bufArr[0] + size, blockCol); } else { ASSERT(size == hdr->szBlkCol); blockCol = NULL; @@ -346,16 +292,15 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttSegReader *reader, const SSttBlk *s } else { int32_t size1 = blockCol->szBitmap + blockCol->szOffset + blockCol->szValue; - code = tRealloc(&reader->reader->config->bufArr[1], size1); + code = tRealloc(&reader->config->bufArr[1], size1); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbReadFile(reader->reader->fd, - sttBlk->bInfo.offset + sttBlk->bInfo.szKey + hdr->szBlkCol + blockCol->offset, - reader->reader->config->bufArr[1], size1); + code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey + hdr->szBlkCol + blockCol->offset, + reader->config->bufArr[1], size1); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbDecmprColData(reader->reader->config->bufArr[1], blockCol, hdr->cmprAlg, hdr->nRow, colData, - &reader->reader->config->bufArr[2]); + code = tsdbDecmprColData(reader->config->bufArr[1], blockCol, hdr->cmprAlg, hdr->nRow, colData, + &reader->config->bufArr[2]); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -364,30 +309,29 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttSegReader *reader, const SSttBlk *s _exit: if (code) { - TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code); + TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); } return code; } -int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *tombBlk, STombBlock *tombBlock) { +int32_t tsdbSttFileReadTombBlock(SSttFileReader *reader, const STombBlk *tombBlk, STombBlock *tombBlock) { int32_t code = 0; int32_t lino = 0; - code = tRealloc(&reader->reader->config->bufArr[0], tombBlk->dp->size); + code = tRealloc(&reader->config->bufArr[0], tombBlk->dp->size); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbReadFile(reader->reader->fd, tombBlk->dp->offset, reader->reader->config->bufArr[0], tombBlk->dp->size); + code = tsdbReadFile(reader->fd, tombBlk->dp->offset, reader->config->bufArr[0], tombBlk->dp->size); if (code) TSDB_CHECK_CODE(code, lino, _exit); int64_t size = 0; tTombBlockClear(tombBlock); for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->dataArr); ++i) { - code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, tombBlk->size[i], TSDB_DATA_TYPE_BIGINT, - tombBlk->cmprAlg, &reader->reader->config->bufArr[1], sizeof(int64_t) * tombBlk->numRec, - &reader->reader->config->bufArr[2]); + code = tsdbDecmprData(reader->config->bufArr[0] + size, tombBlk->size[i], TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg, + &reader->config->bufArr[1], sizeof(int64_t) * tombBlk->numRec, &reader->config->bufArr[2]); TSDB_CHECK_CODE(code, lino, _exit); - code = TARRAY2_APPEND_BATCH(&tombBlock->dataArr[i], reader->reader->config->bufArr[1], tombBlk->numRec); + code = TARRAY2_APPEND_BATCH(&tombBlock->dataArr[i], reader->config->bufArr[1], tombBlk->numRec); TSDB_CHECK_CODE(code, lino, _exit); size += tombBlk->size[i]; @@ -396,31 +340,30 @@ int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *tombBlk, ASSERT(size == tombBlk->dp->size); _exit: if (code) { - TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code); + TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); } return code; } -int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *statisBlock) { +int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *statisBlock) { int32_t code = 0; int32_t lino = 0; - code = tRealloc(&reader->reader->config->bufArr[0], statisBlk->dp->size); + code = tRealloc(&reader->config->bufArr[0], statisBlk->dp->size); TSDB_CHECK_CODE(code, lino, _exit); - code = - tsdbReadFile(reader->reader->fd, statisBlk->dp->offset, reader->reader->config->bufArr[0], statisBlk->dp->size); + code = tsdbReadFile(reader->fd, statisBlk->dp->offset, reader->config->bufArr[0], statisBlk->dp->size); TSDB_CHECK_CODE(code, lino, _exit); int64_t size = 0; tStatisBlockClear(statisBlock); for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->dataArr); ++i) { - code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, statisBlk->size[i], TSDB_DATA_TYPE_BIGINT, - statisBlk->cmprAlg, &reader->reader->config->bufArr[1], sizeof(int64_t) * statisBlk->numRec, - &reader->reader->config->bufArr[2]); + code = + tsdbDecmprData(reader->config->bufArr[0] + size, statisBlk->size[i], TSDB_DATA_TYPE_BIGINT, statisBlk->cmprAlg, + &reader->config->bufArr[1], sizeof(int64_t) * statisBlk->numRec, &reader->config->bufArr[2]); TSDB_CHECK_CODE(code, lino, _exit); - code = TARRAY2_APPEND_BATCH(statisBlock->dataArr + i, reader->reader->config->bufArr[1], statisBlk->numRec); + code = TARRAY2_APPEND_BATCH(statisBlock->dataArr + i, reader->config->bufArr[1], statisBlk->numRec); TSDB_CHECK_CODE(code, lino, _exit); size += statisBlk->size[i]; @@ -430,7 +373,7 @@ int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const SStatisBlk *stat _exit: if (code) { - TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code); + TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); } return code; } @@ -696,7 +639,6 @@ _exit: } static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) { - writer->footer->prevFooter = writer->config->file.size; int32_t code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)writer->footer, sizeof(writer->footer)); if (code) return code; writer->file->size += sizeof(writer->footer); @@ -708,32 +650,24 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) { int32_t lino = 0; // set - writer->file[0] = writer->config->file; - writer->file->stt->nseg++; if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb; if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow; if (!writer->config->bufArr) writer->config->bufArr = writer->bufArr; + writer->file[0] = writer->config->file; + // open file - int32_t flag; + int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; char fname[TSDB_FILENAME_LEN]; - if (writer->file->size > 0) { - flag = TD_FILE_READ | TD_FILE_WRITE; - } else { - flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; - } - tsdbTFileName(writer->config->tsdb, writer->file, fname); code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd); TSDB_CHECK_CODE(code, lino, _exit); - if (writer->file->size == 0) { - uint8_t hdr[TSDB_FHDR_SIZE] = {0}; - code = tsdbWriteFile(writer->fd, 0, hdr, sizeof(hdr)); - TSDB_CHECK_CODE(code, lino, _exit); - writer->file->size += sizeof(hdr); - } + uint8_t hdr[TSDB_FHDR_SIZE] = {0}; + code = tsdbWriteFile(writer->fd, 0, hdr, sizeof(hdr)); + TSDB_CHECK_CODE(code, lino, _exit); + writer->file->size += sizeof(hdr); writer->ctx->opened = true; @@ -798,22 +732,12 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *o tsdbCloseFile(&writer->fd); - ASSERT(writer->config->file.size < writer->file->size); - STFileOp op; - if (writer->config->file.size == 0) { - op = (STFileOp){ - .optype = TSDB_FOP_CREATE, - .fid = writer->config->file.fid, - .nf = writer->file[0], - }; - } else { - op = (STFileOp){ - .optype = TSDB_FOP_MODIFY, - .fid = writer->config->file.fid, - .of = writer->config->file, - .nf = writer->file[0], - }; - } + ASSERT(writer->file->size > 0); + STFileOp op = (STFileOp){ + .optype = TSDB_FOP_CREATE, + .fid = writer->config->file.fid, + .nf = writer->file[0], + }; code = TARRAY2_APPEND(opArray, op); TSDB_CHECK_CODE(code, lino, _exit); @@ -826,22 +750,17 @@ _exit: } static int32_t tsdbSttFWriterCloseAbort(SSttFileWriter *writer) { - if (writer->config->file.size) { // truncate the file to the original size - ASSERT(writer->config->file.size <= writer->file->size); - if (writer->config->file.size < writer->file->size) { - taosFtruncateFile(writer->fd->pFD, writer->config->file.size); - tsdbCloseFile(&writer->fd); - } - } else { // remove the file - char fname[TSDB_FILENAME_LEN]; - tsdbTFileName(writer->config->tsdb, &writer->config->file, fname); - tsdbCloseFile(&writer->fd); - taosRemoveFile(fname); - } + char fname[TSDB_FILENAME_LEN]; + tsdbTFileName(writer->config->tsdb, &writer->config->file, fname); + tsdbCloseFile(&writer->fd); + taosRemoveFile(fname); return 0; } int32_t tsdbSttFileWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **writer) { + ASSERT(config->file.type == TSDB_FTYPE_STT); + ASSERT(config->file.size == 0); + writer[0] = taosMemoryCalloc(1, sizeof(*writer[0])); if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.h b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.h index fdea7527f59e960e0a5a27764c54b2501c21de0e..f3b2e66ab2a68e4eafe4a46e7e7431e8f47c0e1e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.h +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.h @@ -30,25 +30,22 @@ typedef TARRAY2(STombBlk) TTombBlkArray; // SSttFileReader ========================================== typedef struct SSttFileReader SSttFileReader; typedef struct SSttFileReaderConfig SSttFileReaderConfig; -typedef struct SSttSegReader SSttSegReader; typedef TARRAY2(SSttFileReader *) TSttFileReaderArray; -typedef TARRAY2(SSttSegReader *) TSttSegReaderArray; // SSttFileReader int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *config, SSttFileReader **reader); int32_t tsdbSttFileReaderClose(SSttFileReader **reader); -int32_t tsdbSttFileReaderGetSegReader(SSttFileReader *reader, const TSttSegReaderArray **readerArray); // SSttSegReader -int32_t tsdbSttFileReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlkArray); -int32_t tsdbSttFileReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **statisBlkArray); -int32_t tsdbSttFileReadTombBlk(SSttSegReader *reader, const TTombBlkArray **delBlkArray); +int32_t tsdbSttFileReadSttBlk(SSttFileReader *reader, const TSttBlkArray **sttBlkArray); +int32_t tsdbSttFileReadStatisBlk(SSttFileReader *reader, const TStatisBlkArray **statisBlkArray); +int32_t tsdbSttFileReadTombBlk(SSttFileReader *reader, const TTombBlkArray **delBlkArray); -int32_t tsdbSttFileReadBlockData(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData); -int32_t tsdbSttFileReadBlockDataByColumn(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData, +int32_t tsdbSttFileReadBlockData(SSttFileReader *reader, const SSttBlk *sttBlk, SBlockData *bData); +int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *sttBlk, SBlockData *bData, STSchema *pTSchema, int16_t cids[], int32_t ncid); -int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *sData); -int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *delBlk, STombBlock *dData); +int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *sData); +int32_t tsdbSttFileReadTombBlock(SSttFileReader *reader, const STombBlk *delBlk, STombBlock *dData); struct SSttFileReaderConfig { STsdb *tsdb;