提交 09cb05fb 编写于 作者: H Hongze Cheng

refactor

上级 c9e7117b
...@@ -120,15 +120,19 @@ void tfsDestroy() { ...@@ -120,15 +120,19 @@ void tfsDestroy() {
} }
void tfsUpdateInfo() { void tfsUpdateInfo() {
SFSMeta tmeta = {0};
tfsLock(); tfsLock();
for (int level = 0; level < TFS_NLEVEL(); level++) { for (int level = 0; level < TFS_NLEVEL(); level++) {
STier *pTier = TFS_TIER_AT(level); STier *pTier = TFS_TIER_AT(level);
tfsUpdateTierInfo(pTier); tfsUpdateTierInfo(pTier);
pfs->meta.tsize = TIER_SIZE(pTier); tmeta.tsize += TIER_SIZE(pTier);
pfs->meta.avail = TIER_FREE_SIZE(pTier); tmeta.avail += TIER_FREE_SIZE(pTier);
} }
pfs->meta = tmeta;
tfsUnLock(); tfsUnLock();
} }
......
...@@ -512,7 +512,7 @@ STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg); ...@@ -512,7 +512,7 @@ STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg);
void tsdbFreeFileH(STsdbFileH* pFileH); void tsdbFreeFileH(STsdbFileH* pFileH);
int tsdbOpenFileH(STsdbRepo* pRepo); int tsdbOpenFileH(STsdbRepo* pRepo);
void tsdbCloseFileH(STsdbRepo* pRepo); void tsdbCloseFileH(STsdbRepo* pRepo);
SFileGroup* tsdbCreateFGroup(STsdbRepo* pRepo, int fid); SFileGroup *tsdbCreateFGroup(STsdbRepo *pRepo, int fid, int level);
void tsdbInitFileGroupIter(STsdbFileH* pFileH, SFileGroupIter* pIter, int direction); void tsdbInitFileGroupIter(STsdbFileH* pFileH, SFileGroupIter* pIter, int direction);
void tsdbSeekFileGroupIter(SFileGroupIter* pIter, int fid); void tsdbSeekFileGroupIter(SFileGroupIter* pIter, int fid);
SFileGroup* tsdbGetFileGroupNext(SFileGroupIter* pIter); SFileGroup* tsdbGetFileGroupNext(SFileGroupIter* pIter);
......
...@@ -14,14 +14,23 @@ ...@@ -14,14 +14,23 @@
*/ */
#include "tsdbMain.h" #include "tsdbMain.h"
static int tsdbCommitTSData(STsdbRepo *pRepo); typedef struct {
static int tsdbCommitMeta(STsdbRepo *pRepo); SFidGroup fidg;
static void tsdbEndCommit(STsdbRepo *pRepo, int eno); SCommitIter *iters;
static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey); SRWHelper whelper;
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols); SDataCols * pDataCols;
} SCommitH;
static int tsdbCommitTSData(STsdbRepo *pRepo);
static int tsdbCommitMeta(STsdbRepo *pRepo);
static void tsdbEndCommit(STsdbRepo *pRepo, int eno);
static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch);
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo); static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables); static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
static void tsdbSeekCommitIter(SCommitIter *pIters, int nIters, TSKEY key); static void tsdbSeekCommitIter(SCommitIter *pIters, int nIters, TSKEY key);
static int tsdbInitCommitH(STsdbRepo *pRepo, SCommitH *pch);
static void tsdbDestroyCommitH(SCommitH *pch, int niter);
void *tsdbCommitData(STsdbRepo *pRepo) { void *tsdbCommitData(STsdbRepo *pRepo) {
SMemTable * pMem = pRepo->imem; SMemTable * pMem = pRepo->imem;
...@@ -58,68 +67,45 @@ _err: ...@@ -58,68 +67,45 @@ _err:
} }
static int tsdbCommitTSData(STsdbRepo *pRepo) { static int tsdbCommitTSData(STsdbRepo *pRepo) {
SMemTable * pMem = pRepo->imem; SMemTable *pMem = pRepo->imem;
SDataCols * pDataCols = NULL; SCommitH ch = {0};
STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbCfg * pCfg = &(pRepo->config);
SCommitIter *iters = NULL; // SFidGroup fidGroup = {0};
SRWHelper whelper = {0}; TSKEY minKey = 0;
STsdbCfg * pCfg = &(pRepo->config); TSKEY maxKey = 0;
SFidGroup fidGroup = {0};
TSKEY minKey = 0;
TSKEY maxKey = 0;
if (pMem->numOfRows <= 0) return 0; if (pMem->numOfRows <= 0) return 0;
tsdbGetFidGroup(pCfg, &fidGroup); tsdbGetFidGroup(pCfg, &(ch.fidg));
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fidGroup.minFid, &minKey, &maxKey); tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, ch.fidg.minFid, &minKey, &maxKey);
tsdbRemoveFilesBeyondRetention(pRepo, &fidGroup); tsdbRemoveFilesBeyondRetention(pRepo, &(ch.fidg));
iters = tsdbCreateCommitIters(pRepo); if (tsdbInitCommitH(pRepo, &ch) < 0) {
if (iters == NULL) {
tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
if (tsdbInitWriteHelper(&whelper, pRepo) < 0) {
tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
goto _err; goto _err;
} }
int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision)); int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision));
int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision)); int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision));
tsdbSeekCommitIter(iters, pMem->maxTables, minKey); tsdbSeekCommitIter(ch.iters, pMem->maxTables, minKey);
// Loop to commit to each file // Loop to commit to each file
for (int fid = sfid; fid <= efid; fid++) { for (int fid = sfid; fid <= efid; fid++) {
if (fid < fidGroup.minFid) continue; if (fid < ch.fidg.minFid) continue;
if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) { if (tsdbCommitToFile(pRepo, fid, &(ch)) < 0) {
tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _err; goto _err;
} }
} }
tsdbApplyRetention(pRepo, &fidGroup); tsdbApplyRetention(pRepo, &(ch.fidg));
tdFreeDataCols(pDataCols);
tsdbDestroyCommitIters(iters, pMem->maxTables);
tsdbDestroyHelper(&whelper);
tsdbDestroyCommitH(&ch, pMem->maxTables);
return 0; return 0;
_err: _err:
tdFreeDataCols(pDataCols); tsdbDestroyCommitH(&ch, pMem->maxTables);
tsdbDestroyCommitIters(iters, pMem->maxTables);
tsdbDestroyHelper(&whelper);
return -1; return -1;
} }
...@@ -184,14 +170,17 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS ...@@ -184,14 +170,17 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS
return false; return false;
} }
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) { static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch) {
STsdbCfg * pCfg = &pRepo->config; STsdbCfg * pCfg = &pRepo->config;
STsdbFileH *pFileH = pRepo->tsdbFileH; STsdbFileH * pFileH = pRepo->tsdbFileH;
SFileGroup *pGroup = NULL; SFileGroup * pGroup = NULL;
SMemTable * pMem = pRepo->imem; SMemTable * pMem = pRepo->imem;
bool newLast = false; bool newLast = false;
TSKEY minKey = 0; TSKEY minKey = 0;
TSKEY maxKey = 0; TSKEY maxKey = 0;
SCommitIter *iters = pch->iters;
SRWHelper * pHelper = &(pch->whelper);
SDataCols * pDataCols = pch->pDataCols;
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
...@@ -352,4 +341,35 @@ static void tsdbSeekCommitIter(SCommitIter *pIters, int nIters, TSKEY key) { ...@@ -352,4 +341,35 @@ static void tsdbSeekCommitIter(SCommitIter *pIters, int nIters, TSKEY key) {
tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, key-1, INT32_MAX, NULL, NULL, 0, true, NULL); tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, key-1, INT32_MAX, NULL, NULL, 0, true, NULL);
} }
}
static int tsdbInitCommitH(STsdbRepo *pRepo, SCommitH *pch) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
STsdbCfg * pCfg = &(pRepo->config);
pch->iters = tsdbCreateCommitIters(pRepo);
if (pch->iters == NULL) {
tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
if (tsdbInitWriteHelper(&(pch->whelper), pRepo) < 0) {
tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
if ((pch->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
return -1;
}
return 0;
}
static void tsdbDestroyCommitH(SCommitH *pch, int niter) {
tdFreeDataCols(pch->pDataCols);
tsdbDestroyCommitIters(pch->iters, niter);
tsdbDestroyHelper(&(pch->whelper));
} }
\ No newline at end of file
...@@ -62,69 +62,15 @@ void tsdbFreeFileH(STsdbFileH *pFileH) { ...@@ -62,69 +62,15 @@ void tsdbFreeFileH(STsdbFileH *pFileH) {
} }
} }
int tsdbOpenFileH(STsdbRepo *pRepo) { // TODO int tsdbOpenFileH(STsdbRepo *pRepo) {
ASSERT(pRepo != NULL && pRepo->tsdbFileH != NULL); ASSERT(pRepo != NULL && pRepo->tsdbFileH != NULL);
char dataDir[TSDB_FILENAME_LEN] = "\0";
// 1. scan and get all files corresponds // TODO
TDIR *tdir = NULL;
char fname[TSDB_FILENAME_LEN] = "\0";
regex_t regex = {0};
int code = 0;
int vid = 0;
int fid = 0;
const TFILE *pfile = NULL;
code = regcomp(&regex, "^v[0-9]+f[0-9]+\\.(head|data|last|h|d|l)$", REG_EXTENDED);
if (code != 0) {
// TODO: deal the error
}
snprintf(dataDir, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data", REPO_ID(pRepo));
tdir = tfsOpendir(dataDir);
if (tdir == NULL) {
// TODO: deal the error
}
while ((pfile = tfsReaddir(tdir)) != NULL) {
tfsBaseName(pfile, fname);
if (strcmp(fname, ".") == 0 || strcmp(fname, "..") == 0) continue;
code = regexec(&regex, fname, 0, NULL, 0);
if (code == 0) {
sscanf(fname, "v%df%d", &vid, &fid);
if (vid != REPO_ID(pRepo)) {
tfsAbsName(pfile, fname);
tsdbError("vgId:%d invalid file %s exists, ignore", REPO_ID(pRepo), fname);
continue;
}
// TODO
{}
} else if (code == REG_NOMATCH) {
tfsAbsName(pfile, fname);
tsdbWarn("vgId:%d unrecognizable file %s exists, ignore", REPO_ID(pRepo), fname);
continue;
} else {
tsdbError("vgId:%d regexec failed since %s", REPO_ID(pRepo), strerror(code));
// TODO: deal with error
}
}
// 2. Sort all files according to fid
// 3. Recover all files of each fid
while (true) {
// TODO
}
return 0; return 0;
} }
void tsdbCloseFileH(STsdbRepo *pRepo) { // TODO void tsdbCloseFileH(STsdbRepo *pRepo) {
STsdbFileH *pFileH = pRepo->tsdbFileH; STsdbFileH *pFileH = pRepo->tsdbFileH;
for (int i = 0; i < pFileH->nFGroups; i++) { for (int i = 0; i < pFileH->nFGroups; i++) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册