提交 705310a9 编写于 作者: H Hongze Cheng

refact commit

上级 95d94887
...@@ -28,6 +28,8 @@ typedef struct { ...@@ -28,6 +28,8 @@ typedef struct {
int niters; // memory iterators int niters; // memory iterators
SCommitIter *iters; SCommitIter *iters;
bool isRFileSet; // read and commit FSET bool isRFileSet; // read and commit FSET
int32_t fid;
SDFileSet *pSet;
SReadH readh; SReadH readh;
SDFileSet wSet; SDFileSet wSet;
bool isDFileSame; bool isDFileSame;
...@@ -71,7 +73,6 @@ static void tsdbDestroyCommitH(SCommitH *pCommith); ...@@ -71,7 +73,6 @@ static void tsdbDestroyCommitH(SCommitH *pCommith);
static int32_t tsdbCreateCommitIters(SCommitH *pCommith); static int32_t tsdbCreateCommitIters(SCommitH *pCommith);
static void tsdbDestroyCommitIters(SCommitH *pCommith); static void tsdbDestroyCommitIters(SCommitH *pCommith);
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static void tsdbResetCommitFile(SCommitH *pCommith);
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid); static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static int tsdbCommitToTable(SCommitH *pCommith, int tid); static int tsdbCommitToTable(SCommitH *pCommith, int tid);
static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx); static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx);
...@@ -404,34 +405,73 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) { ...@@ -404,34 +405,73 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) {
tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith)); tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith));
} }
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { static int32_t tsdbCommitToFileStart(SCommitH *pCHandle, SDFileSet *pSet, int32_t fid) {
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); int32_t code = 0;
STsdb *pRepo = TSDB_COMMIT_REPO(pCHandle);
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo); STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
ASSERT(pSet == NULL || pSet->fid == fid); ASSERT(pSet == NULL || pSet->fid == fid);
tsdbResetCommitFile(pCommith); pCHandle->fid = fid;
tsdbGetFidKeyRange(pCfg->days, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey)); pCHandle->pSet = pSet;
pCHandle->isRFileSet = false;
pCHandle->isDFileSame = false;
pCHandle->isLFileSame = false;
taosArrayClear(pCHandle->aBlkIdx);
tsdbGetFidKeyRange(pCfg->days, pCfg->precision, fid, &(pCHandle->minKey), &(pCHandle->maxKey));
code = tsdbSetAndOpenCommitFile(pCHandle, pSet, fid);
return code;
}
static int32_t tsdbCommitToFileImpl(SCommitH *pCHandle) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitToFileEnd(SCommitH *pCommith) {
int32_t code = 0;
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
// Set and open files if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) <
if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) { 0) {
tsdbError("vgId:%d, failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), pCommith->fid,
tstrerror(terrno));
tsdbCloseCommitFile(pCommith, true);
// revert the file change
tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pCommith->pSet);
return -1; return -1;
} }
#if 0
// Loop to commit each table data
for (int tid = 0; tid < pCommith->niters; tid++) {
SCommitIter *pIter = pCommith->iters + tid;
if (pIter->pTable == NULL) continue; if (tsdbUpdateDFileSetHeader(&(pCommith->wSet)) < 0) {
tsdbError("vgId:%d, failed to update FSET %d header since %s", REPO_ID(pRepo), pCommith->fid, tstrerror(terrno));
tsdbCloseCommitFile(pCommith, true);
// revert the file change
tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pCommith->pSet);
return -1;
}
if (tsdbCommitToTable(pCommith, tid) < 0) { // Close commit file
tsdbCloseCommitFile(pCommith, true); tsdbCloseCommitFile(pCommith, false);
// revert the file change
tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); if (tsdbUpdateDFileSet(REPO_FS(pRepo), &(pCommith->wSet)) < 0) {
return -1; return -1;
}
} }
#endif
return code;
}
static int32_t tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
int32_t code = 0;
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
// commit to file start
code = tsdbCommitToFileStart(pCommith, pSet, fid);
if (code) {
goto _err;
}
// Loop to commit each table data in mem and file // Loop to commit each table data in mem and file
int mIter = 0, fIter = 0; int mIter = 0, fIter = 0;
int nBlkIdx = taosArrayGetSize(pCommith->readh.aBlkIdx); int nBlkIdx = taosArrayGetSize(pCommith->readh.aBlkIdx);
...@@ -476,31 +516,16 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { ...@@ -476,31 +516,16 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
} }
} }
if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) < // commit to file end
0) { code = tsdbCommitToFileEnd(pCommith);
tsdbError("vgId:%d, failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); if (code) {
tsdbCloseCommitFile(pCommith, true); goto _err;
// revert the file change
tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
return -1;
}
if (tsdbUpdateDFileSetHeader(&(pCommith->wSet)) < 0) {
tsdbError("vgId:%d, failed to update FSET %d header since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
tsdbCloseCommitFile(pCommith, true);
// revert the file change
tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
return -1;
} }
// Close commit file return code;
tsdbCloseCommitFile(pCommith, false);
if (tsdbUpdateDFileSet(REPO_FS(pRepo), &(pCommith->wSet)) < 0) {
return -1;
}
return 0; _err:
return code;
} }
static int32_t tsdbCreateCommitIters(SCommitH *pCommith) { static int32_t tsdbCreateCommitIters(SCommitH *pCommith) {
...@@ -557,13 +582,6 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) { ...@@ -557,13 +582,6 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) {
pCommith->niters = 0; pCommith->niters = 0;
} }
static void tsdbResetCommitFile(SCommitH *pCommith) {
pCommith->isRFileSet = false;
pCommith->isDFileSame = false;
pCommith->isLFileSame = false;
taosArrayClear(pCommith->aBlkIdx);
}
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
SDiskID did; SDiskID did;
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
......
...@@ -196,33 +196,6 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) { ...@@ -196,33 +196,6 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
} else { } else {
pReadh->pBlkIdx = (SBlockIdx *)p; pReadh->pBlkIdx = (SBlockIdx *)p;
} }
// size_t size = taosArrayGetSize(pReadh->aBlkIdx);
// if (size > 0) {
// while (true) {
// if (pReadh->cidx >= size) {
// pReadh->pBlkIdx = NULL;
// break;
// }
// SBlockIdx *pBlkIdx = taosArrayGet(pReadh->aBlkIdx, pReadh->cidx);
// if (pBlkIdx->uid == TABLE_TID(pTable)) {
// if (pBlkIdx->uid == TABLE_UID(pTable)) {
// pReadh->pBlkIdx = pBlkIdx;
// } else {
// pReadh->pBlkIdx = NULL;
// }
// pReadh->cidx++;
// break;
// } else if (pBlkIdx->uid > TABLE_TID(pTable)) {
// pReadh->pBlkIdx = NULL;
// break;
// } else {
// pReadh->cidx++;
// }
// }
// } else {
// pReadh->pBlkIdx = NULL;
// }
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册