diff --git a/src/balance/src/bnMain.c b/src/balance/src/bnMain.c index 383f98191313d3a40f0899787c0f421f2d741cef..d80488fe9fa35df7af217b26cb93c85f9b11192a 100644 --- a/src/balance/src/bnMain.c +++ b/src/balance/src/bnMain.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "os.h" +#include "tref.h" #include "tsync.h" #include "tglobal.h" #include "dnode.h" @@ -28,7 +29,9 @@ #include "mnodeUser.h" #include "mnodeVgroup.h" -static SBnMgmt tsBnMgmt;; +extern int64_t tsDnodeRid; +extern int64_t tsSdbRid; +static SBnMgmt tsBnMgmt; static void bnMonitorDnodeModule(); static void bnLock() { @@ -529,6 +532,9 @@ void bnCheckStatus() { void * pIter = NULL; SDnodeObj *pDnode = NULL; + void *dnodeSdb = taosAcquireRef(tsSdbRid, tsDnodeRid); + if (dnodeSdb == NULL) return; + while (1) { pIter = mnodeGetNextDnode(pIter, &pDnode); if (pDnode == NULL) break; @@ -543,6 +549,8 @@ void bnCheckStatus() { } mnodeDecDnodeRef(pDnode); } + + taosReleaseRef(tsSdbRid, tsDnodeRid); } void bnCheckModules() { diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index c15ae729ed06bc21a1f08faa26e23b4b96145d96..d72bc5f4121ec0cd71c1c1dad5d9db5421c66a37 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -366,7 +366,7 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand queue->numOfItems--; atomic_sub_fetch_32(&qset->numOfItems, 1); code = 1; - uTrace("item:%p is read out from queue:%p, type:%d items:%d", *pitem, queue, *type, queue->numOfItems); + uTrace("item:%p is read out from queue:%p, type:%d items:%d", *pitem, queue, pNode->type, queue->numOfItems); } pthread_mutex_unlock(&queue->mutex); diff --git a/src/vnode/inc/vnodeMain.h b/src/vnode/inc/vnodeMain.h index 058b6bd09065b539a44c305374886c6d81d822b1..e1ddcdc36aa1fbf434b138f8d6fef5966e1fbc3e 100644 --- a/src/vnode/inc/vnodeMain.h +++ b/src/vnode/inc/vnodeMain.h @@ -28,6 +28,7 @@ int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg); int32_t vnodeClose(int32_t vgId); int32_t vnodeReset(SVnodeObj *pVnode); +void vnodeCleanUp(SVnodeObj *pVnode); void vnodeDestroy(SVnodeObj *pVnode); #ifdef __cplusplus diff --git a/src/vnode/inc/vnodeWorker.h b/src/vnode/inc/vnodeWorker.h index abb0aa80ab6da4be202730f70050ece9ebee1178..01d9d42900ef7e0c56bd396f5c698a43c4b501f8 100644 --- a/src/vnode/inc/vnodeWorker.h +++ b/src/vnode/inc/vnodeWorker.h @@ -23,8 +23,8 @@ extern "C" { int32_t vnodeInitMWorker(); void vnodeCleanupMWorker(); -int32_t vnodeOpenInMWorker(int32_t vgId, void *rpcHandle); -int32_t vnodeCleanupInMWorker(int32_t vgId, void *rpcHandle); +int32_t vnodeCleanupInMWorker(SVnodeObj *pVnode); +int32_t vnodeDestroyInMWorker(SVnodeObj *pVnode); #ifdef __cplusplus } diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index b37a0b568e3c2a68596deea2b551e2dc78de8c2a..e286a972dc97f94613614fdefea273cae7459f26 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -26,8 +26,9 @@ #include "vnodeSync.h" #include "vnodeVersion.h" #include "vnodeMgmt.h" +#include "vnodeWorker.h" +#include "vnodeMain.h" -static void vnodeCleanUp(SVnodeObj *pVnode); static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno); int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) { @@ -110,8 +111,10 @@ int32_t vnodeDrop(int32_t vgId) { vInfo("vgId:%d, vnode will be dropped, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); pVnode->dropped = 1; + // remove from hash, so new messages wont be consumed + vnodeRemoveFromHash(pVnode); vnodeRelease(pVnode); - vnodeCleanUp(pVnode); + vnodeCleanupInMWorker(pVnode); return TSDB_CODE_SUCCESS; } @@ -309,6 +312,7 @@ int32_t vnodeOpen(int32_t vgId) { if (pVnode->sync <= 0) { vError("vgId:%d, failed to open sync, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica, tstrerror(terrno)); + vnodeRemoveFromHash(pVnode); vnodeCleanUp(pVnode); return terrno; } @@ -322,6 +326,7 @@ int32_t vnodeClose(int32_t vgId) { if (pVnode == NULL) return 0; vDebug("vgId:%d, vnode will be closed, pVnode:%p", pVnode->vgId, pVnode); + vnodeRemoveFromHash(pVnode); vnodeRelease(pVnode); vnodeCleanUp(pVnode); @@ -398,11 +403,7 @@ void vnodeDestroy(SVnodeObj *pVnode) { tsdbDecCommitRef(vgId); } - -static void vnodeCleanUp(SVnodeObj *pVnode) { - // remove from hash, so new messages wont be consumed - vnodeRemoveFromHash(pVnode); - +void vnodeCleanUp(SVnodeObj *pVnode) { if (!vnodeInInitStatus(pVnode)) { // it may be in updateing or reset state, then it shall wait int32_t i = 0; diff --git a/src/vnode/src/vnodeMgmt.c b/src/vnode/src/vnodeMgmt.c index 631ec13ee8e0f1569bbedd0ee6647a74efca5c88..cf42690d7d0208f50b781e20a4c7c8885f8c5fba 100644 --- a/src/vnode/src/vnodeMgmt.c +++ b/src/vnode/src/vnodeMgmt.c @@ -114,7 +114,7 @@ void vnodeRelease(void *vparam) { } } else { vDebug("vgId:%d, vnode will be destroyed, refCount:%d pVnode:%p", pVnode->vgId, refCount, pVnode); - vnodeDestroy(pVnode); + vnodeDestroyInMWorker(pVnode); int32_t count = taosHashGetSize(tsVnodesHash); vDebug("vgId:%d, vnode is destroyed, vnodes:%d", pVnode->vgId, count); } diff --git a/src/vnode/src/vnodeWorker.c b/src/vnode/src/vnodeWorker.c index 4608d5e1267000781cafe3b63f3018bc448e7a5f..d6053cf18e2569a197c4bdf96ff6394ede716e42 100644 --- a/src/vnode/src/vnodeWorker.c +++ b/src/vnode/src/vnodeWorker.c @@ -21,10 +21,11 @@ #include "tqueue.h" #include "tglobal.h" #include "vnodeWorker.h" +#include "vnodeMain.h" typedef enum { - VNODE_WORKER_ACTION_CREATE, - VNODE_WORKER_ACTION_DELETE + VNODE_WORKER_ACTION_CLEANUP, + VNODE_WORKER_ACTION_DESTROUY } EVMWorkerAction; typedef struct { @@ -132,14 +133,11 @@ void vnodeCleanupMWorker() { vnodeStopMWorker(); } -static int32_t vnodeWriteIntoMWorker(int32_t vgId, EVMWorkerAction action,void *rpcHandle) { +static int32_t vnodeWriteIntoMWorker(SVnodeObj *pVnode, EVMWorkerAction action, void *rpcHandle) { SVMWorkerMsg *pMsg = taosAllocateQitem(sizeof(SVMWorkerMsg)); if (pMsg == NULL) return TSDB_CODE_VND_OUT_OF_MEMORY; - SVnodeObj *pVnode = vnodeAcquire(vgId); - if (pVnode == NULL) return TSDB_CODE_VND_INVALID_VGROUP_ID; - - pMsg->vgId = vgId; + pMsg->vgId = pVnode->vgId; pMsg->pVnode = pVnode; pMsg->rpcHandle = rpcHandle; pMsg->action = action; @@ -150,29 +148,27 @@ static int32_t vnodeWriteIntoMWorker(int32_t vgId, EVMWorkerAction action,void * return code; } -int32_t vnodeOpenInMWorker(int32_t vgId, void *rpcHandle) { - vTrace("vgId:%d, will open in vmworker", vgId); - return vnodeWriteIntoMWorker(vgId, VNODE_WORKER_ACTION_CREATE, rpcHandle); +int32_t vnodeCleanupInMWorker(SVnodeObj *pVnode) { + vTrace("vgId:%d, will cleanup in vmworker", pVnode->vgId); + return vnodeWriteIntoMWorker(pVnode, VNODE_WORKER_ACTION_CLEANUP, NULL); } -int32_t vnodeCleanupInMWorker(int32_t vgId, void *rpcHandle) { - vTrace("vgId:%d, will cleanup in vmworker", vgId); - return vnodeWriteIntoMWorker(vgId, VNODE_WORKER_ACTION_DELETE, rpcHandle); +int32_t vnodeDestroyInMWorker(SVnodeObj *pVnode) { + vTrace("vgId:%d, will destroy in vmworker", pVnode->vgId); + return vnodeWriteIntoMWorker(pVnode, VNODE_WORKER_ACTION_DESTROUY, NULL); } static void vnodeFreeMWorkerMsg(SVMWorkerMsg *pMsg) { vTrace("vgId:%d, disposed in vmworker", pMsg->vgId); - vnodeRelease(pMsg->pVnode); taosFreeQitem(pMsg); } static void vnodeSendVMWorkerRpcRsp(SVMWorkerMsg *pMsg) { - SRpcMsg rpcRsp = { - .handle = pMsg->rpcHandle, - .code = pMsg->code, - }; + if (pMsg->rpcHandle != NULL) { + SRpcMsg rpcRsp = {.handle = pMsg->rpcHandle, .code = pMsg->code}; + rpcSendResponse(&rpcRsp); + } - rpcSendResponse(&rpcRsp); vnodeFreeMWorkerMsg(pMsg); } @@ -180,11 +176,11 @@ static void vnodeProcessMWorkerMsg(SVMWorkerMsg *pMsg) { pMsg->code = 0; switch (pMsg->action) { - case VNODE_WORKER_ACTION_CREATE: - pMsg->code = vnodeOpen(pMsg->vgId); + case VNODE_WORKER_ACTION_CLEANUP: + vnodeCleanUp(pMsg->pVnode); break; - case VNODE_WORKER_ACTION_DELETE: - pMsg->code = vnodeDrop(pMsg->vgId); + case VNODE_WORKER_ACTION_DESTROUY: + vnodeDestroy(pMsg->pVnode); break; default: break;