diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 4987aa42e189d8c9b29b3adbce510e02f4b23696..a309aaf1cad4856110b8bfb95a63826cc583b735 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -91,10 +91,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10); - while (!taosQueueEmpty(pVnode->pQueryQ)) { - taosMsleep(10); - dInfo("prop:vgId:%d, query queue size is %d", pVnode->vgId, taosQueueItemSize(pVnode->pQueryQ)); - } + while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10); dTrace("vgId:%d, vnode queue is empty", pVnode->vgId); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 2c6e5f5ca1270a80194c670fdc3760b564d9994d..ac9afdb11f2957f54723d15cda63a967e0c5ee4b 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -316,6 +316,8 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { // step 1: set rsma stat atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 1); + pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; + ASSERT(pRSmaStat->commitAppliedVer > 0); // step 2: wait all triggered fetch tasks finished int32_t nLoops = 0; @@ -378,8 +380,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { // unlock // taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); #endif - // step 5: others - pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index 7128ecb94ac1fe38418e0da03252e09659b48818..b2344bc0ec918333ea399c525d7156402d21e095 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -152,7 +152,6 @@ int32_t smaPreClose(SSma *pSma) { for (int32_t i = 0; i < RSMA_EXECUTOR_MAX; ++i) { tsem_post(&(pRSmaStat->notEmpty)); } - smaInfo("prop:vgId:%d post notEmtpy", SMA_VID(pSma)); } return 0; } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 6414f822c0b4d804a43c6310df8b06183e5e2c72..5ef5bd0aeaabe8005e9b9da54cfbfcde2d8da7b2 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1769,7 +1769,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { int64_t itemSize = 0; if ((itemSize = taosQueueItemSize(pInfo->queue)) || RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) { - smaDebug("vgId:%d queueItemSize is %" PRIi64 " execType:%" PRIi8, SMA_VID(pSma), itemSize, type); + smaDebug("vgId:%d, queueItemSize is %" PRIi64 " execType:%" PRIi8, SMA_VID(pSma), itemSize, type); if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) { taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock int32_t qallItemSize = taosQallItemSize(pInfo->qall); @@ -1828,7 +1828,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { } tsem_wait(&pRSmaStat->notEmpty); if (pVnode->inClose && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) { - smaInfo("prop:vgId:%d loop end check - break - inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma), pVnode->inClose, + smaInfo("vgId:%d, exec task end, inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma), pVnode->inClose, atomic_load_64(&pRSmaStat->nBufItems)); break; } diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 64f223b974199b99b88867e9d93ddfed6d9d3bd1..8c73499229b89d67edbc21f86ff7f76b570a4275 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -220,13 +220,6 @@ int vnodeCommit(SVnode *pVnode) { vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID, pVnode->state.applied); - // preCommit - // smaSyncPreCommit(pVnode->pSma); - smaAsyncPreCommit(pVnode->pSma); - - vnodeBufPoolUnRef(pVnode->inUse); - pVnode->inUse = NULL; - pVnode->state.commitTerm = pVnode->state.applyTerm; // save info @@ -241,6 +234,16 @@ int vnodeCommit(SVnode *pVnode) { } walBeginSnapshot(pVnode->pWal, pVnode->state.applied); + // preCommit + // smaSyncPreCommit(pVnode->pSma); + if(smaAsyncPreCommit(pVnode->pSma) < 0){ + ASSERT(0); + return -1; + } + + vnodeBufPoolUnRef(pVnode->inUse); + pVnode->inUse = NULL; + // commit each sub-system if (metaCommit(pVnode->pMeta) < 0) { ASSERT(0); @@ -248,7 +251,10 @@ int vnodeCommit(SVnode *pVnode) { } if (VND_IS_RSMA(pVnode)) { - smaAsyncCommit(pVnode->pSma); + if (smaAsyncCommit(pVnode->pSma) < 0) { + ASSERT(0); + return -1; + } if (tsdbCommit(VND_RSMA0(pVnode)) < 0) { ASSERT(0); @@ -285,7 +291,10 @@ int vnodeCommit(SVnode *pVnode) { // postCommit // smaSyncPostCommit(pVnode->pSma); - smaAsyncPostCommit(pVnode->pSma); + if (smaAsyncPostCommit(pVnode->pSma) < 0) { + ASSERT(0); + return -1; + } // apply the commit (TODO) walEndSnapshot(pVnode->pWal);