From b89877b4c1ee124bd8a9de5fcb4fa65fa435f558 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 13 Dec 2022 13:41:06 +0800 Subject: [PATCH] refact: add vnode timer --- source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 2 + source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 49 ++++++++++++++++++++++-- source/dnode/vnode/src/vnd/vnodeSync.c | 15 -------- 3 files changed, 47 insertions(+), 19 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index b38dc19361..b5c554e0ca 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -38,6 +38,8 @@ typedef struct SVnodeMgmt { TdThreadRwlock lock; SVnodesStat state; STfs *pTfs; + TdThread thread; + bool stop; } SVnodeMgmt; typedef struct { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 07ebd72379..56a9a0e22b 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -334,6 +334,49 @@ static void vmCleanup(SVnodeMgmt *pMgmt) { taosMemoryFree(pMgmt); } +static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {} + +static void *vmThreadFp(void *param) { + SVnodeMgmt *pMgmt = param; + int64_t lastTime = 0; + setThreadName("vnode-timer"); + + while (1) { + lastTime++; + taosMsleep(100); + if (pMgmt->stop) break; + if (lastTime % 10 != 0) continue; + + int64_t sec = lastTime / 10; + if (sec % (tsStatusInterval * 5) == 0) { + vmCheckSyncTimeout(pMgmt); + } + } + + return NULL; +} + +static int32_t vmInitTimer(SVnodeMgmt *pMgmt) { + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) { + dError("failed to create vnode timer thread since %s", strerror(errno)); + return -1; + } + + taosThreadAttrDestroy(&thAttr); + return 0; +} + +static void vmCleanupTimer(SVnodeMgmt *pMgmt) { + pMgmt->stop = true; + if (taosCheckPthreadValid(pMgmt->thread)) { + taosThreadJoin(pMgmt->thread, NULL); + taosThreadClear(&pMgmt->thread); + } +} + static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { int32_t code = -1; @@ -510,12 +553,10 @@ static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) { taosMemoryFree(ppVnodes); } - return 0; + return vmInitTimer(pMgmt); } -static void vmStop(SVnodeMgmt *pMgmt) { - // process inside the vnode -} +static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); } SMgmtFunc vmGetMgmtFunc() { SMgmtFunc mgmtFunc = {0}; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index aa215a852f..5a0556ba27 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -26,21 +26,6 @@ static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { tsem_wait(&pVnode->syncSem); } -static inline void vnodeWaitBlockMsgOld(SVnode *pVnode, const SRpcMsg *pMsg) { - if (vnodeIsMsgBlock(pMsg->msgType)) { - const STraceId *trace = &pMsg->info.traceId; - taosThreadMutexLock(&pVnode->lock); - if (!pVnode->blocked) { - vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); - pVnode->blocked = true; - taosThreadMutexUnlock(&pVnode->lock); - tsem_wait(&pVnode->syncSem); - } else { - taosThreadMutexUnlock(&pVnode->lock); - } - } -} - static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { if (vnodeIsMsgBlock(pMsg->msgType)) { const STraceId *trace = &pMsg->info.traceId; -- GitLab