提交 924f88dc 编写于 作者: H Hongze Cheng

refact tfs

上级 17b1de5d
......@@ -33,14 +33,16 @@ typedef struct {
#define TFS_PRIMARY_ID 0
// FS APIs ====================================
int tfsInit(SDiskCfg *pDiskCfg, int ndisk);
void tfsDestroy();
void tfsUpdateInfo();
int64_t tfsTotalSize();
int64_t tfsAvailSize();
void tfsIncDiskFile(int level, int id, int num);
void tfsDecDiskFile(int level, int id, int num);
void tfsAllocDisk(int expLevel, int *level, int *id);
typedef struct {
int64_t tsize;
int64_t avail;
} SFSMeta;
int tfsInit(SDiskCfg *pDiskCfg, int ndisk);
void tfsDestroy();
void tfsUpdateInfo();
void tfsGetMeta(SFSMeta *pMeta);
void tfsAllocDisk(int expLevel, int *level, int *id);
const char *TFS_PRIMARY_PATH();
const char *TFS_DISK_PATH(int level, int id);
......@@ -58,14 +60,14 @@ typedef struct {
#define TFILE_NAME(pf) ((pf)->aname)
#define TFILE_REL_NAME(pf) ((pf)->rname)
#define tfsopen(pf, flags) open(TFILE_NAME(pf), flags)
#define tfsclose(fd) close(fd)
#define tfsremove(pf) remove(TFILE_NAME(pf))
#define tfscopy(sf, df) taosCopy(TFILE_NAME(sf), TFILE_NAME(df))
#define tfsrename(sf, df) rename(TFILE_NAME(sf), TFILE_NAME(df))
void tfsInitFile(TFILE *pf, int level, int id, const char *bname);
bool tfsIsSameFile(TFILE *pf1, TFILE *pf2);
void tfsSetLevel(TFILE *pf, int level);
void tfsSetID(TFILE *pf, int id);
int tfsopen(TFILE *pf, int flags);
int tfsclose(int fd);
int tfsremove(TFILE *pf);
int tfscopy(TFILE *sf, TFILE *df);
void tfsbasename(const TFILE *pf, char *dest);
void tfsdirname(const TFILE *pf, char *dest);
......
......@@ -18,6 +18,7 @@
#include "tlog.h"
#include "tglobal.h"
#include "tfs.h"
#ifdef __cplusplus
extern "C" {
......@@ -33,9 +34,11 @@ extern int fsDebugFlag;
#define fDebug(...) { if (fsDebugFlag & DEBUG_DEBUG) { taosPrintLog("TFS ", cqDebugFlag, __VA_ARGS__); }}
#define fTrace(...) { if (fsDebugFlag & DEBUG_TRACE) { taosPrintLog("TFS ", cqDebugFlag, __VA_ARGS__); }}
// Global Definitions
#define TFS_MIN_DISK_FREE_SIZE 50 * 1024 * 1024
// tdisk.c ======================================================
typedef struct {
int32_t nfiles;
int64_t size;
int64_t free;
} SDiskMeta;
......@@ -53,34 +56,40 @@ typedef struct SDisk {
#define DISK_META(pd) ((pd)->dmeta)
#define DISK_SIZE(pd) ((pd)->dmeta.size)
#define DISK_FREE_SIZE(pd) ((pd)->dmeta.free)
#define DISK_NFILES(pd) ((pd)->dmeta.nfiles)
SDisk *tfsNewDisk(int level, int id, const char *dir);
SDisk *tfsFreeDisk(SDisk *pDisk);
void tfsUpdateDiskInfo(SDisk *pDisk);
int tfsUpdateDiskInfo(SDisk *pDisk);
// ttier.c ======================================================
typedef struct {
int64_t size;
int64_t free;
int16_t nAvailDisks; // # of Available disks
} STierMeta;
typedef struct STier {
int level;
int32_t ndisk;
STierMeta tmeta;
SDisk * disks[TSDB_MAX_DISKS_PER_TIER];
pthread_spinlock_t lock;
int level;
int16_t ndisk; // # of disks mounted to this tier
int16_t nextid; // next disk id to allocate
STierMeta tmeta;
SDisk * disks[TSDB_MAX_DISKS_PER_TIER];
} STier;
#define TIER_LEVEL(pt) ((pt)->level)
#define TIER_NDISKS(pt) ((pt)->ndisk)
#define TIER_SIZE(pt) ((pt)->tmeta.size)
#define TIER_FREE_SIZE(pt) ((pt)->tmeta.free)
#define TIER_AVAIL_DISKS(pt) ((pt)->tmeta.nAvailDisks)
#define DISK_AT_TIER(pt, id) ((pt)->disks[id])
void tfsInitTier(STier *pTier, int level);
int tfsInitTier(STier *pTier, int level);
void tfsDestroyTier(STier *pTier);
SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg);
void tfsUpdateTierInfo(STier *pTier);
void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta);
int tfsAllocDiskOnTier(STier *pTier);
void tfsGetTierMeta(STier *pTier, STierMeta *pTierMeta);
void tfsPosNextId(STier *pTier);
#ifdef __cplusplus
}
......
......@@ -39,7 +39,7 @@ SDisk *tfsFreeDisk(SDisk *pDisk) {
return NULL;
}
void tfsUpdateDiskInfo(SDisk *pDisk) {
int tfsUpdateDiskInfo(SDisk *pDisk) {
ASSERT(pDisk != NULL);
SysDiskSize dstat;
if (taosGetDiskSize(pDisk->dir, &dstat) < 0) {
......@@ -48,8 +48,10 @@ void tfsUpdateDiskInfo(SDisk *pDisk) {
terrno = TAOS_SYSTEM_ERROR(errno);
pDisk->dmeta.size = 0;
pDisk->dmeta.free = 0;
return -1;
} else {
pDisk->dmeta.size = dstat.tsize;
pDisk->dmeta.free = dstat.avail;
return 0;
}
}
\ No newline at end of file
......@@ -15,8 +15,8 @@
#include "os.h"
#include "taosdef.h"
#include "hash.h"
#include "taosdef.h"
#include "taoserror.h"
#include "tfs.h"
#include "tfsint.h"
......@@ -25,28 +25,20 @@
#pragma GCC diagnostic ignored "-Wformat-truncation"
typedef struct {
int64_t tsize;
int64_t avail;
} SFSMeta;
typedef struct {
pthread_mutex_t lock;
bool locked;
SFSMeta meta;
int nlevel;
STier tiers[TSDB_MAX_TIERS];
SHashObj * map; // name to did map
pthread_spinlock_t lock;
SFSMeta meta;
int nlevel;
STier tiers[TSDB_MAX_TIERS];
SHashObj * map; // name to did map
} SFS;
typedef struct {
SDisk *pDisk;
} SDiskIter;
#define TFS_LOCKED() (pfs->locked)
#define TFS_META() (pfs->meta)
#define TFS_NLEVEL() (pfs->nlevel)
#define TFS_TIERS() (pfs->tiers)
#define TFS_TIER_AT(level) (TFS_TIERS() + (level))
#define TFS_DISK_AT(level, id) DISK_AT_TIER(TFS_TIER_AT(level), id)
#define TFS_PRIMARY_DISK() TFS_DISK_AT(TFS_PRIMARY_LEVEL, TFS_PRIMARY_ID)
......@@ -54,7 +46,8 @@ typedef struct {
#define TFS_IS_VALID_ID(level, id) (((id) >= 0) && ((id) < TIER_NDISKS(TFS_TIER_AT(level))))
#define TFS_IS_VALID_DISK(level, id) (TFS_IS_VALID_LEVEL(level) && TFS_IS_VALID_ID(level, id))
#define TFS_MIN_DISK_FREE_SIZE 50*1024*1024
#define tfsLock() pthread_spin_lock(&(pfs->lock))
#define tfsUnLock() pthread_spin_unlock(&(pfs->lock))
static SFS tfs = {0};
static SFS *pfs = &tfs;
......@@ -66,27 +59,29 @@ static int tfsCheckAndFormatCfg(SDiskCfg *pCfg);
static int tfsFormatDir(char *idir, char *odir);
static SDisk *tfsGetDiskByID(SDiskID did);
static SDisk *tfsGetDiskByName(const char *dir);
static int tfsLock();
static int tfsUnLock();
static int tfsOpendirImpl(TDIR *tdir);
static void tfsInitDiskIter(SDiskIter *pIter);
static SDisk *tfsNextDisk(SDiskIter *pIter);
static int tfsAssignDisk(int level);
// FS APIs ====================================
int tfsInit(SDiskCfg *pDiskCfg, int ndisk) {
ASSERT(ndisk > 0);
for (int level = 0; level < TSDB_MAX_TIERS; level++) {
tfsInitTier(TFS_TIER_AT(level), level);
}
if (tfsInitTier(TFS_TIER_AT(level), level) < 0) {
while (true) {
level--;
if (level < 0) break;
int ret = pthread_mutex_init(&(pfs->lock), NULL);
if (ret != 0) {
terrno = TAOS_SYSTEM_ERROR(ret);
return -1;
tfsDestroyTier(TFS_TIER_AT(level));
}
return -1;
}
}
pthread_spin_init(&(pfs->lock), 0);
pfs->map = taosHashInit(TSDB_MAX_TIERS * TSDB_MAX_DISKS_PER_TIER * 2,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (pfs->map == NULL) {
......@@ -108,6 +103,9 @@ int tfsInit(SDiskCfg *pDiskCfg, int ndisk) {
}
tfsUpdateInfo();
for (int level = 0; level < TFS_NLEVEL(); level++) {
tfsPosNextId(TFS_TIER_AT(level));
}
return 0;
}
......@@ -115,44 +113,40 @@ int tfsInit(SDiskCfg *pDiskCfg, int ndisk) {
void tfsDestroy() {
taosHashCleanup(pfs->map);
pfs->map = NULL;
pthread_mutex_destroy(&(pfs->lock));
pthread_spin_destroy(&(pfs->lock));
for (int level = 0; level < TFS_NLEVEL(); level++) {
tfsDestroyTier(TFS_TIER_AT(level));
}
}
void tfsUpdateInfo() {
SFSMeta tmeta = {0};
void tfsUpdateInfo(SFSMeta *pFSMeta) {
SFSMeta fsMeta;
STierMeta tierMeta;
tfsLock();
if (pFSMeta == NULL) {
pFSMeta = &fsMeta;
}
memset(pFSMeta, 0, sizeof(*pFSMeta));
for (int level = 0; level < TFS_NLEVEL(); level++) {
STier *pTier = TFS_TIER_AT(level);
tfsUpdateTierInfo(pTier);
tmeta.tsize += TIER_SIZE(pTier);
tmeta.avail += TIER_FREE_SIZE(pTier);
tfsUpdateTierInfo(pTier, &tierMeta);
pFSMeta->tsize += tierMeta.size;
pFSMeta->avail += tierMeta.free;
}
pfs->meta = tmeta;
tfsUnLock();
}
int64_t tfsTotalSize() { return pfs->meta.tsize; }
int64_t tfsAvailSize() { return pfs->meta.avail; }
void tfsIncDiskFile(int level, int id, int num) {
tfsLock();
TFS_DISK_AT(level, id)->dmeta.nfiles += num;
pfs->meta = *pFSMeta;
tfsUnLock();
}
void tfsDecDiskFile(int level, int id, int num) {
void tfsGetMeta(SFSMeta *pMeta) {
ASSERT(pMeta);
tfsLock();
TFS_DISK_AT(level, id)->dmeta.nfiles -= num;
ASSERT(TFS_DISK_AT(level, id)->dmeta.nfiles >= 0);
*pMeta = pfs->meta;
tfsUnLock();
}
......@@ -163,12 +157,12 @@ void tfsAllocDisk(int expLevel, int *level, int *id) {
*id = TFS_UNDECIDED_ID;
if (*level > TFS_NLEVEL()) {
*level = TFS_NLEVEL();
*level = TFS_NLEVEL() - 1;
}
while (*level >= 0) {
*id = tfsAssignDisk(*level);
if (*id < 0) {
*id = tfsAllocDiskOnTier(TFS_TIER_AT(*level));
if (*id == TFS_UNDECIDED_ID) {
(*level)--;
continue;
}
......@@ -184,19 +178,15 @@ const char *TFS_PRIMARY_PATH() { return DISK_DIR(TFS_PRIMARY_DISK()); }
const char *TFS_DISK_PATH(int level, int id) { return DISK_DIR(TFS_DISK_AT(level, id)); }
// TFILE APIs ====================================
static void tfsSetFileAname(TFILE *pf) {
if (TFS_IS_VALID_DISK(pf->level, pf->id)) {
SDisk *pDisk = TFS_DISK_AT(pf->level, pf->id);
ASSERT(pDisk != NULL);
snprintf(pf->aname, TSDB_FILENAME_LEN, "%s/%s", DISK_DIR(pDisk), pf->rname);
}
}
void tfsInitFile(TFILE *pf, int level, int id, const char *bname) {
ASSERT(TFS_IS_VALID_DISK(level, id));
SDisk *pDisk = TFS_DISK_AT(level, id);
pf->level = level;
pf->id = id;
strncpy(pf->rname, bname, TSDB_FILENAME_LEN);
tfsSetFileAname(pf);
snprintf(pf->aname, TSDB_FILENAME_LEN, "%s/%s", DISK_DIR(pDisk), bname);
}
bool tfsIsSameFile(TFILE *pf1, TFILE *pf2) {
......@@ -206,96 +196,6 @@ bool tfsIsSameFile(TFILE *pf1, TFILE *pf2) {
return true;
}
void tfsSetLevel(TFILE *pf, int level) {
pf->level = level;
tfsSetFileAname(pf);
}
void tfsSetID(TFILE *pf, int id) {
pf->id = id;
tfsSetFileAname(pf);
}
int tfsopen(TFILE *pf, int flags) {
int fd = -1;
if (flags & O_CREAT) {
if (pf->level >= TFS_NLEVEL()) {
tfsSetLevel(pf, TFS_NLEVEL() - 1);
}
if (pf->id == TFS_UNDECIDED_ID) {
int id = tfsAssignDisk(pf->level);
if (id < 0) {
fError("failed to assign disk at level %d", pf->level);
return -1;
}
tfsSetID(pf, id);
}
tfsIncDiskFile(pf->level, pf->id, 1);
}
fd = open(pf->aname, flags, 0755);
if (fd < 0) {
fError("failed to open file %s since %s", pf->aname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return fd;
}
int tfsclose(int fd) {
int code = close(fd);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
int tfsremove(TFILE *pf) {
int code = remove(pf->aname);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
tfsDecDiskFile(pf->level, pf->id, 1);
return 0;
}
int tfscopy(TFILE *sf, TFILE *df) {
if (df->level >= TFS_NLEVEL()) {
tfsSetLevel(df, TFS_NLEVEL() - 1);
}
if (sf->level == df->level) {
terrno = TSDB_CODE_FS_INVLD_LEVEL;
return -1;
}
if (df->id == TFS_UNDECIDED_ID) {
int id = tfsAssignDisk(df->level);
if (id < 0) {
terrno = TSDB_CODE_FS_NO_VALID_DISK;
return -1;
}
tfsSetID(df, id);
}
tfsIncDiskFile(df->level, df->id, 1);
taosCopy(sf->aname, df->aname);
return 0;
}
void tfsbasename(const TFILE *pf, char *dest) {
char tname[TSDB_FILENAME_LEN] = "\0";
......@@ -428,31 +328,6 @@ void tfsClosedir(TDIR *tdir) {
}
}
// Static functions
static int tfsLock() {
int code = pthread_mutex_lock(&(pfs->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
pfs->locked = true;
return 0;
}
static int tfsUnLock() {
pfs->locked = false;
int code = pthread_mutex_unlock(&(pfs->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
// private
static int tfsMount(SDiskCfg *pCfg) {
SDiskID did;
......@@ -639,43 +514,16 @@ static SDisk *tfsNextDisk(SDiskIter *pIter) {
return pDisk;
}
static int tfsAssignDisk(int level) {
if (!TFS_IS_VALID_LEVEL(level)) return -1;
STier *pTier = TFS_TIER_AT(level);
int id = -1;
tfsLock();
for (int tid = 0; tid < TIER_NDISKS(pTier); tid++) {
SDisk *pDisk = DISK_AT_TIER(pTier, tid);
if (DISK_FREE_SIZE(pDisk) < TFS_MIN_DISK_FREE_SIZE) continue;
if (id == -1) {
id = tid;
continue;
}
if (DISK_NFILES(DISK_AT_TIER(pTier, id)) > DISK_NFILES(DISK_AT_TIER(pTier, tid))) {
id = tid;
}
}
tfsUnLock();
return id;
}
// OTHER FUNCTIONS ===================================
void taosGetDisk() {
const double unit = 1024 * 1024 * 1024;
SysDiskSize diskSize;
SFSMeta fsMeta;
if (tscEmbedded) {
tfsUpdateInfo();
tsTotalDataDirGB = (float)tfsTotalSize() / unit;
tsAvailDataDirGB = (float)tfsAvailSize() / unit;
tfsUpdateInfo(&fsMeta);
tsTotalDataDirGB = (float)fsMeta.tsize / unit;
tsAvailDataDirGB = (float)fsMeta.avail / unit;
}
if (taosGetDiskSize(tsLogDir, &diskSize)) {
......
......@@ -18,19 +18,39 @@
#include "taoserror.h"
#include "tfsint.h"
#define tfsLockTier(pTier) pthread_spin_lock(&((pTier)->lock))
#define tfsUnLockTier(pTier) pthread_spin_unlock(&((pTier)->lock))
// PROTECTED ==========================================
void tfsInitTier(STier *pTier, int level) { pTier->level = level; }
int tfsInitTier(STier *pTier, int level) {
memset((void *)pTier, 0, sizeof(*pTier));
int code = pthread_spin_init(&(pTier->lock), 0);
if (code) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
pTier->level = level;
return 0;
}
void tfsDestroyTier(STier *pTier) {
for (int id = 0; id < TSDB_MAX_DISKS_PER_TIER; id++) {
DISK_AT_TIER(pTier, id) = tfsFreeDisk(DISK_AT_TIER(pTier, id));
}
pTier->ndisk = 0;
pthread_spin_destroy(&(pTier->lock));
}
SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) {
ASSERT(pTier->level == pCfg->level);
int id = 0;
int id = 0;
SDisk *pDisk;
if (TIER_NDISKS(pTier) >= TSDB_MAX_DISKS_PER_TIER) {
terrno = TSDB_CODE_FS_TOO_MANY_MOUNT;
......@@ -55,8 +75,9 @@ SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) {
id = pTier->ndisk;
}
DISK_AT_TIER(pTier, id) = tfsNewDisk(pCfg->level, id, pCfg->dir);
if (DISK_AT_TIER(pTier, id) == NULL) return NULL;
pDisk = tfsNewDisk(pCfg->level, id, pCfg->dir);
if (pDisk == NULL) return NULL;
DISK_AT_TIER(pTier, id) = pDisk;
pTier->ndisk++;
fDebug("disk %s is mounted to tier level %d id %d", pCfg->dir, pCfg->level, id);
......@@ -64,14 +85,85 @@ SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) {
return DISK_AT_TIER(pTier, id);
}
void tfsUpdateTierInfo(STier *pTier) {
STierMeta tmeta = {0};
void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta) {
STierMeta tmeta;
if (pTierMeta == NULL) {
pTierMeta = &tmeta;
}
memset(pTierMeta, 0, sizeof(*pTierMeta));
tfsLockTier(pTier);
for (int id = 0; id < pTier->ndisk; id++) {
tfsUpdateDiskInfo(DISK_AT_TIER(pTier, id));
tmeta.size += DISK_SIZE(DISK_AT_TIER(pTier, id));
tmeta.free += DISK_FREE_SIZE(DISK_AT_TIER(pTier, id));
if (tfsUpdateDiskInfo(DISK_AT_TIER(pTier, id)) < 0) {
continue;
}
pTierMeta->size += DISK_SIZE(DISK_AT_TIER(pTier, id));
pTierMeta->free += DISK_FREE_SIZE(DISK_AT_TIER(pTier, id));
pTierMeta->nAvailDisks++;
}
pTier->tmeta = *pTierMeta;
tfsUnLockTier(pTier);
}
// Round-Robin to allocate disk on a tier
int tfsAllocDiskOnTier(STier *pTier) {
ASSERT(pTier->ndisk > 0);
int id = TFS_UNDECIDED_ID;
SDisk *pDisk;
tfsLockTier(pTier);
if (TIER_AVAIL_DISKS(pTier) <= 0) {
tfsUnLockTier(pTier);
return id;
}
id = pTier->nextid;
while (true) {
pDisk = DISK_AT_TIER(pTier, id);
ASSERT(pDisk != NULL);
if (DISK_FREE_SIZE(pDisk) < TFS_MIN_DISK_FREE_SIZE) {
id = (id + 1) % pTier->ndisk;
if (id == pTier->nextid) {
tfsUnLockTier(pTier);
return TFS_UNDECIDED_ID;
} else {
continue;
}
} else {
pTier->nextid = (id + 1) % pTier->ndisk;
break;
}
}
tfsUnLockTier(pTier);
return id;
}
void tfsGetTierMeta(STier *pTier, STierMeta *pTierMeta) {
ASSERT(pTierMeta != NULL);
tfsLockTier(pTier);
*pTierMeta = pTier->tmeta;
tfsUnLockTier(pTier);
}
void tfsPosNextId(STier *pTier) {
ASSERT(pTier->ndisk > 0);
int nextid = 0;
for (int id = 1; id < pTier->ndisk; id++) {
SDisk *pLDisk = DISK_AT_TIER(pTier, nextid);
SDisk *pDisk = DISK_AT_TIER(pTier, id);
if (DISK_FREE_SIZE(pDisk) > TFS_MIN_DISK_FREE_SIZE && DISK_FREE_SIZE(pDisk) > DISK_FREE_SIZE(pLDisk)) {
nextid = id;
}
}
pTier->tmeta = tmeta;
pTier->nextid = nextid;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册