diff --git a/src/tsdb/inc/tsdbFS.h b/src/tsdb/inc/tsdbFS.h index 608171bc23dd90c655f1cb588502f5fe65f4d7f1..a1b3e455e1f48bebeb9f9b4d397018062001a1f8 100644 --- a/src/tsdb/inc/tsdbFS.h +++ b/src/tsdb/inc/tsdbFS.h @@ -55,6 +55,7 @@ typedef struct { #define FS_CURRENT_STATUS(pfs) ((pfs)->cstatus) #define FS_NEW_STATUS(pfs) ((pfs)->nstatus) #define FS_IN_TXN(pfs) (pfs)->intxn +#define FS_TXN_VERSION(pfs) ((pfs)->nstatus->meta.version) typedef struct { int direction; @@ -72,7 +73,7 @@ STsdbFS *tsdbNewFS(int keep, int days); void * tsdbFreeFS(STsdbFS *pfs); int tsdbOpenFS(STsdbFS *pFs, int keep, int days); void tsdbCloseFS(STsdbFS *pFs); -uint32_t tsdbStartFSTxn(STsdbFS *pfs); +void tsdbStartFSTxn(STsdbFS *pfs, int64_t pointsAdd, int64_t storageAdd); int tsdbEndFSTxn(STsdbFS *pfs); int tsdbEndFSTxnWithError(STsdbFS *pfs); void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta); diff --git a/src/tsdb/inc/tsdbFile.h b/src/tsdb/inc/tsdbFile.h index f5051aff437e57f034928eb22afdec4f393780a8..fa453a25be1a84f52b86fb89c97cad574f8e67bf 100644 --- a/src/tsdb/inc/tsdbFile.h +++ b/src/tsdb/inc/tsdbFile.h @@ -286,6 +286,12 @@ typedef struct { TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(s, ftype)); \ } \ } while (0); +#define TSDB_FSET_FSYNC(s) \ + do { \ + for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \ + TSDB_FILE_FSYNC(TSDB_DFILE_IN_SET(s, ftype)); \ + } \ + } while (0); void tsdbInitDFileSet(SDFileSet* pSet, SDiskID did, int vid, int fid, uint32_t ver); void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet); diff --git a/src/tsdb/inc/tsdbMemTable.h b/src/tsdb/inc/tsdbMemTable.h index 82cb579514753380c580fb552422d5b245b64342..1ec770e51331f1229460f6049f4ac7f33c03a4db 100644 --- a/src/tsdb/inc/tsdbMemTable.h +++ b/src/tsdb/inc/tsdbMemTable.h @@ -54,6 +54,8 @@ typedef struct { SList* actList; SList* extraBuffList; SList* bufBlockList; + int64_t pointsAdd; // TODO + int64_t storageAdd; // TODO } SMemTable; enum { TSDB_UPDATE_META, TSDB_DROP_META }; diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 80cf4cc51545aafdbab33460a6c162b4757d4aee..344df35c316317fd9f9636c5d25ee5f9fcac153b 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -89,7 +89,6 @@ struct STsdbRepo { #define REPO_ID(r) (r)->config.tsdbId #define REPO_CFG(r) (&((r)->config)) #define REPO_FS(r) ((r)->fs) -#define REPO_FS_VERSION(r) // TODO #define IS_REPO_LOCKED(r) (r)->repoLocked #define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg) diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 24ec06261172ca87c259f0c414f642191bbb279c..cad88871a8572c82ed150d9e28c6dc8a1767ea6a 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -25,7 +25,6 @@ typedef struct { } SRtn; typedef struct { - uint32_t version; SRtn rtn; // retention snapshot SFSIter fsIter; // tsdb file iterator int niters; // memory iterators @@ -53,6 +52,7 @@ typedef struct { #define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh)) #define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh)) #define TSDB_COMMIT_DEFAULT_ROWS(ch) (TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock * 4 / 5) +#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch))) static int tsdbCommitMeta(STsdbRepo *pRepo); static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen); @@ -87,6 +87,8 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError); static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo); static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update); +static int tsdbApplyRtn(STsdbRepo *pRepo); +static int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn); void *tsdbCommitData(STsdbRepo *pRepo) { if (tsdbStartCommit(pRepo) < 0) { @@ -131,7 +133,7 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) { ASSERT(pOMFile != NULL || listNEles(pMem->actList) > 0); if (listNEles(pMem->actList) <= 0) { - // no + // no meta data to commit, just keep the old meta file tsdbUpdateMFile(pfs, pOMFile); return 0; } else { @@ -140,7 +142,7 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) { // Create a new meta file did.level = TFS_PRIMARY_LEVEL; did.id = TFS_PRIMARY_ID; - tsdbInitMFile(&mf, did, REPO_ID(pRepo), pfs->nstatus->meta.version); + tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo))); if (tsdbCreateMFile(&mf) < 0) { return -1; @@ -176,6 +178,7 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) { return -1; } + TSDB_FILE_FSYNC(&mf); tsdbCloseMFile(&mf); tsdbUpdateMFile(pfs, &mf); @@ -266,17 +269,20 @@ static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) { return 0; } - // =================== Commit Time-Series Data static int tsdbCommitTSData(STsdbRepo *pRepo) { SMemTable *pMem = pRepo->imem; SCommitH commith = {0}; - STsdbFS * pfs = REPO_FS(pRepo); SDFileSet *pSet = NULL; - SDFileSet nSet; int fid; - if (pMem->numOfRows <= 0) return 0; + if (pMem->numOfRows <= 0) { + // No memory data, just apply retention on each file on disk + if (tsdbApplyRtn(pRepo) < 0) { + return -1; + } + return 0; + } // Resource initialization if (tsdbInitCommitH(&commith, pRepo) < 0) { @@ -285,9 +291,8 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) { // Skip expired memory data and expired FSET tsdbSeekCommitIter(&commith, commith.rtn.minKey); - while (true) { - pSet = tsdbFSIterNext(&(commith.fsIter)); - if (pSet == NULL || pSet->fid >= commith.rtn.minFid) break; + while ((pSet = tsdbFSIterNext(&(commith.fsIter)))) { + if (pSet->fid >= commith.rtn.minFid) break; } // Loop to commit to each file @@ -299,33 +304,11 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) { if (pSet && (fid == TSDB_IVLD_FID || pSet->fid < fid)) { // Only has existing FSET but no memory data to commit in this // existing FSET, only check if file in correct retention - int level, id; - - tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(commith.rtn)), &level, &id); - if (level == TFS_UNDECIDED_LEVEL) { - terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; + if (tsdbApplyRtnOnFSet(pRepo, pSet, &(commith.rtn)) < 0) { tsdbDestroyCommitH(&commith); return -1; } - if (level > TSDB_FSET_LEVEL(pSet)) { - // Need to move the FSET to higher level - if (tsdbCopyDFileSet(pSet, &nSet) < 0) { - tsdbDestroyCommitH(&commith); - return -1; - } - - if (tsdbUpdateDFileSet(pfs, &nSet) < 0) { - tsdbDestroyCommitH(&commith); - return -1; - } - } else { - if (tsdbUpdateDFileSet(pfs, pSet) < 0) { - tsdbDestroyCommitH(&commith); - return -1; - } - } - pSet = tsdbFSIterNext(&(commith.fsIter)); } else { // Has memory data to commit @@ -358,10 +341,12 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) { static int tsdbStartCommit(STsdbRepo *pRepo) { SMemTable *pMem = pRepo->imem; + ASSERT(pMem->numOfRows > 0 || listNEles(pMem->actList) > 0); + tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64 " meta rows: %d", REPO_ID(pRepo), pMem->keyFirst, pMem->keyLast, pMem->numOfRows, listNEles(pMem->actList)); - if (tsdbStartFSTxn(REPO_FS(pRepo)) < 0) return -1; + tsdbStartFSTxn(REPO_FS(pRepo), pMem->pointsAdd, pMem->storageAdd); pRepo->code = TSDB_CODE_SUCCESS; return 0; @@ -429,10 +414,10 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { return -1; } + // Close commit file tsdbCloseCommitFile(pCommith, false); if (tsdbUpdateDFileSet(REPO_FS(pRepo), &(pCommith->wSet)) < 0) { - // TODO return -1; } @@ -507,18 +492,9 @@ static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo) { STsdbCfg *pCfg = REPO_CFG(pRepo); memset(pCommith, 0, sizeof(*pCommith)); - for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { - TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(pCommith), ftype)); - } - - pCommith->version = REPO_FS_VERSION(pRepo) + 1; - tsdbGetRtnSnap(pRepo, &(pCommith->rtn)); - for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { - SDFile *pDFile = TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(pCommith), ftype); - TSDB_FILE_SET_CLOSED(pDFile); - } + TSDB_FSET_SET_CLOSED(TSDB_COMMIT_WRITE_FSET(pCommith)); // Init read handle if (tsdbInitReadH(&(pCommith->readh), pRepo) < 0) { @@ -1365,12 +1341,12 @@ static void tsdbResetCommitTable(SCommitH *pCommith) { } static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { - int level, id; SDiskID did; + STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith); - tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &level, &id); - if (level == TFS_UNDECIDED_LEVEL) { + tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &(did.level), &(did.id)); + if (did.level == TFS_UNDECIDED_LEVEL) { terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; return -1; } @@ -1385,54 +1361,33 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid if (tsdbLoadBlockIdx(&(pCommith->readh)) < 0) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; } - return -1; } else { pCommith->isRFileSet = false; } // Set and open commit FSET - if (pSet == NULL || level > TSDB_FSET_LEVEL(pSet)) { - // Create new FSET - did.level = level; - did.id = id; - tsdbInitDFileSet(pWSet, did, TSDB_COMMIT_REPO_ID(pCommith), fid, pCommith->version); - - if (tsdbOpenDFileSet(pWSet, O_WRONLY | O_CREAT) < 0) { - for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { - remove(TSDB_FILE_FULL_NAME(TSDB_DFILE_IN_SET(pWSet, ftype))); - } - - if (pCommith->isRFileSet) { - tsdbCloseAndUnsetFSet(&(pCommith->readh)); - } - return -1; - } - - if (tsdbUpdateDFileSetHeader(pWSet) < 0) { - tsdbCloseDFileSet(pWSet); - - for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { - remove(TSDB_FILE_FULL_NAME(TSDB_DFILE_IN_SET(pWSet, ftype))); - } + if (pSet == NULL || did.level > TSDB_FSET_LEVEL(pSet)) { + // Create a new FSET to write data + tsdbInitDFileSet(pWSet, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo))); + if (tsdbCreateDFileSet(pWSet) < 0) { if (pCommith->isRFileSet) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); } - return -1; } - // TODO: update file info; + pCommith->isDFileSame = false; + pCommith->isLFileSame = false; } else { - level = TSDB_FSET_LEVEL(pSet); - id = TSDB_FSET_ID(pSet); + did.level = TSDB_FSET_LEVEL(pSet); + did.id = TSDB_FSET_ID(pSet); // TSDB_FILE_HEAD SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith); - did.level = level; - did.id = id; - tsdbInitDFile(pWHeadf, did, TSDB_COMMIT_REPO_ID(pCommith), fid, pCommith->version, TSDB_FILE_HEAD); + tsdbInitDFile(pWHeadf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD); if (tsdbCreateDFile(pWHeadf) < 0) { if (pCommith->isRFileSet) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); @@ -1446,7 +1401,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid tsdbInitDFileEx(pWHeadf, pRDataf); if (tsdbOpenDFile(pWDataf, O_WRONLY) < 0) { tsdbCloseDFile(pWHeadf); - remove(TSDB_FILE_FULL_NAME(pWHeadf)); + tsdbRemoveDFile(pWHeadf); if (pCommith->isRFileSet) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); return -1; @@ -1460,17 +1415,26 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid if (pRLastf->info.size < 32 * 1024) { tsdbInitDFileEx(pWLastf, pRLastf); pCommith->isLFileSame = true; + + if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) { + tsdbCloseDFileSet(pWSet); + tsdbRemoveDFile(pWHeadf); + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } + } } else { - tsdbInitDFile(pWLastf, did, TSDB_COMMIT_REPO_ID(pCommith), fid, pCommith->version, TSDB_FILE_LAST); + tsdbInitDFile(pWLastf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST); pCommith->isLFileSame = false; - } - if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) { - tsdbCloseDFile(pWDataf); - tsdbCloseDFile(pWHeadf); - remove(TSDB_FILE_FULL_NAME(pWHeadf)); - if (pCommith->isRFileSet) { - tsdbCloseAndUnsetFSet(&(pCommith->readh)); - return -1; + + if (tsdbCreateDFile(pWLastf) < 0) { + tsdbCloseDFileSet(pWSet); + tsdbRemoveDFile(pWHeadf); + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } } } } @@ -1484,10 +1448,7 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) { } if (!hasError) { - for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { - SDFile *pDFile = TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(pCommith), ftype); - fsync(TSDB_FILE_FD(pDFile)); - } + TSDB_FSET_FSYNC(TSDB_COMMIT_WRITE_FSET(pCommith)); } tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith)); } @@ -1508,4 +1469,59 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p } return false; +} + +static int tsdbApplyRtn(STsdbRepo *pRepo) { + SRtn rtn; + SFSIter fsiter; + STsdbFS * pfs = REPO_FS(pRepo); + SDFileSet *pSet; + + // Get retentioni snapshot + tsdbGetRtnSnap(pRepo, &rtn); + + tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD); + while ((pSet = tsdbFSIterNext(&fsiter))) { + if (pSet->fid < rtn.minFid) continue; + + if (tsdbApplyRtnOnFSet(pRepo, pSet, &rtn) < 0) { + return -1; + } + } + + return 0; +} + +static int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn) { + SDiskID did; + SDFileSet nSet; + STsdbFS * pfs = REPO_FS(pRepo); + + ASSERT(pSet->fid >= pRtn->minFid); + + tfsAllocDisk(tsdbGetFidLevel(pSet->fid, pRtn), &(did.level), &(did.id)); + if (did.level == TFS_UNDECIDED_LEVEL) { + terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; + return -1; + } + + if (did.level > TSDB_FSET_LEVEL(pSet)) { + // Need to move the FSET to higher level + tsdbInitDFileSet(&nSet, did, REPO_ID(pRepo), pSet->fid, FS_TXN_VERSION(pfs)); + + if (tsdbCopyDFileSet(pSet, &nSet) < 0) { + return -1; + } + + if (tsdbUpdateDFileSet(pfs, &nSet) < 0) { + return -1; + } + } else { + // On a correct level + if (tsdbUpdateDFileSet(pfs, pSet) < 0) { + return -1; + } + } + + return 0; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbFS.c b/src/tsdb/src/tsdbFS.c index ac80e8262344e1860dbe76da8c484957e35e435a..72922ee7e2ade0460391af1bd72f8c5aac47f4f6 100644 --- a/src/tsdb/src/tsdbFS.c +++ b/src/tsdb/src/tsdbFS.c @@ -232,13 +232,15 @@ void tsdbCloseFS(STsdbFS *pFs) { } // Start a new transaction to modify the file system -uint32_t tsdbStartFSTxn(STsdbFS *pfs) { +void tsdbStartFSTxn(STsdbFS *pfs, int64_t pointsAdd, int64_t storageAdd) { ASSERT(pfs->intxn == false); pfs->intxn = true; tsdbResetFSStatus(pfs->nstatus); - - return pfs->cstatus->meta.version + 1; + pfs->nstatus->meta = pfs->cstatus->meta; + pfs->nstatus->meta.version = pfs->cstatus->meta.version + 1; + pfs->nstatus->meta.totalPoints = pfs->cstatus->meta.totalPoints + pointsAdd; + pfs->nstatus->meta.version = pfs->cstatus->meta.totalStorage += storageAdd; } void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta) { pfs->nstatus->meta = *pMeta; } @@ -268,7 +270,8 @@ int tsdbEndFSTxn(STsdbFS *pfs) { } int tsdbEndFSTxnWithError(STsdbFS *pfs) { - // TODO + tsdbApplyFSTxnOnDisk(pfs->nstatus, pfs->cstatus); + // TODO: if mf change, reload pfs->metaCache pfs->intxn = false; return 0; }