diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 4468ee4262c7fda28b42c89b1b184bcc6703a43f..32e35416927d144cdf3dabaa51e29ada6d390ac9 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -34,12 +34,14 @@ extern "C" { #define TSDB_INVALID_SUPER_TABLE_ID -1 +#define TSDB_STATUS_COMMIT_START 1 +#define TSDB_STATUS_COMMIT_OVER 2 + // --------- TSDB APPLICATION HANDLE DEFINITION typedef struct { - // WAL handle void *appH; void *cqH; - int (*walCallBack)(void *); + int (*notifyStatus)(void *, int status); int (*eventCallBack)(void *); } STsdbAppH; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index a353f54353b8e01055403f1130c2efa7429aa62d..df09b26c717f8f5549d9c889cbb602c0b8042cfa 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -330,7 +330,7 @@ int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg) { int32_t tsdbTriggerCommit(TsdbRepoT *repo) { STsdbRepo *pRepo = (STsdbRepo *)repo; - if (pRepo->appH.walCallBack) pRepo->appH.walCallBack(pRepo->appH.appH); + if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START); tsdbLockRepo(repo); if (pRepo->commit) { @@ -942,7 +942,6 @@ static void tsdbFreeMemTable(SMemTable *pMemTable) { // Commit to file static void *tsdbCommitData(void *arg) { - printf("Starting to commit....\n"); STsdbRepo * pRepo = (STsdbRepo *)arg; STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbCache *pCache = pRepo->tsdbCache; @@ -951,6 +950,8 @@ static void *tsdbCommitData(void *arg) { SRWHelper whelper = {0}; if (pCache->imem == NULL) return NULL; + tsdbPrint("vgId: %d, starting to commit....", pRepo->config.tsdbId); + // Create the iterator to read from cache SSkipListIterator **iters = tsdbCreateTableIters(pMeta, pCfg->maxTables); if (iters == NULL) { @@ -974,6 +975,7 @@ static void *tsdbCommitData(void *arg) { // Do retention actions tsdbFitRetention(pRepo); + if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER); _exit: tdFreeDataCols(pDataCols); @@ -1176,4 +1178,4 @@ uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, int32_t * magic = *size; return magic; -} \ No newline at end of file +} diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 8d35be96409141625848c4be8c31550c16f49df1..ec5ac40d041e11404c4872eaec5880b147f26d4a 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -33,12 +33,11 @@ static int32_t tsOpennedVnodes; static void *tsDnodeVnodesHash; static void vnodeCleanUp(SVnodeObj *pVnode); static void vnodeBuildVloadMsg(char *pNode, void * param); -static int vnodeWalCallback(void *arg); static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg); static int32_t vnodeReadCfg(SVnodeObj *pVnode); static int32_t vnodeSaveVersion(SVnodeObj *pVnode); static bool vnodeReadVersion(SVnodeObj *pVnode); -static int vnodeWalCallback(void *arg); +static int vnodeProcessTsdbStatus(void *arg, int status); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static void vnodeNotifyRole(void *ahandle, int8_t role); @@ -206,7 +205,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { STsdbAppH appH = {0}; appH.appH = (void *)pVnode; - appH.walCallBack = vnodeWalCallback; + appH.notifyStatus = vnodeProcessTsdbStatus; appH.cqH = pVnode->cq; sprintf(temp, "%s/tsdb", rootDir); @@ -374,14 +373,20 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { walClose(pVnode->wal); pVnode->wal = NULL; - vnodeSaveVersion(pVnode); vnodeRelease(pVnode); } // TODO: this is a simple implement -static int vnodeWalCallback(void *arg) { +static int vnodeProcessTsdbStatus(void *arg, int status) { SVnodeObj *pVnode = arg; - return walRenew(pVnode->wal); + + if (status == TSDB_STATUS_COMMIT_START) + return walRenew(pVnode->wal); + + if (status == TSDB_STATUS_COMMIT_OVER) + return vnodeSaveVersion(pVnode); + + return 0; } static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size) { @@ -414,7 +419,7 @@ static void vnodeNotifyFileSynced(void *ahandle) { tsdbCloseRepo(pVnode->tsdb); STsdbAppH appH = {0}; appH.appH = (void *)pVnode; - appH.walCallBack = vnodeWalCallback; + appH.notifyStatus = vnodeProcessTsdbStatus; appH.cqH = pVnode->cq; pVnode->tsdb = tsdbOpenRepo(rootDir, &appH); }