未验证 提交 8b66fa15 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4244 from taosdata/feature/TD-2066

Feature/td 2066
...@@ -83,7 +83,7 @@ STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo); ...@@ -83,7 +83,7 @@ STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo);
int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg); int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg);
int32_t tsdbDropRepo(char *rootDir); int32_t tsdbDropRepo(char *rootDir);
TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH); TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH);
void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit); int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit);
int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg); int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg);
int tsdbGetState(TSDB_REPO_T *repo); int tsdbGetState(TSDB_REPO_T *repo);
......
...@@ -235,6 +235,7 @@ typedef struct { ...@@ -235,6 +235,7 @@ typedef struct {
sem_t readyToCommit; sem_t readyToCommit;
pthread_mutex_t mutex; pthread_mutex_t mutex;
bool repoLocked; bool repoLocked;
int32_t code; // Commit code
} STsdbRepo; } STsdbRepo;
// ------------------ tsdbRWHelper.c // ------------------ tsdbRWHelper.c
......
...@@ -28,6 +28,8 @@ void *tsdbCommitData(STsdbRepo *pRepo) { ...@@ -28,6 +28,8 @@ void *tsdbCommitData(STsdbRepo *pRepo) {
tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64 " meta rows: %d", tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64 " meta rows: %d",
REPO_ID(pRepo), pMem->keyFirst, pMem->keyLast, pMem->numOfRows, listNEles(pMem->actList)); REPO_ID(pRepo), pMem->keyFirst, pMem->keyLast, pMem->numOfRows, listNEles(pMem->actList));
pRepo->code = TSDB_CODE_SUCCESS;
// Commit to update meta file // Commit to update meta file
if (tsdbCommitMeta(pRepo) < 0) { if (tsdbCommitMeta(pRepo) < 0) {
tsdbError("vgId:%d error occurs while committing META data since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d error occurs while committing META data since %s", REPO_ID(pRepo), tstrerror(terrno));
...@@ -49,6 +51,7 @@ void *tsdbCommitData(STsdbRepo *pRepo) { ...@@ -49,6 +51,7 @@ void *tsdbCommitData(STsdbRepo *pRepo) {
_err: _err:
ASSERT(terrno != TSDB_CODE_SUCCESS); ASSERT(terrno != TSDB_CODE_SUCCESS);
pRepo->code = terrno;
tsdbInfo("vgId:%d commit over, failed", REPO_ID(pRepo)); tsdbInfo("vgId:%d commit over, failed", REPO_ID(pRepo));
tsdbEndCommit(pRepo, terrno); tsdbEndCommit(pRepo, terrno);
......
...@@ -134,17 +134,20 @@ _err: ...@@ -134,17 +134,20 @@ _err:
} }
// Note: all working thread and query thread must stopped when calling this function // Note: all working thread and query thread must stopped when calling this function
void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
if (repo == NULL) return; if (repo == NULL) return 0;
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
int vgId = REPO_ID(pRepo); int vgId = REPO_ID(pRepo);
terrno = TSDB_CODE_SUCCESS;
tsdbStopStream(pRepo); tsdbStopStream(pRepo);
if (toCommit) { if (toCommit) {
tsdbAsyncCommit(pRepo); tsdbAsyncCommit(pRepo);
sem_wait(&(pRepo->readyToCommit)); sem_wait(&(pRepo->readyToCommit));
terrno = pRepo->code;
} }
tsdbUnRefMemTable(pRepo, pRepo->mem); tsdbUnRefMemTable(pRepo, pRepo->mem);
tsdbUnRefMemTable(pRepo, pRepo->imem); tsdbUnRefMemTable(pRepo, pRepo->imem);
...@@ -156,6 +159,12 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { ...@@ -156,6 +159,12 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
tsdbCloseMeta(pRepo); tsdbCloseMeta(pRepo);
tsdbFreeRepo(pRepo); tsdbFreeRepo(pRepo);
tsdbDebug("vgId:%d repository is closed", vgId); tsdbDebug("vgId:%d repository is closed", vgId);
if (terrno != TSDB_CODE_SUCCESS) {
return -1;
} else {
return 0;
}
} }
uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) { uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) {
...@@ -619,6 +628,7 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) { ...@@ -619,6 +628,7 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) {
} }
pRepo->state = TSDB_STATE_OK; pRepo->state = TSDB_STATE_OK;
pRepo->code = TSDB_CODE_SUCCESS;
int code = pthread_mutex_init(&pRepo->mutex, NULL); int code = pthread_mutex_init(&pRepo->mutex, NULL);
if (code != 0) { if (code != 0) {
......
...@@ -209,6 +209,10 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { ...@@ -209,6 +209,10 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
sem_wait(&(pRepo->readyToCommit)); sem_wait(&(pRepo->readyToCommit));
if (pRepo->code != TSDB_CODE_SUCCESS) {
tsdbWarn("vgId:%d try to commit when TSDB not in good state: %s", REPO_ID(pRepo), tstrerror(terrno));
}
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START, TSDB_CODE_SUCCESS); if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START, TSDB_CODE_SUCCESS);
if (tsdbLockRepo(pRepo) < 0) return -1; if (tsdbLockRepo(pRepo) < 0) return -1;
pRepo->imem = pRepo->mem; pRepo->imem = pRepo->mem;
...@@ -223,10 +227,18 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { ...@@ -223,10 +227,18 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
int tsdbSyncCommit(TSDB_REPO_T *repo) { int tsdbSyncCommit(TSDB_REPO_T *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
tsdbAsyncCommit(pRepo); tsdbAsyncCommit(pRepo);
sem_wait(&(pRepo->readyToCommit)); sem_wait(&(pRepo->readyToCommit));
sem_post(&(pRepo->readyToCommit)); sem_post(&(pRepo->readyToCommit));
return 0;
if (pRepo->code != TSDB_CODE_SUCCESS) {
terrno = pRepo->code;
return -1;
} else {
terrno = TSDB_CODE_SUCCESS;
return 0;
}
} }
/** /**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册