From a9fcc12c338e9af562213e2a8fc54909a99a7e7d Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Fri, 19 Aug 2022 20:08:09 +0800 Subject: [PATCH] enh: rsma batch process --- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 6 +++- source/dnode/vnode/src/inc/sma.h | 3 +- source/dnode/vnode/src/inc/vnodeInt.h | 3 ++ source/dnode/vnode/src/sma/smaOpen.c | 11 ++++++ source/dnode/vnode/src/sma/smaRollup.c | 36 +++++++++++-------- source/dnode/vnode/src/vnd/vnodeOpen.c | 3 ++ .../tsim/sma/rsmaPersistenceRecovery.sim | 6 ++++ 7 files changed, 52 insertions(+), 16 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 1f981cc9e0..4987aa42e1 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -87,10 +87,14 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { while (pVnode->refCount > 0) taosMsleep(10); dTrace("vgId:%d, wait for vnode queue is empty", pVnode->vgId); + while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10); - while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); + while (!taosQueueEmpty(pVnode->pQueryQ)) { + taosMsleep(10); + dInfo("prop:vgId:%d, query queue size is %d", pVnode->vgId, taosQueueItemSize(pVnode->pQueryQ)); + } while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10); dTrace("vgId:%d, vnode queue is empty", pVnode->vgId); diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 989d24295e..d32d67e29c 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -32,7 +32,8 @@ extern "C" { #define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) // clang-format on -#define RSMA_TASK_INFO_HASH_SLOT 8 +#define RSMA_TASK_INFO_HASH_SLOT (8) +#define RSMA_EXECUTOR_MAX (4) typedef struct SSmaEnv SSmaEnv; typedef struct SSmaStat SSmaStat; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 63d228ec8b..3361b63fd8 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -189,6 +189,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem int32_t smaInit(); void smaCleanUp(); int32_t smaOpen(SVnode* pVnode); +int32_t smaPreClose(SSma* pSma); int32_t smaClose(SSma* pSma); int32_t smaBegin(SSma* pSma); int32_t smaSyncPreCommit(SSma* pSma); @@ -322,10 +323,12 @@ struct SVnode { TdThreadMutex lock; bool blocked; bool restored; + bool inClose; tsem_t syncSem; SQHandle* pQuery; }; + #define TD_VID(PVNODE) ((PVNODE)->config.vgId) #define VND_TSDB(vnd) ((vnd)->pTsdb) diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index 235fb1f941..7128ecb94a 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -146,6 +146,17 @@ int32_t smaClose(SSma *pSma) { return 0; } +int32_t smaPreClose(SSma *pSma) { + if (pSma && VND_IS_RSMA(pSma->pVnode)) { + SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); + for (int32_t i = 0; i < RSMA_EXECUTOR_MAX; ++i) { + tsem_post(&(pRSmaStat->notEmpty)); + } + smaInfo("prop:vgId:%d post notEmtpy", SMA_VID(pSma)); + } + return 0; +} + /** * @brief rsma env restore * diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index e8018c0f33..049c2fef9d 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -19,7 +19,6 @@ #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid #define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt #define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt -#define RSMA_EXECUTOR_MAX (4) // cnt #define RSMA_FETCH_DELAY_MAX (1800000) // ms #define RSMA_FETCH_SKIP_MAX (1000) // cnt #define RSMA_FETCH_ACTIVE_MAX (1800) // ms @@ -671,7 +670,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm } else { smaDebug("vgId:%d, rsma %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level); } -#if 1 +#if 0 char flag[10] = {0}; snprintf(flag, 10, "level %" PRIi8, pItem->level); blockDebugShowDataBlocks(pResList, flag); @@ -1736,6 +1735,7 @@ _err: * @return int32_t */ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { + SVnode *pVnode = pSma->pVnode; SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); SHashObj *infoHash = NULL; @@ -1753,18 +1753,9 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { goto _err; } - int32_t nIdle = 0; + bool isBusy = false; while (true) { - if (++nIdle > 100) { - if (atomic_fetch_sub_8(&pRSmaStat->nExecutor, 1) > 1) { - // free the exec thread if without SubmitReq - break; - } else { - // keep at least 1 exec thread only if without SubmitReq in case of no query thread to use when busy again - atomic_add_fetch_8(&pRSmaStat->nExecutor, 1); - nIdle = 0; - } - } + isBusy = false; // step 1: rsma exec - consume data in buffer queue for all suids if (type == RSMA_EXEC_OVERFLOW || type == RSMA_EXEC_COMMIT) { void *pIter = taosHashIterate(infoHash, NULL); // infoHash has r/w lock @@ -1785,7 +1776,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { if (qallItemSize > 0) { // subtract the item size after the task finished, commit should wait for all items be consumed atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize); - nIdle = 0; + isBusy = true; } ASSERT(1 == atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0)); } @@ -1826,8 +1817,25 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { ASSERT(0); } + smaInfo("prop:vgId:%d loop end check", SMA_VID(pSma)); if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) { + if (pVnode->inClose) { + smaInfo("prop:vgId:%d loop end check - inClose and break", SMA_VID(pSma)); + break; + } + smaInfo("prop:vgId:%d loop end check - wait for notEmpty", SMA_VID(pSma)); tsem_wait(&pRSmaStat->notEmpty); + smaInfo("prop:vgId:%d loop end check - received notEmpty", SMA_VID(pSma)); + if (pVnode->inClose && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) { + smaInfo("prop:vgId:%d loop end check - break - inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma), pVnode->inClose, + atomic_load_64(&pRSmaStat->nBufItems)); + break; + } else { + smaInfo("prop:vgId:%d loop end check - continue - inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma), + pVnode->inClose, atomic_load_64(&pRSmaStat->nBufItems)); + } + } else { + smaInfo("prop:vgId:%d loop end check - continue to run", SMA_VID(pSma)); } } // end of while(true) diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index a4fd984fb7..dcfbd33b90 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -87,6 +87,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { pVnode->msgCb = msgCb; taosThreadMutexInit(&pVnode->lock, NULL); pVnode->blocked = false; + pVnode->inClose = false; tsem_init(&pVnode->syncSem, 0, 0); tsem_init(&(pVnode->canCommit), 0, 1); @@ -181,6 +182,8 @@ _err: void vnodePreClose(SVnode *pVnode) { if (pVnode) { syncLeaderTransfer(pVnode->sync); + pVnode->inClose = true; + smaPreClose(pVnode->pSma); } } diff --git a/tests/script/tsim/sma/rsmaPersistenceRecovery.sim b/tests/script/tsim/sma/rsmaPersistenceRecovery.sim index f53cd45d48..faff48b61c 100644 --- a/tests/script/tsim/sma/rsmaPersistenceRecovery.sim +++ b/tests/script/tsim/sma/rsmaPersistenceRecovery.sim @@ -35,6 +35,7 @@ sleep 7000 print =============== select * from retention level 2 from memory sql select * from ct1; print $data00 $data01 $data02 +print $data10 $data11 $data12 if $rows > 2 then print retention level 2 file rows $rows > 2 return -1 @@ -51,6 +52,7 @@ endi print =============== select * from retention level 1 from memory sql select * from ct1 where ts > now-8d; print $data00 $data01 $data02 +print $data10 $data11 $data12 if $rows > 2 then print retention level 1 file rows $rows > 2 return -1 @@ -89,6 +91,7 @@ system sh/exec.sh -n dnode1 -s start print =============== select * from retention level 2 from file sql select * from ct1; print $data00 $data01 $data02 +print $data10 $data11 $data12 if $rows > 2 then print retention level 2 file rows $rows > 2 return -1 @@ -104,6 +107,7 @@ endi print =============== select * from retention level 1 from file sql select * from ct1 where ts > now-8d; print $data00 $data01 $data02 +print $data10 $data11 $data12 if $rows > 2 then print retention level 1 file rows $rows > 2 return -1 @@ -141,6 +145,7 @@ sleep 7000 print =============== select * from retention level 2 from file and memory after rsma qtaskinfo recovery sql select * from ct1; print $data00 $data01 $data02 +print $data10 $data11 $data12 if $rows > 2 then print retention level 2 file/mem rows $rows > 2 return -1 @@ -163,6 +168,7 @@ endi print =============== select * from retention level 1 from file and memory after rsma qtaskinfo recovery sql select * from ct1 where ts > now-8d; print $data00 $data01 $data02 +print $data10 $data11 $data12 if $rows > 2 then print retention level 1 file/mem rows $rows > 2 return -1 -- GitLab