提交 431998ee 编写于 作者: H Hongze Cheng

add to commit option to tsdbCloseRepo

上级 b9ac23bc
...@@ -72,7 +72,7 @@ typedef void TsdbRepoT; // use void to hide implementation details from outside ...@@ -72,7 +72,7 @@ typedef void TsdbRepoT; // use void to hide implementation details from outside
int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter); int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter);
int32_t tsdbDropRepo(TsdbRepoT *repo); int32_t tsdbDropRepo(TsdbRepoT *repo);
TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH); TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH);
int32_t tsdbCloseRepo(TsdbRepoT *repo); int32_t tsdbCloseRepo(TsdbRepoT *repo, int toCommit);
int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg); int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg);
// --------- TSDB TABLE DEFINITION // --------- TSDB TABLE DEFINITION
......
...@@ -258,7 +258,7 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) { ...@@ -258,7 +258,7 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) {
* *
* @return 0 for success, -1 for failure and the error number is set * @return 0 for success, -1 for failure and the error number is set
*/ */
int32_t tsdbCloseRepo(TsdbRepoT *repo) { int32_t tsdbCloseRepo(TsdbRepoT *repo, int toCommit) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
if (pRepo == NULL) return 0; if (pRepo == NULL) return 0;
int id = pRepo->config.tsdbId; int id = pRepo->config.tsdbId;
...@@ -285,7 +285,7 @@ int32_t tsdbCloseRepo(TsdbRepoT *repo) { ...@@ -285,7 +285,7 @@ int32_t tsdbCloseRepo(TsdbRepoT *repo) {
tsdbUnLockRepo(repo); tsdbUnLockRepo(repo);
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START); if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START);
tsdbCommitData((void *)repo); if (toCommit) tsdbCommitData((void *)repo);
tsdbCloseFileH(pRepo->tsdbFileH); tsdbCloseFileH(pRepo->tsdbFileH);
...@@ -1018,10 +1018,16 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -1018,10 +1018,16 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
// Create and open files for commit // Create and open files for commit
tsdbGetDataDirName(pRepo, dataDir); tsdbGetDataDirName(pRepo, dataDir);
if ((pGroup = tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) goto _err; if ((pGroup = tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) {
tsdbError("vgId:%d, failed to create file group %d", pRepo->config.tsdbId, fid);
goto _err;
}
// Open files for write/read // Open files for write/read
if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) goto _err; if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) {
tsdbError("vgId:%d, failed to set helper file", pRepo->config.tsdbId);
goto _err;
}
// Loop to commit data in each table // Loop to commit data in each table
for (int tid = 1; tid < pCfg->maxTables; tid++) { for (int tid = 1; tid < pCfg->maxTables; tid++) {
...@@ -1058,13 +1064,22 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -1058,13 +1064,22 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
ASSERT(pDataCols->numOfPoints == 0); ASSERT(pDataCols->numOfPoints == 0);
// Move the last block to the new .l file if neccessary // Move the last block to the new .l file if neccessary
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) goto _err; if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
tsdbError("vgId:%d, failed to move last block", pRepo->config.tsdbId);
goto _err;
}
// Write the SCompBlock part // Write the SCompBlock part
if (tsdbWriteCompInfo(pHelper) < 0) goto _err; if (tsdbWriteCompInfo(pHelper) < 0) {
tsdbError("vgId:%d, failed to write compInfo part", pRepo->config.tsdbId);
goto _err;
}
} }
if (tsdbWriteCompIdx(pHelper) < 0) goto _err; if (tsdbWriteCompIdx(pHelper) < 0) {
tsdbError("vgId:%d, failed to write compIdx part", pRepo->config.tsdbId);
goto _err;
}
tsdbCloseHelperFile(pHelper, 0); tsdbCloseHelperFile(pHelper, 0);
// TODO: make it atomic with some methods // TODO: make it atomic with some methods
......
...@@ -376,7 +376,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { ...@@ -376,7 +376,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
cqClose(pVnode->cq); cqClose(pVnode->cq);
pVnode->cq = NULL; pVnode->cq = NULL;
tsdbCloseRepo(pVnode->tsdb); tsdbCloseRepo(pVnode->tsdb, 1);
pVnode->tsdb = NULL; pVnode->tsdb = NULL;
walClose(pVnode->wal); walClose(pVnode->wal);
...@@ -427,7 +427,7 @@ static void vnodeNotifyFileSynced(void *ahandle) { ...@@ -427,7 +427,7 @@ static void vnodeNotifyFileSynced(void *ahandle) {
char rootDir[128] = "\0"; char rootDir[128] = "\0";
sprintf(rootDir, "%s/tsdb", pVnode->rootDir); sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
// clsoe tsdb, then open tsdb // clsoe tsdb, then open tsdb
tsdbCloseRepo(pVnode->tsdb); tsdbCloseRepo(pVnode->tsdb, 0);
STsdbAppH appH = {0}; STsdbAppH appH = {0};
appH.appH = (void *)pVnode; appH.appH = (void *)pVnode;
appH.notifyStatus = vnodeProcessTsdbStatus; appH.notifyStatus = vnodeProcessTsdbStatus;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册