提交 c3e222bf 编写于 作者: S Shengliang Guan

refact tfs module

上级 ec66739d
...@@ -381,7 +381,7 @@ static void *dnodeOpenVnodeFunc(void *param) { ...@@ -381,7 +381,7 @@ static void *dnodeOpenVnodeFunc(void *param) {
pMgmt->openVnodes, pMgmt->totalVnodes); pMgmt->openVnodes, pMgmt->totalVnodes);
dndReportStartup(pDnode, "open-vnodes", stepDesc); dndReportStartup(pDnode, "open-vnodes", stepDesc);
SVnodeCfg cfg = {.pDnode = pDnode, .vgId = pCfg->vgId}; SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pDnode->pTfs, .vgId = pCfg->vgId};
SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); SVnode *pImpl = vnodeOpen(pCfg->path, &cfg);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
...@@ -587,6 +587,7 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { ...@@ -587,6 +587,7 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
} }
vnodeCfg.pDnode = pDnode; vnodeCfg.pDnode = pDnode;
vnodeCfg.pTfs = pDnode->pTfs;
SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg); SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr()); dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr());
......
...@@ -23,9 +23,9 @@ static const char *tsdbTxnFname[] = {"current.t", "current"}; ...@@ -23,9 +23,9 @@ static const char *tsdbTxnFname[] = {"current.t", "current"};
static int tsdbComparFidFSet(const void *arg1, const void *arg2); static int tsdbComparFidFSet(const void *arg1, const void *arg2);
static void tsdbResetFSStatus(SFSStatus *pStatus); static void tsdbResetFSStatus(SFSStatus *pStatus);
static int tsdbSaveFSStatus(STfs *pTfs, SFSStatus *pStatus, int vid); static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus);
static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo); static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo);
static void tsdbGetTxnFname(STfs *pTfs, int repoid, TSDB_TXN_FILE_T ftype, char fname[]); static void tsdbGetTxnFname(STsdb *pRepo, TSDB_TXN_FILE_T ftype, char fname[]);
static int tsdbOpenFSFromCurrent(STsdb *pRepo); static int tsdbOpenFSFromCurrent(STsdb *pRepo);
static int tsdbScanAndTryFixFS(STsdb *pRepo); static int tsdbScanAndTryFixFS(STsdb *pRepo);
static int tsdbScanRootDir(STsdb *pRepo); static int tsdbScanRootDir(STsdb *pRepo);
...@@ -311,7 +311,7 @@ int tsdbOpenFS(STsdb *pRepo) { ...@@ -311,7 +311,7 @@ int tsdbOpenFS(STsdb *pRepo) {
ASSERT(pfs != NULL); ASSERT(pfs != NULL);
tsdbGetTxnFname(pRepo->pTfs, REPO_ID(pRepo), TSDB_TXN_CURR_FILE, current); tsdbGetTxnFname(pRepo, TSDB_TXN_CURR_FILE, current);
tsdbGetRtnSnap(pRepo, &pRepo->rtn); tsdbGetRtnSnap(pRepo, &pRepo->rtn);
if (access(current, F_OK) == 0) { if (access(current, F_OK) == 0) {
...@@ -375,7 +375,7 @@ int tsdbEndFSTxn(STsdb *pRepo) { ...@@ -375,7 +375,7 @@ int tsdbEndFSTxn(STsdb *pRepo) {
SFSStatus *pStatus; SFSStatus *pStatus;
// Write current file system snapshot // Write current file system snapshot
if (tsdbSaveFSStatus(pRepo->pTfs, pfs->nstatus, REPO_ID(pRepo)) < 0) { if (tsdbSaveFSStatus(pRepo, pfs->nstatus) < 0) {
tsdbEndFSTxnWithError(pfs); tsdbEndFSTxnWithError(pfs);
return -1; return -1;
} }
...@@ -405,7 +405,7 @@ int tsdbEndFSTxnWithError(STsdbFS *pfs) { ...@@ -405,7 +405,7 @@ int tsdbEndFSTxnWithError(STsdbFS *pfs) {
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); } int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); }
static int tsdbSaveFSStatus(STfs *pTfs, SFSStatus *pStatus, int vid) { static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus) {
SFSHeader fsheader; SFSHeader fsheader;
void * pBuf = NULL; void * pBuf = NULL;
void * ptr; void * ptr;
...@@ -413,8 +413,8 @@ static int tsdbSaveFSStatus(STfs *pTfs, SFSStatus *pStatus, int vid) { ...@@ -413,8 +413,8 @@ static int tsdbSaveFSStatus(STfs *pTfs, SFSStatus *pStatus, int vid) {
char tfname[TSDB_FILENAME_LEN] = "\0"; char tfname[TSDB_FILENAME_LEN] = "\0";
char cfname[TSDB_FILENAME_LEN] = "\0"; char cfname[TSDB_FILENAME_LEN] = "\0";
tsdbGetTxnFname(pTfs, vid, TSDB_TXN_TEMP_FILE, tfname); tsdbGetTxnFname(pRepo, TSDB_TXN_TEMP_FILE, tfname);
tsdbGetTxnFname(pTfs, vid, TSDB_TXN_CURR_FILE, cfname); tsdbGetTxnFname(pRepo, TSDB_TXN_CURR_FILE, cfname);
int fd = open(tfname, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0755); int fd = open(tfname, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0755);
if (fd < 0) { if (fd < 0) {
...@@ -645,8 +645,9 @@ static int tsdbComparFidFSet(const void *arg1, const void *arg2) { ...@@ -645,8 +645,9 @@ static int tsdbComparFidFSet(const void *arg1, const void *arg2) {
} }
} }
static void tsdbGetTxnFname(STfs *pTfs, int repoid, TSDB_TXN_FILE_T ftype, char fname[]) { static void tsdbGetTxnFname(STsdb *pRepo, TSDB_TXN_FILE_T ftype, char fname[]) {
snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", tfsGetPrimaryPath(pTfs), repoid, tsdbTxnFname[ftype]); snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", tfsGetPrimaryPath(pRepo->pTfs), pRepo->vgId,
tsdbTxnFname[ftype]);
} }
static int tsdbOpenFSFromCurrent(STsdb *pRepo) { static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
...@@ -657,7 +658,7 @@ static int tsdbOpenFSFromCurrent(STsdb *pRepo) { ...@@ -657,7 +658,7 @@ static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
char current[TSDB_FILENAME_LEN] = "\0"; char current[TSDB_FILENAME_LEN] = "\0";
void * ptr; void * ptr;
tsdbGetTxnFname(pRepo->pTfs, REPO_ID(pRepo), TSDB_TXN_CURR_FILE, current); tsdbGetTxnFname(pRepo, TSDB_TXN_CURR_FILE, current);
// current file exists, try to recover // current file exists, try to recover
fd = open(current, O_RDONLY | O_BINARY); fd = open(current, O_RDONLY | O_BINARY);
...@@ -1287,7 +1288,7 @@ static int tsdbRestoreCurrent(STsdb *pRepo) { ...@@ -1287,7 +1288,7 @@ static int tsdbRestoreCurrent(STsdb *pRepo) {
return -1; return -1;
} }
if (tsdbSaveFSStatus(pRepo->pTfs, pRepo->fs->cstatus, REPO_ID(pRepo)) < 0) { if (tsdbSaveFSStatus(pRepo, pRepo->fs->cstatus) < 0) {
tsdbError("vgId:%d failed to restore corrent since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to restore corrent since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; return -1;
} }
......
...@@ -21,7 +21,8 @@ static void tsdbFree(STsdb *pTsdb); ...@@ -21,7 +21,8 @@ static void tsdbFree(STsdb *pTsdb);
static int tsdbOpenImpl(STsdb *pTsdb); static int tsdbOpenImpl(STsdb *pTsdb);
static void tsdbCloseImpl(STsdb *pTsdb); static void tsdbCloseImpl(STsdb *pTsdb);
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta, STfs *pTfs) { STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta,
STfs *pTfs) {
STsdb *pTsdb = NULL; STsdb *pTsdb = NULL;
// Set default TSDB Options // Set default TSDB Options
......
...@@ -28,6 +28,7 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { ...@@ -28,6 +28,7 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
if (pVnodeCfg != NULL) { if (pVnodeCfg != NULL) {
cfg.vgId = pVnodeCfg->vgId; cfg.vgId = pVnodeCfg->vgId;
cfg.pDnode = pVnodeCfg->pDnode; cfg.pDnode = pVnodeCfg->pDnode;
cfg.pTfs = pVnodeCfg->pTfs;
} }
// Validate options // Validate options
......
...@@ -58,12 +58,12 @@ typedef struct { ...@@ -58,12 +58,12 @@ typedef struct {
} SDiskIter; } SDiskIter;
typedef struct STfsDir { typedef struct STfsDir {
SDiskIter *iter; SDiskIter iter;
SDiskID did; SDiskID did;
char dirname[TSDB_FILENAME_LEN]; char dirname[TSDB_FILENAME_LEN];
STfsFile tfile; STfsFile tfile;
DIR *dir; DIR *dir;
STfs *pTfs; STfs *pTfs;
} STfsDir; } STfsDir;
typedef struct STfs { typedef struct STfs {
......
...@@ -152,7 +152,6 @@ void tfsInitFile(STfs *pTfs, STfsFile *pFile, SDiskID diskId, const char *rname) ...@@ -152,7 +152,6 @@ void tfsInitFile(STfs *pTfs, STfsFile *pFile, SDiskID diskId, const char *rname)
} }
bool tfsIsSameFile(const STfsFile *pFile1, const STfsFile *pFile2) { bool tfsIsSameFile(const STfsFile *pFile1, const STfsFile *pFile2) {
ASSERT(pFile1 != NULL || pFile2 != NULL);
if (pFile1 == NULL || pFile2 == NULL || pFile1->pTfs != pFile2->pTfs) return false; if (pFile1 == NULL || pFile2 == NULL || pFile1->pTfs != pFile2->pTfs) return false;
if (pFile1->did.level != pFile2->did.level) return false; if (pFile1->did.level != pFile2->did.level) return false;
if (pFile1->did.id != pFile2->did.id) return false; if (pFile1->did.id != pFile2->did.id) return false;
...@@ -308,7 +307,7 @@ STfsDir *tfsOpendir(STfs *pTfs, const char *rname) { ...@@ -308,7 +307,7 @@ STfsDir *tfsOpendir(STfs *pTfs, const char *rname) {
} }
SDiskID diskId = {.id = 0, .level = 0}; SDiskID diskId = {.id = 0, .level = 0};
pDir->iter->pDisk = TFS_DISK_AT(pTfs, diskId); pDir->iter.pDisk = TFS_DISK_AT(pTfs, diskId);
pDir->pTfs = pTfs; pDir->pTfs = pTfs;
tstrncpy(pDir->dirname, rname, TSDB_FILENAME_LEN); tstrncpy(pDir->dirname, rname, TSDB_FILENAME_LEN);
...@@ -331,7 +330,7 @@ const STfsFile *tfsReaddir(STfsDir *pDir) { ...@@ -331,7 +330,7 @@ const STfsFile *tfsReaddir(STfsDir *pDir) {
// Skip . and .. // Skip . and ..
if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0) continue; if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0) continue;
snprintf(bname, TMPNAME_LEN * 2, "%s/%s", pDir->dirname, dp->d_name); snprintf(bname, TMPNAME_LEN * 2, "%s%s%s", pDir->dirname, TD_DIRSEP, dp->d_name);
tfsInitFile(pDir->pTfs, &pDir->tfile, pDir->did, bname); tfsInitFile(pDir->pTfs, &pDir->tfile, pDir->did, bname);
return &pDir->tfile; return &pDir->tfile;
} }
...@@ -496,7 +495,7 @@ static int32_t tfsOpendirImpl(STfs *pTfs, STfsDir *pDir) { ...@@ -496,7 +495,7 @@ static int32_t tfsOpendirImpl(STfs *pTfs, STfsDir *pDir) {
} }
while (true) { while (true) {
pDisk = tfsNextDisk(pTfs, pDir->iter); pDisk = tfsNextDisk(pTfs, &pDir->iter);
if (pDisk == NULL) return 0; if (pDisk == NULL) return 0;
pDir->did.level = pDisk->level; pDir->did.level = pDisk->level;
...@@ -514,7 +513,9 @@ static STfsDisk *tfsNextDisk(STfs *pTfs, SDiskIter *pIter) { ...@@ -514,7 +513,9 @@ static STfsDisk *tfsNextDisk(STfs *pTfs, SDiskIter *pIter) {
if (pIter == NULL) return NULL; if (pIter == NULL) return NULL;
STfsDisk *pDisk = pIter->pDisk; STfsDisk *pDisk = pIter->pDisk;
SDiskID did = {.level = pDisk->level, .id = pDisk->id + 1}; if (pDisk == NULL) return NULL;
SDiskID did = {.level = pDisk->level, .id = pDisk->id + 1};
if (did.id < TFS_TIER_AT(pTfs, did.level)->ndisk) { if (did.id < TFS_TIER_AT(pTfs, did.level)->ndisk) {
pIter->pDisk = TFS_DISK_AT(pTfs, did); pIter->pDisk = TFS_DISK_AT(pTfs, did);
......
...@@ -46,7 +46,7 @@ STfsDisk *tfsFreeDisk(STfsDisk *pDisk) { ...@@ -46,7 +46,7 @@ STfsDisk *tfsFreeDisk(STfsDisk *pDisk) {
} }
int32_t tfsUpdateDiskSize(STfsDisk *pDisk) { int32_t tfsUpdateDiskSize(STfsDisk *pDisk) {
if (taosGetDiskSize(pDisk->path, &pDisk->size) != 0) { if (taosGetDiskSize(pDisk->path, &pDisk->size) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
fError("failed to get disk:%s size, level:%d id:%d since %s", pDisk->path, pDisk->level, pDisk->id, terrstr()); fError("failed to get disk:%s size, level:%d id:%d since %s", pDisk->path, pDisk->level, pDisk->id, terrstr());
return -1; return -1;
......
...@@ -82,6 +82,7 @@ void tfsUpdateTierSize(STfsTier *pTier) { ...@@ -82,6 +82,7 @@ void tfsUpdateTierSize(STfsTier *pTier) {
for (int32_t id = 0; id < pTier->ndisk; id++) { for (int32_t id = 0; id < pTier->ndisk; id++) {
STfsDisk *pDisk = pTier->disks[id]; STfsDisk *pDisk = pTier->disks[id];
if (pDisk == NULL) continue; if (pDisk == NULL) continue;
if (tfsUpdateDiskSize(pDisk) < 0) continue;
size.total += pDisk->size.total; size.total += pDisk->size.total;
size.used += pDisk->size.used; size.used += pDisk->size.used;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册