提交 ed9709c3 编写于 作者: S Shengliang Guan

refact tfs module

上级 d6359f3a
...@@ -16,77 +16,226 @@ ...@@ -16,77 +16,226 @@
#ifndef _TD_TFS_H_ #ifndef _TD_TFS_H_
#define _TD_TFS_H_ #define _TD_TFS_H_
#include "tglobal.h" #include "tcfg.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
/* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct STfs STfs;
typedef struct STfsDir STfsDir;
typedef struct { typedef struct {
int32_t level; int32_t level;
int32_t id; int32_t id;
} SDiskID; } SDiskID;
#define TFS_UNDECIDED_LEVEL -1
#define TFS_UNDECIDED_ID -1
#define TFS_PRIMARY_LEVEL 0
#define TFS_PRIMARY_ID 0
#define TFS_MIN_LEVEL 0
#define TFS_MAX_LEVEL (TSDB_MAX_TIERS - 1)
// FS APIs ====================================
typedef struct { typedef struct {
int64_t total; SDiskID did;
int64_t used; char aname[TSDB_FILENAME_LEN]; // ABS name
int64_t avail; char rname[TSDB_FILENAME_LEN]; // REL name
} SFSMeta; STfs *pTfs;
} STfsFile;
int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk); /**
void tfsCleanup(); * @brief Open a fs.
void tfsUpdateSize(SFSMeta *pFSMeta); *
void tfsAllocDisk(int32_t expLevel, int32_t *level, int32_t *id); * @param pCfg Config of the fs.
* @param ndisk Length of the config.
* @return STfs* The fs object.
*/
STfs *tfsOpen(SDiskCfg *pCfg, int32_t ndisk);
const char *TFS_PRIMARY_PATH(); /**
const char *TFS_DISK_PATH(int32_t level, int32_t id); * @brief Close a fs.
*
* @param pTfs The fs object to close.
*/
void tfsClose(STfs *pTfs);
// TFILE APIs ==================================== /**
typedef struct { * @brief Update the disk size.
int32_t level; *
int32_t id; * @param pTfs The fs object.
char rname[TSDB_FILENAME_LEN]; // REL name */
char aname[TSDB_FILENAME_LEN]; // ABS name void tfsUpdateSize(STfs *pTfs);
} TFILE;
/**
#define TFILE_LEVEL(pf) ((pf)->level) * @brief Get the disk size.
#define TFILE_ID(pf) ((pf)->id) *
#define TFILE_NAME(pf) ((pf)->aname) * @param pTfs The fs object.
#define TFILE_REL_NAME(pf) ((pf)->rname) */
SDiskSize tfsGetSize(STfs *pTfs);
#define tfsopen(pf, flags) open(TFILE_NAME(pf), flags)
#define tfsclose(fd) close(fd) /**
#define tfsremove(pf) remove(TFILE_NAME(pf)) * @brief Allocate an existing available tier level from fs.
#define tfscopy(sf, df) taosCopyFile(TFILE_NAME(sf), TFILE_NAME(df)) *
#define tfsrename(sf, df) taosRename(TFILE_NAME(sf), TFILE_NAME(df)) * @param pTfs The fs object.
* @param expLevel Disk level want to allocate.
void tfsInitFile(TFILE *pf, int32_t level, int32_t id, const char *bname); * @param pDiskId The disk ID after allocation.
bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2); * @return int32_t 0 for success, -1 for failure.
int32_t tfsEncodeFile(void **buf, TFILE *pf); */
void *tfsDecodeFile(void *buf, TFILE *pf); int32_t tfsAllocDisk(STfs *pTfs, int32_t expLevel, SDiskID *pDiskId);
void tfsbasename(const TFILE *pf, char *dest);
void tfsdirname(const TFILE *pf, char *dest); /**
* @brief Get the primary path.
// DIR APIs ==================================== *
int32_t tfsMkdirAt(const char *rname, int32_t level, int32_t id); * @param pTfs The fs object.
int32_t tfsMkdirRecurAt(const char *rname, int32_t level, int32_t id); * @return const char * The primary path.
int32_t tfsMkdir(const char *rname); */
int32_t tfsRmdir(const char *rname); const char *tfsGetPrimaryPath(STfs *pTfs);
int32_t tfsRename(char *orname, char *nrname);
/**
typedef struct TDIR TDIR; * @brief Get the disk path.
*
TDIR *tfsOpendir(const char *rname); * @param pTfs The fs object.
const TFILE *tfsReaddir(TDIR *tdir); * @param diskId The diskId.
void tfsClosedir(TDIR *tdir); * @return const char * The primary path.
*/
const char *tfsGetDiskPath(STfs *pTfs, SDiskID diskId);
/**
* @brief Make directory at all levels in tfs.
*
* @param pTfs The fs object.
* @param rname The rel name of directory.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsMkdir(STfs *pTfs, const char *rname);
/**
* @brief Create directories in tfs.
*
* @param pTfs The fs object.
* @param rname The rel name of directory.
* @param diskId The disk ID.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsMkdirAt(STfs *pTfs, const char *rname, SDiskID diskId);
/**
* @brief Recursive create directories in tfs.
*
* @param pTfs The fs object.
* @param rname The rel name of directory.
* @param diskId The disk ID.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsMkdirRecurAt(STfs *pTfs, const char *rname, SDiskID diskId);
/**
* @brief Remove directory at all levels in tfs.
*
* @param pTfs The fs object.
* @param rname The rel name of directory.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsRmdir(STfs *pTfs, const char *rname);
/**
* @brief Rename file/directory in tfs.
*
* @param pTfs The fs object.
* @param orname The rel name of old file.
* @param nrname The rel name of new file.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsRename(STfs *pTfs, char *orname, char *nrname);
/**
* @brief Init file object in tfs.
*
* @param pTfs The fs object.
* @param pFile The file object.
* @param diskId The disk ID.
* @param rname The rel name of file.
*/
void tfsInitFile(STfs *pTfs, STfsFile *pFile, SDiskID diskId, const char *rname);
/**
* @brief Determine whether they are the same file.
*
* @param pFile1 The file object.
* @param pFile2 The file object.
* @param bool The compare result.
*/
bool tfsIsSameFile(const STfsFile *pFile1, const STfsFile *pFile2);
/**
* @brief Encode file name to a buffer.
*
* @param buf The buffer where file name are saved.
* @param pFile The file object.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsEncodeFile(void **buf, STfsFile *pFile);
/**
* @brief Decode file name from a buffer.
*
* @param pTfs The fs object.
* @param buf The buffer where file name are saved.
* @param pFile The file object.
* @return void * Buffer address after decode.
*/
void *tfsDecodeFile(STfs *pTfs, void *buf, STfsFile *pFile);
/**
* @brief Get the basename of the file.
*
* @param pFile The file object.
* @param dest The buffer where basename will be saved.
*/
void tfsBasename(const STfsFile *pFile, char *dest);
/**
* @brief Get the dirname of the file.
*
* @param pFile The file object.
* @param dest The buffer where dirname will be saved.
*/
void tfsDirname(const STfsFile *pFile, char *dest);
/**
* @brief Remove file in tfs.
*
* @param pFile The file to be removed.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsRemoveFile(const STfsFile *pFile);
/**
* @brief Copy file in tfs.
*
* @param pFile1 The src file.
* @param pFile2 The dest file.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsCopyFile(const STfsFile *pFile1, const STfsFile *pFile2);
/**
* @brief Open a directory for traversal.
*
* @param rname The rel name of file.
* @return STfsDir* The dir object.
*/
STfsDir *tfsOpendir(STfs *pTfs, const char *rname);
/**
* @brief Get a file from dir and move to next pos.
*
* @param pDir The dir object.
* @return STfsFile* The file in dir.
*/
const STfsFile *tfsReaddir(STfsDir *pDir);
/**
* @brief Close a directory.
*
* @param pDir The dir object.
*/
void tfsClosedir(STfsDir *pDir);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -372,10 +372,14 @@ do { \ ...@@ -372,10 +372,14 @@ do { \
#define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P #define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P
#define TSDB_MAX_TIERS 3 #define TFS_MAX_TIERS 3
#define TSDB_MAX_DISKS_PER_TIER 16 #define TFS_MAX_DISKS_PER_TIER 16
#define TSDB_MAX_DISKS (TSDB_MAX_TIERS * TSDB_MAX_DISKS_PER_TIER) #define TFS_MAX_DISKS (TFS_MAX_TIERS * TFS_MAX_DISKS_PER_TIER)
#define TFS_MIN_LEVEL 0
#define TFS_MAX_LEVEL (TFS_MAX_TIERS - 1)
#define TFS_PRIMARY_LEVEL 0
#define TFS_PRIMARY_ID 0
#define TFS_MIN_DISK_FREE_SIZE 50 * 1024 * 1024
enum { TRANS_STAT_INIT = 0, TRANS_STAT_EXECUTING, TRANS_STAT_EXECUTED, TRANS_STAT_ROLLBACKING, TRANS_STAT_ROLLBACKED }; enum { TRANS_STAT_INIT = 0, TRANS_STAT_EXECUTING, TRANS_STAT_EXECUTED, TRANS_STAT_ROLLBACKING, TRANS_STAT_ROLLBACKED };
enum { TRANS_OPER_INIT = 0, TRANS_OPER_EXECUTE, TRANS_OPER_ROLLBACK }; enum { TRANS_OPER_INIT = 0, TRANS_OPER_EXECUTE, TRANS_OPER_ROLLBACK };
......
...@@ -137,7 +137,7 @@ int32_t tsDiskCfgNum = 0; ...@@ -137,7 +137,7 @@ int32_t tsDiskCfgNum = 0;
#ifndef _STORAGE #ifndef _STORAGE
SDiskCfg tsDiskCfg[1]; SDiskCfg tsDiskCfg[1];
#else #else
SDiskCfg tsDiskCfg[TSDB_MAX_DISKS]; SDiskCfg tsDiskCfg[TFS_MAX_DISKS];
#endif #endif
/* /*
......
...@@ -134,6 +134,7 @@ typedef struct SDnode { ...@@ -134,6 +134,7 @@ typedef struct SDnode {
SBnodeMgmt bmgmt; SBnodeMgmt bmgmt;
SVnodesMgmt vmgmt; SVnodesMgmt vmgmt;
STransMgmt tmgmt; STransMgmt tmgmt;
STfs *pTfs;
SStartupReq startup; SStartupReq startup;
} SDnode; } SDnode;
......
...@@ -43,6 +43,7 @@ extern "C" { ...@@ -43,6 +43,7 @@ extern "C" {
#include "qnode.h" #include "qnode.h"
#include "snode.h" #include "snode.h"
#include "vnode.h" #include "vnode.h"
#include "tfs.h"
extern int32_t dDebugFlag; extern int32_t dDebugFlag;
......
...@@ -173,11 +173,12 @@ SDnode *dndCreate(SDnodeObjCfg *pCfg) { ...@@ -173,11 +173,12 @@ SDnode *dndCreate(SDnodeObjCfg *pCfg) {
return NULL; return NULL;
} }
SDiskCfg dCfg; SDiskCfg dCfg = {0};
strcpy(dCfg.dir, pDnode->cfg.dataDir); tstrncpy(dCfg.dir, pDnode->cfg.dataDir, TSDB_FILENAME_LEN);
dCfg.level = 0; dCfg.level = 0;
dCfg.primary = 1; dCfg.primary = 1;
if (tfsInit(&dCfg, 1) != 0) { pDnode->pTfs = tfsOpen(&dCfg, 1);
if (pDnode->pTfs == NULL) {
dError("failed to init tfs since %s", terrstr()); dError("failed to init tfs since %s", terrstr());
dndClose(pDnode); dndClose(pDnode);
return NULL; return NULL;
...@@ -251,7 +252,7 @@ void dndClose(SDnode *pDnode) { ...@@ -251,7 +252,7 @@ void dndClose(SDnode *pDnode) {
dndCleanupQnode(pDnode); dndCleanupQnode(pDnode);
dndCleanupVnodes(pDnode); dndCleanupVnodes(pDnode);
dndCleanupMgmt(pDnode); dndCleanupMgmt(pDnode);
tfsCleanup(); tfsClose(pDnode->pTfs);
dndCloseImp(pDnode); dndCloseImp(pDnode);
free(pDnode); free(pDnode);
...@@ -314,3 +315,27 @@ void dndCleanup() { ...@@ -314,3 +315,27 @@ void dndCleanup() {
taosStopCacheRefreshWorker(); taosStopCacheRefreshWorker();
dInfo("dnode env is cleaned up"); dInfo("dnode env is cleaned up");
} }
// OTHER FUNCTIONS ===================================
void taosGetDisk() {
#if 0
const double unit = 1024 * 1024 * 1024;
SDiskSize diskSize = tfsGetSize(pTfs);
tfsUpdateSize(&fsMeta);
tsTotalDataDirGB = (float)(fsMeta.total / unit);
tsUsedDataDirGB = (float)(fsMeta.used / unit);
tsAvailDataDirGB = (float)(fsMeta.avail / unit);
if (taosGetDiskSize(tsLogDir, &diskSize) == 0) {
tsTotalLogDirGB = (float)(diskSize.total / unit);
tsAvailLogDirGB = (float)(diskSize.avail / unit);
}
if (taosGetDiskSize(tsTempDir, &diskSize) == 0) {
tsTotalTmpDirGB = (float)(diskSize.total / unit);
tsAvailTmpDirectorySpace = (float)(diskSize.avail / unit);
}
#endif
}
\ No newline at end of file
...@@ -80,7 +80,7 @@ typedef struct { ...@@ -80,7 +80,7 @@ typedef struct {
} STableKeyInfo; } STableKeyInfo;
// STsdb // STsdb
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta); STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta, STfs *pTfs);
void tsdbClose(STsdb *); void tsdbClose(STsdb *);
void tsdbRemove(const char *path); void tsdbRemove(const char *path);
int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg, SSubmitRsp *pRsp); int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg, SSubmitRsp *pRsp);
......
...@@ -36,7 +36,8 @@ typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); ...@@ -36,7 +36,8 @@ typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq);
typedef struct SVnodeCfg { typedef struct SVnodeCfg {
int32_t vgId; int32_t vgId;
SDnode * pDnode; SDnode *pDnode;
STfs *pTfs;
uint64_t wsize; uint64_t wsize;
uint64_t ssize; uint64_t ssize;
uint64_t lsize; uint64_t lsize;
......
...@@ -50,6 +50,7 @@ struct STsdb { ...@@ -50,6 +50,7 @@ struct STsdb {
SMemAllocatorFactory *pmaf; SMemAllocatorFactory *pmaf;
STsdbFS * fs; STsdbFS * fs;
SMeta * pMeta; SMeta * pMeta;
STfs * pTfs;
}; };
#define REPO_ID(r) ((r)->vgId) #define REPO_ID(r) ((r)->vgId)
......
...@@ -55,7 +55,7 @@ typedef struct { ...@@ -55,7 +55,7 @@ typedef struct {
typedef struct { typedef struct {
SMFInfo info; SMFInfo info;
TFILE f; STfsFile f;
int fd; int fd;
uint8_t state; uint8_t state;
} SMFile; } SMFile;
...@@ -176,7 +176,7 @@ typedef struct { ...@@ -176,7 +176,7 @@ typedef struct {
typedef struct { typedef struct {
SDFInfo info; SDFInfo info;
TFILE f; STfsFile f;
int fd; int fd;
uint8_t state; uint8_t state;
} SDFile; } SDFile;
......
...@@ -78,6 +78,7 @@ struct SVnode { ...@@ -78,6 +78,7 @@ struct SVnode {
tsem_t canCommit; tsem_t canCommit;
SQHandle* pQuery; SQHandle* pQuery;
SDnode* pDnode; SDnode* pDnode;
STfs* pTfs;
}; };
int vnodeScheduleTask(SVnodeTask* task); int vnodeScheduleTask(SVnodeTask* task);
......
...@@ -97,8 +97,7 @@ int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) { ...@@ -97,8 +97,7 @@ int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
level = tsdbGetFidLevel(pSet->fid, pRtn); level = tsdbGetFidLevel(pSet->fid, pRtn);
tfsAllocDisk(level, &(did.level), &(did.id)); if (tfsAllocDisk(pRepo->pTfs, level, &did) < 0) {
if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
return -1; return -1;
} }
...@@ -456,8 +455,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid ...@@ -456,8 +455,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith); STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith); SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith);
tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &(did.level), &(did.id)); if (tfsAllocDisk(pRepo->pTfs, tsdbGetFidLevel(fid, &(pCommith->rtn)), &did) < 0) {
if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
return -1; return -1;
} }
......
...@@ -186,8 +186,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { ...@@ -186,8 +186,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
} }
} else { } else {
// Create new fset as compacted fset // Create new fset as compacted fset
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(pComph->rtn)), &(did.level), &(did.id)); if (tfsAllocDisk(pRepo->pTfs, tsdbGetFidLevel(pSet->fid, &(pComph->rtn)), &did) < 0) {
if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbCompactFSetEnd(pComph); tsdbCompactFSetEnd(pComph);
......
...@@ -23,14 +23,14 @@ static const char *tsdbTxnFname[] = {"current.t", "current"}; ...@@ -23,14 +23,14 @@ static const char *tsdbTxnFname[] = {"current.t", "current"};
static int tsdbComparFidFSet(const void *arg1, const void *arg2); static int tsdbComparFidFSet(const void *arg1, const void *arg2);
static void tsdbResetFSStatus(SFSStatus *pStatus); static void tsdbResetFSStatus(SFSStatus *pStatus);
static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid); static int tsdbSaveFSStatus(STfs *pTfs, SFSStatus *pStatus, int vid);
static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo); static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo);
static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]); static void tsdbGetTxnFname(STfs *pTfs, int repoid, TSDB_TXN_FILE_T ftype, char fname[]);
static int tsdbOpenFSFromCurrent(STsdb *pRepo); static int tsdbOpenFSFromCurrent(STsdb *pRepo);
static int tsdbScanAndTryFixFS(STsdb *pRepo); static int tsdbScanAndTryFixFS(STsdb *pRepo);
static int tsdbScanRootDir(STsdb *pRepo); static int tsdbScanRootDir(STsdb *pRepo);
static int tsdbScanDataDir(STsdb *pRepo); static int tsdbScanDataDir(STsdb *pRepo);
static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf); static bool tsdbIsTFileInFS(STsdbFS *pfs, const STfsFile *pf);
static int tsdbRestoreCurrent(STsdb *pRepo); static int tsdbRestoreCurrent(STsdb *pRepo);
static int tsdbComparTFILE(const void *arg1, const void *arg2); static int tsdbComparTFILE(const void *arg1, const void *arg2);
static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired); static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired);
...@@ -311,7 +311,7 @@ int tsdbOpenFS(STsdb *pRepo) { ...@@ -311,7 +311,7 @@ int tsdbOpenFS(STsdb *pRepo) {
ASSERT(pfs != NULL); ASSERT(pfs != NULL);
tsdbGetTxnFname(REPO_ID(pRepo), TSDB_TXN_CURR_FILE, current); tsdbGetTxnFname(pRepo->pTfs, REPO_ID(pRepo), TSDB_TXN_CURR_FILE, current);
tsdbGetRtnSnap(pRepo, &pRepo->rtn); tsdbGetRtnSnap(pRepo, &pRepo->rtn);
if (access(current, F_OK) == 0) { if (access(current, F_OK) == 0) {
...@@ -375,7 +375,7 @@ int tsdbEndFSTxn(STsdb *pRepo) { ...@@ -375,7 +375,7 @@ int tsdbEndFSTxn(STsdb *pRepo) {
SFSStatus *pStatus; SFSStatus *pStatus;
// Write current file system snapshot // Write current file system snapshot
if (tsdbSaveFSStatus(pfs->nstatus, REPO_ID(pRepo)) < 0) { if (tsdbSaveFSStatus(pRepo->pTfs, pfs->nstatus, REPO_ID(pRepo)) < 0) {
tsdbEndFSTxnWithError(pfs); tsdbEndFSTxnWithError(pfs);
return -1; return -1;
} }
...@@ -405,7 +405,7 @@ int tsdbEndFSTxnWithError(STsdbFS *pfs) { ...@@ -405,7 +405,7 @@ int tsdbEndFSTxnWithError(STsdbFS *pfs) {
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); } int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); }
static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) { static int tsdbSaveFSStatus(STfs *pTfs, SFSStatus *pStatus, int vid) {
SFSHeader fsheader; SFSHeader fsheader;
void * pBuf = NULL; void * pBuf = NULL;
void * ptr; void * ptr;
...@@ -413,8 +413,8 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) { ...@@ -413,8 +413,8 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) {
char tfname[TSDB_FILENAME_LEN] = "\0"; char tfname[TSDB_FILENAME_LEN] = "\0";
char cfname[TSDB_FILENAME_LEN] = "\0"; char cfname[TSDB_FILENAME_LEN] = "\0";
tsdbGetTxnFname(vid, TSDB_TXN_TEMP_FILE, tfname); tsdbGetTxnFname(pTfs, vid, TSDB_TXN_TEMP_FILE, tfname);
tsdbGetTxnFname(vid, TSDB_TXN_CURR_FILE, cfname); tsdbGetTxnFname(pTfs, vid, TSDB_TXN_CURR_FILE, cfname);
int fd = open(tfname, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0755); int fd = open(tfname, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0755);
if (fd < 0) { if (fd < 0) {
...@@ -645,8 +645,8 @@ static int tsdbComparFidFSet(const void *arg1, const void *arg2) { ...@@ -645,8 +645,8 @@ static int tsdbComparFidFSet(const void *arg1, const void *arg2) {
} }
} }
static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]) { static void tsdbGetTxnFname(STfs *pTfs, int repoid, TSDB_TXN_FILE_T ftype, char fname[]) {
snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", TFS_PRIMARY_PATH(), repoid, tsdbTxnFname[ftype]); snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", tfsGetPrimaryPath(pTfs), repoid, tsdbTxnFname[ftype]);
} }
static int tsdbOpenFSFromCurrent(STsdb *pRepo) { static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
...@@ -657,7 +657,7 @@ static int tsdbOpenFSFromCurrent(STsdb *pRepo) { ...@@ -657,7 +657,7 @@ static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
char current[TSDB_FILENAME_LEN] = "\0"; char current[TSDB_FILENAME_LEN] = "\0";
void * ptr; void * ptr;
tsdbGetTxnFname(REPO_ID(pRepo), TSDB_TXN_CURR_FILE, current); tsdbGetTxnFname(pRepo->pTfs, REPO_ID(pRepo), TSDB_TXN_CURR_FILE, current);
// current file exists, try to recover // current file exists, try to recover
fd = open(current, O_RDONLY | O_BINARY); fd = open(current, O_RDONLY | O_BINARY);
...@@ -910,17 +910,17 @@ static int tsdbScanRootDir(STsdb *pRepo) { ...@@ -910,17 +910,17 @@ static int tsdbScanRootDir(STsdb *pRepo) {
char rootDir[TSDB_FILENAME_LEN]; char rootDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN]; char bname[TSDB_FILENAME_LEN];
STsdbFS * pfs = REPO_FS(pRepo); STsdbFS * pfs = REPO_FS(pRepo);
const TFILE *pf; const STfsFile *pf;
tsdbGetRootDir(REPO_ID(pRepo), rootDir); tsdbGetRootDir(REPO_ID(pRepo), rootDir);
TDIR *tdir = tfsOpendir(rootDir); STfsDir *tdir = tfsOpendir(rootDir);
if (tdir == NULL) { if (tdir == NULL) {
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), rootDir, tstrerror(terrno)); tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), rootDir, tstrerror(terrno));
return -1; return -1;
} }
while ((pf = tfsReaddir(tdir))) { while ((pf = tfsReaddir(tdir))) {
tfsbasename(pf, bname); tfsBasename(pf, bname);
if (strcmp(bname, tsdbTxnFname[TSDB_TXN_CURR_FILE]) == 0 || strcmp(bname, "data") == 0) { if (strcmp(bname, tsdbTxnFname[TSDB_TXN_CURR_FILE]) == 0 || strcmp(bname, "data") == 0) {
// Skip current file and data directory // Skip current file and data directory
...@@ -944,17 +944,17 @@ static int tsdbScanDataDir(STsdb *pRepo) { ...@@ -944,17 +944,17 @@ static int tsdbScanDataDir(STsdb *pRepo) {
char dataDir[TSDB_FILENAME_LEN]; char dataDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN]; char bname[TSDB_FILENAME_LEN];
STsdbFS * pfs = REPO_FS(pRepo); STsdbFS * pfs = REPO_FS(pRepo);
const TFILE *pf; const STfsFile *pf;
tsdbGetDataDir(REPO_ID(pRepo), dataDir); tsdbGetDataDir(REPO_ID(pRepo), dataDir);
TDIR *tdir = tfsOpendir(dataDir); STfsDir *tdir = tfsOpendir(dataDir);
if (tdir == NULL) { if (tdir == NULL) {
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), dataDir, tstrerror(terrno)); tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), dataDir, tstrerror(terrno));
return -1; return -1;
} }
while ((pf = tfsReaddir(tdir))) { while ((pf = tfsReaddir(tdir))) {
tfsbasename(pf, bname); tfsBasename(pf, bname);
if (!tsdbIsTFileInFS(pfs, pf)) { if (!tsdbIsTFileInFS(pfs, pf)) {
(void)tfsremove(pf); (void)tfsremove(pf);
...@@ -967,7 +967,7 @@ static int tsdbScanDataDir(STsdb *pRepo) { ...@@ -967,7 +967,7 @@ static int tsdbScanDataDir(STsdb *pRepo) {
return 0; return 0;
} }
static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) { static bool tsdbIsTFileInFS(STsdbFS *pfs, const STfsFile *pf) {
SFSIter fsiter; SFSIter fsiter;
tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD); tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD);
SDFileSet *pSet; SDFileSet *pSet;
...@@ -987,8 +987,8 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) { ...@@ -987,8 +987,8 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
// static int tsdbRestoreMeta(STsdb *pRepo) { // static int tsdbRestoreMeta(STsdb *pRepo) {
// char rootDir[TSDB_FILENAME_LEN]; // char rootDir[TSDB_FILENAME_LEN];
// char bname[TSDB_FILENAME_LEN]; // char bname[TSDB_FILENAME_LEN];
// TDIR * tdir = NULL; // STfsDir * tdir = NULL;
// const TFILE *pf = NULL; // const STfsFile *pf = NULL;
// const char * pattern = "^meta(-ver[0-9]+)?$"; // const char * pattern = "^meta(-ver[0-9]+)?$";
// regex_t regex; // regex_t regex;
// STsdbFS * pfs = REPO_FS(pRepo); // STsdbFS * pfs = REPO_FS(pRepo);
...@@ -1007,7 +1007,7 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) { ...@@ -1007,7 +1007,7 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
// } // }
// while ((pf = tfsReaddir(tdir))) { // while ((pf = tfsReaddir(tdir))) {
// tfsbasename(pf, bname); // tfsBasename(pf, bname);
// if (strcmp(bname, "data") == 0) { // if (strcmp(bname, "data") == 0) {
// // Skip the data/ directory // // Skip the data/ directory
...@@ -1108,8 +1108,8 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) { ...@@ -1108,8 +1108,8 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
static int tsdbRestoreDFileSet(STsdb *pRepo) { static int tsdbRestoreDFileSet(STsdb *pRepo) {
char dataDir[TSDB_FILENAME_LEN]; char dataDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN]; char bname[TSDB_FILENAME_LEN];
TDIR * tdir = NULL; STfsDir * tdir = NULL;
const TFILE *pf = NULL; const STfsFile *pf = NULL;
const char * pattern = "^v[0-9]+f[0-9]+\\.(head|data|last)(-ver[0-9]+)?$"; const char * pattern = "^v[0-9]+f[0-9]+\\.(head|data|last)(-ver[0-9]+)?$";
SArray * fArray = NULL; SArray * fArray = NULL;
regex_t regex; regex_t regex;
...@@ -1120,7 +1120,7 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) { ...@@ -1120,7 +1120,7 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
// Resource allocation and init // Resource allocation and init
regcomp(&regex, pattern, REG_EXTENDED); regcomp(&regex, pattern, REG_EXTENDED);
fArray = taosArrayInit(1024, sizeof(TFILE)); fArray = taosArrayInit(1024, sizeof(STfsFile));
if (fArray == NULL) { if (fArray == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to restore DFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir, tsdbError("vgId:%d failed to restore DFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir,
...@@ -1139,7 +1139,7 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) { ...@@ -1139,7 +1139,7 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
} }
while ((pf = tfsReaddir(tdir))) { while ((pf = tfsReaddir(tdir))) {
tfsbasename(pf, bname); tfsBasename(pf, bname);
int code = regexec(&regex, bname, 0, NULL, 0); int code = regexec(&regex, bname, 0, NULL, 0);
if (code == 0) { if (code == 0) {
...@@ -1200,7 +1200,7 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) { ...@@ -1200,7 +1200,7 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
uint32_t tversion; uint32_t tversion;
char _bname[TSDB_FILENAME_LEN]; char _bname[TSDB_FILENAME_LEN];
tfsbasename(pf, _bname); tfsBasename(pf, _bname);
tsdbParseDFilename(_bname, &tvid, &tfid, &ttype, &tversion); tsdbParseDFilename(_bname, &tvid, &tfid, &ttype, &tversion);
ASSERT(tvid == REPO_ID(pRepo)); ASSERT(tvid == REPO_ID(pRepo));
...@@ -1287,7 +1287,7 @@ static int tsdbRestoreCurrent(STsdb *pRepo) { ...@@ -1287,7 +1287,7 @@ static int tsdbRestoreCurrent(STsdb *pRepo) {
return -1; return -1;
} }
if (tsdbSaveFSStatus(pRepo->fs->cstatus, REPO_ID(pRepo)) < 0) { if (tsdbSaveFSStatus(pRepo->pTfs, pRepo->fs->cstatus, REPO_ID(pRepo)) < 0) {
tsdbError("vgId:%d failed to restore corrent since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to restore corrent since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; return -1;
} }
...@@ -1296,8 +1296,8 @@ static int tsdbRestoreCurrent(STsdb *pRepo) { ...@@ -1296,8 +1296,8 @@ static int tsdbRestoreCurrent(STsdb *pRepo) {
} }
static int tsdbComparTFILE(const void *arg1, const void *arg2) { static int tsdbComparTFILE(const void *arg1, const void *arg2) {
TFILE *pf1 = (TFILE *)arg1; STfsFile *pf1 = (STfsFile *)arg1;
TFILE *pf2 = (TFILE *)arg2; STfsFile *pf2 = (STfsFile *)arg2;
int vid1, fid1, vid2, fid2; int vid1, fid1, vid2, fid2;
TSDB_FILE_T ftype1, ftype2; TSDB_FILE_T ftype1, ftype2;
...@@ -1305,8 +1305,8 @@ static int tsdbComparTFILE(const void *arg1, const void *arg2) { ...@@ -1305,8 +1305,8 @@ static int tsdbComparTFILE(const void *arg1, const void *arg2) {
char bname1[TSDB_FILENAME_LEN]; char bname1[TSDB_FILENAME_LEN];
char bname2[TSDB_FILENAME_LEN]; char bname2[TSDB_FILENAME_LEN];
tfsbasename(pf1, bname1); tfsBasename(pf1, bname1);
tfsbasename(pf2, bname2); tfsBasename(pf2, bname2);
tsdbParseDFilename(bname1, &vid1, &fid1, &ftype1, &version1); tsdbParseDFilename(bname1, &vid1, &fid1, &ftype1, &version1);
tsdbParseDFilename(bname2, &vid2, &fid2, &ftype2, &version2); tsdbParseDFilename(bname2, &vid2, &fid2, &ftype2, &version2);
......
...@@ -16,12 +16,12 @@ ...@@ -16,12 +16,12 @@
#include "tsdbDef.h" #include "tsdbDef.h"
static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF,
SMeta *pMeta); SMeta *pMeta, STfs *pTfs);
static void tsdbFree(STsdb *pTsdb); static void tsdbFree(STsdb *pTsdb);
static int tsdbOpenImpl(STsdb *pTsdb); static int tsdbOpenImpl(STsdb *pTsdb);
static void tsdbCloseImpl(STsdb *pTsdb); static void tsdbCloseImpl(STsdb *pTsdb);
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta) { STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta, STfs *pTfs) {
STsdb *pTsdb = NULL; STsdb *pTsdb = NULL;
// Set default TSDB Options // Set default TSDB Options
...@@ -36,7 +36,7 @@ STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAl ...@@ -36,7 +36,7 @@ STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAl
} }
// Create the handle // Create the handle
pTsdb = tsdbNew(path, vgId, pTsdbCfg, pMAF, pMeta); pTsdb = tsdbNew(path, vgId, pTsdbCfg, pMAF, pMeta, pTfs);
if (pTsdb == NULL) { if (pTsdb == NULL) {
// TODO: handle error // TODO: handle error
return NULL; return NULL;
...@@ -64,7 +64,7 @@ void tsdbRemove(const char *path) { taosRemoveDir(path); } ...@@ -64,7 +64,7 @@ void tsdbRemove(const char *path) { taosRemoveDir(path); }
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF,
SMeta *pMeta) { SMeta *pMeta, STfs *pTfs) {
STsdb *pTsdb = NULL; STsdb *pTsdb = NULL;
pTsdb = (STsdb *)calloc(1, sizeof(STsdb)); pTsdb = (STsdb *)calloc(1, sizeof(STsdb));
...@@ -78,6 +78,7 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, ...@@ -78,6 +78,7 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg,
tsdbOptionsCopy(&(pTsdb->config), pTsdbCfg); tsdbOptionsCopy(&(pTsdb->config), pTsdbCfg);
pTsdb->pmaf = pMAF; pTsdb->pmaf = pMAF;
pTsdb->pMeta = pMeta; pTsdb->pMeta = pMeta;
pTsdb->pTfs = pTfs;
pTsdb->fs = tsdbNewFS(pTsdbCfg); pTsdb->fs = tsdbNewFS(pTsdbCfg);
...@@ -494,7 +495,7 @@ uint32_t tsdbGetFileInfo(STsdbRepo *repo, char *name, uint32_t *index, uint32_t ...@@ -494,7 +495,7 @@ uint32_t tsdbGetFileInfo(STsdbRepo *repo, char *name, uint32_t *index, uint32_t
} }
} else { // get the named file at the specified index. If not there, return 0 } else { // get the named file at the specified index. If not there, return 0
fname = malloc(256); fname = malloc(256);
sprintf(fname, "%s/vnode/vnode%d/%s", TFS_PRIMARY_PATH(), REPO_ID(pRepo), name); sprintf(fname, "%s/vnode/vnode%d/%s", tfsGetPrimaryPath(pRepo->pTfs), REPO_ID(pRepo), name);
if (access(fname, F_OK) != 0) { if (access(fname, F_OK) != 0) {
tfree(fname); tfree(fname);
return 0; return 0;
......
...@@ -75,6 +75,7 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) { ...@@ -75,6 +75,7 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) {
pVnode->vgId = pVnodeCfg->vgId; pVnode->vgId = pVnodeCfg->vgId;
pVnode->pDnode = pVnodeCfg->pDnode; pVnode->pDnode = pVnodeCfg->pDnode;
pVnode->pTfs = pVnodeCfg->pTfs;
pVnode->path = strdup(path); pVnode->path = strdup(path);
vnodeOptionsCopy(&(pVnode->config), pVnodeCfg); vnodeOptionsCopy(&(pVnode->config), pVnodeCfg);
...@@ -109,7 +110,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { ...@@ -109,7 +110,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// Open tsdb // Open tsdb
sprintf(dir, "%s/tsdb", pVnode->path); sprintf(dir, "%s/tsdb", pVnode->path);
pVnode->pTsdb = tsdbOpen(dir, pVnode->vgId, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta); pVnode->pTsdb = tsdbOpen(dir, pVnode->vgId, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs);
if (pVnode->pTsdb == NULL) { if (pVnode->pTsdb == NULL) {
// TODO: handle error // TODO: handle error
return -1; return -1;
......
...@@ -33,28 +33,73 @@ extern int32_t fsDebugFlag; ...@@ -33,28 +33,73 @@ extern int32_t fsDebugFlag;
#define fError(...) { if (fsDebugFlag & DEBUG_ERROR) { taosPrintLog("TFS ERROR ", 255, __VA_ARGS__); }} #define fError(...) { if (fsDebugFlag & DEBUG_ERROR) { taosPrintLog("TFS ERROR ", 255, __VA_ARGS__); }}
#define fWarn(...) { if (fsDebugFlag & DEBUG_WARN) { taosPrintLog("TFS WARN ", 255, __VA_ARGS__); }} #define fWarn(...) { if (fsDebugFlag & DEBUG_WARN) { taosPrintLog("TFS WARN ", 255, __VA_ARGS__); }}
#define fInfo(...) { if (fsDebugFlag & DEBUG_INFO) { taosPrintLog("TFS ", 255, __VA_ARGS__); }} #define fInfo(...) { if (fsDebugFlag & DEBUG_INFO) { taosPrintLog("TFS ", 255, __VA_ARGS__); }}
#define fDebug(...) { if (fsDebugFlag & DEBUG_DEBUG) { taosPrintLog("TFS ", cqDebugFlag, __VA_ARGS__); }} #define fDebug(...) { if (fsDebugFlag & DEBUG_DEBUG) { taosPrintLog("TFS ", fsDebugFlag, __VA_ARGS__); }}
#define fTrace(...) { if (fsDebugFlag & DEBUG_TRACE) { taosPrintLog("TFS ", cqDebugFlag, __VA_ARGS__); }} #define fTrace(...) { if (fsDebugFlag & DEBUG_TRACE) { taosPrintLog("TFS ", fsDebugFlag, __VA_ARGS__); }}
// Global Definitions typedef struct {
#define TFS_MIN_DISK_FREE_SIZE 50 * 1024 * 1024
typedef struct SDisk {
int32_t level; int32_t level;
int32_t id; int32_t id;
char *path; char *path;
SDiskSize size; SDiskSize size;
} SDisk; } STfsDisk;
typedef struct STier { typedef struct {
pthread_spinlock_t lock; pthread_spinlock_t lock;
int32_t level; int32_t level;
int16_t nextid; // next disk id to allocate int32_t nextid; // next disk id to allocate
int16_t ndisk; // # of disks mounted to this tier int32_t ndisk; // # of disks mounted to this tier
int16_t nAvailDisks; // # of Available disks int32_t nAvailDisks; // # of Available disks
SDisk *disks[TSDB_MAX_DISKS_PER_TIER]; STfsDisk *disks[TFS_MAX_DISKS_PER_TIER];
SDiskSize size;
} STfsTier;
typedef struct {
STfsDisk *pDisk;
} SDiskIter;
typedef struct STfsDir {
SDiskIter *iter;
SDiskID did;
char dirname[TSDB_FILENAME_LEN];
STfsFile tfile;
DIR *dir;
STfs *pTfs;
} STfsDir;
typedef struct STfs {
pthread_spinlock_t lock;
SDiskSize size; SDiskSize size;
} STier; int32_t nlevel;
STfsTier tiers[TFS_MAX_TIERS];
SHashObj *hash; // name to did map
} STfs;
STfsDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir);
STfsDisk *tfsFreeDisk(STfsDisk *pDisk);
int32_t tfsUpdateDiskSize(STfsDisk *pDisk);
int32_t tfsInitTier(STfsTier *pTier, int32_t level);
void tfsDestroyTier(STfsTier *pTier);
STfsDisk *tfsMountDiskToTier(STfsTier *pTier, SDiskCfg *pCfg);
void tfsUpdateTierSize(STfsTier *pTier);
int32_t tfsAllocDiskOnTier(STfsTier *pTier);
void tfsPosNextId(STfsTier *pTier);
#define tfsLockTier(pTier) pthread_spin_lock(&(pTier)->lock)
#define tfsUnLockTier(pTier) pthread_spin_unlock(&(pTier)->lock)
#define TMPNAME_LEN (TSDB_FILENAME_LEN * 2 + 32)
#define tfsLock(pTfs) pthread_spin_lock(&(pTfs)->lock)
#define tfsUnLock(pTfs) pthread_spin_unlock(&(pTfs)->lock)
#define TFS_TIER_AT(pTfs, level) (&pTfs->tiers[level])
#define TFS_DISK_AT(pTfs, did) (pTfs->tiers[(did).level].disks[(did).id])
#define TFS_PRIMARY_DISK(pTfs) (pTfs->tiers[0].disks[0])
#define TFS_IS_VALID_LEVEL(level) (((level) >= 0) && ((level) < TFS_NLEVEL()))
#define TFS_IS_VALID_ID(level, id) (((id) >= 0) && ((id) < TIER_NDISKS(TFS_TIER_AT(level))))
#define TFS_IS_VALID_DISK(level, id) (TFS_IS_VALID_LEVEL(level) && TFS_IS_VALID_ID(level, id))
#define TIER_LEVEL(pt) ((pt)->level) #define TIER_LEVEL(pt) ((pt)->level)
#define TIER_NDISKS(pt) ((pt)->ndisk) #define TIER_NDISKS(pt) ((pt)->ndisk)
...@@ -64,17 +109,6 @@ typedef struct STier { ...@@ -64,17 +109,6 @@ typedef struct STier {
#define DISK_AT_TIER(pt, id) ((pt)->disks[id]) #define DISK_AT_TIER(pt, id) ((pt)->disks[id])
#define DISK_DIR(pd) ((pd)->path) #define DISK_DIR(pd) ((pd)->path)
SDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir);
SDisk *tfsFreeDisk(SDisk *pDisk);
int32_t tfsUpdateDiskSize(SDisk *pDisk);
int32_t tfsInitTier(STier *pTier, int32_t level);
void tfsDestroyTier(STier *pTier);
SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg);
void tfsUpdateTierSize(STier *pTier);
int32_t tfsAllocDiskOnTier(STier *pTier);
void tfsPosNextId(STier *pTier);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
此差异已折叠。
...@@ -16,8 +16,8 @@ ...@@ -16,8 +16,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tfsInt.h" #include "tfsInt.h"
SDisk *tfsNewDisk(int32_t level, int32_t id, const char *path) { STfsDisk *tfsNewDisk(int32_t level, int32_t id, const char *path) {
SDisk *pDisk = calloc(1, sizeof(SDisk)); STfsDisk *pDisk = calloc(1, sizeof(STfsDisk));
if (pDisk == NULL) { if (pDisk == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
...@@ -36,7 +36,7 @@ SDisk *tfsNewDisk(int32_t level, int32_t id, const char *path) { ...@@ -36,7 +36,7 @@ SDisk *tfsNewDisk(int32_t level, int32_t id, const char *path) {
return pDisk; return pDisk;
} }
SDisk *tfsFreeDisk(SDisk *pDisk) { STfsDisk *tfsFreeDisk(STfsDisk *pDisk) {
if (pDisk != NULL) { if (pDisk != NULL) {
free(pDisk->path); free(pDisk->path);
free(pDisk); free(pDisk);
...@@ -45,7 +45,7 @@ SDisk *tfsFreeDisk(SDisk *pDisk) { ...@@ -45,7 +45,7 @@ SDisk *tfsFreeDisk(SDisk *pDisk) {
return NULL; return NULL;
} }
int32_t tfsUpdateDiskSize(SDisk *pDisk) { int32_t tfsUpdateDiskSize(STfsDisk *pDisk) {
if (taosGetDiskSize(pDisk->path, &pDisk->size) != 0) { if (taosGetDiskSize(pDisk->path, &pDisk->size) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
fError("failed to get disk:%s size, level:%d id:%d since %s", pDisk->path, pDisk->level, pDisk->id, terrstr()); fError("failed to get disk:%s size, level:%d id:%d since %s", pDisk->path, pDisk->level, pDisk->id, terrstr());
......
...@@ -16,11 +16,8 @@ ...@@ -16,11 +16,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tfsInt.h" #include "tfsInt.h"
#define tfsLockTier(pTier) pthread_spin_lock(&(pTier)->lock) int32_t tfsInitTier(STfsTier *pTier, int32_t level) {
#define tfsUnLockTier(pTier) pthread_spin_unlock(&(pTier)->lock) memset(pTier, 0, sizeof(STfsTier));
int32_t tfsInitTier(STier *pTier, int32_t level) {
memset(pTier, 0, sizeof(STier));
if (pthread_spin_init(&pTier->lock, 0) != 0) { if (pthread_spin_init(&pTier->lock, 0) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -31,17 +28,17 @@ int32_t tfsInitTier(STier *pTier, int32_t level) { ...@@ -31,17 +28,17 @@ int32_t tfsInitTier(STier *pTier, int32_t level) {
return 0; return 0;
} }
void tfsDestroyTier(STier *pTier) { void tfsDestroyTier(STfsTier *pTier) {
for (int32_t id = 0; id < TSDB_MAX_DISKS_PER_TIER; id++) { for (int32_t id = 0; id < TFS_MAX_DISKS_PER_TIER; id++) {
pTier->disks[id] = tfsFreeDisk(pTier->disks[id]); pTier->disks[id] = tfsFreeDisk(pTier->disks[id]);
} }
pTier->ndisk = 0; pTier->ndisk = 0;
pthread_spin_destroy(&(pTier->lock)); pthread_spin_destroy(&pTier->lock);
} }
SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) { STfsDisk *tfsMountDiskToTier(STfsTier *pTier, SDiskCfg *pCfg) {
if (pTier->ndisk >= TSDB_MAX_DISKS_PER_TIER) { if (pTier->ndisk >= TFS_MAX_DISKS_PER_TIER) {
terrno = TSDB_CODE_FS_TOO_MANY_MOUNT; terrno = TSDB_CODE_FS_TOO_MANY_MOUNT;
return NULL; return NULL;
} }
...@@ -61,12 +58,12 @@ SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) { ...@@ -61,12 +58,12 @@ SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) {
id = pTier->ndisk; id = pTier->ndisk;
} }
if (id >= TSDB_MAX_DISKS_PER_TIER) { if (id >= TFS_MAX_DISKS_PER_TIER) {
terrno = TSDB_CODE_FS_TOO_MANY_MOUNT; terrno = TSDB_CODE_FS_TOO_MANY_MOUNT;
return NULL; return NULL;
} }
SDisk *pDisk = tfsNewDisk(pCfg->level, id, pCfg->dir); STfsDisk *pDisk = tfsNewDisk(pCfg->level, id, pCfg->dir);
if (pDisk == NULL) return NULL; if (pDisk == NULL) return NULL;
pTier->disks[id] = pDisk; pTier->disks[id] = pDisk;
...@@ -76,14 +73,14 @@ SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) { ...@@ -76,14 +73,14 @@ SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) {
return pTier->disks[id]; return pTier->disks[id];
} }
void tfsUpdateTierSize(STier *pTier) { void tfsUpdateTierSize(STfsTier *pTier) {
SDiskSize size = {0}; SDiskSize size = {0};
int16_t nAvailDisks = 0; int32_t nAvailDisks = 0;
tfsLockTier(pTier); tfsLockTier(pTier);
for (int32_t id = 0; id < pTier->ndisk; id++) { for (int32_t id = 0; id < pTier->ndisk; id++) {
SDisk *pDisk = pTier->disks[id]; STfsDisk *pDisk = pTier->disks[id];
if (pDisk == NULL) continue; if (pDisk == NULL) continue;
size.total += pDisk->size.total; size.total += pDisk->size.total;
...@@ -99,7 +96,7 @@ void tfsUpdateTierSize(STier *pTier) { ...@@ -99,7 +96,7 @@ void tfsUpdateTierSize(STier *pTier) {
} }
// Round-Robin to allocate disk on a tier // Round-Robin to allocate disk on a tier
int32_t tfsAllocDiskOnTier(STier *pTier) { int32_t tfsAllocDiskOnTier(STfsTier *pTier) {
terrno = TSDB_CODE_FS_NO_VALID_DISK; terrno = TSDB_CODE_FS_NO_VALID_DISK;
tfsLockTier(pTier); tfsLockTier(pTier);
...@@ -110,9 +107,9 @@ int32_t tfsAllocDiskOnTier(STier *pTier) { ...@@ -110,9 +107,9 @@ int32_t tfsAllocDiskOnTier(STier *pTier) {
} }
int32_t retId = -1; int32_t retId = -1;
for (int32_t id = 0; id < TSDB_MAX_DISKS_PER_TIER; ++id) { for (int32_t id = 0; id < TFS_MAX_DISKS_PER_TIER; ++id) {
int32_t diskId = (pTier->nextid + id) % pTier->ndisk; int32_t diskId = (pTier->nextid + id) % pTier->ndisk;
SDisk *pDisk = pTier->disks[diskId]; STfsDisk *pDisk = pTier->disks[diskId];
if (pDisk == NULL) continue; if (pDisk == NULL) continue;
...@@ -128,12 +125,12 @@ int32_t tfsAllocDiskOnTier(STier *pTier) { ...@@ -128,12 +125,12 @@ int32_t tfsAllocDiskOnTier(STier *pTier) {
return retId; return retId;
} }
void tfsPosNextId(STier *pTier) { void tfsPosNextId(STfsTier *pTier) {
int32_t nextid = 0; int32_t nextid = 0;
for (int32_t id = 1; id < pTier->ndisk; id++) { for (int32_t id = 1; id < pTier->ndisk; id++) {
SDisk *pLDisk = pTier->disks[nextid]; STfsDisk *pLDisk = pTier->disks[nextid];
SDisk *pDisk = pTier->disks[id]; STfsDisk *pDisk = pTier->disks[id];
if (pDisk->size.avail > TFS_MIN_DISK_FREE_SIZE && pDisk->size.avail > pLDisk->size.avail) { if (pDisk->size.avail > TFS_MIN_DISK_FREE_SIZE && pDisk->size.avail > pLDisk->size.avail) {
nextid = id; nextid = id;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册