From 4328ac123cd3542dec7d54a7629a637c7d28d3ec Mon Sep 17 00:00:00 2001 From: jtao1735 Date: Wed, 13 May 2020 04:04:55 +0000 Subject: [PATCH] tsdb notify the vnode about it status changes --- src/inc/tsdb.h | 6 ++++-- src/tsdb/src/tsdbMain.c | 8 +++++--- src/vnode/src/vnodeMain.c | 19 ++++++++++++------- 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 4468ee4262..32e3541692 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -34,12 +34,14 @@ extern "C" { #define TSDB_INVALID_SUPER_TABLE_ID -1 +#define TSDB_STATUS_COMMIT_START 1 +#define TSDB_STATUS_COMMIT_OVER 2 + // --------- TSDB APPLICATION HANDLE DEFINITION typedef struct { - // WAL handle void *appH; void *cqH; - int (*walCallBack)(void *); + int (*notifyStatus)(void *, int status); int (*eventCallBack)(void *); } STsdbAppH; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index a353f54353..df09b26c71 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -330,7 +330,7 @@ int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg) { int32_t tsdbTriggerCommit(TsdbRepoT *repo) { STsdbRepo *pRepo = (STsdbRepo *)repo; - if (pRepo->appH.walCallBack) pRepo->appH.walCallBack(pRepo->appH.appH); + if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START); tsdbLockRepo(repo); if (pRepo->commit) { @@ -942,7 +942,6 @@ static void tsdbFreeMemTable(SMemTable *pMemTable) { // Commit to file static void *tsdbCommitData(void *arg) { - printf("Starting to commit....\n"); STsdbRepo * pRepo = (STsdbRepo *)arg; STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbCache *pCache = pRepo->tsdbCache; @@ -951,6 +950,8 @@ static void *tsdbCommitData(void *arg) { SRWHelper whelper = {0}; if (pCache->imem == NULL) return NULL; + tsdbPrint("vgId: %d, starting to commit....", pRepo->config.tsdbId); + // Create the iterator to read from cache SSkipListIterator **iters = tsdbCreateTableIters(pMeta, pCfg->maxTables); if (iters == NULL) { @@ -974,6 +975,7 @@ static void *tsdbCommitData(void *arg) { // Do retention actions tsdbFitRetention(pRepo); + if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER); _exit: tdFreeDataCols(pDataCols); @@ -1176,4 +1178,4 @@ uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, int32_t * magic = *size; return magic; -} \ No newline at end of file +} diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 8d35be9640..ec5ac40d04 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -33,12 +33,11 @@ static int32_t tsOpennedVnodes; static void *tsDnodeVnodesHash; static void vnodeCleanUp(SVnodeObj *pVnode); static void vnodeBuildVloadMsg(char *pNode, void * param); -static int vnodeWalCallback(void *arg); static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg); static int32_t vnodeReadCfg(SVnodeObj *pVnode); static int32_t vnodeSaveVersion(SVnodeObj *pVnode); static bool vnodeReadVersion(SVnodeObj *pVnode); -static int vnodeWalCallback(void *arg); +static int vnodeProcessTsdbStatus(void *arg, int status); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static void vnodeNotifyRole(void *ahandle, int8_t role); @@ -206,7 +205,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { STsdbAppH appH = {0}; appH.appH = (void *)pVnode; - appH.walCallBack = vnodeWalCallback; + appH.notifyStatus = vnodeProcessTsdbStatus; appH.cqH = pVnode->cq; sprintf(temp, "%s/tsdb", rootDir); @@ -374,14 +373,20 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { walClose(pVnode->wal); pVnode->wal = NULL; - vnodeSaveVersion(pVnode); vnodeRelease(pVnode); } // TODO: this is a simple implement -static int vnodeWalCallback(void *arg) { +static int vnodeProcessTsdbStatus(void *arg, int status) { SVnodeObj *pVnode = arg; - return walRenew(pVnode->wal); + + if (status == TSDB_STATUS_COMMIT_START) + return walRenew(pVnode->wal); + + if (status == TSDB_STATUS_COMMIT_OVER) + return vnodeSaveVersion(pVnode); + + return 0; } static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size) { @@ -414,7 +419,7 @@ static void vnodeNotifyFileSynced(void *ahandle) { tsdbCloseRepo(pVnode->tsdb); STsdbAppH appH = {0}; appH.appH = (void *)pVnode; - appH.walCallBack = vnodeWalCallback; + appH.notifyStatus = vnodeProcessTsdbStatus; appH.cqH = pVnode->cq; pVnode->tsdb = tsdbOpenRepo(rootDir, &appH); } -- GitLab