From 09cb05fb6a0fe02fc157c7c2bba11c5f255ff7ea Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 24 Nov 2020 10:25:25 +0000 Subject: [PATCH] refactor --- src/tfs/src/tfs.c | 8 ++- src/tsdb/inc/tsdbMain.h | 2 +- src/tsdb/src/tsdbCommit.c | 124 ++++++++++++++++++++++---------------- src/tsdb/src/tsdbFile.c | 60 +----------------- 4 files changed, 82 insertions(+), 112 deletions(-) diff --git a/src/tfs/src/tfs.c b/src/tfs/src/tfs.c index c9ce560c69..19d80e45f9 100644 --- a/src/tfs/src/tfs.c +++ b/src/tfs/src/tfs.c @@ -120,15 +120,19 @@ void tfsDestroy() { } void tfsUpdateInfo() { + SFSMeta tmeta = {0}; + tfsLock(); for (int level = 0; level < TFS_NLEVEL(); level++) { STier *pTier = TFS_TIER_AT(level); tfsUpdateTierInfo(pTier); - pfs->meta.tsize = TIER_SIZE(pTier); - pfs->meta.avail = TIER_FREE_SIZE(pTier); + tmeta.tsize += TIER_SIZE(pTier); + tmeta.avail += TIER_FREE_SIZE(pTier); } + pfs->meta = tmeta; + tfsUnLock(); } diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 5757cde2aa..d6182ac090 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -512,7 +512,7 @@ STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg); void tsdbFreeFileH(STsdbFileH* pFileH); int tsdbOpenFileH(STsdbRepo* pRepo); void tsdbCloseFileH(STsdbRepo* pRepo); -SFileGroup* tsdbCreateFGroup(STsdbRepo* pRepo, int fid); +SFileGroup *tsdbCreateFGroup(STsdbRepo *pRepo, int fid, int level); void tsdbInitFileGroupIter(STsdbFileH* pFileH, SFileGroupIter* pIter, int direction); void tsdbSeekFileGroupIter(SFileGroupIter* pIter, int fid); SFileGroup* tsdbGetFileGroupNext(SFileGroupIter* pIter); diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 5b2af8db1e..5f4a918b7c 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -14,14 +14,23 @@ */ #include "tsdbMain.h" -static int tsdbCommitTSData(STsdbRepo *pRepo); -static int tsdbCommitMeta(STsdbRepo *pRepo); -static void tsdbEndCommit(STsdbRepo *pRepo, int eno); -static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey); -static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols); +typedef struct { + SFidGroup fidg; + SCommitIter *iters; + SRWHelper whelper; + SDataCols * pDataCols; +} SCommitH; + +static int tsdbCommitTSData(STsdbRepo *pRepo); +static int tsdbCommitMeta(STsdbRepo *pRepo); +static void tsdbEndCommit(STsdbRepo *pRepo, int eno); +static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey); +static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch); static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo); static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables); static void tsdbSeekCommitIter(SCommitIter *pIters, int nIters, TSKEY key); +static int tsdbInitCommitH(STsdbRepo *pRepo, SCommitH *pch); +static void tsdbDestroyCommitH(SCommitH *pch, int niter); void *tsdbCommitData(STsdbRepo *pRepo) { SMemTable * pMem = pRepo->imem; @@ -58,68 +67,45 @@ _err: } static int tsdbCommitTSData(STsdbRepo *pRepo) { - SMemTable * pMem = pRepo->imem; - SDataCols * pDataCols = NULL; - STsdbMeta * pMeta = pRepo->tsdbMeta; - SCommitIter *iters = NULL; - SRWHelper whelper = {0}; - STsdbCfg * pCfg = &(pRepo->config); - SFidGroup fidGroup = {0}; - TSKEY minKey = 0; - TSKEY maxKey = 0; + SMemTable *pMem = pRepo->imem; + SCommitH ch = {0}; + STsdbCfg * pCfg = &(pRepo->config); + // SFidGroup fidGroup = {0}; + TSKEY minKey = 0; + TSKEY maxKey = 0; if (pMem->numOfRows <= 0) return 0; - tsdbGetFidGroup(pCfg, &fidGroup); - tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fidGroup.minFid, &minKey, &maxKey); - tsdbRemoveFilesBeyondRetention(pRepo, &fidGroup); + tsdbGetFidGroup(pCfg, &(ch.fidg)); + tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, ch.fidg.minFid, &minKey, &maxKey); + tsdbRemoveFilesBeyondRetention(pRepo, &(ch.fidg)); - iters = tsdbCreateCommitIters(pRepo); - if (iters == NULL) { - tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } - - if (tsdbInitWriteHelper(&whelper, pRepo) < 0) { - tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } - - if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s", - REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno)); + if (tsdbInitCommitH(pRepo, &ch) < 0) { goto _err; } int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision)); int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision)); - tsdbSeekCommitIter(iters, pMem->maxTables, minKey); + tsdbSeekCommitIter(ch.iters, pMem->maxTables, minKey); // Loop to commit to each file for (int fid = sfid; fid <= efid; fid++) { - if (fid < fidGroup.minFid) continue; + if (fid < ch.fidg.minFid) continue; - if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) { + if (tsdbCommitToFile(pRepo, fid, &(ch)) < 0) { tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); goto _err; } } - tsdbApplyRetention(pRepo, &fidGroup); - - tdFreeDataCols(pDataCols); - tsdbDestroyCommitIters(iters, pMem->maxTables); - tsdbDestroyHelper(&whelper); + tsdbApplyRetention(pRepo, &(ch.fidg)); + tsdbDestroyCommitH(&ch, pMem->maxTables); return 0; _err: - tdFreeDataCols(pDataCols); - tsdbDestroyCommitIters(iters, pMem->maxTables); - tsdbDestroyHelper(&whelper); - + tsdbDestroyCommitH(&ch, pMem->maxTables); return -1; } @@ -184,14 +170,17 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS return false; } -static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) { - STsdbCfg * pCfg = &pRepo->config; - STsdbFileH *pFileH = pRepo->tsdbFileH; - SFileGroup *pGroup = NULL; - SMemTable * pMem = pRepo->imem; - bool newLast = false; - TSKEY minKey = 0; - TSKEY maxKey = 0; +static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch) { + STsdbCfg * pCfg = &pRepo->config; + STsdbFileH * pFileH = pRepo->tsdbFileH; + SFileGroup * pGroup = NULL; + SMemTable * pMem = pRepo->imem; + bool newLast = false; + TSKEY minKey = 0; + TSKEY maxKey = 0; + SCommitIter *iters = pch->iters; + SRWHelper * pHelper = &(pch->whelper); + SDataCols * pDataCols = pch->pDataCols; tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); @@ -352,4 +341,35 @@ static void tsdbSeekCommitIter(SCommitIter *pIters, int nIters, TSKEY key) { tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, key-1, INT32_MAX, NULL, NULL, 0, true, NULL); } +} + +static int tsdbInitCommitH(STsdbRepo *pRepo, SCommitH *pch) { + STsdbMeta *pMeta = pRepo->tsdbMeta; + STsdbCfg * pCfg = &(pRepo->config); + + pch->iters = tsdbCreateCommitIters(pRepo); + if (pch->iters == NULL) { + tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + + if (tsdbInitWriteHelper(&(pch->whelper), pRepo) < 0) { + tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + + if ((pch->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s", + REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno)); + return -1; + } + + return 0; +} + +static void tsdbDestroyCommitH(SCommitH *pch, int niter) { + tdFreeDataCols(pch->pDataCols); + tsdbDestroyCommitIters(pch->iters, niter); + tsdbDestroyHelper(&(pch->whelper)); } \ No newline at end of file diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 238351b350..a6db0ebbf1 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -62,69 +62,15 @@ void tsdbFreeFileH(STsdbFileH *pFileH) { } } -int tsdbOpenFileH(STsdbRepo *pRepo) { // TODO +int tsdbOpenFileH(STsdbRepo *pRepo) { ASSERT(pRepo != NULL && pRepo->tsdbFileH != NULL); - char dataDir[TSDB_FILENAME_LEN] = "\0"; - // 1. scan and get all files corresponds - TDIR *tdir = NULL; - char fname[TSDB_FILENAME_LEN] = "\0"; - regex_t regex = {0}; - int code = 0; - int vid = 0; - int fid = 0; - - const TFILE *pfile = NULL; - - code = regcomp(®ex, "^v[0-9]+f[0-9]+\\.(head|data|last|h|d|l)$", REG_EXTENDED); - if (code != 0) { - // TODO: deal the error - } - - snprintf(dataDir, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data", REPO_ID(pRepo)); - tdir = tfsOpendir(dataDir); - if (tdir == NULL) { - // TODO: deal the error - } - - while ((pfile = tfsReaddir(tdir)) != NULL) { - tfsBaseName(pfile, fname); - - if (strcmp(fname, ".") == 0 || strcmp(fname, "..") == 0) continue; - - code = regexec(®ex, fname, 0, NULL, 0); - if (code == 0) { - sscanf(fname, "v%df%d", &vid, &fid); - - if (vid != REPO_ID(pRepo)) { - tfsAbsName(pfile, fname); - tsdbError("vgId:%d invalid file %s exists, ignore", REPO_ID(pRepo), fname); - continue; - } - - // TODO - {} - } else if (code == REG_NOMATCH) { - tfsAbsName(pfile, fname); - tsdbWarn("vgId:%d unrecognizable file %s exists, ignore", REPO_ID(pRepo), fname); - continue; - } else { - tsdbError("vgId:%d regexec failed since %s", REPO_ID(pRepo), strerror(code)); - // TODO: deal with error - } - } - - // 2. Sort all files according to fid - - // 3. Recover all files of each fid - while (true) { - // TODO - } + // TODO return 0; } -void tsdbCloseFileH(STsdbRepo *pRepo) { // TODO +void tsdbCloseFileH(STsdbRepo *pRepo) { STsdbFileH *pFileH = pRepo->tsdbFileH; for (int i = 0; i < pFileH->nFGroups; i++) { -- GitLab