提交 c5d18b5a 编写于 作者: S Shengliang Guan

refactor: changel lockfree to rwlock

上级 568c6269
......@@ -26,21 +26,21 @@ extern "C" {
#endif
typedef struct SVnodeMgmt {
SDnodeData *pData;
SMsgCb msgCb;
const char *path;
const char *name;
SQWorkerPool queryPool;
SQWorkerPool fetchPool;
SWWorkerPool syncPool;
SWWorkerPool writePool;
SWWorkerPool mergePool;
SSingleWorker mgmtWorker;
SSingleWorker monitorWorker;
SHashObj *hash;
SRWLatch latch;
SVnodesStat state;
STfs *pTfs;
SDnodeData *pData;
SMsgCb msgCb;
const char *path;
const char *name;
SQWorkerPool queryPool;
SQWorkerPool fetchPool;
SWWorkerPool syncPool;
SWWorkerPool writePool;
SWWorkerPool mergePool;
SSingleWorker mgmtWorker;
SSingleWorker monitorWorker;
SHashObj *hash;
TdThreadRwlock lock;
SVnodesStat state;
STfs *pTfs;
} SVnodeMgmt;
typedef struct {
......
......@@ -17,7 +17,7 @@
#include "vmInt.h"
SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) {
taosRLockLatch(&pMgmt->latch);
taosThreadRwlockRdlock(&pMgmt->lock);
int32_t num = 0;
int32_t size = taosHashGetSize(pMgmt->hash);
......@@ -38,7 +38,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) {
}
}
taosRUnLockLatch(&pMgmt->latch);
taosThreadRwlockUnlock(&pMgmt->lock);
*numOfVnodes = num;
return pVnodes;
......
......@@ -20,7 +20,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
if (pInfo->pVloads == NULL) return;
taosRLockLatch(&pMgmt->latch);
taosThreadRwlockRdlock(&pMgmt->lock);
void *pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) {
......@@ -34,7 +34,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
pIter = taosHashIterate(pMgmt->hash, pIter);
}
taosRUnLockLatch(&pMgmt->latch);
taosThreadRwlockUnlock(&pMgmt->lock);
}
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
......
......@@ -20,14 +20,14 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
SVnodeObj *pVnode = NULL;
int32_t refCount = 0;
taosRLockLatch(&pMgmt->latch);
taosThreadRwlockRdlock(&pMgmt->lock);
taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
} else {
refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
}
taosRUnLockLatch(&pMgmt->latch);
taosThreadRwlockUnlock(&pMgmt->lock);
if (pVnode != NULL) {
dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
......@@ -39,9 +39,9 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
if (pVnode == NULL) return;
taosRLockLatch(&pMgmt->latch);
taosThreadRwlockRdlock(&pMgmt->lock);
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
taosRUnLockLatch(&pMgmt->latch);
taosThreadRwlockUnlock(&pMgmt->lock);
dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
}
......@@ -70,9 +70,9 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
return -1;
}
taosWLockLatch(&pMgmt->latch);
taosThreadRwlockWrlock(&pMgmt->lock);
int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
taosWUnLockLatch(&pMgmt->latch);
taosThreadRwlockUnlock(&pMgmt->lock);
return code;
}
......@@ -80,9 +80,9 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
char path[TSDB_FILENAME_LEN] = {0};
taosWLockLatch(&pMgmt->latch);
taosThreadRwlockWrlock(&pMgmt->lock);
taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
taosWUnLockLatch(&pMgmt->latch);
taosThreadRwlockUnlock(&pMgmt->lock);
vmReleaseVnode(pMgmt, pVnode);
while (pVnode->refCount > 0) taosMsleep(10);
......@@ -239,6 +239,7 @@ static void vmCleanup(SVnodeMgmt *pMgmt) {
vmStopWorker(pMgmt);
vnodeCleanup();
tfsClose(pMgmt->pTfs);
taosThreadRwlockDestroy(&pMgmt->lock);
taosMemoryFree(pMgmt);
}
......@@ -260,7 +261,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->msgCb.queueFps[MERGE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToMergeQueue;
pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
pMgmt->msgCb.mgmt = pMgmt;
taosInitRWLatch(&pMgmt->latch);
taosThreadRwlockInit(&pMgmt->lock, NULL);
SDiskCfg dCfg = {0};
tstrncpy(dCfg.dir, tsDataDir, TSDB_FILENAME_LEN);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册