diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index 21da615825b89817087a879cd5a9cd929d0647ec..7efb63f216ffbba3448e22268dba75d30c01e26a 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -16,15 +16,33 @@ #include "inc/tsdbMerge.h" typedef struct { - bool launched; + bool launched; + bool toData; + int32_t level; + STFileSet *fset; } SMergeCtx; typedef struct { - STsdb *tsdb; - SMergeCtx ctx; + STsdb *tsdb; + // config + int32_t maxRow; + int32_t szPage; + int8_t cmprAlg; + int64_t cid; + SSkmInfo skmTb; + SSkmInfo skmRow; + uint8_t *aBuf[5]; + + // context + SMergeCtx ctx; + // reader + TARRAY2(SSttFileReader *) sttReaderArr; + SDataFileReader *dataReader; + // writer SSttFileWriter *sttWriter; SDataFileWriter *dataWriter; - TFileOpArray fopArr; + // operations + TFileOpArray fopArr; } SMerger; static int32_t tsdbMergerOpen(SMerger *merger) { @@ -58,57 +76,117 @@ _exit: return 0; } -static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) { - int32_t code = 0; - int32_t lino = 0; +static int32_t tsdbDoMergeFileSet(SMerger *merger) { + // TODO + return 0; +} - if (merger->ctx.launched == false) { - code = tsdbMergerOpen(merger); - TSDB_CHECK_CODE(code, lino, _exit); - } +static int32_t tsdbMergeFileSetBegin(SMerger *merger) { + int32_t code = 0; + int32_t lino = 0; + int32_t vid = TD_VID(merger->tsdb->pVnode); + STFileSet *fset = merger->ctx.fset; + + // prepare the merger file set + SSttLvl *lvl; + STFileObj *fobj; + merger->ctx.toData = true; + merger->ctx.level = 0; + + TARRAY2_FOREACH(&fset->lvlArr, lvl) { + if (lvl->level != merger->ctx.level) { + lvl = NULL; + break; + } - { // prepare the merger file set - SSttLvl *lvl; - STFileObj *fobj; - bool mergerToData = true; - int32_t level = -1; + fobj = TARRAY2_GET(&lvl->farr, 0); + if (fobj->f.stt.nseg < merger->tsdb->pVnode->config.sttTrigger) { + merger->ctx.toData = false; + break; + } else { + ASSERT(lvl->level == 0 || TARRAY2_SIZE(&lvl->farr) == 1); + merger->ctx.level++; - TARRAY2_FOREACH(&fset->lvlArr, lvl) { - if (lvl->level - level > 1) { - mergerToData = false; - break; - } + // open the reader + SSttFileReader *reader; + // code = tsdbSttFileReaderOpen(&fobj->f.stt, &reader); + // TSDB_CHECK_CODE(code, lino, _exit); - if (lvl->level == 0) { - } else { - ASSERT(TARRAY2_SIZE(&lvl->farr) == 1); + code = TARRAY2_APPEND(&merger->sttReaderArr, reader); + TSDB_CHECK_CODE(code, lino, _exit); - fobj = TARRAY2_FIRST(&lvl->farr); - } + // add the operation + STFileOp op = { + .fid = fobj->f.fid, + .optype = TSDB_FOP_REMOVE, + .of = fobj->f, + }; + code = TARRAY2_APPEND(&merger->fopArr, op); + TSDB_CHECK_CODE(code, lino, _exit); } + } - // merge to level - level = level + 1; - lvl = tsdbTFileSetGetLvl(fset, level); - if (lvl == NULL) { - // open new stt file to merge to - } else { - // open existing stt file to merge to - } + // open stt file writer + SSttFileWriterConfig config = { + .pTsdb = merger->tsdb, + .maxRow = merger->maxRow, + .szPage = merger->szPage, + .cmprAlg = merger->cmprAlg, + .pSkmTb = &merger->skmTb, + .pSkmRow = &merger->skmRow, + .aBuf = merger->aBuf, + }; + if (lvl) { + config.file = fobj->f; + } else { + config.file = (STFile){ + .type = TSDB_FTYPE_STT, + .did = {.level = 0, .id = 0}, + .fid = fset->fid, + .cid = merger->cid, + .size = 0, + .stt = {.level = merger->ctx.level, .nseg = 0}, + }; + } + code = tsdbSttFWriterOpen(&config, &merger->sttWriter); + TSDB_CHECK_CODE(code, lino, _exit); - if (mergerToData) { - // code = tsdbDataFWriterOpen(SDataFWriter * *ppWriter, STsdb * pTsdb, SDFileSet * pSet); - // TSDB_CHECK_CODE(code, lino, _exit); - } + // open data file writer + if (merger->ctx.toData) { + // code = tsdbDataFWriterOpen(); + // TSDB_CHECK_CODE(code, lino, _exit); } - { - // do merge the file set +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); } + return code; +} +static int32_t tsdbMergeFileSetEnd(SMerger *merger) { + // TODO + return 0; +} +static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) { + int32_t code = 0; + int32_t lino = 0; - { // end merge the file set + if (merger->ctx.launched == false) { + code = tsdbMergerOpen(merger); + TSDB_CHECK_CODE(code, lino, _exit); } + merger->ctx.fset = fset; + + code = tsdbMergeFileSetBegin(merger); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbDoMergeFileSet(merger); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbMergeFileSetEnd(merger); + TSDB_CHECK_CODE(code, lino, _exit); + _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(merger->tsdb->pVnode), __func__, lino, tstrerror(code));