From b39ceed07eeec3772cdafb9d759654c17c4d51f6 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 7 Dec 2020 17:42:52 +0800 Subject: [PATCH] TD-2324 --- src/vnode/inc/vnodeMain.h | 1 + src/vnode/inc/vnodeWorker.h | 4 ++-- src/vnode/src/vnodeMain.c | 13 ++++++------ src/vnode/src/vnodeWorker.c | 42 +++++++++++++++++-------------------- 4 files changed, 29 insertions(+), 31 deletions(-) diff --git a/src/vnode/inc/vnodeMain.h b/src/vnode/inc/vnodeMain.h index 058b6bd090..e1ddcdc36a 100644 --- a/src/vnode/inc/vnodeMain.h +++ b/src/vnode/inc/vnodeMain.h @@ -28,6 +28,7 @@ int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg); int32_t vnodeClose(int32_t vgId); int32_t vnodeReset(SVnodeObj *pVnode); +void vnodeCleanUp(SVnodeObj *pVnode); void vnodeDestroy(SVnodeObj *pVnode); #ifdef __cplusplus diff --git a/src/vnode/inc/vnodeWorker.h b/src/vnode/inc/vnodeWorker.h index abb0aa80ab..01d9d42900 100644 --- a/src/vnode/inc/vnodeWorker.h +++ b/src/vnode/inc/vnodeWorker.h @@ -23,8 +23,8 @@ extern "C" { int32_t vnodeInitMWorker(); void vnodeCleanupMWorker(); -int32_t vnodeOpenInMWorker(int32_t vgId, void *rpcHandle); -int32_t vnodeCleanupInMWorker(int32_t vgId, void *rpcHandle); +int32_t vnodeCleanupInMWorker(SVnodeObj *pVnode); +int32_t vnodeDestroyInMWorker(SVnodeObj *pVnode); #ifdef __cplusplus } diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index b37a0b568e..e95387b62c 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -26,8 +26,9 @@ #include "vnodeSync.h" #include "vnodeVersion.h" #include "vnodeMgmt.h" +#include "vnodeWorker.h" +#include "vnodeMain.h" -static void vnodeCleanUp(SVnodeObj *pVnode); static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno); int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) { @@ -110,6 +111,8 @@ int32_t vnodeDrop(int32_t vgId) { vInfo("vgId:%d, vnode will be dropped, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); pVnode->dropped = 1; + // remove from hash, so new messages wont be consumed + vnodeRemoveFromHash(pVnode); vnodeRelease(pVnode); vnodeCleanUp(pVnode); @@ -309,6 +312,7 @@ int32_t vnodeOpen(int32_t vgId) { if (pVnode->sync <= 0) { vError("vgId:%d, failed to open sync, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica, tstrerror(terrno)); + vnodeRemoveFromHash(pVnode); vnodeCleanUp(pVnode); return terrno; } @@ -322,6 +326,7 @@ int32_t vnodeClose(int32_t vgId) { if (pVnode == NULL) return 0; vDebug("vgId:%d, vnode will be closed, pVnode:%p", pVnode->vgId, pVnode); + vnodeRemoveFromHash(pVnode); vnodeRelease(pVnode); vnodeCleanUp(pVnode); @@ -398,11 +403,7 @@ void vnodeDestroy(SVnodeObj *pVnode) { tsdbDecCommitRef(vgId); } - -static void vnodeCleanUp(SVnodeObj *pVnode) { - // remove from hash, so new messages wont be consumed - vnodeRemoveFromHash(pVnode); - +void vnodeCleanUp(SVnodeObj *pVnode) { if (!vnodeInInitStatus(pVnode)) { // it may be in updateing or reset state, then it shall wait int32_t i = 0; diff --git a/src/vnode/src/vnodeWorker.c b/src/vnode/src/vnodeWorker.c index 4608d5e126..d6053cf18e 100644 --- a/src/vnode/src/vnodeWorker.c +++ b/src/vnode/src/vnodeWorker.c @@ -21,10 +21,11 @@ #include "tqueue.h" #include "tglobal.h" #include "vnodeWorker.h" +#include "vnodeMain.h" typedef enum { - VNODE_WORKER_ACTION_CREATE, - VNODE_WORKER_ACTION_DELETE + VNODE_WORKER_ACTION_CLEANUP, + VNODE_WORKER_ACTION_DESTROUY } EVMWorkerAction; typedef struct { @@ -132,14 +133,11 @@ void vnodeCleanupMWorker() { vnodeStopMWorker(); } -static int32_t vnodeWriteIntoMWorker(int32_t vgId, EVMWorkerAction action,void *rpcHandle) { +static int32_t vnodeWriteIntoMWorker(SVnodeObj *pVnode, EVMWorkerAction action, void *rpcHandle) { SVMWorkerMsg *pMsg = taosAllocateQitem(sizeof(SVMWorkerMsg)); if (pMsg == NULL) return TSDB_CODE_VND_OUT_OF_MEMORY; - SVnodeObj *pVnode = vnodeAcquire(vgId); - if (pVnode == NULL) return TSDB_CODE_VND_INVALID_VGROUP_ID; - - pMsg->vgId = vgId; + pMsg->vgId = pVnode->vgId; pMsg->pVnode = pVnode; pMsg->rpcHandle = rpcHandle; pMsg->action = action; @@ -150,29 +148,27 @@ static int32_t vnodeWriteIntoMWorker(int32_t vgId, EVMWorkerAction action,void * return code; } -int32_t vnodeOpenInMWorker(int32_t vgId, void *rpcHandle) { - vTrace("vgId:%d, will open in vmworker", vgId); - return vnodeWriteIntoMWorker(vgId, VNODE_WORKER_ACTION_CREATE, rpcHandle); +int32_t vnodeCleanupInMWorker(SVnodeObj *pVnode) { + vTrace("vgId:%d, will cleanup in vmworker", pVnode->vgId); + return vnodeWriteIntoMWorker(pVnode, VNODE_WORKER_ACTION_CLEANUP, NULL); } -int32_t vnodeCleanupInMWorker(int32_t vgId, void *rpcHandle) { - vTrace("vgId:%d, will cleanup in vmworker", vgId); - return vnodeWriteIntoMWorker(vgId, VNODE_WORKER_ACTION_DELETE, rpcHandle); +int32_t vnodeDestroyInMWorker(SVnodeObj *pVnode) { + vTrace("vgId:%d, will destroy in vmworker", pVnode->vgId); + return vnodeWriteIntoMWorker(pVnode, VNODE_WORKER_ACTION_DESTROUY, NULL); } static void vnodeFreeMWorkerMsg(SVMWorkerMsg *pMsg) { vTrace("vgId:%d, disposed in vmworker", pMsg->vgId); - vnodeRelease(pMsg->pVnode); taosFreeQitem(pMsg); } static void vnodeSendVMWorkerRpcRsp(SVMWorkerMsg *pMsg) { - SRpcMsg rpcRsp = { - .handle = pMsg->rpcHandle, - .code = pMsg->code, - }; + if (pMsg->rpcHandle != NULL) { + SRpcMsg rpcRsp = {.handle = pMsg->rpcHandle, .code = pMsg->code}; + rpcSendResponse(&rpcRsp); + } - rpcSendResponse(&rpcRsp); vnodeFreeMWorkerMsg(pMsg); } @@ -180,11 +176,11 @@ static void vnodeProcessMWorkerMsg(SVMWorkerMsg *pMsg) { pMsg->code = 0; switch (pMsg->action) { - case VNODE_WORKER_ACTION_CREATE: - pMsg->code = vnodeOpen(pMsg->vgId); + case VNODE_WORKER_ACTION_CLEANUP: + vnodeCleanUp(pMsg->pVnode); break; - case VNODE_WORKER_ACTION_DELETE: - pMsg->code = vnodeDrop(pMsg->vgId); + case VNODE_WORKER_ACTION_DESTROUY: + vnodeDestroy(pMsg->pVnode); break; default: break; -- GitLab