From 3a40080d1676b4d8841add2cabe72f7e177c4b45 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 6 Jan 2021 10:21:26 +0000 Subject: [PATCH] partial work --- src/tsdb/inc/tsdbMain.h | 2 + src/tsdb/src/tsdbCommit.c | 190 ++++++++++++++++++++++++++------------ src/tsdb/src/tsdbFile.c | 12 ++- 3 files changed, 141 insertions(+), 63 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 6142fd1880..86c8966445 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -368,9 +368,11 @@ typedef struct { #define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t)) void tsdbInitDFileSet(SDFileSet* pSet, int vid, int fid, int ver, int level, int id); +void tsdbInitDFileSetWithOld(SDFileSet *pSet, SDFileSet *pOldSet); int tsdbOpenDFileSet(SDFileSet* pSet, int flags); void tsdbCloseDFileSet(SDFileSet* pSet); int tsdbUpdateDFileSetHeader(SDFileSet* pSet); +int tsdbCopyDFileSet(SDFileSet* pFromSet, SDFileSet* pToSet); /* Statistic information of the TSDB file system. */ diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 397f5707ad..b98dfed698 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -35,20 +35,6 @@ typedef struct { SDataCols * pDataCols; } SCommitH; -static int tsdbCommitTSData(STsdbRepo *pRepo); -static int tsdbCommitMeta(STsdbRepo *pRepo); -static int tsdbStartCommit(STsdbRepo *pRepo); -static void tsdbEndCommit(STsdbRepo *pRepo, int eno); -static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey); -static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch); -static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo); -static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables); -static void tsdbSeekCommitIter(SCommitIter *pIters, int nIters, TSKEY key); -static int tsdbInitCommitH(STsdbRepo *pRepo, SCommitH *pch); -static void tsdbDestroyCommitH(SCommitH *pch, int niter); -static void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn); -static int tsdbGetFidLevel(int fid, SRtn *pRtn); - void *tsdbCommitData(STsdbRepo *pRepo) { if (tsdbStartCommit(pRepo) < 0) { tsdbError("vgId:%d failed to commit data while startting to commit since %s", REPO_ID(pRepo), tstrerror(terrno)); @@ -84,39 +70,61 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) { SCommitH ch = {0}; SFSIter fsIter = {0}; SDFileSet *pOldSet = NULL; + SDFileSet nSet; + int level, id; int fid; if (pMem->numOfRows <= 0) return 0; + // Resource initialization if (tsdbInitCommitH(pRepo, &ch) < 0) { + // TODO return -1; } + tsdbInitFSIter(pRepo, &fsIter); + // Skip expired memory data and expired FSET tsdbSeekCommitIter(ch.iters, pMem->maxTables, ch.rtn.minKey); - - tsdbInitFSIter(pRepo, &fsIter); - pOldSet = tsdbFSIterNext(&fsIter); fid = tsdbNextCommitFid(ch.iters, pMem->maxTables); + while (true) { + pOldSet = tsdbFSIterNext(&fsIter); + if (pOldSet == NULL || pOldSet->fid >= ch.rtn.minFid) break; + } + // Loop to commit to each file while (true) { + // Loop over both on disk and memory if (pOldSet == NULL && fid == TSDB_IVLD_FID) break; - if (pOldSet == NULL || (fid != TSDB_IVLD_FID && pOldSet->fid > fid)) { - ASSERT(fid >= ch.rtn.minFid); - // commit to new SDFileSet fid - tsdbCommitToFile(pRepo, NULL, &ch, fid); - fid = tsdbNextCommitFid(ch.iters, pMem->maxTables); - } else if (fid != TSDB_IVLD_FID && pOldSet->fid == fid) { - ASSERT(fid >= ch.rtn.minFid); - // commit to fid with old SDFileSet - tsdbCommitToFile(pRepo, pOldSet, &ch, fid); - fid = tsdbNextCommitFid(ch.iters, pMem->maxTables); + // Only has existing FSET but no memory data to commit in this + // existing FSET, only check if file in correct retention + if (pOldSet && (fid == TSDB_IVLD_FID || pOldSet->fid < fid)) { + if (tsdbApplyRtn(*pOldSet, &(ch.rtn), &nSet) < 0) { + return -1; + } + + tsdbUpdateDFileSet(pRepo, &nSet); + pOldSet = tsdbFSIterNext(&fsIter); + continue; + } + + SDFileSet *pCSet; + int cfid; + + if (pOldSet == NULL || pOldSet->fid > fid) { + // Commit to a new FSET with fid: fid + pCSet = NULL; + cfid = fid; } else { - // check if pOldSet need to be changed - tsdbCommitToFile(pRepo, pOldSet, &ch, TSDB_IVLD_FID); - pOldSet = tsdbFSIterNext(&fsIter) + // Commit to an existing FSET + pCSet = pOldSet; + cfid = pOldSet->fid; + pOldSet = tsdbFSIterNext(&fsIter); } + fid = tsdbNextCommitFid(ch.iters, pMem->maxTables); + + tsdbCommitToFile(pCSet, &ch, cfid); } tsdbDestroyCommitH(&ch, pMem->maxTables); @@ -212,47 +220,60 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS return false; } -static int tsdbCommitToFile(STsdbRepo *pRepo, SDFileSet *pOldSet, SCommitH *pch, int fid) { - SDFileSet rSet; - SDFileSet wSet; - int level, id; +static int tsdbCommitToFile(SCommitH *pch, SDFileSet *pOldSet, int fid) { + int level, id; + int nSet, ver; + STsdbRepo *pRepo; - // ASSERT(pOldSet != NULL || fid != TSDB_IVLD_FID); + ASSERT(pOldSet == NULL || pOldSet->fid == fid); - // file should be deleted, do nothing and return - if (pOldSet && pOldSet->fid < pch->rtn.minFid) { - ASSERT(fid == TSDB_IVLD_FID); - return 0; + tfsAllocDisk(tsdbGetFidLevel(fid, &(pch->rtn)), &level, &id); + if (level == TFS_UNDECIDED_LEVEL) { + // TODO + return -1; } - if (pOldSet == NULL) { - ASSERT(fid != TSDB_IVLD_FID); - - tfsAllocDisk(tsdbGetFidLevel(fid, &(pch->rtn)), &level, &id); - if (level == TFS_UNDECIDED_LEVEL) { - // terrno = TSDB_CODE_TDB_NO_INVALID_DISK; + if (pOldSet == NULL || level > TSDB_FSET_LEVEL(pOldSet)) { + // Create new fset to commit + tsdbInitDFileSet(&nSet, pRepo, fid, ver, level, id); + if (tsdbOpenDFileSet(&nSet, O_WRONLY | O_CREAT) < 0) { + // TODO: return -1; } - // wSet here is the file to write, no read set - tsdbInitDFileSet(&wSet, REPO_ID(pRepo), fid, 0 /*TODO*/, level, id); - } else { - tfsAllocDisk(tsdbGetFidLevel(pOldSet->fid, &(pch->rtn)), &level, &fid); - if (level == TFS_UNDECIDED_LEVEL) { - // terrno = TSDB_CODE_TDB_NO_INVALID_DISK; + if (tsdbUpdateDFileSetHeader(&nSet) < 0) { + // TODO return -1; } + } else { + level = TSDB_FSET_LEVEL(pOldSet); - if (level > TSDB_FSET_LEVEL(pOldSet)) { - // wSet here is the file to write, pOldSet here is the read set - tsdbInitDFileSet(&wSet, REPO_ID(pRepo), fid, 0 /*TODO*/, level, id); + tsdbInitDFile(TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_HEAD), ...); + + tsdbInitDFileWithOld(TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_DATA), TSDB_DFILE_IN_SET(pOldSet, TSDB_FILE_DATA)) + + SDFile *pDFile = TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_LAST); + if (pDFile->info.size < 32 * 1024 * 1024) { + tsdbInitDFileWithOld(TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_LAST), TSDB_DFILE_IN_SET(pOldSet, TSDB_FILE_LAST)) } else { - // get wSet with pOldSet + tsdbInitDFile(TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_LAST), ...); + } + + tsdbOpenDFileSet(&nSet, O_WRONLY | O_CREAT); + + // TODO: update file header + } + + tsdbSetCommitFile(pch, pOldSet, &nSet); + + for (size_t tid = 0; tid < pMem->maxTables; tid++) { + SCommitIter *pIter = pch->iters + tid; + if (pIter->pTable == NULL) continue; + + if (tsdbCommitToTable(pch, tid) < 0) { + // TODO + return -1; } - // if (level == TSDB_FSET_LEVEL(pOldSet)) { - // } else { - // // TODO - // } } tsdbUpdateDFileSet(pRepo, &wSet); @@ -385,4 +406,55 @@ static int tsdbNextCommitFid(SCommitIter *iters, int niters) { // TODO return fid; +} + +static int tsdbApplyRtn(const SDFileSet oSet, const SRtn *pRtn, SDFileSet *pRSet) { + int level, id; + int vid, ver; + + tfsAllocDisk(tsdbGetFidLevel(oSet.fid, pRtn), &level, &id); + + if (level == TFS_UNDECIDED_LEVEL) { + // terrno = TSDB_CODE_TDB_NO_AVAILABLE_DISK; + return -1; + } + + if (level > TSDB_FSET_LEVEL(pSet)) { + tsdbInitDFileSet(pRSet, vid, TSDB_FSET_FID(&oSet), ver, level, id); + if (tsdbCopyDFileSet(&oSet, pRSet) < 0) { + return -1; + } + } else { + tsdbInitDFileSetWithOld(pRSet, &oSet); + } + + return 0; +} + +static int tsdbCommitToTable(SCommitH *pch, int tid) { + SCommitIter *pIter = pch->iters + tid; + if (pIter->pTable == NULL) return 0; + + TSDB_RLOCK_TABLE(pIter->pTable); + + tsdbSetCommitTable(pch, pIter->pTable); + + if (pIter->pIter == NULL && pch->readh.pBlockIdx == NULL) { + TSDB_RUNLOCK_TABLE(pIter->pTable); + return 0; + } + + if (tsdbLoadBlockInfo(pch, NULL) < 0) { + TSDB_RUNLOCK_TABLE(pIter->pTable); + return -1; + } + + // Loop to merge disk data and + while (true) { + // TODO + } + + TSDB_RUNLOCK_TABLE(pIter->pTable); + + return 0; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 88581e44c4..c5330852f5 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -241,7 +241,12 @@ void tsdbInitDFileSet(SDFileSet *pSet, int vid, int fid, int ver, int level, int for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype); tsdbInitDFile(pDFile, vid, fid, ver, level, id, NULL, ftype); - // TODO: reset level and id + } +} + +void tsdbInitDFileSetWithOld(SDFileSet *pSet, SDFileSet *pOldSet) { + for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + tsdbInitDFileWithOld(TSDB_DFILE_IN_SET(pSet, ftype), TSDB_DFILE_IN_SET(pOldSet, ftype)); } } @@ -267,7 +272,6 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) { return 0; } -int tsdbMoveDFileSet(SDFileSet *pOldSet, SDFileSet *pNewSet) { - // TODO - return 0; +int tsdbCopyDFileSet(SDFileSet *pFromSet, SDFileSet *pToSet) { + // return 0; } \ No newline at end of file -- GitLab