From 7c7644b545f3ed11ca066326af4d0b10c356b9f9 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 17 Jan 2021 13:00:04 +0800 Subject: [PATCH] more progress --- src/tsdb/inc/tsdbFile.h | 2 + src/tsdb/src/tsdbFS.c | 171 ++++++++++++++++++++++++++++++++++++++-- src/tsdb/src/tsdbFile.c | 77 ++++++++++++++++++ 3 files changed, 242 insertions(+), 8 deletions(-) diff --git a/src/tsdb/inc/tsdbFile.h b/src/tsdb/inc/tsdbFile.h index af57139f82..2719e7aeec 100644 --- a/src/tsdb/inc/tsdbFile.h +++ b/src/tsdb/inc/tsdbFile.h @@ -60,6 +60,7 @@ void* tsdbDecodeSMFile(void* buf, SMFile* pMFile); int tsdbApplyMFileChange(SMFile* from, SMFile* to); int tsdbCreateMFile(SMFile* pMFile); int tsdbUpdateMFileHeader(SMFile* pMFile); +int tsdbScanAndTryFixMFile(SMFile* pMFile); static FORCE_INLINE void tsdbSetMFileInfo(SMFile* pMFile, SMFInfo* pInfo) { pMFile->info = *pInfo; } @@ -300,6 +301,7 @@ void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet); int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to); int tsdbCreateDFileSet(SDFileSet* pSet); int tsdbUpdateDFileSetHeader(SDFileSet* pSet); +int tsdbScanAndTryFixDFileSet(SDFileSet* pSet); static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { diff --git a/src/tsdb/src/tsdbFS.c b/src/tsdb/src/tsdbFS.c index 566e5d4b12..771c1a4eeb 100644 --- a/src/tsdb/src/tsdbFS.c +++ b/src/tsdb/src/tsdbFS.c @@ -15,14 +15,17 @@ #include "tsdbint.h" -#define TSDB_FS_CURRENT_FNAME "current" -#define TSDB_FS_TEMP_FNAME "current.t" +typedef enum { TSDB_TXN_TEMP_FILE = 0, TSDB_TXN_CURR_FILE } TSDB_TXN_FILE_T; +static const char *tsdbTxnFname[] = {"current.t", "current"}; #define TSDB_MAX_FSETS(keep, days) ((keep) / (days) + 3) static int tsdbComparFidFSet(const void *arg1, const void *arg2); static void tsdbResetFSStatus(SFSStatus *pStatus); static int tsdbApplyFSTxn(STsdbFS *pfs, int vid); static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo); +static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]); +static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo); +static int tsdbScanAndTryFixFS(STsdbRepo *pRepo); // ================== CURRENT file header info static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) { @@ -100,7 +103,7 @@ static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) { return tlen; } -static UNUSED_FUNC void *tsdbDecodeFSStatus(void *buf, SFSStatus *pStatus) { +static void *tsdbDecodeFSStatus(void *buf, SFSStatus *pStatus) { tsdbResetFSStatus(pStatus); pStatus->pmf = &(pStatus->mf); @@ -228,7 +231,23 @@ void *tsdbFreeFS(STsdbFS *pfs) { // TODO int tsdbOpenFS(STsdbRepo *pRepo) { - // TODO + STsdbFS * pfs = REPO_FS(pRepo); + char current[TSDB_FILENAME_LEN] = "\0"; + + ASSERT(pfs != NULL); + + tsdbGetTxnFname(REPO_ID(pRepo), TSDB_TXN_CURR_FILE, current); + + if (access(current, F_OK) == 0) { + if (tsdbOpenFSFromCurrent(pRepo) < 0) { + tsdbError("vgId:%d failed to open FS since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + } else { + // TODO: current file not exists, try to recover it + + } + return 0; } @@ -245,9 +264,13 @@ void tsdbStartFSTxn(STsdbRepo *pRepo, int64_t pointsAdd, int64_t storageAdd) { pfs->intxn = true; tsdbResetFSStatus(pfs->nstatus); pfs->nstatus->meta = pfs->cstatus->meta; - pfs->nstatus->meta.version = pfs->cstatus->meta.version + 1; + if (pfs->cstatus->pmf == NULL) { + pfs->nstatus->meta.version = 0; + } else { + pfs->nstatus->meta.version = pfs->cstatus->meta.version + 1; + } pfs->nstatus->meta.totalPoints = pfs->cstatus->meta.totalPoints + pointsAdd; - pfs->nstatus->meta.version = pfs->cstatus->meta.totalStorage += storageAdd; + pfs->nstatus->meta.totalStorage = pfs->cstatus->meta.totalStorage += storageAdd; } void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta) { pfs->nstatus->meta = *pMeta; } @@ -297,8 +320,8 @@ static int tsdbApplyFSTxn(STsdbFS *pfs, int vid) { char tfname[TSDB_FILENAME_LEN] = "\0"; char cfname[TSDB_FILENAME_LEN] = "\0"; - snprintf(tfname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", TFS_PRIMARY_PATH(), vid, TSDB_FS_TEMP_FNAME); - snprintf(cfname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", TFS_PRIMARY_PATH(), vid, TSDB_FS_CURRENT_FNAME); + tsdbGetTxnFname(vid, TSDB_TXN_TEMP_FILE, tfname); + tsdbGetTxnFname(vid, TSDB_TXN_CURR_FILE, cfname); int fd = open(tfname, O_WRONLY | O_CREAT | O_TRUNC, 0755); if (fd < 0) { @@ -527,4 +550,136 @@ static int tsdbComparFidFSet(const void *arg1, const void *arg2) { } else { return 1; } +} + +static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]) { + snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", TFS_PRIMARY_PATH(), repoid, tsdbTxnFname[ftype]); +} + +static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo) { + STsdbFS * pfs = REPO_FS(pRepo); + int fd = -1; + void * buffer = NULL; + SFSHeader fsheader; + char current[TSDB_FILENAME_LEN] = "\0"; + void * ptr; + + tsdbGetTxnFname(REPO_ID(pRepo), TSDB_TXN_CURR_FILE, current); + + // current file exists, try to recover + fd = open(current, O_RDONLY); + if (fd < 0) { + tsdbError("vgId:%d failed to open file %s since %s", REPO_ID(pRepo), current, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (tsdbMakeRoom(&buffer, TSDB_FILE_HEAD_SIZE) < 0) { + goto _err; + } + + int nread = taosRead(fd, buffer, TSDB_FILE_HEAD_SIZE); + if (nread < 0) { + tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pRepo), TSDB_FILENAME_LEN, current, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (nread < TSDB_FILE_HEAD_SIZE) { + tsdbError("vgId:%d failed to read header of file %s, read bytes:%d", REPO_ID(pRepo), current, nread); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + goto _err; + } + + if (!taosCheckChecksumWhole((uint8_t *)buffer, TSDB_FILE_HEAD_SIZE)) { + tsdbError("vgId:%d header of file %s failed checksum check", REPO_ID(pRepo), current); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + goto _err; + } + + ptr = buffer; + ptr = tsdbDecodeFSHeader(ptr, &fsheader); + + if (fsheader.version != TSDB_FS_VERSION) { + // TODO: handle file version change + } + + SFSStatus *pStatus = pfs->cstatus; + + if (fsheader.len > 0) { + if (tsdbMakeRoom(&buffer, fsheader.len) < 0) { + goto _err; + } + + nread = taosRead(fd, buffer, fsheader.len); + if (nread < 0) { + tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), current, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (nread < fsheader.len) { + tsdbError("vgId:%d failed to read %d bytes from file %s", REPO_ID(pRepo), fsheader.len, current); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + goto _err; + } + + if (!taosCheckChecksumWhole((uint8_t *)buffer, fsheader.len)) { + tsdbError("vgId:%d file %s is corrupted since wrong checksum", REPO_ID(pRepo), current); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + goto _err; + } + + ptr = buffer; + ptr = tsdbDecodeFSMeta(ptr, &(pStatus->meta)); + ptr = tsdbDecodeFSStatus(ptr, pStatus); + } else { + tsdbResetFSStatus(pStatus); + } + + taosTZfree(buffer); + close(fd); + + if (tsdbScanAndTryFixFS(pRepo) < 0) { + return -1; + } + + return 0; + +_err: + if (fd >= 0) { + close(fd); + } + taosTZfree(buffer); + return -1; +} + +// Scan and try to fix incorrect files +static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) { + STsdbFS * pfs = REPO_FS(pRepo); + SFSStatus *pStatus = pfs->cstatus; + + if (pStatus->pmf) { + if (tsdbScanAndTryFixMFile(pStatus->pmf) < 0) { + tsdbError("vgId:%d failed to fix MFile since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + } + + size_t size = taosArrayGetSize(pStatus->df); + + for (size_t i = 0; i < size; i++) { + SDFileSet *pSet = (SDFileSet *)taosArrayGet(pStatus->df, i); + + if (tsdbScanAndTryFixDFileSet(pSet) < 0) { + tsdbError("vgId:%d failed to fix MFile since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + } + + // TODO: remove those unused files + {} + + return 0; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 36dff0f5f4..bfd84e1aa6 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -128,6 +128,40 @@ int tsdbUpdateMFileHeader(SMFile *pMFile) { return 0; } +int tsdbScanAndTryFixMFile(SMFile *pMFile) { + struct stat mfstat; + SMFile mf = *pMFile; + + if (stat(TSDB_FILE_FULL_NAME(&mf), &mfstat) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (pMFile->info.size > mfstat.st_size) { + if (tsdbOpenMFile(&mf, O_WRONLY) < 0) { + return -1; + } + + if (taosFtruncate(mf.fd, mf.info.size) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + tsdbCloseMFile(&mf); + return -1; + } + + if (tsdbUpdateMFileHeader(&mf) < 0) { + tsdbCloseMFile(&mf); + return -1; + } + + tsdbCloseMFile(&mf); + } else if (pMFile->info.size < mfstat.st_size) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return -1; + } + + return 0; +} + static int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo) { int tlen = 0; @@ -251,6 +285,40 @@ int tsdbUpdateDFileHeader(SDFile *pDFile) { return 0; } +static int tsdbScanAndTryFixDFile(SDFile *pDFile) { + struct stat dfstat; + SDFile df = *pDFile; + + if (stat(TSDB_FILE_FULL_NAME(&df), &dfstat) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (pDFile->info.size > dfstat.st_size) { + if (tsdbOpenDFile(&df, O_WRONLY) < 0) { + return -1; + } + + if (taosFtruncate(df.fd, df.info.size) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + tsdbCloseDFile(&df); + return -1; + } + + if (tsdbUpdateDFileHeader(&df) < 0) { + tsdbCloseDFile(&df); + return -1; + } + + tsdbCloseDFile(&df); + } else if (pDFile->info.size < dfstat.st_size) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return -1; + } + + return 0; +} + static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) { int tlen = 0; @@ -388,6 +456,15 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) { return 0; } +int tsdbScanAndTryFixDFileSet(SDFileSet *pSet) { + for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + if (tsdbScanAndTryFixDFile(TSDB_DFILE_IN_SET(pSet, ftype)) < 0) { + return -1; + } + } + return 0; +} + static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname) { ASSERT(ftype != TSDB_FILE_MAX); -- GitLab