From e843619b87bc3c3fb1785bd8f448dea62fba3858 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 15 Jan 2021 07:43:55 +0000 Subject: [PATCH] refact tsdb fs --- src/tsdb/inc/tsdbFS.h | 9 +++++---- src/tsdb/inc/tsdbFile.h | 3 ++- src/tsdb/src/tsdbCommit.c | 6 +++--- src/tsdb/src/tsdbFS.c | 36 +++++++++++++++++++++++------------- 4 files changed, 33 insertions(+), 21 deletions(-) diff --git a/src/tsdb/inc/tsdbFS.h b/src/tsdb/inc/tsdbFS.h index 3a4f244c0e..608171bc23 100644 --- a/src/tsdb/inc/tsdbFS.h +++ b/src/tsdb/inc/tsdbFS.h @@ -24,7 +24,7 @@ extern "C" { // ================== CURRENT file header info typedef struct { - uint32_t version; // Current file version + uint32_t version; // Current file system version (relating to code) uint32_t len; // Encode content length (including checksum) } SFSHeader; @@ -72,9 +72,10 @@ STsdbFS *tsdbNewFS(int keep, int days); void * tsdbFreeFS(STsdbFS *pfs); int tsdbOpenFS(STsdbFS *pFs, int keep, int days); void tsdbCloseFS(STsdbFS *pFs); -int tsdbStartTxn(STsdbFS *pfs); -int tsdbEndTxn(STsdbFS *pfs); -int tsdbEndTxnWithError(STsdbFS *pfs); +uint32_t tsdbStartFSTxn(STsdbFS *pfs); +int tsdbEndFSTxn(STsdbFS *pfs); +int tsdbEndFSTxnWithError(STsdbFS *pfs); +void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta); void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile); int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet); diff --git a/src/tsdb/inc/tsdbFile.h b/src/tsdb/inc/tsdbFile.h index 33c7b89c54..f5051aff43 100644 --- a/src/tsdb/inc/tsdbFile.h +++ b/src/tsdb/inc/tsdbFile.h @@ -57,10 +57,11 @@ void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver); void tsdbInitMFileEx(SMFile* pMFile, SMFile* pOMFile); int tsdbEncodeSMFile(void** buf, SMFile* pMFile); void* tsdbDecodeSMFile(void* buf, SMFile* pMFile); +int tsdbApplyMFileChange(SMFile* from, SMFile* to); int tsdbCreateMFile(SMFile* pMFile); int tsdbUpdateMFileHeader(SMFile* pMFile); -static FORCE_INLINE void tsdbSetMFileInfo(SMFile* pMFile, SMInfo* pInfo) { pMFile->info = *pInfo; } +static FORCE_INLINE void tsdbSetMFileInfo(SMFile* pMFile, SMFInfo* pInfo) { pMFile->info = *pInfo; } static FORCE_INLINE int tsdbOpenMFile(SMFile* pMFile, int flags) { ASSERT(TSDB_FILE_CLOSED(pMFile)); diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 2f034873a4..24ec062611 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -361,7 +361,7 @@ static int tsdbStartCommit(STsdbRepo *pRepo) { tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64 " meta rows: %d", REPO_ID(pRepo), pMem->keyFirst, pMem->keyLast, pMem->numOfRows, listNEles(pMem->actList)); - if (tsdbStartTxn(REPO_FS(pRepo)) < 0) return -1; + if (tsdbStartFSTxn(REPO_FS(pRepo)) < 0) return -1; pRepo->code = TSDB_CODE_SUCCESS; return 0; @@ -369,9 +369,9 @@ static int tsdbStartCommit(STsdbRepo *pRepo) { static void tsdbEndCommit(STsdbRepo *pRepo, int eno) { if (eno != TSDB_CODE_SUCCESS) { - tsdbEndTxnWithError(REPO_FS(pRepo)); + tsdbEndFSTxnWithError(REPO_FS(pRepo)); } else { - tsdbEndTxn(REPO_FS(pRepo)); + tsdbEndFSTxn(REPO_FS(pRepo)); } tsdbInfo("vgId:%d commit over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed"); diff --git a/src/tsdb/src/tsdbFS.c b/src/tsdb/src/tsdbFS.c index 1e748c0286..ac80e82623 100644 --- a/src/tsdb/src/tsdbFS.c +++ b/src/tsdb/src/tsdbFS.c @@ -40,7 +40,7 @@ static void *tsdbDecodeFSHeader(void *buf, SFSHeader *pHeader) { static int tsdbEncodeFSMeta(void **buf, STsdbFSMeta *pMeta) { int tlen = 0; - tlen += taosEncodeFixedU64(buf, pMeta->version); + tlen += taosEncodeFixedU32(buf, pMeta->version); tlen += taosEncodeFixedI64(buf, pMeta->totalPoints); tlen += taosEncodeFixedI64(buf, pMeta->totalStorage); @@ -48,7 +48,7 @@ static int tsdbEncodeFSMeta(void **buf, STsdbFSMeta *pMeta) { } static void *tsdbDecodeFSMeta(void *buf, STsdbFSMeta *pMeta) { - buf = taosDecodeFixedU64(buf, &(pMeta->version)); + buf = taosDecodeFixedU32(buf, &(pMeta->version)); buf = taosDecodeFixedI64(buf, &(pMeta->totalPoints)); buf = taosDecodeFixedI64(buf, &(pMeta->totalStorage)); @@ -70,7 +70,7 @@ static int tsdbEncodeDFileSetArray(void **buf, SArray *pArray) { return tlen; } -static int tsdbDecodeDFileSetArray(void *buf, SArray *pArray) { +static void *tsdbDecodeDFileSetArray(void *buf, SArray *pArray) { uint64_t nset; SDFileSet dset; @@ -89,7 +89,7 @@ static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) { int tlen = 0; - tlen += tsdbEncodeSMFile(buf, &(pStatus->pmf)); + tlen += tsdbEncodeSMFile(buf, pStatus->pmf); tlen += tsdbEncodeDFileSetArray(buf, pStatus->df); return tlen; @@ -113,6 +113,8 @@ static SFSStatus *tsdbNewFSStatus(int maxFSet) { return NULL; } + TSDB_FSET_SET_CLOSED(&(pStatus->mf)); + pStatus->df = taosArrayInit(maxFSet, sizeof(SDFileSet)); if (pStatus->df == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -137,6 +139,8 @@ static void tsdbResetFSStatus(SFSStatus *pStatus) { return; } + TSDB_FSET_SET_CLOSED(&(pStatus->mf)); + pStatus->pmf = NULL; taosArrayClear(pStatus->df); } @@ -162,6 +166,7 @@ static int tsdbAddDFileSetToStatus(SFSStatus *pStatus, const SDFileSet *pSet) { } // ================== STsdbFS +// TODO STsdbFS *tsdbNewFS(int keep, int days) { int maxFSet = TSDB_MAX_FSETS(keep, days); STsdbFS *pfs; @@ -201,6 +206,7 @@ STsdbFS *tsdbNewFS(int keep, int days) { return pfs; } +// TODO void *tsdbFreeFS(STsdbFS *pfs) { if (pfs) { pfs->nstatus = tsdbFreeFSStatus(pfs->nstatus); @@ -213,33 +219,37 @@ void *tsdbFreeFS(STsdbFS *pfs) { return NULL; } +// TODO int tsdbOpenFS(STsdbFS *pFs, int keep, int days) { // TODO return 0; } +// TODO void tsdbCloseFS(STsdbFS *pFs) { // TODO } // Start a new transaction to modify the file system -int tsdbStartTxn(STsdbFS *pfs) { +uint32_t tsdbStartFSTxn(STsdbFS *pfs) { ASSERT(pfs->intxn == false); pfs->intxn = true; tsdbResetFSStatus(pfs->nstatus); - return 0; + return pfs->cstatus->meta.version + 1; } -int tsdbEndTxn(STsdbFS *pfs) { +void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta) { pfs->nstatus->meta = *pMeta; } + +int tsdbEndFSTxn(STsdbFS *pfs) { ASSERT(FS_IN_TXN(pfs)); SFSStatus *pStatus; // Write current file system snapshot - if (tsdbUpdateFS(pfs) < 0) { - tsdbEndTxnWithError(pfs); + if (tsdbApplyFSTxn(pfs) < 0) { + tsdbEndFSTxnWithError(pfs); return -1; } @@ -251,13 +261,13 @@ int tsdbEndTxn(STsdbFS *pfs) { tsdbUnLockFS(pfs); // Apply actual change to each file and SDFileSet - tsdbApplyFSChangeOnDisk(pfs); + tsdbApplyFSTxnOnDisk(pfs->nstatus, pfs->cstatus); pfs->intxn = false; return 0; } -int tsdbEndTxnWithError(STsdbFS *pfs) { +int tsdbEndFSTxnWithError(STsdbFS *pfs) { // TODO pfs->intxn = false; return 0; @@ -267,7 +277,7 @@ void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile) { tsdbSetStatusMFile(pf int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); } -static int tsdbUpdateFS(STsdbFS *pfs) { +static int tsdbApplyFSTxn(STsdbFS *pfs) { ASSERT(FS_IN_TXN(pfs)); SFSHeader fsheader; void * pBuf = NULL; @@ -339,7 +349,7 @@ static int tsdbUpdateFS(STsdbFS *pfs) { return 0; } -static void tsdbApplyFSChangeOnDisk(SFSStatus *pFrom, SFSStatus *pTo) { +static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo) { int ifrom = 0; int ito = 0; size_t sizeFrom, sizeTo; -- GitLab