提交 38078033 编写于 作者: S Shengliang Guan

refactor: change lockfree to rwlock

上级 c5d18b5a
...@@ -19,11 +19,11 @@ ...@@ -19,11 +19,11 @@
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) { if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId); dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
taosWLockLatch(&pMgmt->pData->latch); taosThreadRwlockWrlock(&pMgmt->pData->lock);
pMgmt->pData->dnodeId = pCfg->dnodeId; pMgmt->pData->dnodeId = pCfg->dnodeId;
pMgmt->pData->clusterId = pCfg->clusterId; pMgmt->pData->clusterId = pCfg->clusterId;
dmWriteEps(pMgmt->pData); dmWriteEps(pMgmt->pData);
taosWUnLockLatch(&pMgmt->pData->latch); taosThreadRwlockUnlock(&pMgmt->pData->lock);
} }
} }
...@@ -50,7 +50,7 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) { ...@@ -50,7 +50,7 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
void dmSendStatusReq(SDnodeMgmt *pMgmt) { void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SStatusReq req = {0}; SStatusReq req = {0};
taosRLockLatch(&pMgmt->pData->latch); taosThreadRwlockRdlock(&pMgmt->pData->lock);
req.sver = tsVersion; req.sver = tsVersion;
req.dnodeVer = pMgmt->pData->dnodeVer; req.dnodeVer = pMgmt->pData->dnodeVer;
req.dnodeId = pMgmt->pData->dnodeId; req.dnodeId = pMgmt->pData->dnodeId;
...@@ -69,7 +69,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { ...@@ -69,7 +69,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN); memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN); memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN); memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
taosRUnLockLatch(&pMgmt->pData->latch); taosThreadRwlockUnlock(&pMgmt->pData->lock);
SMonVloadInfo vinfo = {0}; SMonVloadInfo vinfo = {0};
(*pMgmt->getVnodeLoadsFp)(&vinfo); (*pMgmt->getVnodeLoadsFp)(&vinfo);
......
...@@ -30,7 +30,6 @@ typedef struct SSnodeMgmt { ...@@ -30,7 +30,6 @@ typedef struct SSnodeMgmt {
SMsgCb msgCb; SMsgCb msgCb;
const char *path; const char *path;
const char *name; const char *name;
SRWLatch latch;
int8_t uniqueWorkerInUse; int8_t uniqueWorkerInUse;
SArray *uniqueWorkers; // SArray<SMultiWorker*> SArray *uniqueWorkers; // SArray<SMultiWorker*>
SSingleWorker sharedWorker; SSingleWorker sharedWorker;
......
...@@ -70,7 +70,7 @@ typedef struct SMgmtWrapper { ...@@ -70,7 +70,7 @@ typedef struct SMgmtWrapper {
const char *name; const char *name;
char *path; char *path;
int32_t refCount; int32_t refCount;
SRWLatch latch; TdThreadRwlock lock;
EDndNodeType ntype; EDndNodeType ntype;
bool deployed; bool deployed;
bool required; bool required;
......
...@@ -91,7 +91,7 @@ static int32_t dmInitVars(SDnode *pDnode, EDndNodeType rtype) { ...@@ -91,7 +91,7 @@ static int32_t dmInitVars(SDnode *pDnode, EDndNodeType rtype) {
return -1; return -1;
} }
taosInitRWLatch(&pData->latch); taosThreadRwlockInit(&pData->lock, NULL);
taosThreadMutexInit(&pDnode->mutex, NULL); taosThreadMutexInit(&pDnode->mutex, NULL);
return 0; return 0;
} }
...@@ -100,6 +100,7 @@ static void dmClearVars(SDnode *pDnode) { ...@@ -100,6 +100,7 @@ static void dmClearVars(SDnode *pDnode) {
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
taosMemoryFreeClear(pWrapper->path); taosMemoryFreeClear(pWrapper->path);
taosThreadRwlockDestroy(&pWrapper->lock);
} }
if (pDnode->lockfile != NULL) { if (pDnode->lockfile != NULL) {
taosUnLockFile(pDnode->lockfile); taosUnLockFile(pDnode->lockfile);
...@@ -108,7 +109,7 @@ static void dmClearVars(SDnode *pDnode) { ...@@ -108,7 +109,7 @@ static void dmClearVars(SDnode *pDnode) {
} }
SDnodeData *pData = &pDnode->data; SDnodeData *pData = &pDnode->data;
taosWLockLatch(&pData->latch); taosThreadRwlockWrlock(&pData->lock);
if (pData->dnodeEps != NULL) { if (pData->dnodeEps != NULL) {
taosArrayDestroy(pData->dnodeEps); taosArrayDestroy(pData->dnodeEps);
pData->dnodeEps = NULL; pData->dnodeEps = NULL;
...@@ -117,8 +118,9 @@ static void dmClearVars(SDnode *pDnode) { ...@@ -117,8 +118,9 @@ static void dmClearVars(SDnode *pDnode) {
taosHashCleanup(pData->dnodeHash); taosHashCleanup(pData->dnodeHash);
pData->dnodeHash = NULL; pData->dnodeHash = NULL;
} }
taosWUnLockLatch(&pData->latch); taosThreadRwlockUnlock(&pData->lock);
taosThreadRwlockDestroy(&pData->lock);
taosThreadMutexDestroy(&pDnode->mutex); taosThreadMutexDestroy(&pDnode->mutex);
memset(&pDnode->mutex, 0, sizeof(pDnode->mutex)); memset(&pDnode->mutex, 0, sizeof(pDnode->mutex));
} }
...@@ -151,7 +153,7 @@ int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) { ...@@ -151,7 +153,7 @@ int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) {
if (ntype == DNODE) { if (ntype == DNODE) {
pWrapper->proc.ptype = DND_PROC_SINGLE; pWrapper->proc.ptype = DND_PROC_SINGLE;
} }
taosInitRWLatch(&pWrapper->latch); taosThreadRwlockInit(&pWrapper->lock, NULL);
snprintf(path, sizeof(path), "%s%s%s", tsDataDir, TD_DIRSEP, pWrapper->name); snprintf(path, sizeof(path), "%s%s%s", tsDataDir, TD_DIRSEP, pWrapper->name);
pWrapper->path = strdup(path); pWrapper->path = strdup(path);
...@@ -223,7 +225,7 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) { ...@@ -223,7 +225,7 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
SMgmtWrapper *pRetWrapper = pWrapper; SMgmtWrapper *pRetWrapper = pWrapper;
taosRLockLatch(&pWrapper->latch); taosThreadRwlockRdlock(&pWrapper->lock);
if (pWrapper->deployed) { if (pWrapper->deployed) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
dTrace("node:%s, is acquired, ref:%d", pWrapper->name, refCount); dTrace("node:%s, is acquired, ref:%d", pWrapper->name, refCount);
...@@ -231,7 +233,7 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) { ...@@ -231,7 +233,7 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED; terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
pRetWrapper = NULL; pRetWrapper = NULL;
} }
taosRUnLockLatch(&pWrapper->latch); taosThreadRwlockUnlock(&pWrapper->lock);
return pRetWrapper; return pRetWrapper;
} }
...@@ -239,7 +241,7 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) { ...@@ -239,7 +241,7 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) { int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
int32_t code = 0; int32_t code = 0;
taosRLockLatch(&pWrapper->latch); taosThreadRwlockRdlock(&pWrapper->lock);
if (pWrapper->deployed || (InParentProc(pWrapper) && pWrapper->required)) { if (pWrapper->deployed || (InParentProc(pWrapper) && pWrapper->required)) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount); dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount);
...@@ -247,7 +249,7 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) { ...@@ -247,7 +249,7 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED; terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
code = -1; code = -1;
} }
taosRUnLockLatch(&pWrapper->latch); taosThreadRwlockUnlock(&pWrapper->lock);
return code; return code;
} }
...@@ -255,9 +257,9 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) { ...@@ -255,9 +257,9 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
void dmReleaseWrapper(SMgmtWrapper *pWrapper) { void dmReleaseWrapper(SMgmtWrapper *pWrapper) {
if (pWrapper == NULL) return; if (pWrapper == NULL) return;
taosRLockLatch(&pWrapper->latch); taosThreadRwlockRdlock(&pWrapper->lock);
int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1); int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
taosRUnLockLatch(&pWrapper->latch); taosThreadRwlockUnlock(&pWrapper->lock);
dTrace("node:%s, is released, ref:%d", pWrapper->name, refCount); dTrace("node:%s, is released, ref:%d", pWrapper->name, refCount);
} }
......
...@@ -186,12 +186,12 @@ void dmCloseNode(SMgmtWrapper *pWrapper) { ...@@ -186,12 +186,12 @@ void dmCloseNode(SMgmtWrapper *pWrapper) {
} }
} }
taosWLockLatch(&pWrapper->latch); taosThreadRwlockWrlock(&pWrapper->lock);
if (pWrapper->pMgmt != NULL) { if (pWrapper->pMgmt != NULL) {
(*pWrapper->func.closeFp)(pWrapper->pMgmt); (*pWrapper->func.closeFp)(pWrapper->pMgmt);
pWrapper->pMgmt = NULL; pWrapper->pMgmt = NULL;
} }
taosWUnLockLatch(&pWrapper->latch); taosThreadRwlockUnlock(&pWrapper->lock);
if (!OnlyInSingleProc(pWrapper)) { if (!OnlyInSingleProc(pWrapper)) {
dmCleanupProc(pWrapper); dmCleanupProc(pWrapper);
......
...@@ -104,7 +104,7 @@ typedef struct { ...@@ -104,7 +104,7 @@ typedef struct {
SEpSet mnodeEps; SEpSet mnodeEps;
SArray *dnodeEps; SArray *dnodeEps;
SHashObj *dnodeHash; SHashObj *dnodeHash;
SRWLatch latch; TdThreadRwlock lock;
SMsgCb msgCb; SMsgCb msgCb;
} SDnodeData; } SDnodeData;
......
...@@ -21,7 +21,7 @@ static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep); ...@@ -21,7 +21,7 @@ static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep);
static void dmResetEps(SDnodeData *pData, SArray *dnodeEps); static void dmResetEps(SDnodeData *pData, SArray *dnodeEps);
static void dmGetDnodeEp(SDnodeData *pData, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) { static void dmGetDnodeEp(SDnodeData *pData, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
taosRLockLatch(&pData->latch); taosThreadRwlockRdlock(&pData->lock);
SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t)); SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
if (pDnodeEp != NULL) { if (pDnodeEp != NULL) {
...@@ -36,7 +36,7 @@ static void dmGetDnodeEp(SDnodeData *pData, int32_t dnodeId, char *pEp, char *pF ...@@ -36,7 +36,7 @@ static void dmGetDnodeEp(SDnodeData *pData, int32_t dnodeId, char *pEp, char *pF
} }
} }
taosRUnLockLatch(&pData->latch); taosThreadRwlockUnlock(&pData->lock);
} }
int32_t dmReadEps(SDnodeData *pData) { int32_t dmReadEps(SDnodeData *pData) {
...@@ -232,7 +232,7 @@ void dmUpdateEps(SDnodeData *pData, SArray *eps) { ...@@ -232,7 +232,7 @@ void dmUpdateEps(SDnodeData *pData, SArray *eps) {
int32_t numOfEps = taosArrayGetSize(eps); int32_t numOfEps = taosArrayGetSize(eps);
if (numOfEps <= 0) return; if (numOfEps <= 0) return;
taosWLockLatch(&pData->latch); taosThreadRwlockWrlock(&pData->lock);
int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pData->dnodeEps); int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pData->dnodeEps);
if (numOfEps != numOfEpsOld) { if (numOfEps != numOfEpsOld) {
...@@ -246,7 +246,7 @@ void dmUpdateEps(SDnodeData *pData, SArray *eps) { ...@@ -246,7 +246,7 @@ void dmUpdateEps(SDnodeData *pData, SArray *eps) {
} }
} }
taosWUnLockLatch(&pData->latch); taosThreadRwlockUnlock(&pData->lock);
} }
static void dmResetEps(SDnodeData *pData, SArray *dnodeEps) { static void dmResetEps(SDnodeData *pData, SArray *dnodeEps) {
...@@ -292,7 +292,7 @@ static void dmPrintEps(SDnodeData *pData) { ...@@ -292,7 +292,7 @@ static void dmPrintEps(SDnodeData *pData) {
static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) { static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) {
bool changed = false; bool changed = false;
if (dnodeId == 0) return changed; if (dnodeId == 0) return changed;
taosRLockLatch(&pData->latch); taosThreadRwlockRdlock(&pData->lock);
SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t)); SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
if (pDnodeEp != NULL) { if (pDnodeEp != NULL) {
...@@ -304,24 +304,23 @@ static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) { ...@@ -304,24 +304,23 @@ static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) {
} }
} }
taosRUnLockLatch(&pData->latch); taosThreadRwlockUnlock(&pData->lock);
return changed; return changed;
} }
void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) { void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
taosRLockLatch(&pData->latch); taosThreadRwlockRdlock(&pData->lock);
*pEpSet = pData->mnodeEps; *pEpSet = pData->mnodeEps;
taosRUnLockLatch(&pData->latch); taosThreadRwlockUnlock(&pData->lock);
} }
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) { void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); taosThreadRwlockWrlock(&pData->lock);
taosWLockLatch(&pData->latch);
pData->mnodeEps = *pEpSet; pData->mnodeEps = *pEpSet;
taosThreadRwlockUnlock(&pData->lock);
dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
} }
taosWUnLockLatch(&pData->latch);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册