diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index e2b7327e8f7cea8851f70d5145b45e8f3a7c0a90..b62b8b533e80a2509802bd200f7e13bad21c427c 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -77,7 +77,7 @@ void vnodeBufPoolReset(SVBufPool* pPool); // vnodeQuery.c int32_t vnodeQueryOpen(SVnode* pVnode); -void vnodeQueryPreClose(SVnode *pVnode); +void vnodeQueryPreClose(SVnode* pVnode); void vnodeQueryClose(SVnode* pVnode); int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct); int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct); @@ -86,7 +86,6 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg); // vnodeCommit.c int32_t vnodeBegin(SVnode* pVnode); int32_t vnodeShouldCommit(SVnode* pVnode); -int32_t vnodeCommit(SVnode* pVnode); void vnodeRollback(SVnode* pVnode); int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index f229b3b12748d4f0bdab9205e2bbc3f3b6cfc946..f8a9522d7fb3df1d6d8d00c8165ca446b910880a 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -154,6 +154,7 @@ int32_t metaGetStbStats(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo); int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback); int tsdbClose(STsdb** pTsdb); int32_t tsdbBegin(STsdb* pTsdb); +int32_t tsdbPrepareCommit(STsdb* pTsdb); int32_t tsdbCommit(STsdb* pTsdb); int32_t tsdbFinishCommit(STsdb* pTsdb); int32_t tsdbRollbackCommit(STsdb* pTsdb); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 391e10e223620bd892034bb7ae83785d811562bd..f3acfca39c542654829b0c48d1f5ed67b050afcd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -150,20 +150,26 @@ _exit: return code; } +int32_t tsdbPrepareCommit(STsdb *pTsdb) { + taosThreadRwlockWrlock(&pTsdb->rwLock); + ASSERT(pTsdb->imem == NULL); + pTsdb->imem = pTsdb->mem; + pTsdb->mem = NULL; + taosThreadRwlockUnlock(&pTsdb->rwLock); + + return 0; +} + int32_t tsdbCommit(STsdb *pTsdb) { if (!pTsdb) return 0; int32_t code = 0; int32_t lino = 0; SCommitter commith; - SMemTable *pMemTable = pTsdb->mem; + SMemTable *pMemTable = pTsdb->imem; // check if (pMemTable->nRow == 0 && pMemTable->nDel == 0) { - taosThreadRwlockWrlock(&pTsdb->rwLock); - pTsdb->mem = NULL; - taosThreadRwlockUnlock(&pTsdb->rwLock); - tsdbUnrefMemTable(pMemTable); goto _exit; } @@ -811,12 +817,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { int32_t lino = 0; memset(pCommitter, 0, sizeof(*pCommitter)); - ASSERT(pTsdb->mem && pTsdb->imem == NULL && "last tsdb commit incomplete"); - - taosThreadRwlockWrlock(&pTsdb->rwLock); - pTsdb->imem = pTsdb->mem; - pTsdb->mem = NULL; - taosThreadRwlockUnlock(&pTsdb->rwLock); + ASSERT(pTsdb->imem && "last tsdb commit incomplete"); pCommitter->pTsdb = pTsdb; pCommitter->commitID = pTsdb->pVnode->state.commitID; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index ce9404348c0f424522ec1de09e7cae7c5c7d2706..79e4f46241dada394ee6b28c7788ec7b02b6d7d4 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -15,12 +15,17 @@ #include "vnd.h" +typedef struct { + SVnodeInfo info; + SVnode *pVnode; +} SCommitInfo; + #define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json" -static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); -static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo); -static void vnodeWaitCommit(SVnode *pVnode); +static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); +static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo); +static int vnodeCommitImpl(SCommitInfo *pInfo); int vnodeBegin(SVnode *pVnode) { // alloc buffer pool @@ -185,27 +190,28 @@ _err: return -1; } -typedef struct { - SVnodeInfo info; - SVnode *pVnode; -} SCommitInfo; static void vnodePrepareCommit(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); vnodeBufPoolUnRef(pVnode->inUse); pVnode->inUse = NULL; + + tsdbPrepareCommit(pVnode->pTsdb); } static int32_t vnodeCommitTask(void *arg) { int32_t code = 0; - SVnode *pVnode = (SVnode *)pVnode; + SCommitInfo *pInfo = (SCommitInfo *)arg; - code = vnodeCommit(pVnode); + // commit + code = vnodeCommitImpl(pInfo); if (code) goto _exit; - tsem_post(&pVnode->canCommit); + // end commit + tsem_post(&pInfo->pVnode->canCommit); _exit: + taosMemoryFree(pInfo); return code; } int vnodeAsyncCommit(SVnode *pVnode) { @@ -215,16 +221,18 @@ int vnodeAsyncCommit(SVnode *pVnode) { vnodePrepareCommit(pVnode); // schedule the task - SVnodeInfo *pInfo = (SVnodeInfo *)taosMemoryCalloc(1, sizeof(*pInfo)); + pVnode->state.commitTerm = pVnode->state.applyTerm; + + SCommitInfo *pInfo = (SCommitInfo *)taosMemoryCalloc(1, sizeof(*pInfo)); if (NULL == pInfo) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - pInfo->config = pVnode->config; - pInfo->state.committed = pVnode->state.applied; - pInfo->state.commitTerm = pVnode->state.applyTerm; - pInfo->state.commitID = pVnode->state.commitID; - vnodeScheduleTask(vnodeCommitTask, pVnode); + pInfo->info.config = pVnode->config; + pInfo->info.state.committed = pVnode->state.applied; + pInfo->info.state.commitTerm = pVnode->state.applyTerm; + pInfo->info.state.commitID = pVnode->state.commitID; + vnodeScheduleTask(vnodeCommitTask, pInfo); _exit: if (code) { @@ -243,11 +251,12 @@ int vnodeSyncCommit(SVnode *pVnode) { return 0; } -int vnodeCommit(SVnode *pVnode) { - int32_t code = 0; - int32_t lino = 0; - SVnodeInfo info = {0}; - char dir[TSDB_FILENAME_LEN]; +static int vnodeCommitImpl(SCommitInfo *pInfo) { + int32_t code = 0; + int32_t lino = 0; + + char dir[TSDB_FILENAME_LEN] = {0}; + SVnode *pVnode = pInfo->pVnode; vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode), pVnode->state.commitID, pVnode->state.applied, pVnode->state.applyTerm); @@ -258,19 +267,13 @@ int vnodeCommit(SVnode *pVnode) { return -1; } - pVnode->state.commitTerm = pVnode->state.applyTerm; - // save info - info.config = pVnode->config; - info.state.committed = pVnode->state.applied; - info.state.commitTerm = pVnode->state.applyTerm; - info.state.commitID = pVnode->state.commitID; if (pVnode->pTfs) { snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); } else { snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path); } - if (vnodeSaveInfo(dir, &info) < 0) { + if (vnodeSaveInfo(dir, &pInfo->info) < 0) { code = terrno; TSDB_CHECK_CODE(code, lino, _exit); } @@ -281,9 +284,6 @@ int vnodeCommit(SVnode *pVnode) { code = smaPreCommit(pVnode->pSma); TSDB_CHECK_CODE(code, lino, _exit); - vnodeBufPoolUnRef(pVnode->inUse); - pVnode->inUse = NULL; - // commit each sub-system if (metaCommit(pVnode->pMeta) < 0) { code = TSDB_CODE_FAILED; @@ -304,7 +304,7 @@ int vnodeCommit(SVnode *pVnode) { } // commit info - if (vnodeCommitInfo(dir, &info) < 0) { + if (vnodeCommitInfo(dir, &pInfo->info) < 0) { code = terrno; TSDB_CHECK_CODE(code, lino, _exit); } @@ -322,14 +322,13 @@ int vnodeCommit(SVnode *pVnode) { TSDB_CHECK_CODE(code, lino, _exit); } - pVnode->state.committed = info.state.committed; + pVnode->state.committed = pInfo->info.state.committed; if (smaPostCommit(pVnode->pSma) < 0) { vError("vgId:%d, failed to post-commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } - // apply the commit (TODO) // walEndSnapshot(pVnode->pWal); syncEndSnapshot(pVnode->sync);