diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 4bdaf8d3531aefcbcd74eddecbc55152cfcbc798..ce9404348c0f424522ec1de09e7cae7c5c7d2706 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -15,12 +15,11 @@ #include "vnd.h" -#define VND_INFO_FNAME "vnode.json" +#define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json" static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo); -static int vnodeCommitImpl(void *arg); static void vnodeWaitCommit(SVnode *pVnode); int vnodeBegin(SVnode *pVnode) { @@ -107,7 +106,8 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { // free info binary taosMemoryFree(data); - vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d", pInfo->config.vgId, fname, pInfo->config.syncCfg.replicaNum); + vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d", pInfo->config.vgId, fname, + pInfo->config.syncCfg.replicaNum); return 0; @@ -185,21 +185,61 @@ _err: return -1; } +typedef struct { + SVnodeInfo info; + SVnode *pVnode; +} SCommitInfo; +static void vnodePrepareCommit(SVnode *pVnode) { + tsem_wait(&pVnode->canCommit); + + vnodeBufPoolUnRef(pVnode->inUse); + pVnode->inUse = NULL; +} +static int32_t vnodeCommitTask(void *arg) { + int32_t code = 0; + + SVnode *pVnode = (SVnode *)pVnode; + + code = vnodeCommit(pVnode); + if (code) goto _exit; + + tsem_post(&pVnode->canCommit); + +_exit: + return code; +} int vnodeAsyncCommit(SVnode *pVnode) { - vnodeWaitCommit(pVnode); + int32_t code = 0; - // vnodeBufPoolSwitch(pVnode); - // tsdbPrepareCommit(pVnode->pTsdb); + // prepare to commit + vnodePrepareCommit(pVnode); - vnodeScheduleTask(vnodeCommitImpl, pVnode); + // schedule the task + SVnodeInfo *pInfo = (SVnodeInfo *)taosMemoryCalloc(1, sizeof(*pInfo)); + if (NULL == pInfo) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + pInfo->config = pVnode->config; + pInfo->state.committed = pVnode->state.applied; + pInfo->state.commitTerm = pVnode->state.applyTerm; + pInfo->state.commitID = pVnode->state.commitID; + vnodeScheduleTask(vnodeCommitTask, pVnode); - return 0; +_exit: + if (code) { + vError("vgId:%d %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), + pVnode->state.commitID); + } else { + vDebug("vgId:%d %s done", TD_VID(pVnode), __func__); + } + return code; } int vnodeSyncCommit(SVnode *pVnode) { vnodeAsyncCommit(pVnode); - vnodeWaitCommit(pVnode); - tsem_post(&(pVnode->canCommit)); + tsem_wait(&pVnode->canCommit); + tsem_post(&pVnode->canCommit); return 0; } @@ -318,20 +358,6 @@ void vnodeRollback(SVnode *pVnode) { (void)taosRemoveFile(tFName); } -static int vnodeCommitImpl(void *arg) { - SVnode *pVnode = (SVnode *)arg; - - // metaCommit(pVnode->pMeta); - tqCommit(pVnode->pTq); - // tsdbCommit(pVnode->pTsdb, ); - - // vnodeBufPoolRecycle(pVnode); - tsem_post(&(pVnode->canCommit)); - return 0; -} - -static FORCE_INLINE void vnodeWaitCommit(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); } - static int vnodeEncodeState(const void *pObj, SJson *pJson) { const SVState *pState = (SVState *)pObj; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 0ff4a46d4418dd4156876c61b6e0286ccf01dd26..e09fafb75690f13dd160763126d139dd21ae8c1b 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -242,14 +242,14 @@ _err: return NULL; } -void vnodePreClose(SVnode *pVnode) { +void vnodePreClose(SVnode *pVnode) { vnodeQueryPreClose(pVnode); - vnodeSyncPreClose(pVnode); + vnodeSyncPreClose(pVnode); } void vnodeClose(SVnode *pVnode) { if (pVnode) { - vnodeCommit(pVnode); + vnodeSyncCommit(pVnode); vnodeSyncClose(pVnode); vnodeQueryClose(pVnode); walClose(pVnode->pWal); diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index e8cdf9513f8fc80d479114273a5bf4984bb71337..a34744a1da9055f5131ef96afdca0e3e79c9e90e 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -259,7 +259,7 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr pWriter->ever = ever; // commit it - code = vnodeCommit(pVnode); + code = vnodeSyncCommit(pVnode); if (code) { taosMemoryFree(pWriter); goto _err; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index d63c45fc0e21d07834a02f7cba6ea6a983f7efe8..d8079ca437208bf897093e9309904a3e6bc13eab 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -311,11 +311,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp if (vnodeShouldCommit(pVnode)) { _do_commit: vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version); - // commit current change - if (vnodeCommit(pVnode) < 0) { - vError("vgId:%d, failed to commit vnode since %s.", TD_VID(pVnode), tstrerror(terrno)); - goto _err; - } + vnodeAsyncCommit(pVnode); // start a new one if (vnodeBegin(pVnode) < 0) {