/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #define _DEFAULT_SOURCE #include "tfsInt.h" #define TMPNAME_LEN (TSDB_FILENAME_LEN * 2 + 32) typedef struct { pthread_spinlock_t lock; SFSMeta meta; int32_t nlevel; STier tiers[TSDB_MAX_TIERS]; SHashObj *map; // name to did map } SFS; typedef struct { SDisk *pDisk; } SDiskIter; #define TFS_META() (pfs->meta) #define TFS_NLEVEL() (pfs->nlevel) #define TFS_TIERS() (pfs->tiers) #define TFS_TIER_AT(level) (TFS_TIERS() + (level)) #define TFS_DISK_AT(level, id) DISK_AT_TIER(TFS_TIER_AT(level), id) #define TFS_PRIMARY_DISK() TFS_DISK_AT(TFS_PRIMARY_LEVEL, TFS_PRIMARY_ID) #define TFS_IS_VALID_LEVEL(level) (((level) >= 0) && ((level) < TFS_NLEVEL())) #define TFS_IS_VALID_ID(level, id) (((id) >= 0) && ((id) < TIER_NDISKS(TFS_TIER_AT(level)))) #define TFS_IS_VALID_DISK(level, id) (TFS_IS_VALID_LEVEL(level) && TFS_IS_VALID_ID(level, id)) #define tfsLock() pthread_spin_lock(&(pfs->lock)) #define tfsUnLock() pthread_spin_unlock(&(pfs->lock)) static SFS tfs = {0}; static SFS *pfs = &tfs; // STATIC DECLARATION static int32_t tfsMount(SDiskCfg *pCfg); static int32_t tfsCheck(); static int32_t tfsCheckAndFormatCfg(SDiskCfg *pCfg); static int32_t tfsFormatDir(char *idir, char *odir); static SDisk *tfsGetDiskByID(SDiskID did); static SDisk *tfsGetDiskByName(const char *dir); static int32_t tfsOpendirImpl(TDIR *tdir); static void tfsInitDiskIter(SDiskIter *pIter); static SDisk *tfsNextDisk(SDiskIter *pIter); // FS APIs ==================================== int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk) { if (ndisk < 0) { terrno = TSDB_CODE_INVALID_PARA; return -1; } for (int32_t level = 0; level < TSDB_MAX_TIERS; level++) { if (tfsInitTier(TFS_TIER_AT(level), level) < 0) { while (true) { level--; if (level < 0) break; tfsDestroyTier(TFS_TIER_AT(level)); } return -1; } } pthread_spin_init(&(pfs->lock), 0); pfs->map = taosHashInit(TSDB_MAX_TIERS * TSDB_MAX_DISKS_PER_TIER * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (pfs->map == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tfsCleanup(); return -1; } for (int32_t idisk = 0; idisk < ndisk; idisk++) { if (tfsMount(pDiskCfg + idisk) < 0) { tfsCleanup(); return -1; } } if (tfsCheck() < 0) { tfsCleanup(); return -1; } tfsUpdateSize(NULL); for (int32_t level = 0; level < TFS_NLEVEL(); level++) { tfsPosNextId(TFS_TIER_AT(level)); } return 0; } void tfsCleanup() { taosHashCleanup(pfs->map); pfs->map = NULL; pthread_spin_destroy(&(pfs->lock)); for (int32_t level = 0; level < TFS_NLEVEL(); level++) { tfsDestroyTier(TFS_TIER_AT(level)); } } void tfsUpdateSize(SFSMeta *pFSMeta) { SFSMeta fsMeta = {0}; SDiskSize size = {0}; if (pFSMeta == NULL) { pFSMeta = &fsMeta; } memset(pFSMeta, 0, sizeof(SFSMeta)); for (int32_t level = 0; level < TFS_NLEVEL(); level++) { STier *pTier = TFS_TIER_AT(level); tfsUpdateTierSize(pTier); pFSMeta->total += pTier->size.total; pFSMeta->avail += pTier->size.avail; pFSMeta->used += pTier->size.used; } tfsLock(); pfs->meta = *pFSMeta; tfsUnLock(); } /* Allocate an existing available tier level */ void tfsAllocDisk(int32_t expLevel, int32_t *level, int32_t *id) { ASSERT(expLevel >= 0); *level = expLevel; *id = TFS_UNDECIDED_ID; if (*level >= TFS_NLEVEL()) { *level = TFS_NLEVEL() - 1; } while (*level >= 0) { *id = tfsAllocDiskOnTier(TFS_TIER_AT(*level)); if (*id == TFS_UNDECIDED_ID) { (*level)--; continue; } return; } *level = TFS_UNDECIDED_LEVEL; *id = TFS_UNDECIDED_ID; } const char *TFS_PRIMARY_PATH() { return DISK_DIR(TFS_PRIMARY_DISK()); } const char *TFS_DISK_PATH(int32_t level, int32_t id) { return DISK_DIR(TFS_DISK_AT(level, id)); } // TFILE APIs ==================================== void tfsInitFile(TFILE *pf, int32_t level, int32_t id, const char *bname) { ASSERT(TFS_IS_VALID_DISK(level, id)); SDisk *pDisk = TFS_DISK_AT(level, id); pf->level = level; pf->id = id; tstrncpy(pf->rname, bname, TSDB_FILENAME_LEN); char tmpName[TMPNAME_LEN] = {0}; snprintf(tmpName, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), bname); tstrncpy(pf->aname, tmpName, TSDB_FILENAME_LEN); } bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2) { ASSERT(pf1 != NULL || pf2 != NULL); if (pf1 == NULL || pf2 == NULL) return false; if (pf1->level != pf2->level) return false; if (pf1->id != pf2->id) return false; if (strncmp(pf1->rname, pf2->rname, TSDB_FILENAME_LEN) != 0) return false; return true; } int32_t tfsEncodeFile(void **buf, TFILE *pf) { int32_t tlen = 0; tlen += taosEncodeVariantI32(buf, pf->level); tlen += taosEncodeVariantI32(buf, pf->id); tlen += taosEncodeString(buf, pf->rname); return tlen; } void *tfsDecodeFile(void *buf, TFILE *pf) { int32_t level, id; char *rname; buf = taosDecodeVariantI32(buf, &(level)); buf = taosDecodeVariantI32(buf, &(id)); buf = taosDecodeString(buf, &rname); tfsInitFile(pf, level, id, rname); tfree(rname); return buf; } void tfsbasename(const TFILE *pf, char *dest) { char tname[TSDB_FILENAME_LEN] = "\0"; tstrncpy(tname, pf->aname, TSDB_FILENAME_LEN); tstrncpy(dest, basename(tname), TSDB_FILENAME_LEN); } void tfsdirname(const TFILE *pf, char *dest) { char tname[TSDB_FILENAME_LEN] = "\0"; tstrncpy(tname, pf->aname, TSDB_FILENAME_LEN); tstrncpy(dest, dirname(tname), TSDB_FILENAME_LEN); } // DIR APIs ==================================== int32_t tfsMkdirAt(const char *rname, int32_t level, int32_t id) { SDisk *pDisk = TFS_DISK_AT(level, id); char aname[TMPNAME_LEN]; snprintf(aname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), rname); if (taosMkDir(aname) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } return 0; } int32_t tfsMkdirRecurAt(const char *rname, int32_t level, int32_t id) { if (tfsMkdirAt(rname, level, id) < 0) { if (errno == ENOENT) { // Try to create upper char *s = strdup(rname); // Make a copy of dirname(s) because the implementation of 'dirname' differs on different platforms. // Some platform may modify the contents of the string passed into dirname(). Others may return a pointer to // internal static storage space that will be overwritten by next call. For case like that, we should not use // the pointer directly in this recursion. // See // https://developer.apple.com/library/archive/documentation/System/Conceptual/ManPages_iPhoneOS/man3/dirname.3.html char *dir = strdup(dirname(s)); if (tfsMkdirRecurAt(dir, level, id) < 0) { free(s); free(dir); return -1; } free(s); free(dir); if (tfsMkdirAt(rname, level, id) < 0) { return -1; } } else { return -1; } } return 0; } int32_t tfsMkdir(const char *rname) { for (int32_t level = 0; level < TFS_NLEVEL(); level++) { STier *pTier = TFS_TIER_AT(level); for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) { if (tfsMkdirAt(rname, level, id) < 0) { return -1; } } } return 0; } int32_t tfsRmdir(const char *rname) { char aname[TMPNAME_LEN] = "\0"; for (int32_t level = 0; level < TFS_NLEVEL(); level++) { STier *pTier = TFS_TIER_AT(level); for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) { SDisk *pDisk = pTier->disks[id]; snprintf(aname, TMPNAME_LEN, "%s%s%s", DISK_DIR(pDisk), TS_PATH_DELIMITER, rname); taosRemoveDir(aname); } } return 0; } #if 0 int32_t tfsRename(char *orname, char *nrname) { char oaname[TMPNAME_LEN] = "\0"; char naname[TMPNAME_LEN] = "\0"; for (int32_t level = 0; level < pfs->nlevel; level++) { STier *pTier = TFS_TIER_AT(level); for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) { SDisk *pDisk = DISK_AT_TIER(pTier, id); snprintf(oaname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), orname); snprintf(naname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), nrname); taosRenameFile(oaname, naname); } } return 0; } #endif struct TDIR { SDiskIter iter; int32_t level; int32_t id; char dirname[TSDB_FILENAME_LEN]; TFILE tfile; DIR *dir; }; TDIR *tfsOpendir(const char *rname) { TDIR *tdir = (TDIR *)calloc(1, sizeof(*tdir)); if (tdir == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } tfsInitDiskIter(&(tdir->iter)); tstrncpy(tdir->dirname, rname, TSDB_FILENAME_LEN); if (tfsOpendirImpl(tdir) < 0) { free(tdir); return NULL; } return tdir; } const TFILE *tfsReaddir(TDIR *tdir) { if (tdir == NULL || tdir->dir == NULL) return NULL; char bname[TMPNAME_LEN * 2] = "\0"; while (true) { struct dirent *dp = NULL; dp = readdir(tdir->dir); if (dp != NULL) { // Skip . and .. if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0) continue; snprintf(bname, TMPNAME_LEN * 2, "%s/%s", tdir->dirname, dp->d_name); tfsInitFile(&(tdir->tfile), tdir->level, tdir->id, bname); return &(tdir->tfile); } if (tfsOpendirImpl(tdir) < 0) { return NULL; } if (tdir->dir == NULL) { terrno = TSDB_CODE_SUCCESS; return NULL; } } } void tfsClosedir(TDIR *tdir) { if (tdir) { if (tdir->dir != NULL) { closedir(tdir->dir); tdir->dir = NULL; } free(tdir); } } // private static int32_t tfsMount(SDiskCfg *pCfg) { SDiskID did; SDisk *pDisk = NULL; if (tfsCheckAndFormatCfg(pCfg) < 0) return -1; did.level = pCfg->level; pDisk = tfsMountDiskToTier(TFS_TIER_AT(did.level), pCfg); if (pDisk == NULL) { fError("failed to mount disk %s to level %d since %s", pCfg->dir, pCfg->level, tstrerror(terrno)); return -1; } did.id = pDisk->id; taosHashPut(pfs->map, (void *)(pCfg->dir), strnlen(pCfg->dir, TSDB_FILENAME_LEN), (void *)(&did), sizeof(did)); if (pfs->nlevel < pCfg->level + 1) pfs->nlevel = pCfg->level + 1; return 0; } static int32_t tfsCheckAndFormatCfg(SDiskCfg *pCfg) { char dirName[TSDB_FILENAME_LEN] = "\0"; struct stat pstat; if (pCfg->level < 0 || pCfg->level >= TSDB_MAX_TIERS) { fError("failed to mount %s to FS since invalid level %d", pCfg->dir, pCfg->level); terrno = TSDB_CODE_FS_INVLD_CFG; return -1; } if (pCfg->primary) { if (pCfg->level != 0) { fError("failed to mount %s to FS since disk is primary but level %d not 0", pCfg->dir, pCfg->level); terrno = TSDB_CODE_FS_INVLD_CFG; return -1; } if (TFS_PRIMARY_DISK() != NULL) { fError("failed to mount %s to FS since duplicate primary mount", pCfg->dir); terrno = TSDB_CODE_FS_DUP_PRIMARY; return -1; } } if (tfsFormatDir(pCfg->dir, dirName) < 0) { fError("failed to mount %s to FS since invalid dir format", pCfg->dir); terrno = TSDB_CODE_FS_INVLD_CFG; return -1; } if (tfsGetDiskByName(dirName) != NULL) { fError("failed to mount %s to FS since duplicate mount", pCfg->dir); terrno = TSDB_CODE_FS_INVLD_CFG; return -1; } if (access(dirName, W_OK | R_OK | F_OK) != 0) { fError("failed to mount %s to FS since no R/W access rights", pCfg->dir); terrno = TSDB_CODE_FS_INVLD_CFG; return -1; } if (stat(dirName, &pstat) < 0) { fError("failed to mount %s to FS since %s", pCfg->dir, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } if (!S_ISDIR(pstat.st_mode)) { fError("failed to mount %s to FS since not a directory", pCfg->dir); terrno = TSDB_CODE_FS_INVLD_CFG; return -1; } strncpy(pCfg->dir, dirName, TSDB_FILENAME_LEN); return 0; } static int32_t tfsFormatDir(char *idir, char *odir) { wordexp_t wep = {0}; int32_t code = wordexp(idir, &wep, 0); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); return -1; } char tmp[PATH_MAX] = {0}; if (realpath(wep.we_wordv[0], tmp) == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); wordfree(&wep); return -1; } strcpy(odir, tmp); wordfree(&wep); return 0; } static int32_t tfsCheck() { if (TFS_PRIMARY_DISK() == NULL) { fError("no primary disk is set"); terrno = TSDB_CODE_FS_NO_PRIMARY_DISK; return -1; } for (int32_t level = 0; level < TFS_NLEVEL(); level++) { if (TIER_NDISKS(TFS_TIER_AT(level)) == 0) { fError("no disk at level %d", level); terrno = TSDB_CODE_FS_NO_MOUNT_AT_TIER; return -1; } } return 0; } static SDisk *tfsGetDiskByID(SDiskID did) { return TFS_DISK_AT(did.level, did.id); } static SDisk *tfsGetDiskByName(const char *dir) { SDiskID did; SDisk *pDisk = NULL; void *pr = NULL; pr = taosHashGet(pfs->map, (void *)dir, strnlen(dir, TSDB_FILENAME_LEN)); if (pr == NULL) return NULL; did = *(SDiskID *)pr; pDisk = tfsGetDiskByID(did); ASSERT(pDisk != NULL); return pDisk; } static int32_t tfsOpendirImpl(TDIR *tdir) { SDisk *pDisk = NULL; char adir[TMPNAME_LEN * 2] = "\0"; if (tdir->dir != NULL) { closedir(tdir->dir); tdir->dir = NULL; } while (true) { pDisk = tfsNextDisk(&(tdir->iter)); if (pDisk == NULL) return 0; tdir->level = pDisk->level; tdir->id = pDisk->id; snprintf(adir, TMPNAME_LEN * 2, "%s%s%s", pDisk->path, TS_PATH_DELIMITER,tdir->dirname); tdir->dir = opendir(adir); if (tdir->dir != NULL) break; } return 0; } static void tfsInitDiskIter(SDiskIter *pIter) { pIter->pDisk = TFS_DISK_AT(0, 0); } static SDisk *tfsNextDisk(SDiskIter *pIter) { SDisk *pDisk = pIter->pDisk; if (pDisk == NULL) return NULL; int32_t level = pDisk->level; int32_t id = pDisk->id; id++; if (id < TIER_NDISKS(TFS_TIER_AT(level))) { pIter->pDisk = TFS_DISK_AT(level, id); ASSERT(pIter->pDisk != NULL); } else { level++; id = 0; if (level < TFS_NLEVEL()) { pIter->pDisk = TFS_DISK_AT(level, id); ASSERT(pIter->pDisk != NULL); } else { pIter->pDisk = NULL; } } return pDisk; } // OTHER FUNCTIONS =================================== void taosGetDisk() { const double unit = 1024 * 1024 * 1024; SDiskSize diskSize; SFSMeta fsMeta; tfsUpdateSize(&fsMeta); tsTotalDataDirGB = (float)(fsMeta.total / unit); tsUsedDataDirGB = (float)(fsMeta.used / unit); tsAvailDataDirGB = (float)(fsMeta.avail / unit); if (taosGetDiskSize(tsLogDir, &diskSize) == 0) { tsTotalLogDirGB = (float)(diskSize.total / unit); tsAvailLogDirGB = (float)(diskSize.avail / unit); } if (taosGetDiskSize(tsTempDir, &diskSize) == 0) { tsTotalTmpDirGB = (float)(diskSize.total / unit); tsAvailTmpDirectorySpace = (float)(diskSize.avail / unit); } }