diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index 83f86a80a78a5d2ace726bc28dd2953e13ab1a74..e81de0893cf221d19b739adb92a6ce03e1b2b4f1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -16,17 +16,41 @@ #include "tsdb.h" typedef struct { - STsdb *pTsdb; - STsdbFS fs; +} SMemDIter; + +typedef struct { + SArray *aBlockIdx; // SArray + SMapData mDataBlk; // SMapData + SBlockData bData +} SDataDIter; + +typedef struct { + SArray *aSttBlk; // SArray + SBlockData bData; +} SSttDIter; + +typedef struct { + STsdb *pTsdb; + STsdbFS fs; + int64_t cid; + int32_t fid; + SDFileSet *pDFileSet; + SRBTree rtree; } STsdbCompactor; #define TSDB_FLG_DEEP_COMPACT 0x1 -static int32_t tsdbBeginCompact(STsdb *pTsdb) { +static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) { int32_t code = 0; int32_t lino = 0; - // TODO + pCompactor->pTsdb = pTsdb; + + code = tsdbFSCopy(pTsdb, &pCompactor->fs); + TSDB_CHECK_CODE(code, lino, _exit); + + pCompactor->fid = INT32_MIN; + _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); @@ -34,10 +58,12 @@ _exit: return code; } -static int32_t tsdbCommitCompact(STsdb *pTsdb) { +static int32_t tsdbCommitCompact(STsdbCompactor *pCompactor) { int32_t code = 0; int32_t lino = 0; + STsdb *pTsdb = pCompactor->pTsdb; + // TODO _exit: @@ -47,10 +73,12 @@ _exit: return code; } -static int32_t tsdbAbortCompact(STsdb *pTsdb) { +static int32_t tsdbAbortCompact(STsdbCompactor *pCompactor) { int32_t code = 0; int32_t lino = 0; + STsdb *pTsdb = pCompactor->pTsdb; + // TODO _exit: @@ -60,11 +88,11 @@ _exit: return code; } -static int32_t tsdbDeepCompact(STsdb *pTsdb) { +static int32_t tsdbDeepCompact(STsdbCompactor *pCompactor) { int32_t code = 0; int32_t lino = 0; - // TODO + STsdb *pTsdb = pCompactor->pTsdb; _exit: if (code) { @@ -73,10 +101,12 @@ _exit: return code; } -static int32_t tsdbShallowCompact(STsdb *pTsdb) { +static int32_t tsdbShallowCompact(STsdbCompactor *pCompactor) { int32_t code = 0; int32_t lino = 0; + STsdb *pTsdb = pCompactor->pTsdb; + // TODO _exit: @@ -93,23 +123,30 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { // Check if can do compact (TODO) // Do compact - code = tsdbBeginCompact(pTsdb); + STsdbCompactor compactor = {0}; + + code = tsdbBeginCompact(pTsdb, &compactor); TSDB_CHECK_CODE(code, lino, _exit); - if (flag & TSDB_FLG_DEEP_COMPACT) { - code = tsdbDeepCompact(pTsdb); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - code = tsdbShallowCompact(pTsdb); - TSDB_CHECK_CODE(code, lino, _exit); + while (true) { + compactor.pDFileSet = (SDFileSet *)taosArraySearch(compactor.fs.aDFileSet, &compactor.fid, tDFileSetCmprFn, TD_GT); + if (compactor.pDFileSet == NULL) break; + + if (flag & TSDB_FLG_DEEP_COMPACT) { + code = tsdbDeepCompact(&compactor); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbShallowCompact(&compactor); + TSDB_CHECK_CODE(code, lino, _exit); + } } _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - tsdbAbortCompact(pTsdb); + tsdbAbortCompact(&compactor); } else { - tsdbCommitCompact(pTsdb); + tsdbCommitCompact(&compactor); } return code; }