diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index c07bf2d6c841d9b94947bafc5d7e854c4cb49b79..083bb6623f3e03ce6200fc04a40ce1599ecd05ea 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -281,6 +281,8 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); // tsdbRead.c ============================================================================================== int32_t tsdbTakeReadSnap(STsdb *pTsdb, STsdbReadSnap **ppSnap); void tsdbUntakeReadSnap(STsdb *pTsdb, STsdbReadSnap *pSnap); +// tsdbMerge.c ============================================================================================== +int32_t tsdbMerge(STsdb *pTsdb); #define TSDB_CACHE_NO(c) ((c).cacheLast == 0) #define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index ea398461aefe94e87b78a7baf7dd5b1ebe437cae..1fbdbdad823a969a73f8a8e9ba4c2f6cb0e2a6cb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -28,6 +28,7 @@ typedef struct { typedef struct { STsdb *pTsdb; + int8_t toMerge; /* commit data */ int64_t commitID; int32_t minutes; @@ -394,6 +395,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { SDFileSet wSet = {0}; if (pRSet) { ASSERT(pRSet->nLastF < pCommitter->maxLast); + fHead = (SHeadFile){.commitID = pCommitter->commitID}; fData = *pRSet->pDataF; fSma = *pRSet->pSmaF; @@ -409,6 +411,10 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { } wSet.nLastF = pRSet->nLastF + 1; wSet.aLastF[wSet.nLastF - 1] = &fLast; // todo + + if (wSet.nLastF == pCommitter->maxLast) { + pCommitter->toMerge = 1; + } } else { fHead = (SHeadFile){.commitID = pCommitter->commitID}; fData = (SDataFile){.commitID = pCommitter->commitID}; @@ -1277,6 +1283,11 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { tsdbFSDestroy(&pCommitter->fs); taosArrayDestroy(pCommitter->aTbDataP); + if (pCommitter->toMerge) { + code = tsdbMerge(pTsdb); + if (code) goto _err; + } + tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode)); return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbMerge.c b/source/dnode/vnode/src/tsdb/tsdbMerge.c index 8b2685e97b13cf2a5c92edfcf2827c62839aa78f..9bdc0ba46a0c0df49a8c0b2159d9bd0d932c5dc7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/tsdbMerge.c @@ -17,11 +17,36 @@ typedef struct { STsdb *pTsdb; + int8_t maxLast; STsdbFS fs; + struct { + SDataFReader *pReader; + } dReader; + struct { + SDataFWriter *pWriter; + } dWriter; } STsdbMerger; int32_t tsdbMerge(STsdb *pTsdb) { - int32_t code = 0; - // TODO + int32_t code = 0; + STsdbMerger merger = {0}; + STsdbMerger *pMerger = &merger; + + pMerger->pTsdb = pTsdb; + pMerger->maxLast = TSDB_DEFAULT_LAST_FILE; + code = tsdbFSCopy(pTsdb, &pMerger->fs); + if (code) goto _err; + + for (int32_t iSet = 0; iSet < taosArrayGetSize(pMerger->fs.aDFileSet); iSet++) { + SDFileSet *pSet = (SDFileSet *)taosArrayGet(pMerger->fs.aDFileSet, iSet); + if (pSet->nLastF < pMerger->maxLast) continue; + + // do merge the file + } + + return code; + +_err: + tsdbError("vgId:%d tsdb merge failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } \ No newline at end of file