diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 474214be7e48a74c7e4f87bae39245a1c90f69f5..58859f42bc80daa3317d789950c1625c1533cf5f 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -83,7 +83,7 @@ STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo); int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg); int32_t tsdbDropRepo(char *rootDir); TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH); -void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit); +int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit); int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg); int tsdbGetState(TSDB_REPO_T *repo); diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index a905f9803fd1c217b85840b9243aa3f761903fc6..5c978abd1da5cc5caa17450661d980ca648763c1 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -235,6 +235,7 @@ typedef struct { sem_t readyToCommit; pthread_mutex_t mutex; bool repoLocked; + int32_t code; // Commit code } STsdbRepo; // ------------------ tsdbRWHelper.c diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 0112f40ffd709170fadb9ae80dc90214cc63332a..650a32eede7a49c26acb3c908f9376e3a0e7ac64 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -28,6 +28,8 @@ void *tsdbCommitData(STsdbRepo *pRepo) { tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64 " meta rows: %d", REPO_ID(pRepo), pMem->keyFirst, pMem->keyLast, pMem->numOfRows, listNEles(pMem->actList)); + pRepo->code = TSDB_CODE_SUCCESS; + // Commit to update meta file if (tsdbCommitMeta(pRepo) < 0) { tsdbError("vgId:%d error occurs while committing META data since %s", REPO_ID(pRepo), tstrerror(terrno)); @@ -49,6 +51,7 @@ void *tsdbCommitData(STsdbRepo *pRepo) { _err: ASSERT(terrno != TSDB_CODE_SUCCESS); + pRepo->code = terrno; tsdbInfo("vgId:%d commit over, failed", REPO_ID(pRepo)); tsdbEndCommit(pRepo, terrno); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 3a3a824afab55d9e01a70b15dff1d07cfd506db6..3990c0c516d1768135934b098efbcd7746ab424c 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -134,17 +134,20 @@ _err: } // Note: all working thread and query thread must stopped when calling this function -void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { - if (repo == NULL) return; +int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { + if (repo == NULL) return 0; STsdbRepo *pRepo = (STsdbRepo *)repo; int vgId = REPO_ID(pRepo); + terrno = TSDB_CODE_SUCCESS; + tsdbStopStream(pRepo); if (toCommit) { tsdbAsyncCommit(pRepo); sem_wait(&(pRepo->readyToCommit)); + terrno = pRepo->code; } tsdbUnRefMemTable(pRepo, pRepo->mem); tsdbUnRefMemTable(pRepo, pRepo->imem); @@ -156,6 +159,12 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { tsdbCloseMeta(pRepo); tsdbFreeRepo(pRepo); tsdbDebug("vgId:%d repository is closed", vgId); + + if (terrno != TSDB_CODE_SUCCESS) { + return -1; + } else { + return 0; + } } uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) { @@ -619,6 +628,7 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) { } pRepo->state = TSDB_STATE_OK; + pRepo->code = TSDB_CODE_SUCCESS; int code = pthread_mutex_init(&pRepo->mutex, NULL); if (code != 0) { diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 6d7835438cc04004dc50a23b77cea185ff149697..71944c87c6d68d530e07a13a05bf5ac89ee3754d 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -209,6 +209,10 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { sem_wait(&(pRepo->readyToCommit)); + if (pRepo->code != TSDB_CODE_SUCCESS) { + tsdbWarn("vgId:%d try to commit when TSDB not in good state: %s", REPO_ID(pRepo), tstrerror(terrno)); + } + if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START, TSDB_CODE_SUCCESS); if (tsdbLockRepo(pRepo) < 0) return -1; pRepo->imem = pRepo->mem; @@ -223,10 +227,18 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { int tsdbSyncCommit(TSDB_REPO_T *repo) { STsdbRepo *pRepo = (STsdbRepo *)repo; + tsdbAsyncCommit(pRepo); sem_wait(&(pRepo->readyToCommit)); sem_post(&(pRepo->readyToCommit)); - return 0; + + if (pRepo->code != TSDB_CODE_SUCCESS) { + terrno = pRepo->code; + return -1; + } else { + terrno = TSDB_CODE_SUCCESS; + return 0; + } } /**