diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 2602544bb46b07298829cbc43f599eccb8705902..5b17e53904fe5ca6b7127e2b27bbae06063b7b2a 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -318,6 +318,7 @@ int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem); void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); +int tsdbAsyncCommit(STsdbRepo* pRepo); // ------------------ tsdbFile.c #define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index e6cf8653994746427b55fdafa7a7b3b30c9ecbb4..36d8901bdfe26bdcc41ef9381f38600127ab0471 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -135,12 +135,14 @@ _err: return NULL; } +// Note: all working thread and query thread must stopped when calling this function void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { if (repo == NULL) return; STsdbRepo *pRepo = (STsdbRepo *)repo; - // TODO: wait for commit over + tsdbAsyncCommit(pRepo); + if (pRepo->commit) pthread_join(pRepo->commitThread, NULL); tsdbCloseFileH(pRepo); tsdbCloseBufPool(pRepo); diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 4b8c76702ead17e5ff119c4540cad3fc7af6cc0d..ce57111125eacb913e606baa3beae6ac9a679b22 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -173,42 +173,10 @@ int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) { void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { STsdbCfg * pCfg = &pRepo->config; STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); - int code = 0; if (pBufBlock != NULL && pBufBlock->remain < bytes) { if (listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 2) { // need to commit mem - if (pRepo->imem) { - code = pthread_join(pRepo->commitThread, NULL); - if (code != 0) { - tsdbError("vgId:%d failed to thread join since %s", REPO_ID(pRepo), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return NULL; - } - - if (tsdbUnRefMemTable(pRepo, pRepo->imem) < 0) { - tsdbError("vgId:%d failed to unref memtable since %s", REPO_ID(pRepo), tstrerror(terrno)) - return NULL; - } - } - - ASSERT(pRepo->commit == 0); - SMemTable *pImem = pRepo->imem; - if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START); - - if (tsdbLockRepo(pRepo) < 0) return NULL; - pRepo->imem = pRepo->mem; - pRepo->mem = NULL; - pRepo->commit = 1; - code = pthread_create(&pRepo->commitThread, NULL, tsdbCommitData, (void *)pRepo); - if (code != 0) { - tsdbError("vgId:%d failed to create commit thread since %s", REPO_ID(pRepo), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(code); - tsdbUnlockRepo(pRepo); - return NULL; - } - if (tsdbUnlockRepo(pRepo) < 0) return NULL; - - if (pImem && tsdbUnRefMemTable(pRepo, pImem) < 0) return NULL; + if (tsdbAsyncCommit(pRepo) < 0) return NULL; } else { if (tsdbLockRepo(pRepo) < 0) return NULL; SListNode *pNode = tsdbAllocBufBlockFromPool(pRepo); @@ -242,6 +210,42 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { return ptr; } +int tsdbAsyncCommit(STsdbRepo *pRepo) { + SMemTable *pIMem = pRepo->imem; + int code = 0; + + if (pIMem != NULL) { + ASSERT(pRepo->commit); + code = pthread_join(pRepo->commitThread, NULL); + if (code != 0) { + tsdbError("vgId:%d failed to thread join since %s", REPO_ID(pRepo), strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + } + + ASSERT(pRepo->commit == 0); + if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START); + if (pRepo->mem != NULL) { + if (tsdbLockRepo(pRepo) < 0) return -1; + pRepo->imem = pRepo->mem; + pRepo->mem = NULL; + pRepo->commit = 1; + code = pthread_create(&pRepo->commitThread, NULL, tsdbCommitData, (void *)pRepo); + if (code != 0) { + tsdbError("vgId:%d failed to create commit thread since %s", REPO_ID(pRepo), strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(code); + tsdbUnlockRepo(pRepo); + return -1; + } + if (tsdbUnlockRepo(pRepo) < 0) return -1; + } + + if (pIMem && tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1; + + return 0; +} + // ---------------- LOCAL FUNCTIONS ---------------- static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) { ASSERT(pRepo != NULL);