diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 7fc10c4237cd2af28cc649644b39d9d134361c49..022d6204efae05731c24acd02dbbfb0dbd26780f 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -26,21 +26,21 @@ extern "C" { #endif typedef struct SVnodeMgmt { - SDnodeData *pData; - SMsgCb msgCb; - const char *path; - const char *name; - SQWorkerPool queryPool; - SQWorkerPool fetchPool; - SWWorkerPool syncPool; - SWWorkerPool writePool; - SWWorkerPool mergePool; - SSingleWorker mgmtWorker; - SSingleWorker monitorWorker; - SHashObj *hash; - SRWLatch latch; - SVnodesStat state; - STfs *pTfs; + SDnodeData *pData; + SMsgCb msgCb; + const char *path; + const char *name; + SQWorkerPool queryPool; + SQWorkerPool fetchPool; + SWWorkerPool syncPool; + SWWorkerPool writePool; + SWWorkerPool mergePool; + SSingleWorker mgmtWorker; + SSingleWorker monitorWorker; + SHashObj *hash; + TdThreadRwlock lock; + SVnodesStat state; + STfs *pTfs; } SVnodeMgmt; typedef struct { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c index 7a6c5f982ed13317a4d047f95b8893f9798d4a09..cf5a7ad88544bad3e9fbe21e5605b621148183fe 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c @@ -17,7 +17,7 @@ #include "vmInt.h" SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) { - taosRLockLatch(&pMgmt->latch); + taosThreadRwlockRdlock(&pMgmt->lock); int32_t num = 0; int32_t size = taosHashGetSize(pMgmt->hash); @@ -38,7 +38,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) { } } - taosRUnLockLatch(&pMgmt->latch); + taosThreadRwlockUnlock(&pMgmt->lock); *numOfVnodes = num; return pVnodes; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index a4da6d089c6b2d21db01207747d1b291ffbbd5fd..602922feebc974dfea03a40eb2f7d011a66f6601 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -20,7 +20,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad)); if (pInfo->pVloads == NULL) return; - taosRLockLatch(&pMgmt->latch); + taosThreadRwlockRdlock(&pMgmt->lock); void *pIter = taosHashIterate(pMgmt->hash, NULL); while (pIter) { @@ -34,7 +34,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { pIter = taosHashIterate(pMgmt->hash, pIter); } - taosRUnLockLatch(&pMgmt->latch); + taosThreadRwlockUnlock(&pMgmt->lock); } void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 41c0b3086b19118a90be1687c66073860f8d31d9..0c8d492ef449624e7462b736fcdd9c2ffb9c2ac2 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -20,14 +20,14 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { SVnodeObj *pVnode = NULL; int32_t refCount = 0; - taosRLockLatch(&pMgmt->latch); + taosThreadRwlockRdlock(&pMgmt->lock); taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode); if (pVnode == NULL) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; } else { refCount = atomic_add_fetch_32(&pVnode->refCount, 1); } - taosRUnLockLatch(&pMgmt->latch); + taosThreadRwlockUnlock(&pMgmt->lock); if (pVnode != NULL) { dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); @@ -39,9 +39,9 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { if (pVnode == NULL) return; - taosRLockLatch(&pMgmt->latch); + taosThreadRwlockRdlock(&pMgmt->lock); int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); - taosRUnLockLatch(&pMgmt->latch); + taosThreadRwlockUnlock(&pMgmt->lock); dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); } @@ -70,9 +70,9 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { return -1; } - taosWLockLatch(&pMgmt->latch); + taosThreadRwlockWrlock(&pMgmt->lock); int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); - taosWUnLockLatch(&pMgmt->latch); + taosThreadRwlockUnlock(&pMgmt->lock); return code; } @@ -80,9 +80,9 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { char path[TSDB_FILENAME_LEN] = {0}; - taosWLockLatch(&pMgmt->latch); + taosThreadRwlockWrlock(&pMgmt->lock); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); - taosWUnLockLatch(&pMgmt->latch); + taosThreadRwlockUnlock(&pMgmt->lock); vmReleaseVnode(pMgmt, pVnode); while (pVnode->refCount > 0) taosMsleep(10); @@ -239,6 +239,7 @@ static void vmCleanup(SVnodeMgmt *pMgmt) { vmStopWorker(pMgmt); vnodeCleanup(); tfsClose(pMgmt->pTfs); + taosThreadRwlockDestroy(&pMgmt->lock); taosMemoryFree(pMgmt); } @@ -260,7 +261,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->msgCb.queueFps[MERGE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToMergeQueue; pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize; pMgmt->msgCb.mgmt = pMgmt; - taosInitRWLatch(&pMgmt->latch); + taosThreadRwlockInit(&pMgmt->lock, NULL); SDiskCfg dCfg = {0}; tstrncpy(dCfg.dir, tsDataDir, TSDB_FILENAME_LEN);