From 9c0f05258928aa16b4c88c32b9da050cda4fa729 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 6 Sep 2021 14:27:33 +0800 Subject: [PATCH] process upgrade compatibility --- src/tsdb/inc/tsdbFS.h | 23 +++++++--- src/tsdb/inc/tsdbFile.h | 47 +++++++++++++------- src/tsdb/inc/tsdbReadImpl.h | 10 ++--- src/tsdb/src/tsdbCommit.c | 55 ++++++++++++++++-------- src/tsdb/src/tsdbCompact.c | 2 +- src/tsdb/src/tsdbFS.c | 61 +++++++++++++++----------- src/tsdb/src/tsdbFile.c | 85 +++++++++++++++++++++++++++++-------- src/tsdb/src/tsdbReadImpl.c | 7 ++- 8 files changed, 198 insertions(+), 92 deletions(-) diff --git a/src/tsdb/inc/tsdbFS.h b/src/tsdb/inc/tsdbFS.h index 5efc5b458b..e94cc54d89 100644 --- a/src/tsdb/inc/tsdbFS.h +++ b/src/tsdb/inc/tsdbFS.h @@ -17,12 +17,25 @@ #define _TD_TSDB_FS_H_ /** - * The fileset .head/.data/.last/.sma use the same TSDB_FS_VERSION. - * 0 - original format before 2021.08.25 // TODO update date 2021.08.25 to release version. - * 1 - extract aggregate block data from .data file and save to separate .sma file since 2021.08.25 // TODO update - * date to release version. + * 1. The fileset .head/.data/.last/.sma use the same fver 0 before 2021.09.05. + * 2. .head fver is 1 when extract aggregate block data from .data file and save to separate .sma file since 2021.09.05 + * // TODO update date and add release version. */ -#define TSDB_FS_VERSION 1 +typedef enum { + TSDB_FS_VERSION_0, + TSDB_FS_VERSION_1, +} ETsdbFsVersion; + +static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T ftype) { // for DFile + switch (ftype) { + case TSDB_FILE_HEAD: + return TSDB_FS_VERSION_1; + default: + return TSDB_FS_VERSION_0; + } +} + +static FORCE_INLINE uint32_t tsdbGetSFSVersion() { return TSDB_FS_VERSION_1; } // for current // ================== TSDB global config extern bool tsdbForceKeepFile; diff --git a/src/tsdb/inc/tsdbFile.h b/src/tsdb/inc/tsdbFile.h index 10d119eed1..d4f4fd70c5 100644 --- a/src/tsdb/inc/tsdbFile.h +++ b/src/tsdb/inc/tsdbFile.h @@ -37,9 +37,11 @@ #define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s)) #define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK) #define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD) - +#define TSDB_FSET_NFILES_VALID(s) (((s)->nFiles >= TSDB_FILE_MIN) && ((s)->nFiles <= TSDB_FILE_MAX)) typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_SMA, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T; +#define TSDB_FILE_MIN 3U // min number of files in one DFileSet + // =============== SMFile typedef struct { int64_t size; @@ -166,6 +168,7 @@ typedef struct { uint32_t offset; uint64_t size; uint64_t tombSize; + uint32_t fver; } SDFInfo; typedef struct { @@ -179,8 +182,9 @@ void tsdbInitDFile(SDFile* pDFile, SDiskID did, int vid, int fid, uint32_t ver, void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile); int tsdbEncodeSDFile(void** buf, SDFile* pDFile); void* tsdbDecodeSDFile(void* buf, SDFile* pDFile); -int tsdbCreateDFile(SDFile* pDFile, bool updateHeader); +int tsdbCreateDFile(SDFile* pDFile, bool updateHeader, TSDB_FILE_T ftype); int tsdbUpdateDFileHeader(SDFile* pDFile); +int tsdbUpdateDFileHeaderEx(SDFile* pDFile, uint32_t fver); int tsdbLoadDFileHeader(SDFile* pDFile, SDFInfo* pInfo); int tsdbParseDFilename(const char* fname, int* vid, int* fid, TSDB_FILE_T* ftype, uint32_t* version); @@ -283,32 +287,39 @@ static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) { // =============== SDFileSet typedef struct { - int fid; - int state; - SDFile files[TSDB_FILE_MAX]; + int fid; + int state; + uint8_t nFiles; + SDFile files[TSDB_FILE_MAX]; } SDFileSet; #define TSDB_FSET_FID(s) ((s)->fid) #define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t)) #define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0)) #define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0)) -#define TSDB_FSET_SET_CLOSED(s) \ +#define TSDB_FSET_SET_INIT(s) \ do { \ for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \ TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(s, ftype)); \ } \ } while (0); -#define TSDB_FSET_FSYNC(s) \ - do { \ - for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \ - TSDB_FILE_FSYNC(TSDB_DFILE_IN_SET(s, ftype)); \ - } \ +#define TSDB_FSET_SET_CLOSED(s) \ + do { \ + for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < (s)->nFiles; ftype++) { \ + TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(s, ftype)); \ + } \ + } while (0); +#define TSDB_FSET_FSYNC(s) \ + do { \ + for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < (s)->nFiles; ftype++) { \ + TSDB_FILE_FSYNC(TSDB_DFILE_IN_SET(s, ftype)); \ + } \ } while (0); void tsdbInitDFileSet(SDFileSet* pSet, SDiskID did, int vid, int fid, uint32_t ver); void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet); int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet); -void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet); +void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet, bool containNFiles); int tsdbEncodeDFileSetEx(void** buf, SDFileSet* pSet); void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet); int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to); @@ -317,13 +328,15 @@ int tsdbUpdateDFileSetHeader(SDFileSet* pSet); int tsdbScanAndTryFixDFileSet(STsdbRepo *pRepo, SDFileSet* pSet); static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) { - for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + ASSERT(pSet->nFiles <= TSDB_FILE_MAX); + for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { tsdbCloseDFile(TSDB_DFILE_IN_SET(pSet, ftype)); } } static FORCE_INLINE int tsdbOpenDFileSet(SDFileSet* pSet, int flags) { - for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + ASSERT(TSDB_FSET_NFILES_VALID(pSet)); + for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { if (tsdbOpenDFile(TSDB_DFILE_IN_SET(pSet, ftype), flags) < 0) { tsdbCloseDFileSet(pSet); return -1; @@ -333,13 +346,15 @@ static FORCE_INLINE int tsdbOpenDFileSet(SDFileSet* pSet, int flags) { } static FORCE_INLINE void tsdbRemoveDFileSet(SDFileSet* pSet) { - for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + ASSERT(TSDB_FSET_NFILES_VALID(pSet)); + for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { (void)tsdbRemoveDFile(TSDB_DFILE_IN_SET(pSet, ftype)); } } static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet* pSrc, SDFileSet* pDest) { - for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + ASSERT(TSDB_FSET_NFILES_VALID(pSrc)); + for (TSDB_FILE_T ftype = 0; ftype < pSrc->nFiles; ftype++) { if (tsdbCopyDFile(TSDB_DFILE_IN_SET(pSrc, ftype), TSDB_DFILE_IN_SET(pDest, ftype)) < 0) { tsdbRemoveDFileSet(pDest); return -1; diff --git a/src/tsdb/inc/tsdbReadImpl.h b/src/tsdb/inc/tsdbReadImpl.h index 86063b5563..3df1f7a52a 100644 --- a/src/tsdb/inc/tsdbReadImpl.h +++ b/src/tsdb/inc/tsdbReadImpl.h @@ -84,8 +84,6 @@ typedef struct { int64_t sum; int64_t max; int64_t min; - // uint8_t type; - // char reserved[15]; // Adjust the size of reserved array whenever adding new field of SAggrBlkCol. } SAggrBlkCol; // Code here just for back-ward compatibility @@ -122,11 +120,9 @@ struct SReadH { int cidx; SBlockInfo *pBlkInfo; SBlockData *pBlkData; // Block info -#ifdef __TD_6117__ - SAggrBlkData *pAggrBlkData; // Block info -#endif + SAggrBlkData *pAggrBlkData; // Aggregate Block info SDataCols * pDCols[2]; - void * pRBuf; // buffer + void * pBuf; // buffer void * pCBuf; // compression buffer void * pExBuf; // extra buffer }; @@ -139,7 +135,7 @@ struct SReadH { #define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA) #define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST) #define TSDB_READ_AGGR_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMA) -#define TSDB_READ_BUF(rh) ((rh)->pRBuf) +#define TSDB_READ_BUF(rh) ((rh)->pBuf) #define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf) #define TSDB_READ_EXBUF(rh) ((rh)->pExBuf) diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 2604aab4dc..ed84c41a88 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -844,7 +844,7 @@ static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo) { memset(pCommith, 0, sizeof(*pCommith)); tsdbGetRtnSnap(pRepo, &(pCommith->rtn)); - TSDB_FSET_SET_CLOSED(TSDB_COMMIT_WRITE_FSET(pCommith)); + TSDB_FSET_SET_INIT(TSDB_COMMIT_WRITE_FSET(pCommith)); // Init read handle if (tsdbInitReadH(&(pCommith->readh), pRepo) < 0) { @@ -1571,7 +1571,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid pCommith->isDFileSame = false; pCommith->isLFileSame = false; - + pCommith->isSmaFileSame = false; tsdbDebug("vgId:%d FSET %d at level %d disk id %d is created to commit", REPO_ID(pRepo), TSDB_FSET_FID(pWSet), TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet)); } else { @@ -1580,11 +1580,12 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid pCommith->wSet.fid = fid; pCommith->wSet.state = 0; - + pCommith->wSet.nFiles = TSDB_FILE_MAX; + // TSDB_FILE_HEAD SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith); tsdbInitDFile(pWHeadf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD); - if (tsdbCreateDFile(pWHeadf, true) < 0) { + if (tsdbCreateDFile(pWHeadf, true, TSDB_FILE_HEAD) < 0) { tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf), tstrerror(terrno)); @@ -1598,7 +1599,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid SDFile *pRDataf = TSDB_READ_DATA_FILE(&(pCommith->readh)); SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith); tsdbInitDFileEx(pWDataf, pRDataf); - if (tsdbOpenDFile(pWDataf, O_WRONLY) < 0) { + if (tsdbOpenDFile(pWDataf, O_RDWR) < 0) { tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWDataf), tstrerror(terrno)); @@ -1618,7 +1619,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid tsdbInitDFileEx(pWLastf, pRLastf); pCommith->isLFileSame = true; - if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) { + if (tsdbOpenDFile(pWLastf, O_RDWR) < 0) { tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf), tstrerror(terrno)); @@ -1633,7 +1634,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid tsdbInitDFile(pWLastf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST); pCommith->isLFileSame = false; - if (tsdbCreateDFile(pWLastf, true) < 0) { + if (tsdbCreateDFile(pWLastf, true, TSDB_FILE_LAST) < 0) { tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf), tstrerror(terrno)); @@ -1647,21 +1648,41 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid } // TSDB_FILE_SMA + ASSERT(pWSet->nFiles >= TSDB_FILE_SMA); SDFile *pRSmaf = TSDB_READ_AGGR_FILE(&(pCommith->readh)); SDFile *pWSmaf = TSDB_COMMIT_AGGR_FILE(pCommith); - tsdbInitDFileEx(pWSmaf, pRSmaf); - if (tsdbOpenDFile(pWSmaf, O_WRONLY) < 0) { - tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmaf), - tstrerror(terrno)); - tsdbCloseDFileSet(pWSet); - tsdbRemoveDFile(pWHeadf); - if (pCommith->isRFileSet) { - tsdbCloseAndUnsetFSet(&(pCommith->readh)); - return -1; + if (access(TSDB_FILE_FULL_NAME(pRSmaf), F_OK) != 0) { + tsdbDebug("vgId:%d create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmaf)); + tsdbInitDFile(pWSmaf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMA); + pCommith->isLFileSame = false; + + if (tsdbCreateDFile(pWSmaf, true, TSDB_FILE_SMA) < 0) { + tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmaf), + tstrerror(terrno)); + + tsdbCloseDFileSet(pWSet); + (void)tsdbRemoveDFile(pWHeadf); + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } } + } else { + tsdbInitDFileEx(pWSmaf, pRSmaf); + pCommith->isSmaFileSame = true; + if (tsdbOpenDFile(pWSmaf, O_RDWR) < 0) { + tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmaf), + tstrerror(terrno)); + + tsdbCloseDFileSet(pWSet); + tsdbRemoveDFile(pWHeadf); + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } + } } - pCommith->isSmaFileSame = true; } return 0; diff --git a/src/tsdb/src/tsdbCompact.c b/src/tsdb/src/tsdbCompact.c index 6eb8a35e06..aed216903b 100644 --- a/src/tsdb/src/tsdbCompact.c +++ b/src/tsdb/src/tsdbCompact.c @@ -270,7 +270,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { memset(pComph, 0, sizeof(*pComph)); - TSDB_FSET_SET_CLOSED(TSDB_COMPACT_WSET(pComph)); + TSDB_FSET_SET_INIT(TSDB_COMPACT_WSET(pComph)); tsdbGetRtnSnap(pRepo, &(pComph->rtn)); tsdbFSIterInit(&(pComph->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD); diff --git a/src/tsdb/src/tsdbFS.c b/src/tsdb/src/tsdbFS.c index a40e67ca59..ee507f03ef 100644 --- a/src/tsdb/src/tsdbFS.c +++ b/src/tsdb/src/tsdbFS.c @@ -36,6 +36,7 @@ static int tsdbComparTFILE(const void *arg1, const void *arg2); static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo, int32_t *nExpired); static int tsdbProcessExpiredFS(STsdbRepo *pRepo); static int tsdbCreateMeta(STsdbRepo *pRepo); +static int tsdbFetchTFileSet(STsdbRepo *pRepo, SArray **fArray); // For backward compatibility // ================== CURRENT file header info @@ -89,15 +90,16 @@ static int tsdbEncodeDFileSetArray(void **buf, SArray *pArray) { return tlen; } -static void *tsdbDecodeDFileSetArray(void *buf, SArray *pArray) { +static void *tsdbDecodeDFileSetArray(void *buf, SArray *pArray, bool containNFiles) { uint64_t nset; SDFileSet dset; + dset.nFiles = TSDB_FILE_MIN; // default value: .head/.data/.last taosArrayClear(pArray); buf = taosDecodeFixedU64(buf, &nset); for (size_t i = 0; i < nset; i++) { - buf = tsdbDecodeDFileSet(buf, &dset); + buf = tsdbDecodeDFileSet(buf, &dset, containNFiles); taosArrayPush(pArray, (void *)(&dset)); } return buf; @@ -114,13 +116,12 @@ static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) { return tlen; } -static void *tsdbDecodeFSStatus(void *buf, SFSStatus *pStatus) { +static void *tsdbDecodeFSStatus(void *buf, SFSStatus *pStatus, bool containNFiles) { tsdbResetFSStatus(pStatus); - pStatus->pmf = &(pStatus->mf); buf = tsdbDecodeSMFile(buf, pStatus->pmf); - buf = tsdbDecodeDFileSetArray(buf, pStatus->df); + buf = tsdbDecodeDFileSetArray(buf, pStatus->df, containNFiles); return buf; } @@ -414,7 +415,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) { return -1; } - fsheader.version = TSDB_FS_VERSION; + fsheader.version = tsdbGetSFSVersion(); if (pStatus->pmf == NULL) { ASSERT(taosArrayGetSize(pStatus->df) == 0); fsheader.len = 0; @@ -689,7 +690,7 @@ static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo) { ptr = tsdbDecodeFSHeader(ptr, &fsheader); ptr = tsdbDecodeFSMeta(ptr, &(pStatus->meta)); - if (fsheader.version != TSDB_FS_VERSION) { + if (fsheader.version != TSDB_FS_VERSION_0) { // TODO: handle file version change } @@ -718,7 +719,7 @@ static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo) { } ptr = buffer; - ptr = tsdbDecodeFSStatus(ptr, pStatus); + ptr = tsdbDecodeFSStatus(ptr, pStatus, fsheader.version == TSDB_FS_VERSION_0 ? false : true); } else { tsdbResetFSStatus(pStatus); } @@ -752,7 +753,7 @@ static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) { SDFileSet *pSet = (SDFileSet *)taosArrayGet(pStatus->df, i); if (tsdbScanAndTryFixDFileSet(pRepo, pSet) < 0) { - tsdbError("vgId:%d failed to fix MFile since %s", REPO_ID(pRepo), tstrerror(terrno)); + tsdbError("vgId:%d failed to fix DFileSet since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } } @@ -1098,25 +1099,23 @@ static int tsdbRestoreMeta(STsdbRepo *pRepo) { return 0; } -static int tsdbRestoreDFileSet(STsdbRepo *pRepo) { +static int tsdbFetchTFileSet(STsdbRepo *pRepo, SArray **fArray) { char dataDir[TSDB_FILENAME_LEN]; char bname[TSDB_FILENAME_LEN]; TDIR * tdir = NULL; const TFILE *pf = NULL; - const char * pattern = "^v[0-9]+f[0-9]+\\.(head|data|last)(-ver[0-9]+)?$"; - SArray * fArray = NULL; + const char * pattern = "^v[0-9]+f[0-9]+\\.(head|data|last|sma)(-ver[0-9]+)?$"; regex_t regex; - STsdbFS * pfs = REPO_FS(pRepo); tsdbGetDataDir(REPO_ID(pRepo), dataDir); // Resource allocation and init regcomp(®ex, pattern, REG_EXTENDED); - fArray = taosArrayInit(1024, sizeof(TFILE)); - if (fArray == NULL) { + *fArray = taosArrayInit(1024, sizeof(TFILE)); + if (*fArray == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbError("vgId:%d failed to restore DFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir, + tsdbError("vgId:%d failed to fetch TFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir, tstrerror(terrno)); regfree(®ex); return -1; @@ -1124,9 +1123,9 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) { tdir = tfsOpendir(dataDir); if (tdir == NULL) { - tsdbError("vgId:%d failed to restore DFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir, + tsdbError("vgId:%d failed to fetch TFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir, tstrerror(terrno)); - taosArrayDestroy(fArray); + taosArrayDestroy(*fArray); regfree(®ex); return -1; } @@ -1136,10 +1135,10 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) { int code = regexec(®ex, bname, 0, NULL, 0); if (code == 0) { - if (taosArrayPush(fArray, (void *)pf) == NULL) { + if (taosArrayPush(*fArray, (void *)pf) == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tfsClosedir(tdir); - taosArrayDestroy(fArray); + taosArrayDestroy(*fArray); regfree(®ex); return -1; } @@ -1150,10 +1149,10 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) { continue; } else { // Has other error - tsdbError("vgId:%d failed to restore DFileSet Array while run regexec since %s", REPO_ID(pRepo), strerror(code)); + tsdbError("vgId:%d failed to fetch TFileSet Array while run regexec since %s", REPO_ID(pRepo), strerror(code)); terrno = TAOS_SYSTEM_ERROR(code); tfsClosedir(tdir); - taosArrayDestroy(fArray); + taosArrayDestroy(*fArray); regfree(®ex); return -1; } @@ -1163,7 +1162,19 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) { regfree(®ex); // Sort the array according to file name - taosArraySort(fArray, tsdbComparTFILE); + taosArraySort(*fArray, tsdbComparTFILE); + return 0; +} + +static int tsdbRestoreDFileSet(STsdbRepo *pRepo) { + const TFILE *pf = NULL; + SArray * fArray = NULL; + STsdbFS * pfs = REPO_FS(pRepo); + + if (tsdbFetchTFileSet(pRepo, &fArray) < 0) { + tsdbError("vgId:%d failed to fetch TFileSet to restore since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } size_t index = 0; // Loop to recover each file set @@ -1174,7 +1185,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) { SDFileSet fset = {0}; - TSDB_FSET_SET_CLOSED(&fset); + TSDB_FSET_SET_INIT(&fset); // Loop to recover ONE fset for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { @@ -1335,7 +1346,7 @@ static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo, int32_t *nExpired) { continue; } - for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + for (TSDB_FILE_T ftype = 0; ftype < fset.nFiles; ftype++) { SDFile *pDFile = TSDB_DFILE_IN_SET(&fset, ftype); if ((tsdbLoadDFileHeader(pDFile, &info) < 0) || pDFile->info.size != info.size || diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 473ecd1387..4e52ef7ac4 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -199,7 +199,7 @@ int tsdbScanAndTryFixMFile(STsdbRepo *pRepo) { tsdbInitMFileEx(&mf, pMFile); if (access(TSDB_FILE_FULL_NAME(pMFile), F_OK) != 0) { - tsdbError("vgId:%d meta file %s not exit, report to upper layer to fix it", REPO_ID(pRepo), + tsdbError("vgId:%d meta file %s not exist, report to upper layer to fix it", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile)); pRepo->state |= TSDB_STATE_BAD_META; TSDB_FILE_SET_STATE(pMFile, TSDB_FILE_STATE_BAD); @@ -302,6 +302,7 @@ void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver, memset(&(pDFile->info), 0, sizeof(pDFile->info)); pDFile->info.magic = TSDB_FILE_INIT_MAGIC; + pDFile->info.fver = tsdbGetDFSVersion(ftype); tsdbGetFilename(vid, fid, ver, ftype, fname); tfsInitFile(&(pDFile->f), did.level, did.id, fname); @@ -350,10 +351,10 @@ static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) { return buf; } -int tsdbCreateDFile(SDFile *pDFile, bool updateHeader) { +int tsdbCreateDFile(SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType) { ASSERT(pDFile->info.size == 0 && pDFile->info.magic == TSDB_FILE_INIT_MAGIC); - pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0755); + pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_RDWR | O_CREAT | O_TRUNC | O_BINARY, 0755); if (pDFile->fd < 0) { if (errno == ENOENT) { // Try to create directory recursively @@ -364,7 +365,7 @@ int tsdbCreateDFile(SDFile *pDFile, bool updateHeader) { } tfree(s); - pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0755); + pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_RDWR | O_CREAT | O_TRUNC | O_BINARY, 0755); if (pDFile->fd < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -381,7 +382,7 @@ int tsdbCreateDFile(SDFile *pDFile, bool updateHeader) { pDFile->info.size += TSDB_FILE_HEAD_SIZE; - if (tsdbUpdateDFileHeader(pDFile) < 0) { + if (tsdbUpdateDFileHeaderEx(pDFile, tsdbGetDFSVersion(fType)) < 0) { tsdbCloseDFile(pDFile); tsdbRemoveDFile(pDFile); return -1; @@ -390,15 +391,53 @@ int tsdbCreateDFile(SDFile *pDFile, bool updateHeader) { return 0; } +// keep the fver in DFileHeader(e.g. during fs check in openFS) int tsdbUpdateDFileHeader(SDFile *pDFile) { char buf[TSDB_FILE_HEAD_SIZE] = "\0"; + if (tsdbSeekDFile(pDFile, 0, SEEK_SET) < 0) { + tsdbDebug("prop:file %s seek to read fail", TSDB_FILE_FULL_NAME(pDFile)); + return -1; + } + + if (tsdbReadDFile(pDFile, buf, TSDB_FILE_HEAD_SIZE) < 0) { + tsdbDebug("prop:file %s read fail, fd is %d", TSDB_FILE_FULL_NAME(pDFile), pDFile->fd); + return -1; + } + + if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + tsdbDebug("prop:file %s checksum fail", TSDB_FILE_FULL_NAME(pDFile)); + return -1; + } + + void *ptr = POINTER_SHIFT(buf, sizeof(uint32_t)); + tsdbEncodeDFInfo(&ptr, &(pDFile->info)); + + taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE); + + if (tsdbSeekDFile(pDFile, 0, SEEK_SET) < 0) { + tsdbDebug("prop:file %s seek to write fail", TSDB_FILE_FULL_NAME(pDFile)); + return -1; + } + + if (tsdbWriteDFile(pDFile, buf, TSDB_FILE_HEAD_SIZE) < 0) { + tsdbDebug("prop:file %s write fail", TSDB_FILE_FULL_NAME(pDFile)); + return -1; + } + + return 0; +} +// update the fver in DFileHeader +int tsdbUpdateDFileHeaderEx(SDFile *pDFile, uint32_t fver) { + char buf[TSDB_FILE_HEAD_SIZE] = "\0"; + if (tsdbSeekDFile(pDFile, 0, SEEK_SET) < 0) { return -1; } void *ptr = buf; - taosEncodeFixedU32(&ptr, TSDB_FS_VERSION); + taosEncodeFixedU32(&ptr, fver); tsdbEncodeDFInfo(&ptr, &(pDFile->info)); taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE); @@ -411,7 +450,7 @@ int tsdbUpdateDFileHeader(SDFile *pDFile) { int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo) { char buf[TSDB_FILE_HEAD_SIZE] = "\0"; - uint32_t _version; + // uint32_t _version; ASSERT(TSDB_FILE_OPENED(pDFile)); @@ -429,7 +468,7 @@ int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo) { } void *pBuf = buf; - pBuf = taosDecodeFixedU32(pBuf, &_version); + // pBuf = taosDecodeFixedU32(pBuf, &_version); pBuf = tsdbDecodeDFInfo(pBuf, pInfo); return 0; } @@ -441,7 +480,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) { tsdbInitDFileEx(&df, pDFile); if (access(TSDB_FILE_FULL_NAME(pDFile), F_OK) != 0) { - tsdbError("vgId:%d data file %s not exit, report to upper layer to fix it", REPO_ID(pRepo), + tsdbError("vgId:%d data file %s not exist, report to upper layer to fix it", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile)); pRepo->state |= TSDB_STATE_BAD_DATA; TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_BAD); @@ -454,7 +493,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) { } if (pDFile->info.size < dfstat.st_size) { - if (tsdbOpenDFile(&df, O_WRONLY) < 0) { + if (tsdbOpenDFile(&df, O_RDWR) < 0) { return -1; } @@ -489,6 +528,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) { static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) { int tlen = 0; + tlen += taosEncodeFixedU32(buf, pInfo->fver); tlen += taosEncodeFixedU32(buf, pInfo->magic); tlen += taosEncodeFixedU32(buf, pInfo->len); tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks); @@ -501,6 +541,7 @@ static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) { } static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo) { + buf = taosDecodeFixedU32(buf, &(pInfo->fver)); buf = taosDecodeFixedU32(buf, &(pInfo->magic)); buf = taosDecodeFixedU32(buf, &(pInfo->len)); buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks)); @@ -560,6 +601,7 @@ static int tsdbRollBackDFile(SDFile *pDFile) { void tsdbInitDFileSet(SDFileSet *pSet, SDiskID did, int vid, int fid, uint32_t ver) { pSet->fid = fid; pSet->state = 0; + pSet->nFiles = TSDB_FILE_MAX; for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype); @@ -568,8 +610,10 @@ void tsdbInitDFileSet(SDFileSet *pSet, SDiskID did, int vid, int fid, uint32_t v } void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet) { + ASSERT(TSDB_FSET_NFILES_VALID(pOSet)); pSet->fid = pOSet->fid; - for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + pSet->nFiles = pOSet->nFiles; + for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { tsdbInitDFileEx(TSDB_DFILE_IN_SET(pSet, ftype), TSDB_DFILE_IN_SET(pOSet, ftype)); } } @@ -578,6 +622,7 @@ int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) { int tlen = 0; tlen += taosEncodeFixedI32(buf, pSet->fid); + tlen += taosEncodeFixedU8(buf, pSet->nFiles); for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { tlen += tsdbEncodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype)); } @@ -585,13 +630,17 @@ int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) { return tlen; } -void *tsdbDecodeDFileSet(void *buf, SDFileSet *pSet) { +void *tsdbDecodeDFileSet(void *buf, SDFileSet *pSet, bool containNFiles) { int32_t fid; buf = taosDecodeFixedI32(buf, &(fid)); pSet->state = 0; pSet->fid = fid; - for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + if (containNFiles) { + buf = taosDecodeFixedU8(buf, &(pSet->nFiles)); + } + ASSERT(TSDB_FSET_NFILES_VALID(pSet)); + for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { buf = tsdbDecodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype)); } return buf; @@ -620,7 +669,8 @@ void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet) { } int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) { - for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + ASSERT(from == NULL || TSDB_FSET_NFILES_VALID(from)); + for (TSDB_FILE_T ftype = 0; ftype < from->nFiles; ftype++) { SDFile *pDFileFrom = (from) ? TSDB_DFILE_IN_SET(from, ftype) : NULL; SDFile *pDFileTo = (to) ? TSDB_DFILE_IN_SET(to, ftype) : NULL; if (tsdbApplyDFileChange(pDFileFrom, pDFileTo) < 0) { @@ -633,7 +683,7 @@ int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) { int tsdbCreateDFileSet(SDFileSet *pSet, bool updateHeader) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { - if (tsdbCreateDFile(TSDB_DFILE_IN_SET(pSet, ftype), updateHeader) < 0) { + if (tsdbCreateDFile(TSDB_DFILE_IN_SET(pSet, ftype), updateHeader, ftype) < 0) { tsdbCloseDFileSet(pSet); tsdbRemoveDFileSet(pSet); return -1; @@ -644,7 +694,7 @@ int tsdbCreateDFileSet(SDFileSet *pSet, bool updateHeader) { } int tsdbUpdateDFileSetHeader(SDFileSet *pSet) { - for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { if (tsdbUpdateDFileHeader(TSDB_DFILE_IN_SET(pSet, ftype)) < 0) { return -1; } @@ -653,7 +703,8 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) { } int tsdbScanAndTryFixDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) { - for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + ASSERT(TSDB_FSET_NFILES_VALID(pSet)); + for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { if (tsdbScanAndTryFixDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype)) < 0) { return -1; } diff --git a/src/tsdb/src/tsdbReadImpl.c b/src/tsdb/src/tsdbReadImpl.c index e8345fd91e..3835c8e11c 100644 --- a/src/tsdb/src/tsdbReadImpl.c +++ b/src/tsdb/src/tsdbReadImpl.c @@ -33,8 +33,7 @@ int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo) { memset((void *)pReadh, 0, sizeof(*pReadh)); pReadh->pRepo = pRepo; - - TSDB_FSET_SET_CLOSED(TSDB_READ_FSET(pReadh)); + TSDB_FSET_SET_INIT(TSDB_READ_FSET(pReadh)); pReadh->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx)); if (pReadh->aBlkIdx == NULL) { @@ -65,7 +64,7 @@ void tsdbDestroyReadH(SReadH *pReadh) { pReadh->pExBuf = taosTZfree(pReadh->pExBuf); #endif pReadh->pCBuf = taosTZfree(pReadh->pCBuf); - pReadh->pRBuf = taosTZfree(pReadh->pRBuf); + pReadh->pBuf = taosTZfree(pReadh->pBuf); pReadh->pDCols[0] = tdFreeDataCols(pReadh->pDCols[0]); pReadh->pDCols[1] = tdFreeDataCols(pReadh->pDCols[1]); pReadh->pBlkData = taosTZfree(pReadh->pBlkData); @@ -728,7 +727,7 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc return -1; } - if (tsdbCheckAndDecodeColumnData(pDataCol, pReadh->pRBuf, pBlockCol->len, pBlock->algorithm, pBlock->numOfRows, + if (tsdbCheckAndDecodeColumnData(pDataCol, pReadh->pBuf, pBlockCol->len, pBlock->algorithm, pBlock->numOfRows, pCfg->maxRowsPerFileBlock, pReadh->pCBuf, (int32_t)taosTSizeof(pReadh->pCBuf)) < 0) { tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile), pBlockCol->colId, offset); -- GitLab