From f9e0aad82761db775bc0c1774be58a16fea15e99 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sat, 20 Aug 2022 23:28:48 +0800 Subject: [PATCH] enh: rsma batch process --- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 5 +---- source/dnode/vnode/src/sma/smaCommit.c | 4 ++-- source/dnode/vnode/src/sma/smaOpen.c | 1 - source/dnode/vnode/src/sma/smaRollup.c | 4 ++-- source/dnode/vnode/src/vnd/vnodeCommit.c | 27 ++++++++++++++++-------- 5 files changed, 23 insertions(+), 18 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 4987aa42e1..a309aaf1ca 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -91,10 +91,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10); - while (!taosQueueEmpty(pVnode->pQueryQ)) { - taosMsleep(10); - dInfo("prop:vgId:%d, query queue size is %d", pVnode->vgId, taosQueueItemSize(pVnode->pQueryQ)); - } + while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10); dTrace("vgId:%d, vnode queue is empty", pVnode->vgId); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 2c6e5f5ca1..ac9afdb11f 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -316,6 +316,8 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { // step 1: set rsma stat atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 1); + pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; + ASSERT(pRSmaStat->commitAppliedVer > 0); // step 2: wait all triggered fetch tasks finished int32_t nLoops = 0; @@ -378,8 +380,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { // unlock // taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); #endif - // step 5: others - pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index 7128ecb94a..b2344bc0ec 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -152,7 +152,6 @@ int32_t smaPreClose(SSma *pSma) { for (int32_t i = 0; i < RSMA_EXECUTOR_MAX; ++i) { tsem_post(&(pRSmaStat->notEmpty)); } - smaInfo("prop:vgId:%d post notEmtpy", SMA_VID(pSma)); } return 0; } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 6414f822c0..5ef5bd0aea 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1769,7 +1769,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { int64_t itemSize = 0; if ((itemSize = taosQueueItemSize(pInfo->queue)) || RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) { - smaDebug("vgId:%d queueItemSize is %" PRIi64 " execType:%" PRIi8, SMA_VID(pSma), itemSize, type); + smaDebug("vgId:%d, queueItemSize is %" PRIi64 " execType:%" PRIi8, SMA_VID(pSma), itemSize, type); if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) { taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock int32_t qallItemSize = taosQallItemSize(pInfo->qall); @@ -1828,7 +1828,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { } tsem_wait(&pRSmaStat->notEmpty); if (pVnode->inClose && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) { - smaInfo("prop:vgId:%d loop end check - break - inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma), pVnode->inClose, + smaInfo("vgId:%d, exec task end, inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma), pVnode->inClose, atomic_load_64(&pRSmaStat->nBufItems)); break; } diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 64f223b974..8c73499229 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -220,13 +220,6 @@ int vnodeCommit(SVnode *pVnode) { vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID, pVnode->state.applied); - // preCommit - // smaSyncPreCommit(pVnode->pSma); - smaAsyncPreCommit(pVnode->pSma); - - vnodeBufPoolUnRef(pVnode->inUse); - pVnode->inUse = NULL; - pVnode->state.commitTerm = pVnode->state.applyTerm; // save info @@ -241,6 +234,16 @@ int vnodeCommit(SVnode *pVnode) { } walBeginSnapshot(pVnode->pWal, pVnode->state.applied); + // preCommit + // smaSyncPreCommit(pVnode->pSma); + if(smaAsyncPreCommit(pVnode->pSma) < 0){ + ASSERT(0); + return -1; + } + + vnodeBufPoolUnRef(pVnode->inUse); + pVnode->inUse = NULL; + // commit each sub-system if (metaCommit(pVnode->pMeta) < 0) { ASSERT(0); @@ -248,7 +251,10 @@ int vnodeCommit(SVnode *pVnode) { } if (VND_IS_RSMA(pVnode)) { - smaAsyncCommit(pVnode->pSma); + if (smaAsyncCommit(pVnode->pSma) < 0) { + ASSERT(0); + return -1; + } if (tsdbCommit(VND_RSMA0(pVnode)) < 0) { ASSERT(0); @@ -285,7 +291,10 @@ int vnodeCommit(SVnode *pVnode) { // postCommit // smaSyncPostCommit(pVnode->pSma); - smaAsyncPostCommit(pVnode->pSma); + if (smaAsyncPostCommit(pVnode->pSma) < 0) { + ASSERT(0); + return -1; + } // apply the commit (TODO) walEndSnapshot(pVnode->pWal); -- GitLab