From 5b755288725608f8acefebb5f96a7dc30019ccbb Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 26 Nov 2020 11:47:11 +0800 Subject: [PATCH] refactor --- src/tsdb/src/tsdbCommit.c | 5 +- src/tsdb/src/tsdbFile.c | 176 +++++++++++++++++++++++++++++++------- 2 files changed, 146 insertions(+), 35 deletions(-) diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 2a40d261cc..e361c36ab8 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -259,13 +259,14 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch) { pthread_rwlock_wrlock(&(pFileH->fhlock)); - tfsremove(&(helperHeadF(pHelper)->file)); + // tfsremove(&(helperHeadF(pHelper)->file)); (void)rename(TSDB_FILE_NAME(helperNewHeadF(pHelper)), TSDB_FILE_NAME(helperHeadF(pHelper))); + tfsDecDiskFile(helperNewHeadF(pHelper)->file.level, helperNewHeadF(pHelper)->file.id, 1); pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info; if (newLast) { - tfsremove(&(helperLastF(pHelper)->file)); (void)rename(TSDB_FILE_NAME(helperNewLastF(pHelper)), TSDB_FILE_NAME(helperLastF(pHelper))); + tfsDecDiskFile(helperNewLastF(pHelper)->file.level, helperNewLastF(pHelper)->file.id, 1); pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info; } else { pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info; diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 12525b7768..0960c7a4c9 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -21,13 +21,16 @@ #include "tsdbMain.h" #include "tutil.h" #include "tfs.h" +#include "tarray.h" const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"}; -static int compFGroup(const void *arg1, const void *arg2); -static int keyFGroupCompFunc(const void *key, const void *fgroup); -static void tsdbScanAllFiles(STsdbRepo *pRepo, TFILE **pfArray, int *nfiles); -static int tsdbCompareFile(void *arg1, void *arg2); +static int compFGroup(const void *arg1, const void *arg2); +static int keyFGroupCompFunc(const void *key, const void *fgroup); +static void *tsdbScanAllFiles(STsdbRepo *pRepo); +static int tsdbCompareFile(const void *arg1, const void *arg2); +static int tsdbRestoreFile(STsdbRepo *pRepo, TFILE *pfiles, int nfile); +static void tsdbParseFname(const char *bname, int *vid, int *fid, char *suffix); // STsdbFileH =========================================== STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) { @@ -70,24 +73,53 @@ void tsdbFreeFileH(STsdbFileH *pFileH) { int tsdbOpenFileH(STsdbRepo *pRepo) { ASSERT(pRepo != NULL && pRepo->tsdbFileH != NULL); - TFILE *pfArray = NULL; - int nfiles = 0; + void *pfArray = NULL; // Scan the whole directory and get data - tsdbScanAllFiles(pRepo, &pfArray, &nfiles); + pfArray = tsdbScanAllFiles(pRepo); + if (pfArray == NULL) { + return -1; + } - if (nfiles == 0) return 0; + if (taosArrayGetSize(pfArray) == 0) { + taosArrayDestroy(pfArray); + return 0; + } // Sort the files - qsort((void *)pfArray, nfiles, sizeof(TFILE), tsdbCompareFile); + taosArraySort(pfArray, tsdbCompareFile); // Loop to recover the files int iter = 0; while (true) { - if (iter >= nfiles) break; - // TODO + if (iter >= taosArrayGetSize(pfArray)) break; + + int vid, fid; + char bname[TSDB_FILENAME_LEN] = "\0"; + char suffix[TSDB_FILENAME_LEN] = "\0"; + int count = 0; + + TFILE *pf = taosArrayGet(pfArray, iter); + tsdbParseFname(pf, bname); + tsdbParseFname(bname, &vid, &fid, suffix); + count++; + iter++; + + while (true) { + int nfid = 0; + TFILE *npf = taosArrayGet(pfArray, iter); + tsdbParseFname(npf, bname); + tsdbParseFname(bname, &vid, &nfid, suffix); + + if (nfid != fid) break; + count++; + iter++; + } + + tsdbRestoreFile(pRepo, pf, count); } + taosArrayDestroy(pfArray); return 0; } @@ -481,17 +513,22 @@ int tsdbApplyRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) { return 0; } -static void tsdbScanAllFiles(STsdbRepo *pRepo, TFILE **pfArray, int *nfiles) { +static void *tsdbScanAllFiles(STsdbRepo *pRepo) { + void * farray = NULL; TDIR * tdir = NULL; char dirName[TSDB_FILENAME_LEN] = "\0"; char bname[TSDB_FILENAME_LEN] = "\0"; int arraySize = 0; regex_t regex1 = {0}; - regex_t regex2 = {0}; const TFILE *pf = NULL; - regcomp(®ex1, "^v[0-9]+f[0-9]+\\.(head|data|last|stat)$", REG_EXTENDED); - regcomp(®ex2, "^v[0-9]+f[0-9]+\\.(h|d|l|s)$", REG_EXTENDED); + farray = taosArrayInit(256, sizeof(TFILE)); + if (farray == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return NULL; + } + + regcomp(®ex1, "^v[0-9]+f[0-9]+\\.(head|data|last|stat|l|d|h|s)$", REG_EXTENDED); snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data", REPO_ID(pRepo)); @@ -503,41 +540,114 @@ static void tsdbScanAllFiles(STsdbRepo *pRepo, TFILE **pfArray, int *nfiles) { int code = regexec(®ex1, bname, 0, NULL, 0); if (code != 0) { tsdbWarn("vgId:%d file %s exists, ignore it", REPO_ID(pRepo), pf->aname); - rename(pf->aname); continue; } - if (nfiles + 1 >= arraySize) { - if (arraySize = 0) { - arraySize = 1024; - } else { - arraySize = arraySize * 2; - } - - *pfArray = realloc(*pfArray, sizeof(TFILE) * arraySize); - } - - (*pfArray)[nfiles++] = *pf; + taosArrayPush(farray, pf); } + regfree(®ex1); tfsClosedir(tdir); + + return farray; } -static int tsdbCompareFile(void *arg1, void *arg2) { +static int tsdbCompareFile(const void *arg1, const void *arg2) { char bname1[TSDB_FILENAME_LEN] = "\0"; char bname2[TSDB_FILENAME_LEN] = "\0"; TFILE *pf1 = (TFILE *)arg1; TFILE *pf2 = (TFILE *)arg2; + int vid1, fid1, vid2, fid2; tfsbasename(pf1, bname1); tfsbasename(pf2, bname2); - // TODO + + sscanf(bname1, "v%df%d", &vid1, &fid1); + sscanf(bname2, "v%df%d", &vid2, &fid2); + + ASSERT(vid1 == vid2); + if (fid1 < fid2) { + return -1; + } else if (fid1 == fid2) { + return 0 + } else { + return 1; + } } -static int tsdbGetTFileFid(TFILE *pf) { - char bname[TSDB_FILENAME_LEN] = "\0"; - int fid = 0; +static int tsdbRestoreFile(STsdbRepo *pRepo, TFILE *pfiles, int nfile) { + char backname[TSDB_FILENAME_LEN] = "\0"; + char bname[TSDB_FILENAME_LEN] = "\0"; + STsdbFileH *pFileH = pRepo->tsdbFileH; + TFILE * pfArray[TSDB_FILE_TYPE_MAX] = {0}; + TFILE * pHf = NULL; + TFILE * pLf = NULL; + SFileGroup fg = {0}; + int vid = 0; + int fid = 0; + char suffix[TSDB_FILENAME_LEN] = "\0"; + + for (int i = 0; i < nfile; i++) { + TFILE *pf = pfiles + i; + + tfsbasename(pf, bname); + tsdbParseFname(bname, &vid, &fid, suffix); + + if (strcmp(suffix, ".head") == 0) { + pfArray[TSDB_FILE_TYPE_HEAD] = pf; + } else if (strcmp(suffix, ".data") == 0) { + pfArray[TSDB_FILE_TYPE_DATA] = pf; + } else if (strcmp(suffix, ".last") == 0) { + pfArray[TSDB_FILE_TYPE_LAST] = pf; + } else if (strcmp(suffix, ".l") == 0) { + pLf = pf; + } else if (strcmp(suffix, ".h") == 0) { + pHf = pf; + } else { + tsdbWarn("vgId:%d invalid file %s exists, ignore it", REPO_ID(pRepo), pf->aname); + } + } + + if (pfArray[TSDB_FILE_TYPE_HEAD] == NULL || pfArray[TSDB_FILE_TYPE_DATA] == NULL || pfArray[TSDB_FILE_TYPE_LAST] == NULL) { + for (int i = 0; i < nfile; i++) { + snprintf(backname, TSDB_FILENAME_LEN, "%s_bak", (pfiles + i)->aname); + rename((pfiles + i)->aname, backname); + } + + return -1; + } + + if (pHf == NULL) { + if (pLf != NULL) { + rename(pLf->aname, pLastf->aname); + } + } else { + if (pLf != NULL) { + remove(pLf->aname); + } + remove(pHf->aname); + } + + // Register file + fg.fileId = fid; - tfsbasename(pf, bname); + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + SFile *pFile = fg.files + type; + + pFile->fd = -1; + pFile->file = *pfArray[type]; // TODO + tsdbOpenFile(pFile, O_RDONLY); + tsdbLoadFileHeader(pFile); + tsdbCloseFile(pFile); + } + + pFileH->pFGroup[pFileH->nFGroups++] = fg; + + tfsIncDiskFile(pHeadf->level, pHeadf->id, TSDB_FILE_TYPE_MAX); + + return 0; +} +static void tsdbParseFname(const char *bname, int *vid, int *fid, char *suffix) { + sscanf(bname, "v%df%d%s", vid, fid, suffix); } \ No newline at end of file -- GitLab