提交 3a40080d 编写于 作者: H Hongze Cheng

partial work

上级 e82a6c84
...@@ -368,9 +368,11 @@ typedef struct { ...@@ -368,9 +368,11 @@ typedef struct {
#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t)) #define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
void tsdbInitDFileSet(SDFileSet* pSet, int vid, int fid, int ver, int level, int id); void tsdbInitDFileSet(SDFileSet* pSet, int vid, int fid, int ver, int level, int id);
void tsdbInitDFileSetWithOld(SDFileSet *pSet, SDFileSet *pOldSet);
int tsdbOpenDFileSet(SDFileSet* pSet, int flags); int tsdbOpenDFileSet(SDFileSet* pSet, int flags);
void tsdbCloseDFileSet(SDFileSet* pSet); void tsdbCloseDFileSet(SDFileSet* pSet);
int tsdbUpdateDFileSetHeader(SDFileSet* pSet); int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
int tsdbCopyDFileSet(SDFileSet* pFromSet, SDFileSet* pToSet);
/* Statistic information of the TSDB file system. /* Statistic information of the TSDB file system.
*/ */
......
...@@ -35,20 +35,6 @@ typedef struct { ...@@ -35,20 +35,6 @@ typedef struct {
SDataCols * pDataCols; SDataCols * pDataCols;
} SCommitH; } SCommitH;
static int tsdbCommitTSData(STsdbRepo *pRepo);
static int tsdbCommitMeta(STsdbRepo *pRepo);
static int tsdbStartCommit(STsdbRepo *pRepo);
static void tsdbEndCommit(STsdbRepo *pRepo, int eno);
static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch);
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
static void tsdbSeekCommitIter(SCommitIter *pIters, int nIters, TSKEY key);
static int tsdbInitCommitH(STsdbRepo *pRepo, SCommitH *pch);
static void tsdbDestroyCommitH(SCommitH *pch, int niter);
static void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn);
static int tsdbGetFidLevel(int fid, SRtn *pRtn);
void *tsdbCommitData(STsdbRepo *pRepo) { void *tsdbCommitData(STsdbRepo *pRepo) {
if (tsdbStartCommit(pRepo) < 0) { if (tsdbStartCommit(pRepo) < 0) {
tsdbError("vgId:%d failed to commit data while startting to commit since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to commit data while startting to commit since %s", REPO_ID(pRepo), tstrerror(terrno));
...@@ -84,39 +70,61 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) { ...@@ -84,39 +70,61 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) {
SCommitH ch = {0}; SCommitH ch = {0};
SFSIter fsIter = {0}; SFSIter fsIter = {0};
SDFileSet *pOldSet = NULL; SDFileSet *pOldSet = NULL;
SDFileSet nSet;
int level, id;
int fid; int fid;
if (pMem->numOfRows <= 0) return 0; if (pMem->numOfRows <= 0) return 0;
// Resource initialization
if (tsdbInitCommitH(pRepo, &ch) < 0) { if (tsdbInitCommitH(pRepo, &ch) < 0) {
// TODO
return -1; return -1;
} }
tsdbInitFSIter(pRepo, &fsIter);
// Skip expired memory data and expired FSET
tsdbSeekCommitIter(ch.iters, pMem->maxTables, ch.rtn.minKey); tsdbSeekCommitIter(ch.iters, pMem->maxTables, ch.rtn.minKey);
tsdbInitFSIter(pRepo, &fsIter);
pOldSet = tsdbFSIterNext(&fsIter);
fid = tsdbNextCommitFid(ch.iters, pMem->maxTables); fid = tsdbNextCommitFid(ch.iters, pMem->maxTables);
while (true) {
pOldSet = tsdbFSIterNext(&fsIter);
if (pOldSet == NULL || pOldSet->fid >= ch.rtn.minFid) break;
}
// Loop to commit to each file
while (true) { while (true) {
// Loop over both on disk and memory
if (pOldSet == NULL && fid == TSDB_IVLD_FID) break; if (pOldSet == NULL && fid == TSDB_IVLD_FID) break;
if (pOldSet == NULL || (fid != TSDB_IVLD_FID && pOldSet->fid > fid)) { // Only has existing FSET but no memory data to commit in this
ASSERT(fid >= ch.rtn.minFid); // existing FSET, only check if file in correct retention
// commit to new SDFileSet fid if (pOldSet && (fid == TSDB_IVLD_FID || pOldSet->fid < fid)) {
tsdbCommitToFile(pRepo, NULL, &ch, fid); if (tsdbApplyRtn(*pOldSet, &(ch.rtn), &nSet) < 0) {
fid = tsdbNextCommitFid(ch.iters, pMem->maxTables); return -1;
} else if (fid != TSDB_IVLD_FID && pOldSet->fid == fid) { }
ASSERT(fid >= ch.rtn.minFid);
// commit to fid with old SDFileSet tsdbUpdateDFileSet(pRepo, &nSet);
tsdbCommitToFile(pRepo, pOldSet, &ch, fid);
fid = tsdbNextCommitFid(ch.iters, pMem->maxTables);
pOldSet = tsdbFSIterNext(&fsIter); pOldSet = tsdbFSIterNext(&fsIter);
continue;
}
SDFileSet *pCSet;
int cfid;
if (pOldSet == NULL || pOldSet->fid > fid) {
// Commit to a new FSET with fid: fid
pCSet = NULL;
cfid = fid;
} else { } else {
// check if pOldSet need to be changed // Commit to an existing FSET
tsdbCommitToFile(pRepo, pOldSet, &ch, TSDB_IVLD_FID); pCSet = pOldSet;
pOldSet = tsdbFSIterNext(&fsIter) cfid = pOldSet->fid;
pOldSet = tsdbFSIterNext(&fsIter);
} }
fid = tsdbNextCommitFid(ch.iters, pMem->maxTables);
tsdbCommitToFile(pCSet, &ch, cfid);
} }
tsdbDestroyCommitH(&ch, pMem->maxTables); tsdbDestroyCommitH(&ch, pMem->maxTables);
...@@ -212,47 +220,60 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS ...@@ -212,47 +220,60 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS
return false; return false;
} }
static int tsdbCommitToFile(STsdbRepo *pRepo, SDFileSet *pOldSet, SCommitH *pch, int fid) { static int tsdbCommitToFile(SCommitH *pch, SDFileSet *pOldSet, int fid) {
SDFileSet rSet; int level, id;
SDFileSet wSet; int nSet, ver;
int level, id; STsdbRepo *pRepo;
// ASSERT(pOldSet != NULL || fid != TSDB_IVLD_FID); ASSERT(pOldSet == NULL || pOldSet->fid == fid);
// file should be deleted, do nothing and return tfsAllocDisk(tsdbGetFidLevel(fid, &(pch->rtn)), &level, &id);
if (pOldSet && pOldSet->fid < pch->rtn.minFid) { if (level == TFS_UNDECIDED_LEVEL) {
ASSERT(fid == TSDB_IVLD_FID); // TODO
return 0; return -1;
} }
if (pOldSet == NULL) { if (pOldSet == NULL || level > TSDB_FSET_LEVEL(pOldSet)) {
ASSERT(fid != TSDB_IVLD_FID); // Create new fset to commit
tsdbInitDFileSet(&nSet, pRepo, fid, ver, level, id);
tfsAllocDisk(tsdbGetFidLevel(fid, &(pch->rtn)), &level, &id); if (tsdbOpenDFileSet(&nSet, O_WRONLY | O_CREAT) < 0) {
if (level == TFS_UNDECIDED_LEVEL) { // TODO:
// terrno = TSDB_CODE_TDB_NO_INVALID_DISK;
return -1; return -1;
} }
// wSet here is the file to write, no read set if (tsdbUpdateDFileSetHeader(&nSet) < 0) {
tsdbInitDFileSet(&wSet, REPO_ID(pRepo), fid, 0 /*TODO*/, level, id); // TODO
} else {
tfsAllocDisk(tsdbGetFidLevel(pOldSet->fid, &(pch->rtn)), &level, &fid);
if (level == TFS_UNDECIDED_LEVEL) {
// terrno = TSDB_CODE_TDB_NO_INVALID_DISK;
return -1; return -1;
} }
} else {
level = TSDB_FSET_LEVEL(pOldSet);
if (level > TSDB_FSET_LEVEL(pOldSet)) { tsdbInitDFile(TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_HEAD), ...);
// wSet here is the file to write, pOldSet here is the read set
tsdbInitDFileSet(&wSet, REPO_ID(pRepo), fid, 0 /*TODO*/, level, id); tsdbInitDFileWithOld(TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_DATA), TSDB_DFILE_IN_SET(pOldSet, TSDB_FILE_DATA))
SDFile *pDFile = TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_LAST);
if (pDFile->info.size < 32 * 1024 * 1024) {
tsdbInitDFileWithOld(TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_LAST), TSDB_DFILE_IN_SET(pOldSet, TSDB_FILE_LAST))
} else { } else {
// get wSet with pOldSet tsdbInitDFile(TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_LAST), ...);
}
tsdbOpenDFileSet(&nSet, O_WRONLY | O_CREAT);
// TODO: update file header
}
tsdbSetCommitFile(pch, pOldSet, &nSet);
for (size_t tid = 0; tid < pMem->maxTables; tid++) {
SCommitIter *pIter = pch->iters + tid;
if (pIter->pTable == NULL) continue;
if (tsdbCommitToTable(pch, tid) < 0) {
// TODO
return -1;
} }
// if (level == TSDB_FSET_LEVEL(pOldSet)) {
// } else {
// // TODO
// }
} }
tsdbUpdateDFileSet(pRepo, &wSet); tsdbUpdateDFileSet(pRepo, &wSet);
...@@ -385,4 +406,55 @@ static int tsdbNextCommitFid(SCommitIter *iters, int niters) { ...@@ -385,4 +406,55 @@ static int tsdbNextCommitFid(SCommitIter *iters, int niters) {
// TODO // TODO
return fid; return fid;
}
static int tsdbApplyRtn(const SDFileSet oSet, const SRtn *pRtn, SDFileSet *pRSet) {
int level, id;
int vid, ver;
tfsAllocDisk(tsdbGetFidLevel(oSet.fid, pRtn), &level, &id);
if (level == TFS_UNDECIDED_LEVEL) {
// terrno = TSDB_CODE_TDB_NO_AVAILABLE_DISK;
return -1;
}
if (level > TSDB_FSET_LEVEL(pSet)) {
tsdbInitDFileSet(pRSet, vid, TSDB_FSET_FID(&oSet), ver, level, id);
if (tsdbCopyDFileSet(&oSet, pRSet) < 0) {
return -1;
}
} else {
tsdbInitDFileSetWithOld(pRSet, &oSet);
}
return 0;
}
static int tsdbCommitToTable(SCommitH *pch, int tid) {
SCommitIter *pIter = pch->iters + tid;
if (pIter->pTable == NULL) return 0;
TSDB_RLOCK_TABLE(pIter->pTable);
tsdbSetCommitTable(pch, pIter->pTable);
if (pIter->pIter == NULL && pch->readh.pBlockIdx == NULL) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return 0;
}
if (tsdbLoadBlockInfo(pch, NULL) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1;
}
// Loop to merge disk data and
while (true) {
// TODO
}
TSDB_RUNLOCK_TABLE(pIter->pTable);
return 0;
} }
\ No newline at end of file
...@@ -241,7 +241,12 @@ void tsdbInitDFileSet(SDFileSet *pSet, int vid, int fid, int ver, int level, int ...@@ -241,7 +241,12 @@ void tsdbInitDFileSet(SDFileSet *pSet, int vid, int fid, int ver, int level, int
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype); SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype);
tsdbInitDFile(pDFile, vid, fid, ver, level, id, NULL, ftype); tsdbInitDFile(pDFile, vid, fid, ver, level, id, NULL, ftype);
// TODO: reset level and id }
}
void tsdbInitDFileSetWithOld(SDFileSet *pSet, SDFileSet *pOldSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
tsdbInitDFileWithOld(TSDB_DFILE_IN_SET(pSet, ftype), TSDB_DFILE_IN_SET(pOldSet, ftype));
} }
} }
...@@ -267,7 +272,6 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) { ...@@ -267,7 +272,6 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) {
return 0; return 0;
} }
int tsdbMoveDFileSet(SDFileSet *pOldSet, SDFileSet *pNewSet) { int tsdbCopyDFileSet(SDFileSet *pFromSet, SDFileSet *pToSet) {
// TODO // return 0;
return 0;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册