diff --git a/CMakeLists.txt b/CMakeLists.txt index d96c8da57715631344a971caa30bd81c6fdbcb4f..588526c28614f753f6ff70e898f13f3fd65318db 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,7 +16,6 @@ SET(TD_GRANT FALSE) SET(TD_SYNC TRUE) SET(TD_MQTT TRUE) SET(TD_TSDB_PLUGINS FALSE) -SET(TD_DNODE_PLUGINS FALSE) SET(TD_COVER FALSE) SET(TD_MEM_CHECK FALSE) diff --git a/cmake/define.inc b/cmake/define.inc index 3bd1520065f886a56b95f89875fac7977bc396ce..6e64c2709abee366856a61916c6905363ea2002d 100755 --- a/cmake/define.inc +++ b/cmake/define.inc @@ -25,10 +25,6 @@ IF (TD_TSDB_PLUGINS) ADD_DEFINITIONS(-D_TSDB_PLUGINS) ENDIF () -IF (TD_DNODE_PLUGINS) - ADD_DEFINITIONS(-D_DNODE_PLUGINS) -ENDIF () - IF (TD_GODLL) ADD_DEFINITIONS(-D_TD_GO_DLL_) ENDIF () diff --git a/src/common/inc/tpath.h b/src/common/inc/tpath.h index c7557aa30a0dea2e5b0ec85fba258000bd66e772..9acc776278acfb140e6de9f36b94fc4b90cdef77 100644 --- a/src/common/inc/tpath.h +++ b/src/common/inc/tpath.h @@ -22,28 +22,36 @@ extern "C" { #endif -static FORCE_INLINE void tdGetMnodeRootDir(char *rootDir, char *dirName) { - snprintf(dirName, TSDB_FILENAME_LEN, "%s/mnode", rootDir); +static FORCE_INLINE void tdGetMnodeRootDir(char *baseDir, char *dirName) { + snprintf(dirName, TSDB_FILENAME_LEN, "%s/mnode", baseDir); } -static FORCE_INLINE void tdGetDnodeRootDir(char *rootDir, char *dirName) { - snprintf(dirName, TSDB_FILENAME_LEN, "%s/dnode", rootDir); +static FORCE_INLINE void tdGetDnodeRootDir(char *baseDir, char *dirName) { + snprintf(dirName, TSDB_FILENAME_LEN, "%s/dnode", baseDir); } -static FORCE_INLINE void tdGetVnodeRootDir(char *rootDir, char *dirName) { - snprintf(dirName, TSDB_FILENAME_LEN, "%s/vnode", rootDir); +static FORCE_INLINE void tdGetVnodeRootDir(char *baseDir, char *dirName) { + snprintf(dirName, TSDB_FILENAME_LEN, "%s/vnode", baseDir); } -static FORCE_INLINE void tdGetVnodeBackRootDir(char *rootDir, char *dirName) { - snprintf(dirName, TSDB_FILENAME_LEN, "%s/vnode_bak", rootDir); +static FORCE_INLINE void tdGetVnodeBackRootDir(char *baseDir, char *dirName) { + snprintf(dirName, TSDB_FILENAME_LEN, "%s/vnode_bak", baseDir); } -static FORCE_INLINE void tdGetVnodeDir(char *rootDir, int vid, char *dirName) { - snprintf(dirName, TSDB_FILENAME_LEN, "%s/vnode/vnode%d", rootDir, vid); +static FORCE_INLINE void tdGetVnodeDir(char *baseDir, int vid, char *dirName) { + snprintf(dirName, TSDB_FILENAME_LEN, "%s/vnode/vnode%d", baseDir, vid); } -static FORCE_INLINE void tdGetVnodeBackDir(char *rootDir, int vid, char *dirName) { - snprintf(dirName, TSDB_FILENAME_LEN, "%s/vnode_bak/vnode%d", rootDir, vid); +static FORCE_INLINE void tdGetVnodeBackDir(char *baseDir, int vid, char *dirName) { + snprintf(dirName, TSDB_FILENAME_LEN, "%s/vnode_bak/vnode%d", baseDir, vid); +} + +static FORCE_INLINE void tdGetTsdbRootDir(char *baseDir, int vid, char *dirName) { + snprintf(dirName, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb", baseDir, vid); +} + +static FORCE_INLINE void tdGetTsdbDataDir(char *baseDir, int vid, char *dirName) { + snprintf(dirName, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/data", baseDir, vid); } #ifdef __cplusplus diff --git a/src/dnode/CMakeLists.txt b/src/dnode/CMakeLists.txt index d0cbda48a05d1a87563bfc940865a2bf96485f3c..5608cfd6d16826e399f9ea41dc026d2ab2459610 100644 --- a/src/dnode/CMakeLists.txt +++ b/src/dnode/CMakeLists.txt @@ -36,10 +36,6 @@ IF (TD_LINUX) TARGET_LINK_LIBRARIES(taosd balance sync) ENDIF () - IF (TD_DNODE_PLUGINS) - TARGET_LINK_LIBRARIES(taosd dnodePlugins) - ENDIF() - SET(PREPARE_ENV_CMD "prepare_env_cmd") SET(PREPARE_ENV_TARGET "prepare_env_target") ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD} diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 995734b3315d2fe2e08200c3c312a50c9a7b6167..402c4ebbd2711be09d065eb3eccaeb193c0d5806 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -34,8 +34,6 @@ #include "dnodeTelemetry.h" #include "tpath.h" -struct SDnodeTier *pDnodeTier = NULL; - static int32_t dnodeInitStorage(); static void dnodeCleanupStorage(); static void dnodeSetRunStatus(SDnodeRunStatus status); diff --git a/src/dnode/src/dnodeTier.c b/src/dnode/src/dnodeTier.c index f7b3c1f49e7fcf2bdb1fa55b82a1006e96c73f66..942fd9bf82927ae3896f19b45c02a1d4f1b54687 100644 --- a/src/dnode/src/dnodeTier.c +++ b/src/dnode/src/dnodeTier.c @@ -12,18 +12,293 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ - #include "os.h" + #include "dnode.h" +#include "dnodeInt.h" +#include "taosdef.h" + +#define DISK_MIN_FREE_SPACE 30 * 1024 * 1024 // disk free space less than 100M will not create new file again +#define DNODE_DISK_AVAIL(pDisk) ((pDisk)->dmeta.free > DISK_MIN_FREE_SPACE) + +static int dnodeFormatDir(char *idir, char *odir); +static int dnodeCheckDisk(char *dirName, int level, int primary); +static int dnodeUpdateDiskMeta(SDisk *pDisk); +static int dnodeAddDisk(SDnodeTier *pDnodeTier, char *dir, int level, int primary); + +SDnodeTier *dnodeNewTier() { + SDnodeTier *pDnodeTier = (SDnodeTier *)calloc(1, sizeof(*pDnodeTier)); + if (pDnodeTier == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + return NULL; + } + + int ret = pthread_rwlock_init(&(pDnodeTier->rwlock), NULL); + if (ret != 0) { + terrno = TAOS_SYSTEM_ERROR(ret); + dnodeCloseTier(pDnodeTier); + return NULL; + } + + pDnodeTier->map = taosHashInit(DNODE_MAX_TIERS * DNODE_MAX_DISKS_PER_TIER * 2, + taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + if (pDnodeTier->map == NULL) { + terrno = TSDB_CODE_COM_OUT_OF_MEMORY; + dnodeCloseTier(pDnodeTier); + return NULL; + } + + return pDnodeTier; +} + +void *dnodeCloseTier(SDnodeTier *pDnodeTier) { + if (pDnodeTier) { + if (pDnodeTier->map) { + taosHashCleanup(pDnodeTier->map); + pDnodeTier->map = NULL; + } + + pthread_rwlock_destroy(&(pDnodeTier->rwlock)); + + for (int i = 0; i < pDnodeTier->nTiers; i++) { + STier *pTier = pDnodeTier->tiers + i; + for (int j = 0; j < pTier->nDisks; j++) { + if (pTier->disks[j]) { + free(pTier->disks[j]); + pTier->disks[j] = NULL; + } + } + } + free(pDnodeTier); + } + return NULL; +} + +int dnodeAddDisks(SDnodeTier *pDnodeTier, SDiskCfg *pDiskCfgs, int ndisks) { + ASSERT(ndisks > 0); + + for (int i = 0; i < ndisks; i++) { + SDiskCfg *pCfg = pDiskCfgs + i; + dnodeAddDisk(pDnodeTier, pCfg->dir, pCfg->level, pCfg->primary); + } + + if (dnodeCheckTiers(pDnodeTier) < 0) return -1; + + return 0; +} + +int dnodeUpdateTiersInfo(SDnodeTier *pDnodeTier) { + for (int i = 0; i < pDnodeTier->nTiers; i++) { + STier *pTier = pDnodeTier->tiers + i; + + for (int j = 0; j < pTier->nDisks; j++) { + SDisk *pDisk = pTier->disks[j]; + if (dnodeUpdateDiskMeta(pDisk) < 0) return -1; + } + } + return 0; +} + +int dnodeCheckTiers(SDnodeTier *pDnodeTier) { + ASSERT(pDnodeTier->nTiers > 0); + if (DNODE_PRIMARY_DISK(pDnodeTier) == NULL) { + terrno = TSDB_CODE_DND_LACK_PRIMARY_DISK; + return -1; + } + + for (int i = 0; i < pDnodeTier->nTiers; i++) { + if (pDnodeTier->tiers[i].nDisks == 0) { + terrno = TSDB_CODE_DND_NO_DISK_AT_TIER; + return -1; + } + } + + return 0; +} + +SDisk *dnodeAssignDisk(SDnodeTier *pDnodeTier, int level) { + ASSERT(level < pDnodeTier->nTiers); + + STier *pTier = pDnodeTier->tiers + level; + SDisk *pDisk = NULL; + + ASSERT(pTier->nDisks > 0); + + for (int i = 0; i < pTier->nDisks; i++) { + SDisk *iDisk = pTier->disks[i]; + if (dnodeUpdateDiskMeta(iDisk) < 0) return NULL; + if (DNODE_DISK_AVAIL(iDisk)) { + if (pDisk == NULL || pDisk->dmeta.nfiles > iDisk->dmeta.nfiles) { + pDisk = iDisk; + } + } + } + + if (pDisk == NULL) { + terrno = TSDB_CODE_DND_NO_DISK_SPACE; + } + + return NULL; +} + +SDisk *dnodeGetDiskByName(SDnodeTier *pDnodeTier, char *dirName) { + char fdirName[TSDB_FILENAME_LEN] = "\0"; + SDiskID *pDiskID = NULL; + + if (dnodeFormatDir(dirName, fdirName) < 0) { + return NULL; + } + + void *ptr = taosHashGet(pDnodeTier->map, (void *)fdirName, strnlen(fdirName, TSDB_FILENAME_LEN)); + if (ptr == NULL) return NULL; + pDiskID = (SDiskID *)ptr; + + return dnodeGetDisk(pDnodeTier, pDiskID->level, pDiskID->did); +} + +static int dnodeFormatDir(char *idir, char *odir) { + wordexp_t wep; + + int code = wordexp(idir, &wep, 0); + if (code != 0) { + dError("failed to format dir %s since %s", idir, strerror(code)); + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + + if (realpath(wep.we_wordv[0], odir) == NULL) { + dError("failed to format dir %s since %s", idir, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + wordfree(&wep); + return -1; + } + + wordfree(&wep); + return 0; +} + +static int dnodeCheckDisk(char *dirName, int level, int primary) { + if (access(dirName, W_OK | R_OK | F_OK) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + struct stat pstat; + if (stat(dirName, &pstat) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (S_ISDIR(pstat.st_mode)) { + return 0; + } else { + terrno = TSDB_CODE_DND_DISK_NOT_DIRECTORY; + return -1; + } +} + +static int dnodeUpdateDiskMeta(SDisk *pDisk) { + struct statvfs dstat; + if (statvfs(pDisk->dir, &dstat) < 0) { + dError("failed to get dir %s information since %s", pDisk->dir, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + pDisk->dmeta.size = dstat.f_bsize * dstat.f_blocks; + pDisk->dmeta.free = dstat.f_bsize * dstat.f_bavail; + + return 0; +} + +static int dnodeAddDisk(SDnodeTier *pDnodeTier, char *dir, int level, int primary) { + char dirName[TSDB_FILENAME_LEN] = "\0"; + STier * pTier = NULL; + SDiskID diskid = {0}; + SDisk * pDisk = NULL; + + if (level < 0 || level >= DNODE_MAX_TIERS) { + terrno = TSDB_CODE_DND_INVALID_DISK_TIER; + dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno)); + return -1; + } + + if (dnodeFormatDir(dir, dirName) < 0) { + dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno)); + return -1; + } + + pTier = pDnodeTier->tiers + level; + diskid.level = level; + + if (pTier->nDisks >= DNODE_MAX_DISKS_PER_TIER) { + terrno = TSDB_CODE_DND_TOO_MANY_DISKS; + dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno)); + return -1; + } + + if (dnodeGetDiskByName(pDnodeTier, dirName) != NULL) { + terrno = TSDB_CODE_DND_DISK_ALREADY_EXISTS; + dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno)); + return -1; + } + + if (dnodeCheckDisk(dirName, level, primary) < 0) { + dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno)); + return -1; + } + + if (primary) { + if (level != 0) { + terrno = TSDB_CODE_DND_INVALID_DISK_TIER; + dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno)); + return -1; + } + + if (DNODE_PRIMARY_DISK(pDnodeTier) != NULL) { + terrno = TSDB_CODE_DND_DUPLICATE_PRIMARY_DISK; + dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno)); + return -1; + } + + diskid.did = 0; + } else { + if (level == 0) { + if (DNODE_PRIMARY_DISK(pDnodeTier) != NULL) { + diskid.did = pTier->nDisks; + } else { + diskid.did = pTier->nDisks + 1; + if (diskid.did >= DNODE_MAX_DISKS_PER_TIER) { + terrno = TSDB_CODE_DND_TOO_MANY_DISKS; + dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno)); + return -1; + } + } + } else { + diskid.did = pTier->nDisks; + } + } + + pDisk = (SDisk *)calloc(1, sizeof(SDisk)); + if (pDisk == NULL) { + terrno = TSDB_CODE_DND_OUT_OF_MEMORY; + dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno)); + return -1; + } + + strncpy(pDisk->dir, dirName, TSDB_FILENAME_LEN); -#ifndef _DNODE_PLUGINS + if (taosHashPut(pDnodeTier->map, (void *)dirName, strnlen(dirName, TSDB_FILENAME_LEN), (void *)(&diskid), + sizeof(diskid)) < 0) { + free(pDisk); + terrno = TSDB_CODE_DND_OUT_OF_MEMORY; + dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno)); + return -1; + } -SDnodeTier *dnodeNewTier() { return NULL; } -void * dnodeCloseTier(SDnodeTier *pDnodeTier) { return NULL; } -int dnodeAddDisks(SDnodeTier *pDnodeTier, SDiskCfg *pDiskCfgs, int ndisks) { return 0; } -int dnodeUpdateTiersInfo(SDnodeTier *pDnodeTier) { return 0; } -int dnodeCheckTiers(SDnodeTier *pDnodeTier) { return 0; } -SDisk * dnodeAssignDisk(SDnodeTier *pDnodeTier, int level) { return NULL; } -SDisk * dnodeGetDiskByName(SDnodeTier *pDnodeTier, char *dirName) { return NULL; } + pTier->nDisks++; + pTier->disks[diskid.did] = pDisk; + pDnodeTier->nTiers = MAX(pDnodeTier->nTiers, level); -#endif \ No newline at end of file + return 0; +} \ No newline at end of file diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 626ad77da2eab4be9e94516c4e5c7c0e5a45837e..da4ddee214ff262dc5923050621677e3616676d7 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -18,21 +18,25 @@ #define TAOS_RANDOM_FILE_FAIL_TEST #include "os.h" +#include "tglobal.h" #include "talgo.h" #include "tchecksum.h" #include "tsdbMain.h" #include "tutil.h" +#include "dnode.h" +#include "tpath.h" +struct SDnodeTier *pDnodeTier = NULL; +const char * tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"}; -const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"}; - -static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type); -static void tsdbDestroyFile(SFile *pFile); -static int compFGroup(const void *arg1, const void *arg2); -static int keyFGroupCompFunc(const void *key, const void *fgroup); -static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo); -static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep); -static int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days); +static void tsdbDestroyFile(SFile *pFile); +static int compFGroup(const void *arg1, const void *arg2); +static int keyFGroupCompFunc(const void *key, const void *fgroup); +static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep); +static int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days); +static int tsdbLoadFilesFromDisk(STsdbRepo *pRepo, SDisk *pDisk); +static SHashObj *tsdbGetAllFids(STsdbRepo *pRepo, char *dirName); +static int tsdbRestoreFileGroup(STsdbRepo *pRepo, SDisk *pDisk, int fid, SFileGroup *pFileGroup); // ---------------- INTERNAL FUNCTIONS ---------------- STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) { @@ -74,129 +78,25 @@ void tsdbFreeFileH(STsdbFileH *pFileH) { int tsdbOpenFileH(STsdbRepo *pRepo) { ASSERT(pRepo != NULL && pRepo->tsdbFileH != NULL); + char dataDir[TSDB_FILENAME_LEN] = "\0"; - char * tDataDir = NULL; - DIR * dir = NULL; - int fid = 0; - int vid = 0; - regex_t regex1, regex2; - int code = 0; - char fname[TSDB_FILENAME_LEN] = "\0"; - - SFileGroup fileGroup = {0}; - STsdbFileH *pFileH = pRepo->tsdbFileH; - STsdbCfg * pCfg = &(pRepo->config); - - tDataDir = tsdbGetDataDirName(pRepo->rootDir); - if (tDataDir == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - - dir = opendir(tDataDir); - if (dir == NULL) { - tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), tDataDir, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - code = regcomp(®ex1, "^v[0-9]+f[0-9]+\\.(head|data|last|stat)$", REG_EXTENDED); - if (code != 0) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - - code = regcomp(®ex2, "^v[0-9]+f[0-9]+\\.(h|d|l|s)$", REG_EXTENDED); - if (code != 0) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - - int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile); - - struct dirent *dp = NULL; - while ((dp = readdir(dir)) != NULL) { - if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0) continue; + for (int level = 0; level < pDnodeTier->nTiers; level++) { + STier *pTier = pDnodeTier->tiers + level; + for (int did = 0; did < pTier->nDisks; did++) { + SDisk *pDisk = pTier->disks[did]; - code = regexec(®ex1, dp->d_name, 0, NULL, 0); - if (code == 0) { - sscanf(dp->d_name, "v%df%d", &vid, &fid); - if (vid != REPO_ID(pRepo)) { - tsdbError("vgId:%d invalid file %s exists, ignore it", REPO_ID(pRepo), dp->d_name); + tdGetTsdbDataDir(pDisk->dir, REPO_ID(pRepo), dataDir); + + if (access(dataDir, F_OK) != 0) { + // Skip those disks without data continue; } - if (fid < mfid) { - for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { - tsdbGetDataFileName(pRepo->rootDir, pCfg->tsdbId, fid, type, fname); - (void)remove(fname); - } - continue; - } - - if (tsdbSearchFGroup(pFileH, fid, TD_EQ) != NULL) continue; - memset((void *)(&fileGroup), 0, sizeof(SFileGroup)); - fileGroup.fileId = fid; - - tsdbInitFileGroup(&fileGroup, pRepo); - } else if (code == REG_NOMATCH) { - code = regexec(®ex2, dp->d_name, 0, NULL, 0); - if (code == 0) { - size_t tsize = strlen(tDataDir) + strlen(dp->d_name) + 2; - char * fname1 = malloc(tsize); - if (fname1 == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - sprintf(fname1, "%s/%s", tDataDir, dp->d_name); - - tsize = tsize + 64; - char *fname2 = malloc(tsize); - if (fname2 == NULL) { - free(fname1); - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - sprintf(fname2, "%s/%s_back_%" PRId64, tDataDir, dp->d_name, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI)); - - (void)rename(fname1, fname2); - - tsdbDebug("vgId:%d file %s exists, backup it as %s", REPO_ID(pRepo), fname1, fname2); - - free(fname1); - free(fname2); - continue; - } else if (code == REG_NOMATCH) { - tsdbError("vgId:%d invalid file %s exists, ignore it", REPO_ID(pRepo), dp->d_name); - continue; - } else { - goto _err; - } - } else { - goto _err; + tsdbLoadFilesFromDisk(pRepo, pDisk); } - - pFileH->pFGroup[pFileH->nFGroups++] = fileGroup; - qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup); - tsdbDebug("vgId:%d file group %d is restored, nFGroups %d", REPO_ID(pRepo), fileGroup.fileId, pFileH->nFGroups); } - regfree(®ex1); - regfree(®ex2); - taosTFree(tDataDir); - closedir(dir); return 0; - -_err: - for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&fileGroup.files[type]); - - regfree(®ex1); - regfree(®ex2); - - taosTFree(tDataDir); - if (dir != NULL) closedir(dir); - tsdbCloseFileH(pRepo); - return -1; } void tsdbCloseFileH(STsdbRepo *pRepo) { @@ -522,37 +422,6 @@ _err: } // ---------------- LOCAL FUNCTIONS ---------------- -static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) { - uint32_t version; - - tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, pFile->fname); - - pFile->fd = -1; - if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err; - - if (tsdbLoadFileHeader(pFile, &version) < 0) { - tsdbError("vgId:%d failed to load file %s header part since %s", REPO_ID(pRepo), pFile->fname, tstrerror(terrno)); - goto _err; - } - - if (pFile->info.size == TSDB_FILE_HEAD_SIZE) { - pFile->info.size = lseek(pFile->fd, 0, SEEK_END); - } - - if (version != TSDB_FILE_VERSION) { - // TODO: deal with error - tsdbError("vgId:%d file %s version %u is not the same as program version %u which may cause problem", - REPO_ID(pRepo), pFile->fname, version, TSDB_FILE_VERSION); - } - - tsdbCloseFile(pFile); - - return 0; -_err: - tsdbDestroyFile(pFile); - return -1; -} - static void tsdbDestroyFile(SFile *pFile) { tsdbCloseFile(pFile); } static int compFGroup(const void *arg1, const void *arg2) { @@ -578,22 +447,219 @@ static int keyFGroupCompFunc(const void *key, const void *fgroup) { } } -static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo) { +static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep) { + return (TSKEY)(taosGetTimestamp(precision) - keep * tsMsPerDay[precision]); +} + +static int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days) { + return (int)(TSDB_KEY_FILEID(tsdbGetCurrMinKey(precision, keep), days, precision)); +} + +static int tsdbLoadFilesFromDisk(STsdbRepo *pRepo, SDisk *pDisk) { + char tsdbDataDir[TSDB_FILENAME_LEN] = "\0"; + char tsdbRootDir[TSDB_FILENAME_LEN] = "\0"; + char fname[TSDB_FILENAME_LEN] = "\0"; + SHashObj * pFids = NULL; + SHashMutableIterator *pIter = NULL; + STsdbFileH * pFileH = pRepo->tsdbFileH; + SFileGroup fgroup = {0}; + STsdbCfg * pCfg = &(pRepo->config); + int mfid = 0; + + tdGetTsdbRootDir(pDisk->dir, REPO_ID(pRepo), tsdbRootDir); + tdGetTsdbDataDir(pDisk->dir, REPO_ID(pRepo), tsdbDataDir); + + pFids = tsdbGetAllFids(pRepo, tsdbDataDir); + if (pFids == NULL) { + goto _err; + } + + pIter = taosHashCreateIter(pFids); + if (pIter == NULL) { + goto _err; + } + + mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile); + + while (taosHashIterNext(pIter)) { + int32_t fid = *(int32_t *)taosHashIterGet(pIter); + + if (fid < mfid) { + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), fid, type, fname); + (void)remove(fname); + } + + tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), fid, TSDB_FILE_TYPE_NHEAD, fname); + (void)remove(fname); + + tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), fid, TSDB_FILE_TYPE_NLAST, fname); + (void)remove(fname); + + continue; + } + + tsdbRestoreFileGroup(pRepo, pDisk, fid, &fgroup); + pFileH->pFGroup[pFileH->nFGroups++] = fgroup; + qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(fgroup), compFGroup); + + // TODO + pDisk->dmeta.nfiles++; + } + + taosHashDestroyIter(pIter); + taosHashCleanup(pFids); + return 0; + +_err: + taosHashDestroyIter(pIter); + taosHashCleanup(pFids); + return -1; +} + +static int tsdbRestoreFileGroup(STsdbRepo *pRepo, SDisk *pDisk, int fid, SFileGroup *pFileGroup) { + char tsdbRootDir[TSDB_FILENAME_LEN] = "\0"; + char nheadF[TSDB_FILENAME_LEN] = "\0"; + char nlastF[TSDB_FILENAME_LEN] = "\0"; + bool newHeadExists = false; + bool newLastExists = false; + + uint32_t version = 0; + + terrno = TSDB_CODE_SUCCESS; + + memset((void *)pFileGroup, 0, sizeof(*pFileGroup)); + pFileGroup->fileId = fid; for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { - if (tsdbInitFile(&pFGroup->files[type], pRepo, pFGroup->fileId, type) < 0) { - memset(&pFGroup->files[type].info, 0, sizeof(STsdbFileInfo)); - pFGroup->files[type].info.magic = TSDB_FILE_INIT_MAGIC; - pFGroup->state = 1; - pRepo->state = TSDB_STATE_BAD_FILE; + SFile *pFile = pFileGroup->files + type; + pFile->fd = -1; + } + + tdGetTsdbRootDir(pDisk->dir, REPO_ID(pRepo), tsdbRootDir); + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + SFile *pFile = pFileGroup->files + type; + tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), fid, TSDB_FILE_TYPE_HEAD, pFile->fname); + if (access(pFile->fname, F_OK) != 0) { + memset(&(pFile->info), 0, sizeof(pFile->info)); + pFile->info.magic = TSDB_FILE_INIT_MAGIC; + pFileGroup->state = 1; terrno = TSDB_CODE_TDB_FILE_CORRUPTED; } } -} -static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep) { - return (TSKEY)(taosGetTimestamp(precision) - keep * tsMsPerDay[precision]); + tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), fid, TSDB_FILE_TYPE_NHEAD, nheadF); + tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), fid, TSDB_FILE_TYPE_NLAST, nlastF); + + if (access(nheadF, F_OK) == 0) { + newHeadExists = true; + } + + if (access(nlastF, F_OK) == 0) { + newLastExists = true; + } + + if (newHeadExists) { + (void)remove(nheadF); + (void)remove(nlastF); + } else { + if (newLastExists) { + (void)rename(nlastF, pFileGroup->files[TSDB_FILE_TYPE_LAST].fname); + } + } + + if (terrno != TSDB_CODE_SUCCESS) { + return -1; + } + + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + SFile *pFile = pFileGroup->files + type; + if (tsdbOpenFile(pFile, O_RDONLY) < 0) { + memset(&(pFile->info), 0, sizeof(pFile->info)); + pFile->info.magic = TSDB_FILE_INIT_MAGIC; + pFileGroup->state = 1; + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + continue; + } + + if (tsdbLoadFileHeader(pFile, &version) < 0) { + memset(&(pFile->info), 0, sizeof(pFile->info)); + pFile->info.magic = TSDB_FILE_INIT_MAGIC; + pFileGroup->state = 1; + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + tsdbCloseFile(pFile); + continue; + } + + if (version != TSDB_FILE_VERSION) { + tsdbError("vgId:%d file %s version %u is not the same as program version %u which may cause problem", + REPO_ID(pRepo), pFile->fname, version, TSDB_FILE_VERSION); + } + + tsdbCloseFile(pFile); + } + + if (terrno != TSDB_CODE_SUCCESS) { + return -1; + } else { + return 0; + } } -static int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days) { - return (int)(TSDB_KEY_FILEID(tsdbGetCurrMinKey(precision, keep), days, precision)); +static SHashObj *tsdbGetAllFids(STsdbRepo *pRepo, char *dirName) { + DIR * dir = NULL; + regex_t regex = {0}; + int code = 0; + int32_t vid, fid; + SHashObj *pHash = NULL; + + code = regcomp(®ex, "^v[0-9]+f[0-9]+\\.(head|data|last|h|d|l)$", REG_EXTENDED); + if (code != 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _err; + } + + dir = opendir(dirName); + if (dir == NULL) { + tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), dirName, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + pHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (pHash == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _err; + } + + struct dirent *dp = NULL; + while ((dp = readdir(dir)) != NULL) { + if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0) continue; + + code = regexec(®ex, dp->d_name, 0, NULL, 0); + if (code == 0) { + sscanf(dp->d_name, "v%df%d", &vid, &fid); + + if (vid != REPO_ID(pRepo)) { + tsdbError("vgId:%d invalid file %s exists, ignore it", REPO_ID(pRepo), dp->d_name); + continue; + } + + taosHashPut(pHash, (void *)(&fid), sizeof(fid), (void *)(&fid), sizeof(fid)); + } else if (code == REG_NOMATCH) { + tsdbError("vgId:%d invalid file %s exists, ignore it", REPO_ID(pRepo), dp->d_name); + continue; + } else { + goto _err; + } + } + + closedir(dir); + regfree(®ex); + return pHash; + +_err: + taosHashCleanup(pHash); + if (dir != NULL) closedir(dir); + regfree(®ex); + return NULL; } \ No newline at end of file diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index a05b8aa971e3323af3bfdaeca7a9a42e4a5cce24..1d7967464f7e89240322d9d0ef85b77be15418fa 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -105,7 +105,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { } char rootDir[TSDB_FILENAME_LEN] = {0}; - sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnodeCfg->cfg.vgId); + tdGetVnodeDir(tsDataDir, pVnodeCfg->cfg.vgId, rootDir); if (mkdir(rootDir, 0755) != 0 && errno != EEXIST) { vError("vgId:%d, failed to create vnode, reason:%s dir:%s", pVnodeCfg->cfg.vgId, strerror(errno), rootDir); if (errno == EACCES) { @@ -138,7 +138,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { tsdbCfg.compression = pVnodeCfg->cfg.compression; char tsdbDir[TSDB_FILENAME_LEN] = {0}; - sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId); + tdGetTsdbRootDir(tsDataDir, pVnodeCfg->cfg.vgId, tsdbDir); if (tsdbCreateRepo(tsdbDir, &tsdbCfg) < 0) { vError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno)); return TSDB_CODE_VND_INIT_FAILED;